1111
1212use std:: any:: Any ;
1313use std:: cell:: RefCell ;
14- use std:: collections:: { BTreeMap , VecDeque } ;
14+ use std:: collections:: { BTreeMap , BTreeSet , VecDeque } ;
1515use std:: rc:: Rc ;
1616use std:: time:: Duration ;
1717
@@ -113,6 +113,11 @@ pub enum ComputeEvent {
113113 /// Operator index
114114 operator : usize ,
115115 } ,
116+ /// All operators of a dataflow have shut down.
117+ DataflowShutdown {
118+ /// Timely worker index of the dataflow.
119+ dataflow_index : usize ,
120+ } ,
116121}
117122
118123/// A logged peek event.
@@ -176,6 +181,7 @@ pub(super) fn construct<A: Allocate + 'static>(
176181 let ( mut frontier_delay_out, frontier_delay) = demux. new_output ( ) ;
177182 let ( mut peek_out, peek) = demux. new_output ( ) ;
178183 let ( mut peek_duration_out, peek_duration) = demux. new_output ( ) ;
184+ let ( mut shutdown_duration_out, shutdown_duration) = demux. new_output ( ) ;
179185 let ( mut arrangement_heap_size_out, arrangement_heap_size) = demux. new_output ( ) ;
180186 let ( mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux. new_output ( ) ;
181187 let ( mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
@@ -192,6 +198,7 @@ pub(super) fn construct<A: Allocate + 'static>(
192198 let mut frontier_delay = frontier_delay_out. activate ( ) ;
193199 let mut peek = peek_out. activate ( ) ;
194200 let mut peek_duration = peek_duration_out. activate ( ) ;
201+ let mut shutdown_duration = shutdown_duration_out. activate ( ) ;
195202 let mut arrangement_heap_size = arrangement_heap_size_out. activate ( ) ;
196203 let mut arrangement_heap_capacity = arrangement_heap_capacity_out. activate ( ) ;
197204 let mut arrangement_heap_allocations = arrangement_heap_allocations_out. activate ( ) ;
@@ -207,6 +214,7 @@ pub(super) fn construct<A: Allocate + 'static>(
207214 frontier_delay : frontier_delay. session ( & cap) ,
208215 peek : peek. session ( & cap) ,
209216 peek_duration : peek_duration. session ( & cap) ,
217+ shutdown_duration : shutdown_duration. session ( & cap) ,
210218 arrangement_heap_size : arrangement_heap_size. session ( & cap) ,
211219 arrangement_heap_capacity : arrangement_heap_capacity. session ( & cap) ,
212220 arrangement_heap_allocations : arrangement_heap_allocations. session ( & cap) ,
@@ -280,7 +288,13 @@ pub(super) fn construct<A: Allocate + 'static>(
280288 let peek_duration = peek_duration. as_collection ( ) . map ( move |bucket| {
281289 Row :: pack_slice ( & [
282290 Datum :: UInt64 ( u64:: cast_from ( worker_id) ) ,
283- Datum :: UInt64 ( bucket. try_into ( ) . expect ( "pow too big" ) ) ,
291+ Datum :: UInt64 ( bucket. try_into ( ) . expect ( "bucket too big" ) ) ,
292+ ] )
293+ } ) ;
294+ let shutdown_duration = shutdown_duration. as_collection ( ) . map ( move |bucket| {
295+ Row :: pack_slice ( & [
296+ Datum :: UInt64 ( u64:: cast_from ( worker_id) ) ,
297+ Datum :: UInt64 ( bucket. try_into ( ) . expect ( "bucket too big" ) ) ,
284298 ] )
285299 } ) ;
286300
@@ -312,6 +326,7 @@ pub(super) fn construct<A: Allocate + 'static>(
312326 ( FrontierDelay , frontier_delay) ,
313327 ( PeekCurrent , peek_current) ,
314328 ( PeekDuration , peek_duration) ,
329+ ( ShutdownDuration , shutdown_duration) ,
315330 ( ArrangementHeapSize , arrangement_heap_size) ,
316331 ( ArrangementHeapCapacity , arrangement_heap_capacity) ,
317332 ( ArrangementHeapAllocations , arrangement_heap_allocations) ,
@@ -361,6 +376,12 @@ struct DemuxState<A: Allocate> {
361376 export_dataflows : BTreeMap < GlobalId , usize > ,
362377 /// Maps dataflow exports to their imports and frontier delay tracking state.
363378 export_imports : BTreeMap < GlobalId , BTreeMap < GlobalId , FrontierDelayState > > ,
379+ /// Maps live dataflows to counts of their exports.
380+ dataflow_export_counts : BTreeMap < usize , u32 > ,
381+ /// Maps dropped dataflows to their drop time (in ns).
382+ dataflow_drop_times : BTreeMap < usize , u128 > ,
383+ /// Contains dataflows that have shut down but not yet been dropped.
384+ shutdown_dataflows : BTreeSet < usize > ,
364385 /// Maps pending peeks to their installation time (in ns).
365386 peek_stash : BTreeMap < Uuid , u128 > ,
366387 /// Arrangement size stash
@@ -373,6 +394,9 @@ impl<A: Allocate> DemuxState<A> {
373394 worker,
374395 export_dataflows : Default :: default ( ) ,
375396 export_imports : Default :: default ( ) ,
397+ dataflow_export_counts : Default :: default ( ) ,
398+ dataflow_drop_times : Default :: default ( ) ,
399+ shutdown_dataflows : Default :: default ( ) ,
376400 peek_stash : Default :: default ( ) ,
377401 arrangement_size : Default :: default ( ) ,
378402 }
@@ -403,6 +427,7 @@ struct DemuxOutput<'a> {
403427 frontier_delay : OutputSession < ' a , FrontierDelayDatum > ,
404428 peek : OutputSession < ' a , Peek > ,
405429 peek_duration : OutputSession < ' a , u128 > ,
430+ shutdown_duration : OutputSession < ' a , u128 > ,
406431 arrangement_heap_size : OutputSession < ' a , ArrangementHeapDatum > ,
407432 arrangement_heap_capacity : OutputSession < ' a , ArrangementHeapDatum > ,
408433 arrangement_heap_allocations : OutputSession < ' a , ArrangementHeapDatum > ,
@@ -514,6 +539,7 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
514539 ArrangementHeapSizeOperatorDrop { operator } => {
515540 self . handle_arrangement_heap_size_operator_dropped ( operator)
516541 }
542+ DataflowShutdown { dataflow_index } => self . handle_dataflow_shutdown ( dataflow_index) ,
517543 }
518544 }
519545
@@ -524,13 +550,30 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
524550
525551 self . state . export_dataflows . insert ( id, dataflow_id) ;
526552 self . state . export_imports . insert ( id, BTreeMap :: new ( ) ) ;
553+ * self
554+ . state
555+ . dataflow_export_counts
556+ . entry ( dataflow_id)
557+ . or_default ( ) += 1 ;
527558 }
528559
529560 fn handle_export_dropped ( & mut self , id : GlobalId ) {
530561 let ts = self . ts ( ) ;
531562 if let Some ( dataflow_id) = self . state . export_dataflows . remove ( & id) {
532563 let datum = ExportDatum { id, dataflow_id } ;
533564 self . output . export . give ( ( datum, ts, -1 ) ) ;
565+
566+ match self . state . dataflow_export_counts . get_mut ( & dataflow_id) {
567+ entry @ Some ( 0 ) | entry @ None => {
568+ error ! (
569+ export = ?id,
570+ dataflow = ?dataflow_id,
571+ "invalid dataflow_export_counts entry at time of export drop: {entry:?}" ,
572+ ) ;
573+ }
574+ Some ( 1 ) => self . handle_dataflow_dropped ( dataflow_id) ,
575+ Some ( count) => * count -= 1 ,
576+ }
534577 } else {
535578 error ! (
536579 export = ?id,
@@ -564,6 +607,41 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
564607 }
565608 }
566609
610+ fn handle_dataflow_dropped ( & mut self , id : usize ) {
611+ self . state . dataflow_export_counts . remove ( & id) ;
612+
613+ if self . state . shutdown_dataflows . remove ( & id) {
614+ // Dataflow has already shut down before it was dropped.
615+ self . output . shutdown_duration . give ( ( 0 , self . ts ( ) , 1 ) ) ;
616+ } else {
617+ // Dataflow has not yet shut down.
618+ let existing = self
619+ . state
620+ . dataflow_drop_times
621+ . insert ( id, self . time . as_nanos ( ) ) ;
622+ if existing. is_some ( ) {
623+ error ! ( dataflow = ?id, "dataflow already dropped" ) ;
624+ }
625+ }
626+ }
627+
628+ fn handle_dataflow_shutdown ( & mut self , id : usize ) {
629+ if let Some ( start) = self . state . dataflow_drop_times . remove ( & id) {
630+ // Dataflow has alredy been dropped.
631+ let elapsed_ns = self . time . as_nanos ( ) - start;
632+ let elapsed_pow = elapsed_ns. next_power_of_two ( ) ;
633+ self . output
634+ . shutdown_duration
635+ . give ( ( elapsed_pow, self . ts ( ) , 1 ) ) ;
636+ } else {
637+ // Dataflow has not yet been dropped.
638+ let was_new = self . state . shutdown_dataflows . insert ( id) ;
639+ if !was_new {
640+ error ! ( dataflow = ?id, "dataflow already shutdown" ) ;
641+ }
642+ }
643+ }
644+
567645 fn handle_export_dependency ( & mut self , export_id : GlobalId , import_id : GlobalId ) {
568646 let ts = self . ts ( ) ;
569647 let datum = DependencyDatum {
0 commit comments