diff --git a/communication/src/allocator/thread.rs b/communication/src/allocator/thread.rs index c957755c5..054804562 100644 --- a/communication/src/allocator/thread.rs +++ b/communication/src/allocator/thread.rs @@ -94,12 +94,12 @@ impl Pull for Puller { #[inline] fn pull(&mut self) -> &mut Option { let mut borrow = self.source.borrow_mut(); - // if let Some(element) = self.current.take() { - // // TODO : Arbitrary constant. - // if borrow.1.len() < 16 { - // borrow.1.push_back(element); - // } - // } + if let Some(element) = self.current.take() { + // TODO : Arbitrary constant. + if borrow.1.len() < 16 { + borrow.1.push_back(element); + } + } self.current = borrow.0.pop_front(); &mut self.current } diff --git a/container/src/lib.rs b/container/src/lib.rs index 9394f1ec0..0449dc9b9 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -165,6 +165,10 @@ impl> PushInto for CapacityContainerBuil // Maybe flush if self.current.at_capacity() { self.pending.push_back(std::mem::take(&mut self.current)); + if let Some(spare) = self.empty.take() { + self.current = spare; + self.current.clear(); + } } } } @@ -186,9 +190,12 @@ impl ContainerBuilder for CapacityContainerBuild fn finish(&mut self) -> Option<&mut C> { if !self.current.is_empty() { self.pending.push_back(std::mem::take(&mut self.current)); + if let Some(spare) = &mut self.empty { + std::mem::swap(&mut self.current, spare); + self.current.clear(); + } } - self.empty = self.pending.pop_front(); - self.empty.as_mut() + self.extract() } } diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 12d362f3c..cd431ed98 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -99,7 +99,7 @@ fn main() { }); // introduce data and watch! - for round in 0..10 { + for round in 0.. { input.send(WordCountReference { text: "flat container", diff: 1 }); input.advance_to(round + 1); while probe.less_than(input.time()) { @@ -327,9 +327,12 @@ mod builder { fn finish(&mut self) -> Option<&mut Self::Container> { if !self.current.is_empty() { self.pending.push_back(Column::Typed(std::mem::take(&mut self.current))); + if let Some(Column::Typed(spare)) = self.empty.take() { + self.current = spare; + self.current.clear(); + } } - self.empty = self.pending.pop_front(); - self.empty.as_mut() + self.extract() } }