@@ -94,13 +94,21 @@ impl UpstreamQuery for GetProjectStates {
9494 }
9595}
9696
97+ /// Error returned to the requester.
98+ #[ derive( Copy , Clone , Debug , thiserror:: Error ) ]
99+ pub enum Error {
100+ /// Fetching the project state exceeded the configured deadline.
101+ #[ error( "deadline exceeded while fetching project state" ) ]
102+ DeadlineExceeded ,
103+ }
104+
97105/// The wrapper struct for the incoming external requests which also keeps addition information.
98106#[ derive( Debug ) ]
99107struct ProjectStateChannel {
100108 // Main broadcast channel.
101- channel : BroadcastChannel < SourceProjectState > ,
109+ channel : BroadcastChannel < Message > ,
102110 // Additional broadcast channels tracked from merge operations.
103- merged : Vec < BroadcastChannel < SourceProjectState > > ,
111+ merged : Vec < BroadcastChannel < Message > > ,
104112 revision : Revision ,
105113 deadline : Instant ,
106114 no_cache : bool ,
@@ -113,7 +121,7 @@ struct ProjectStateChannel {
113121
114122impl ProjectStateChannel {
115123 pub fn new (
116- sender : BroadcastSender < SourceProjectState > ,
124+ sender : BroadcastSender < Message > ,
117125 revision : Revision ,
118126 timeout : Duration ,
119127 no_cache : bool ,
@@ -141,22 +149,23 @@ impl ProjectStateChannel {
141149 /// If the new revision is different from the contained revision this clears the revision.
142150 /// To not have multiple fetches per revision per batch, we need to find a common denominator
143151 /// for requests with different revisions, which is always to fetch the full project config.
144- pub fn attach ( & mut self , sender : BroadcastSender < SourceProjectState > , revision : Revision ) {
152+ pub fn attach ( & mut self , sender : BroadcastSender < Message > , revision : Revision ) {
145153 self . channel . attach ( sender) ;
146154 if self . revision != revision {
147155 self . revision = Revision :: default ( ) ;
148156 }
149157 }
150158
151159 pub fn send ( self , state : SourceProjectState ) {
152- for channel in self . merged {
153- channel. send ( state. clone ( ) ) ;
154- }
155- self . channel . send ( state)
160+ self . do_send ( Ok ( state) ) ;
161+ }
162+
163+ pub fn error ( self , err : Error ) {
164+ self . do_send ( Err ( err) ) ;
156165 }
157166
158- pub fn expired ( & self ) -> bool {
159- Instant :: now ( ) > self . deadline
167+ pub fn expired ( & self , now : Instant ) -> bool {
168+ now > self . deadline
160169 }
161170
162171 pub fn merge ( & mut self , channel : ProjectStateChannel ) {
@@ -182,28 +191,35 @@ impl ProjectStateChannel {
182191 self . errors += errors;
183192 self . pending += pending;
184193 }
194+
195+ fn do_send ( self , message : Message ) {
196+ for channel in self . merged {
197+ channel. send ( message. clone ( ) ) ;
198+ }
199+ self . channel . send ( message)
200+ }
185201}
186202
187203/// The map of project keys with their project state channels.
188204type ProjectStateChannels = HashMap < ProjectKey , ProjectStateChannel > ;
189205
206+ /// The message used to communicate with the requester.
207+ type Message = Result < SourceProjectState , Error > ;
208+
190209/// This is the [`UpstreamProjectSourceService`] interface.
191210///
192211/// The service is responsible for fetching the [`ParsedProjectState`] from the upstream.
193212/// Internally it maintains the buffer queue of the incoming requests, which got scheduled to fetch the
194213/// state and takes care of the backoff in case there is a problem with the requests.
195214#[ derive( Debug ) ]
196- pub struct UpstreamProjectSource ( FetchProjectState , BroadcastSender < SourceProjectState > ) ;
215+ pub struct UpstreamProjectSource ( FetchProjectState , BroadcastSender < Message > ) ;
197216
198217impl Interface for UpstreamProjectSource { }
199218
200219impl FromMessage < FetchProjectState > for UpstreamProjectSource {
201- type Response = BroadcastResponse < SourceProjectState > ;
220+ type Response = BroadcastResponse < Message > ;
202221
203- fn from_message (
204- message : FetchProjectState ,
205- sender : BroadcastSender < SourceProjectState > ,
206- ) -> Self {
222+ fn from_message ( message : FetchProjectState , sender : BroadcastSender < Message > ) -> Self {
207223 Self ( message, sender)
208224 }
209225}
@@ -271,6 +287,7 @@ impl UpstreamProjectSourceService {
271287 /// Prepares the batches of the cache and nocache channels which could be used to request the
272288 /// project states.
273289 fn prepare_batches ( & mut self ) -> ChannelsBatch {
290+ let now = Instant :: now ( ) ;
274291 let batch_size = self . config . query_batch_size ( ) ;
275292 let num_batches = self . config . max_concurrent_queries ( ) ;
276293
@@ -286,26 +303,31 @@ impl UpstreamProjectSourceService {
286303
287304 let fresh_channels = ( projects. iter ( ) )
288305 . filter_map ( |id| Some ( ( * id, self . state_channels . remove ( id) ?) ) )
289- . filter ( |( id, channel) | {
290- if channel. expired ( ) {
291- metric ! (
292- distribution( RelayDistributions :: ProjectStateAttempts ) = channel. attempts,
293- result = "timeout" ,
294- ) ;
295- metric ! (
296- counter( RelayCounters :: ProjectUpstreamCompleted ) += 1 ,
297- result = "timeout" ,
298- ) ;
299- relay_log:: error!(
300- errors = channel. errors,
301- pending = channel. pending,
302- tags. did_error = channel. errors > 0 ,
303- tags. was_pending = channel. pending > 0 ,
304- tags. project_key = id. to_string( ) ,
305- "error fetching project state {id}: deadline exceeded" ,
306- ) ;
306+ . filter_map ( |( id, channel) | {
307+ if !channel. expired ( now) {
308+ return Some ( ( id, channel) ) ;
307309 }
308- !channel. expired ( )
310+
311+ // Channel is expired, emit telemetry and notify the other end.
312+ metric ! (
313+ distribution( RelayDistributions :: ProjectStateAttempts ) = channel. attempts,
314+ result = "timeout" ,
315+ ) ;
316+ metric ! (
317+ counter( RelayCounters :: ProjectUpstreamCompleted ) += 1 ,
318+ result = "timeout" ,
319+ ) ;
320+ relay_log:: warn!(
321+ errors = channel. errors,
322+ pending = channel. pending,
323+ tags. did_error = channel. errors > 0 ,
324+ tags. was_pending = channel. pending > 0 ,
325+ tags. project_key = %id,
326+ "error fetching project state {id}: deadline exceeded" ,
327+ ) ;
328+ channel. error ( Error :: DeadlineExceeded ) ;
329+
330+ None
309331 } ) ;
310332
311333 // Separate regular channels from those with the `nocache` flag. The latter go in separate
@@ -730,8 +752,8 @@ mod tests {
730752 . await ;
731753
732754 let ( response1, response2) = futures:: future:: join ( response1, response2) . await ;
733- assert ! ( matches!( response1, Ok ( SourceProjectState :: NotModified ) ) ) ;
734- assert ! ( matches!( response2, Ok ( SourceProjectState :: NotModified ) ) ) ;
755+ assert ! ( matches!( response1, Ok ( Ok ( SourceProjectState :: NotModified ) ) ) ) ;
756+ assert ! ( matches!( response2, Ok ( Ok ( SourceProjectState :: NotModified ) ) ) ) ;
735757
736758 // No more messages to upstream expected.
737759 assert ! ( upstream_rx. try_recv( ) . is_err( ) ) ;
0 commit comments