diff --git a/communication/src/allocator/generic.rs b/communication/src/allocator/generic.rs index 605ade7a1..bdec52710 100644 --- a/communication/src/allocator/generic.rs +++ b/communication/src/allocator/generic.rs @@ -93,14 +93,6 @@ impl Allocate for Generic { fn receive(&mut self) { self.receive(); } fn release(&mut self) { self.release(); } fn events(&self) -> &Rc>> { self.events() } - fn await_events(&self, _duration: Option) { - match self { - Generic::Thread(t) => t.await_events(_duration), - Generic::Process(p) => p.await_events(_duration), - Generic::ProcessBinary(pb) => pb.await_events(_duration), - Generic::ZeroCopy(z) => z.await_events(_duration), - } - } } diff --git a/communication/src/allocator/mod.rs b/communication/src/allocator/mod.rs index 0b04c348f..da4088cf7 100644 --- a/communication/src/allocator/mod.rs +++ b/communication/src/allocator/mod.rs @@ -2,7 +2,6 @@ use std::rc::Rc; use std::cell::RefCell; -use std::time::Duration; pub use self::thread::Thread; pub use self::process::Process; @@ -57,14 +56,6 @@ pub trait Allocate { /// into a performance problem. fn events(&self) -> &Rc>>; - /// Awaits communication events. - /// - /// This method may park the current thread, for at most `duration`, - /// until new events arrive. - /// The method is not guaranteed to wait for any amount of time, but - /// good implementations should use this as a hint to park the thread. - fn await_events(&self, _duration: Option) { } - /// Ensure that received messages are surfaced in each channel. /// /// This method should be called to ensure that received messages are diff --git a/communication/src/allocator/process.rs b/communication/src/allocator/process.rs index c255283c2..a5ca34385 100644 --- a/communication/src/allocator/process.rs +++ b/communication/src/allocator/process.rs @@ -4,7 +4,6 @@ use std::rc::Rc; use std::cell::RefCell; use std::sync::{Arc, Mutex}; use std::any::Any; -use std::time::Duration; use std::collections::{HashMap}; use crossbeam_channel::{Sender, Receiver}; @@ -178,10 +177,6 @@ impl Allocate for Process { self.inner.events() } - fn await_events(&self, duration: Option) { - self.inner.await_events(duration); - } - fn receive(&mut self) { let mut events = self.inner.events().borrow_mut(); while let Ok(index) = self.counters_recv.try_recv() { diff --git a/communication/src/allocator/thread.rs b/communication/src/allocator/thread.rs index c957755c5..ab73f285d 100644 --- a/communication/src/allocator/thread.rs +++ b/communication/src/allocator/thread.rs @@ -2,7 +2,6 @@ use std::rc::Rc; use std::cell::RefCell; -use std::time::Duration; use std::collections::VecDeque; use crate::allocator::{Allocate, AllocateBuilder}; @@ -36,16 +35,6 @@ impl Allocate for Thread { fn events(&self) -> &Rc>> { &self.events } - fn await_events(&self, duration: Option) { - if self.events.borrow().is_empty() { - if let Some(duration) = duration { - std::thread::park_timeout(duration); - } - else { - std::thread::park(); - } - } - } } /// Thread-local counting channel push endpoint. diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index f1dda0fb5..71c713fc5 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -271,7 +271,4 @@ impl Allocate for TcpAllocator { fn events(&self) -> &Rc>> { self.inner.events() } - fn await_events(&self, duration: Option) { - self.inner.await_events(duration); - } } diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index d4eed9b43..5958c1bed 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -240,14 +240,4 @@ impl Allocate for ProcessAllocator { fn events(&self) -> &Rc>> { &self.events } - fn await_events(&self, duration: Option) { - if self.events.borrow().is_empty() { - if let Some(duration) = duration { - std::thread::park_timeout(duration); - } - else { - std::thread::park(); - } - } - } } diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 40ed2bfd2..9031e9b5a 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -15,14 +15,14 @@ fn main() { let mut probe = ProbeHandle::new(); // Register timely worker logging. - worker.log_register().insert::("timely", |_time, data| + worker.log_register().unwrap().insert::("timely", |_time, data| data.iter().for_each(|x| println!("LOG1: {:?}", x)) ); // Register timely progress logging. // Less generally useful: intended for debugging advanced custom operators or timely // internals. - worker.log_register().insert::("timely/progress", |_time, data| + worker.log_register().unwrap().insert::("timely/progress", |_time, data| data.iter().for_each(|x| { println!("PROGRESS: {:?}", x); let (_, _, ev) = x; @@ -48,7 +48,7 @@ fn main() { }); // Register timely worker logging. - worker.log_register().insert::("timely", |_time, data| + worker.log_register().unwrap().insert::("timely", |_time, data| data.iter().for_each(|x| println!("LOG2: {:?}", x)) ); @@ -61,13 +61,13 @@ fn main() { }); // Register user-level logging. - worker.log_register().insert::<(),_>("input", |_time, data| + worker.log_register().unwrap().insert::<(),_>("input", |_time, data| for element in data.iter() { println!("Round tick at: {:?}", element.0); } ); - let input_logger = worker.log_register().get::<()>("input").expect("Input logger absent"); + let input_logger = worker.log_register().unwrap().get::<()>("input").expect("Input logger absent"); let timer = std::time::Instant::now(); diff --git a/timely/examples/threadless.rs b/timely/examples/threadless.rs index 24e32d871..e5957ed96 100644 --- a/timely/examples/threadless.rs +++ b/timely/examples/threadless.rs @@ -6,7 +6,7 @@ fn main() { // create a naked single-threaded worker. let allocator = timely::communication::allocator::Thread::default(); - let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator); + let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None); // create input and probe handles. let mut input = InputHandle::new(); diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index e8872e849..d4ba08999 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -70,7 +70,7 @@ where fn peek_identifier(&self) -> usize { self.parent.peek_identifier() } - fn log_register(&self) -> ::std::cell::RefMut> { + fn log_register(&self) -> Option<::std::cell::RefMut>> { self.parent.log_register() } } diff --git a/timely/src/execute.rs b/timely/src/execute.rs index ed0ad99ed..50f3f1787 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -154,7 +154,7 @@ where F: FnOnce(&mut Worker)->T+Send+Sync+'static { let alloc = crate::communication::allocator::thread::Thread::default(); - let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc); + let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc, Some(std::time::Instant::now())); let result = func(&mut worker); while worker.has_dataflows() { worker.step_or_park(None); @@ -320,7 +320,7 @@ where T: Send+'static, F: Fn(&mut Worker<::Allocator>)->T+Send+Sync+'static { initialize_from(builders, others, move |allocator| { - let mut worker = Worker::new(worker_config.clone(), allocator); + let mut worker = Worker::new(worker_config.clone(), allocator, Some(std::time::Instant::now())); let result = func(&mut worker); while worker.has_dataflows() { worker.step_or_park(None); diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 1fdacac1e..c40050cc7 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -179,8 +179,11 @@ where let path = self.path.clone(); let reachability_logging = worker.log_register() - .get::("timely/reachability") - .map(|logger| reachability::logging::TrackerLogger::new(path, logger)); + .as_ref() + .and_then(|l| + l.get::("timely/reachability") + .map(|logger| reachability::logging::TrackerLogger::new(path, logger)) + ); let (tracker, scope_summary) = builder.build(reachability_logging); let progcaster = Progcaster::new(worker, self.path.clone(), self.logging.clone(), self.progress_logging.clone()); @@ -296,10 +299,11 @@ where self.propagate_pointstamps(); { // Enqueue active children; scoped to let borrow drop. + use crate::scheduling::activate::Scheduler; let temp_active = &mut self.temp_active; self.activations .borrow_mut() - .for_extensions(&self.path[..], |index| temp_active.push(Reverse(index))); + .extensions(&self.path[..], temp_active); } // Schedule child operators. diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index 3046f62af..30b2cf9f0 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -8,30 +8,45 @@ use std::time::{Duration, Instant}; use std::cmp::Reverse; use crossbeam_channel::{Sender, Receiver}; -/// Methods required to act as a timely scheduler. +/// Methods required to act as a scheduler for timely operators. /// -/// The core methods are the activation of "paths", sequences of integers, and -/// the enumeration of active paths by prefix. A scheduler may delay the report -/// of a path indefinitely, but it should report at least one extension for the -/// empty path `&[]` or risk parking the worker thread without a certain unpark. +/// Operators are described by "paths" of integers, indicating the path along +/// a tree of regions, arriving at the the operator. Each path is either "idle" +/// or "active", where the latter indicates that someone has requested that the +/// operator be scheduled by the worker. Operators go from idle to active when +/// the `activate(path)` method is called, and from active to idle when the path +/// is returned through a call to `extensions(path, _)`. /// -/// There is no known harm to "spurious wake-ups" where a not-active path is -/// returned through `extensions()`. +/// The worker will continually probe for extensions to the root empty path `[]`, +/// and then follow all returned addresses, recursively. A scheduler need not +/// schedule all active paths, but it should return *some* active path when the +/// worker probes the empty path, or the worker may put the thread to sleep. +/// +/// There is no known harm to scheduling an idle path. +/// The worker may speculatively schedule paths of its own accord. pub trait Scheduler { /// Mark a path as immediately scheduleable. + /// + /// The scheduler is not required to immediately schedule the path, but it + /// should not signal that it has no work until the path has been scheduled. fn activate(&mut self, path: &[usize]); /// Populates `dest` with next identifiers on active extensions of `path`. /// /// This method is where a scheduler is allowed to exercise some discretion, /// in that it does not need to present *all* extensions, but it can instead - /// present only those that the runtime should schedule. - fn extensions(&mut self, path: &[usize], dest: &mut Vec); + /// present only those that the runtime should immediately schedule. + /// + /// The worker *will* schedule all extensions before probing new prefixes. + /// The scheduler is invited to rely on this, and to schedule in "batches", + /// where the next time the worker probes for extensions to the empty path + /// then all addresses in the batch have certainly been scheduled. + fn extensions(&mut self, path: &[usize], dest: &mut BinaryHeap>); } // Trait objects can be schedulers too. impl Scheduler for Box { fn activate(&mut self, path: &[usize]) { (**self).activate(path) } - fn extensions(&mut self, path: &[usize], dest: &mut Vec) { (**self).extensions(path, dest) } + fn extensions(&mut self, path: &[usize], dest: &mut BinaryHeap>) { (**self).extensions(path, dest) } } /// Allocation-free activation tracker. @@ -48,14 +63,14 @@ pub struct Activations { rx: Receiver>, // Delayed activations. - timer: Instant, + timer: Option, queue: BinaryHeap)>>, } impl Activations { /// Creates a new activation tracker. - pub fn new(timer: Instant) -> Self { + pub fn new(timer: Option) -> Self { let (tx, rx) = crossbeam_channel::unbounded(); Self { clean: 0, @@ -77,18 +92,23 @@ impl Activations { /// Schedules a future activation for the task addressed by `path`. pub fn activate_after(&mut self, path: &[usize], delay: Duration) { - // TODO: We could have a minimum delay and immediately schedule anything less than that delay. - if delay == Duration::new(0, 0) { - self.activate(path); - } + if let Some(timer) = self.timer { + // TODO: We could have a minimum delay and immediately schedule anything less than that delay. + if delay == Duration::new(0, 0) { + self.activate(path); + } + else { + let moment = timer.elapsed() + delay; + self.queue.push(Reverse((moment, path.to_vec()))); + } + } else { - let moment = self.timer.elapsed() + delay; - self.queue.push(Reverse((moment, path.to_vec()))); + self.activate(path); } } /// Discards the current active set and presents the next active set. - pub fn advance(&mut self) { + fn advance(&mut self) { // Drain inter-thread activations. while let Ok(path) = self.rx.try_recv() { @@ -96,10 +116,12 @@ impl Activations { } // Drain timer-based activations. - let now = self.timer.elapsed(); - while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) { - let Reverse((_time, path)) = self.queue.pop().unwrap(); - self.activate(&path[..]); + if let Some(timer) = self.timer { + let now = timer.elapsed(); + while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) { + let Reverse((_time, path)) = self.queue.pop().unwrap(); + self.activate(&path[..]); + } } self.bounds.drain(.. self.clean); @@ -121,15 +143,15 @@ impl Activations { self.clean = self.bounds.len(); } - /// Maps a function across activated paths. - pub fn map_active(&self, logic: impl Fn(&[usize])) { - for (offset, length) in self.bounds.iter() { - logic(&self.slices[*offset .. (*offset + *length)]); - } - } - /// Sets as active any symbols that follow `path`. - pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) { + fn for_extensions(&mut self, path: &[usize], mut action: impl FnMut(usize)) { + + // Each call for the root path is a moment where the worker has reset. + // This relies on a worker implementation that follows the scheduling + // instructions perfectly; if any offered paths are not explored, oops. + if path.is_empty() { + self.advance(); + } let position = self.bounds[..self.clean] @@ -170,18 +192,47 @@ impl Activations { /// This method should be used before putting a worker thread to sleep, as it /// indicates the amount of time before the thread should be unparked for the /// next scheduled activation. - pub fn empty_for(&self) -> Option { - if !self.bounds.is_empty() { + fn empty_for(&self) -> Option { + if !self.bounds.is_empty() || self.timer.is_none() { Some(Duration::new(0,0)) } else { self.queue.peek().map(|Reverse((t,_a))| { - let elapsed = self.timer.elapsed(); + let elapsed = self.timer.unwrap().elapsed(); if t < &elapsed { Duration::new(0,0) } else { *t - elapsed } }) } } + + /// Indicates that there is nothing to do for `timeout`, and that the scheduler + /// can allow the thread to sleep until then. + /// + /// The method does not *need* to park the thread, and indeed it may elect to + /// unpark earlier if there are deferred activations. + pub fn park_timeout(&self, timeout: Option) { + let empty_for = self.empty_for(); + let timeout = match (timeout, empty_for) { + (Some(x), Some(y)) => Some(std::cmp::min(x,y)), + (x, y) => x.or(y), + }; + + if let Some(timeout) = timeout { + std::thread::park_timeout(timeout); + } + else { + std::thread::park(); + } + } +} + +impl Scheduler for Activations { + fn activate(&mut self, path: &[usize]) { + self.activate(path); + } + fn extensions(&mut self, path: &[usize], dest: &mut BinaryHeap>) { + self.for_extensions(path, |index| dest.push(Reverse(index))); + } } /// A thread-safe handle to an `Activations`. diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 9459a107b..240c49ac7 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -2,10 +2,11 @@ use std::rc::Rc; use std::cell::{RefCell, RefMut}; +use std::cmp::Reverse; use std::any::Any; use std::str::FromStr; use std::time::{Instant, Duration}; -use std::collections::HashMap; +use std::collections::{HashMap, BinaryHeap}; use std::collections::hash_map::Entry; use std::sync::Arc; @@ -198,26 +199,30 @@ pub trait AsWorker : Scheduler { /// The next worker-unique identifier to be allocated. fn peek_identifier(&self) -> usize; /// Provides access to named logging streams. - fn log_register(&self) -> ::std::cell::RefMut>; + fn log_register(&self) -> Option<::std::cell::RefMut>>; /// Provides access to the timely logging stream. - fn logging(&self) -> Option { self.log_register().get("timely") } + fn logging(&self) -> Option { self.log_register().and_then(|l| l.get("timely")) } } /// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`, /// and has a list of dataflows that it manages. pub struct Worker { config: Config, - timer: Instant, + /// An optional instant from which the start of the computation should be reckoned. + /// + /// If this is set to none, system time-based functionality will be unavailable or work badly. + /// For example, logging will be unavailable, and activation after a delay will be unavailable. + timer: Option, paths: Rc>>>, allocator: Rc>, identifiers: Rc>, // dataflows: Rc>>, dataflows: Rc>>, dataflow_counter: Rc>, - logging: Rc>>, + logging: Option>>>, activations: Rc>, - active_dataflows: Vec, + active_dataflows: BinaryHeap>, // Temporary storage for channel identifiers during dataflow construction. // These are then associated with a dataflow once constructed. @@ -245,7 +250,7 @@ impl AsWorker for Worker { fn new_identifier(&mut self) -> usize { self.new_identifier() } fn peek_identifier(&self) -> usize { self.peek_identifier() } - fn log_register(&self) -> RefMut> { + fn log_register(&self) -> Option>> { self.log_register() } } @@ -258,8 +263,7 @@ impl Scheduler for Worker { impl Worker { /// Allocates a new `Worker` bound to a channel allocator. - pub fn new(config: Config, c: A) -> Worker { - let now = Instant::now(); + pub fn new(config: Config, c: A, now: Option) -> Worker { let index = c.index(); Worker { config, @@ -269,7 +273,7 @@ impl Worker { identifiers: Default::default(), dataflows: Default::default(), dataflow_counter: Default::default(), - logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))), + logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index)))), activations: Rc::new(RefCell::new(Activations::new(now))), active_dataflows: Default::default(), temp_channel_ids: Default::default(), @@ -328,7 +332,7 @@ impl Worker { /// worker.step_or_park(Some(Duration::from_secs(1))); /// }); /// ``` - pub fn step_or_park(&mut self, duration: Option) -> bool { + pub fn step_or_park(&mut self, timeout: Option) -> bool { { // Process channel events. Activate responders. let mut allocator = self.allocator.borrow_mut(); @@ -352,43 +356,41 @@ impl Worker { } } - // Organize activations. - self.activations - .borrow_mut() - .advance(); - - // Consider parking only if we have no pending events, some dataflows, and a non-zero duration. - let empty_for = self.activations.borrow().empty_for(); - // Determine the minimum park duration, where `None` are an absence of a constraint. - let delay = match (duration, empty_for) { - (Some(x), Some(y)) => Some(std::cmp::min(x,y)), - (x, y) => x.or(y), - }; + // Commence a new round of scheduling, starting with dataflows. + // We probe the scheduler for active prefixes, where an empty response + // indicates that the scheduler has no work for us at the moment. + { // Scoped to let borrow of `self.active_dataflows` drop. + use crate::scheduling::activate::Scheduler; + let active_dataflows = &mut self.active_dataflows; + self.activations + .borrow_mut() + .extensions(&[], active_dataflows); + } - if delay != Some(Duration::new(0,0)) { + // If no dataflows are active, there is nothing to do. Consider parking. + if self.active_dataflows.is_empty() { - // Log parking and flush log. - if let Some(l) = self.logging().as_mut() { - l.log(crate::logging::ParkEvent::park(delay)); - l.flush(); - } + // If the timeout is zero, don't bother trying to park. + // More generally, we could put some threshold in here. + if timeout != Some(Duration::new(0, 0)) { + // Log parking and flush log. + if let Some(l) = self.logging().as_mut() { + l.log(crate::logging::ParkEvent::park(timeout)); + l.flush(); + } - self.allocator - .borrow() - .await_events(delay); + // We have just drained `allocator.events()` up above; + // otherwise we should first check it for emptiness. + self.activations.borrow().park_timeout(timeout); - // Log return from unpark. - self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark())); + // Log return from unpark. + self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark())); + } } - else { // Schedule active dataflows. - - let active_dataflows = &mut self.active_dataflows; - self.activations - .borrow_mut() - .for_extensions(&[], |index| active_dataflows.push(index)); + else { // Schedule all active dataflows. let mut dataflows = self.dataflows.borrow_mut(); - for index in active_dataflows.drain(..) { + for Reverse(index) in self.active_dataflows.drain() { // Step dataflow if it exists, remove if not incomplete. if let Entry::Occupied(mut entry) = dataflows.entry(index) { // TODO: This is a moment at which a scheduling decision is being made. @@ -405,7 +407,7 @@ impl Worker { } // Clean up, indicate if dataflows remain. - self.logging.borrow_mut().flush(); + self.logging.as_ref().map(|l| l.borrow_mut().flush()); self.allocator.borrow_mut().release(); !self.dataflows.borrow().is_empty() } @@ -476,7 +478,7 @@ impl Worker { /// /// let index = worker.index(); /// let peers = worker.peers(); - /// let timer = worker.timer(); + /// let timer = worker.timer().unwrap(); /// /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers); /// @@ -491,7 +493,7 @@ impl Worker { /// /// let index = worker.index(); /// let peers = worker.peers(); - /// let timer = worker.timer(); + /// let timer = worker.timer().unwrap(); /// /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers); /// @@ -507,13 +509,13 @@ impl Worker { /// /// let index = worker.index(); /// let peers = worker.peers(); - /// let timer = worker.timer(); + /// let timer = worker.timer().unwrap(); /// /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers); /// /// }); /// ``` - pub fn timer(&self) -> Instant { self.timer } + pub fn timer(&self) -> Option { self.timer } /// Allocate a new worker-unique identifier. /// @@ -537,13 +539,14 @@ impl Worker { /// timely::execute_from_args(::std::env::args(), |worker| { /// /// worker.log_register() + /// .unwrap() /// .insert::("timely", |time, data| /// println!("{:?}\t{:?}", time, data) /// ); /// }); /// ``` - pub fn log_register(&self) -> ::std::cell::RefMut> { - self.logging.borrow_mut() + pub fn log_register(&self) -> Option<::std::cell::RefMut>> { + self.logging.as_ref().map(|l| l.borrow_mut()) } /// Construct a new dataflow. @@ -566,7 +569,7 @@ impl Worker { T: Refines<()>, F: FnOnce(&mut Child)->R, { - let logging = self.logging.borrow_mut().get("timely"); + let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely")); self.dataflow_core("Dataflow", logging, Box::new(()), |_, child| func(child)) } @@ -590,7 +593,7 @@ impl Worker { T: Refines<()>, F: FnOnce(&mut Child)->R, { - let logging = self.logging.borrow_mut().get("timely"); + let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely")); self.dataflow_core(name, logging, Box::new(()), |_, child| func(child)) } @@ -629,7 +632,7 @@ impl Worker { let addr = vec![dataflow_index].into(); let identifier = self.new_identifier(); - let progress_logging = self.logging.borrow_mut().get("timely/progress"); + let progress_logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely/progress")); let subscope = SubgraphBuilder::new_from(addr, logging.clone(), progress_logging.clone(), name); let subscope = RefCell::new(subscope); @@ -726,7 +729,7 @@ impl Clone for Worker { dataflow_counter: self.dataflow_counter.clone(), logging: self.logging.clone(), activations: self.activations.clone(), - active_dataflows: Vec::new(), + active_dataflows: Default::default(), temp_channel_ids: self.temp_channel_ids.clone(), } }