From 76c6cf3f9a71bbc0b40993f456d6411c83635492 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 14 Jul 2025 13:05:04 -0400 Subject: [PATCH 1/2] WIP --- differential-dataflow/examples/columnar.rs | 1501 ++++++++++------- .../implementations/chainless_batcher.rs | 113 ++ .../src/trace/implementations/mod.rs | 1 + .../src/trace/implementations/ord_neu.rs | 33 +- 4 files changed, 1053 insertions(+), 595 deletions(-) create mode 100644 differential-dataflow/src/trace/implementations/chainless_batcher.rs diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index ac6a9e431..d5b9f9d11 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -1,18 +1,16 @@ //! Wordcount based on `columnar`. -use timely::container::{CapacityContainerBuilder, PushInto}; -use timely::dataflow::channels::pact::ExchangeCore; +use timely::container::{ContainerBuilder, PushInto}; +use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::InputHandleCore; use timely::dataflow::ProbeHandle; -use differential_dataflow::trace::implementations::ord_neu::ColKeySpine; - use differential_dataflow::operators::arrange::arrangement::arrange_core; fn main() { - type WordCount = ((String, ()), u64, i64); - type Container = Column; + type WordCount = (Vec, u64, i64); + type Builder = KeyColBuilder; let _config = timely::Config { communication: timely::CommunicationConfig::ProcessBinary(3), @@ -29,8 +27,8 @@ fn main() { // timely::execute(_config, move |worker| { timely::execute_from_args(std::env::args(), move |worker| { - let mut data_input = >>::new(); - let mut keys_input = >>::new(); + let mut data_input = >::new_with_builder(); + let mut keys_input = >::new_with_builder(); let mut probe = ProbeHandle::new(); // create a new input, exchange data, and inspect its output @@ -39,21 +37,17 @@ fn main() { let data = data_input.to_stream(scope); let keys = keys_input.to_stream(scope); - let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); - let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); - - let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); - let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); + let data = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(&data, Pipeline, "Data"); + let keys = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(&keys, Pipeline, "Keys"); - keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + keys.join_core(&data, |_k, (), ()| { Option::<()>::None }) .probe_with(&mut probe); - }); // Resources for placing input data in containers. use std::fmt::Write; let mut buffer = String::default(); - let mut container = Container::default(); + let mut builder = KeyColBuilder::::default(); // Load up data in batches. let mut counter = 0; @@ -63,732 +57,1065 @@ fn main() { while i < size { let val = (counter + i) % keys; write!(buffer, "{:?}", val).unwrap(); - container.push_into(((&buffer, ()), time, 1)); + builder.push_into((buffer.as_bytes(), time, 1)); buffer.clear(); i += worker.peers(); } - data_input.send_batch(&mut container); - container.clear(); + while let Some(container) = builder.finish() { + data_input.send_batch(container); + } counter += size; data_input.advance_to(data_input.time() + 1); keys_input.advance_to(keys_input.time() + 1); while probe.less_than(data_input.time()) { - worker.step(); + worker.step_or_park(None); } } println!("{:?}\tloading complete", timer1.elapsed()); let mut queries = 0; - while queries < 10 * keys { let mut i = worker.index(); let time = *data_input.time(); while i < size { let val = (queries + i) % keys; write!(buffer, "{:?}", val).unwrap(); - container.push_into(((&buffer, ()), time, 1)); + builder.push_into((buffer.as_bytes(), time, 1)); buffer.clear(); i += worker.peers(); } - keys_input.send_batch(&mut container); - container.clear(); + while let Some(container) = builder.finish() { + keys_input.send_batch(container); + } queries += size; data_input.advance_to(data_input.time() + 1); keys_input.advance_to(keys_input.time() + 1); while probe.less_than(data_input.time()) { - worker.step(); + worker.step_or_park(None); } } - println!("{:?}\tqueries complete", timer1.elapsed()); - }) .unwrap(); println!("{:?}\tshut down", timer2.elapsed()); } +pub use layout::{ColumnarLayout, ColumnarUpdate}; +pub mod layout { -pub use container::Column; -mod container { - + use std::fmt::Debug; use columnar::Columnar; - use columnar::Container as FooBozzle; - - use timely::bytes::arc::Bytes; - - /// A container based on a columnar store, encoded in aligned bytes. - pub enum Column { - /// The typed variant of the container. - Typed(C::Container), - /// The binary variant of the container. - Bytes(Bytes), - /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason. - /// - /// Reasons could include misalignment, cloning of data, or wanting - /// to release the `Bytes` as a scarce resource. - Align(Box<[u64]>), + use differential_dataflow::trace::implementations::{Layout, OffsetList}; + use differential_dataflow::difference::Semigroup; + use differential_dataflow::lattice::Lattice; + use timely::progress::Timestamp; + + /// A layout based on columnar + pub struct ColumnarLayout { + phantom: std::marker::PhantomData, } - impl Default for Column { - fn default() -> Self { Self::Typed(Default::default()) } + impl ColumnarUpdate for (K, V, T, R) + where + K: Columnar + Debug + Ord + Clone + 'static, + V: Columnar + Debug + Ord + Clone + 'static, + T: Columnar + Debug + Ord + Default + Clone + Lattice + Timestamp, + R: Columnar + Debug + Ord + Default + Semigroup + 'static, + { + type Key = K; + type Val = V; + type Time = T; + type Diff = R; } - impl> Clone for Column { - fn clone(&self) -> Self { - match self { - Column::Typed(t) => Column::Typed(t.clone()), - Column::Bytes(b) => { - assert!(b.len() % 8 == 0); - let mut alloc: Vec = vec![0; b.len() / 8]; - bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]); - Self::Align(alloc.into()) - }, - Column::Align(a) => Column::Align(a.clone()), - } - } + impl ColumnarUpdate for (K, T, R) + where + K: Columnar + Debug + Ord + Clone + 'static, + T: Columnar + Debug + Ord + Default + Clone + Lattice + Timestamp, + R: Columnar + Debug + Ord + Default + Semigroup + 'static, + { + type Key = K; + type Val = (); + type Time = T; + type Diff = R; } - use columnar::{Clear, Len, Index, FromBytes}; - use columnar::bytes::{EncodeDecode, Indexed}; - use columnar::common::IterOwn; - type BorrowedOf<'a, C> = <::Container as columnar::Container>::Borrowed<'a>; + use crate::arrangement::Coltainer; + impl Layout for ColumnarLayout { + type KeyContainer = Coltainer; + type ValContainer = Coltainer; + type TimeContainer = Coltainer; + type DiffContainer = Coltainer; + type OffsetContainer = OffsetList; + } - impl Column { - pub fn borrow(&self) -> BorrowedOf<'_, C> { - match self { - Column::Typed(t) => t.borrow(), - Column::Bytes(b) => as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))), - Column::Align(a) => as FromBytes>::from_bytes(&mut Indexed::decode(a)), - } - } - pub fn get(&self, index: usize) -> columnar::Ref<'_, C> { - self.borrow().get(index) + /// A type that names constituent update types. + /// + /// We will use their associated `Columnar::Container` + pub trait ColumnarUpdate : Debug + 'static { + type Key: Columnar + Debug + Ord + Clone + 'static; + type Val: Columnar + Debug + Ord + Clone + 'static; + type Time: Columnar + Debug + Ord + Default + Clone + Lattice + Timestamp; + type Diff: Columnar + Debug + Ord + Default + Semigroup + 'static; + } + + /// A container whose references can be ordered. + pub trait OrdContainer : for<'a> columnar::Container : Ord> { } + impl columnar::Container : Ord>> OrdContainer for C { } + +} + +pub use storage::val::ValStorage; +pub use storage::key::KeyStorage; +pub mod storage { + + pub mod val { + + use std::fmt::Debug; + use columnar::{Container, ContainerOf, Index, Len, Push}; + use columnar::Vecs; + + use crate::layout::ColumnarUpdate as Update; + + /// Trie-shaped update storage. + #[derive(Debug)] + pub struct ValStorage { + /// An ordered list of keys. + pub keys: ContainerOf, + /// For each key in `keys`, a list of values. + pub vals: Vecs>, + /// For each val in `vals`, a list of (time, diff) updates. + pub upds: Vecs<(ContainerOf, ContainerOf)>, } - // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. - pub(crate) fn clear(&mut self) { - match self { - Column::Typed(t) => t.clear(), - Column::Bytes(_) => *self = Column::Typed(Default::default()), - Column::Align(_) => *self = Column::Typed(Default::default()), + + impl Default for ValStorage { fn default() -> Self { Self { keys: Default::default(), vals: Default::default(), upds: Default::default(), } } } + impl Clone for ValStorage { fn clone(&self) -> Self { Self { keys: self.keys.clone(), vals: self.vals.clone(), upds: self.upds.clone(), } } } + + pub type Tuple = (::Key, ::Val, ::Time, ::Diff); + + use std::ops::Range; + impl ValStorage { + + /// Forms `Self` from sorted update tuples. + pub fn form<'a>(sorted: impl Iterator>>) -> Self { + + let mut output = Self::default(); + let mut sorted = sorted.peekable(); + + if let Some((key,val,time,diff)) = sorted.next() { + output.keys.push(key); + output.vals.values.push(val); + output.upds.values.push((time, diff)); + for (key,val,time,diff) in sorted { + let mut differs = false; + // We would now iterate over layers. + // We'll do that manually, as the types are all different. + // Keys first; non-standard logic because they are not (yet) a list of lists. + let keys_len = output.keys.len(); + differs |= ContainerOf::::reborrow_ref(key) != output.keys.borrow().get(keys_len-1); + if differs { output.keys.push(key); } + // Vals next + let vals_len = output.vals.values.len(); + if differs { output.vals.bounds.push(vals_len as u64); } + differs |= ContainerOf::::reborrow_ref(val) != output.vals.values.borrow().get(vals_len-1); + if differs { output.vals.values.push(val); } + // Upds last + let upds_len = output.upds.values.len(); + if differs { output.upds.bounds.push(upds_len as u64); } + // differs |= ContainerOf::<(U::Time,U::Diff)>::reborrow_ref((time,diff)) != output.upds.values.borrow().get(upds_len-1); + differs = true; + if differs { output.upds.values.push((time,diff)); } + } + // output.keys.bounds.push(vals_len as u64); + output.vals.bounds.push(output.vals.values.len() as u64); + output.upds.bounds.push(output.upds.values.len() as u64); + } + + assert_eq!(output.keys.len(), output.vals.len()); + assert_eq!(output.vals.values.len(), output.upds.len()); + + output } - } - #[inline] - pub fn len(&self) -> usize { - match self { - Column::Typed(t) => t.len(), - Column::Bytes(b) => as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(), - Column::Align(a) => as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(), + + pub fn vals_bounds(&self, range: Range) -> Range { + if !range.is_empty() { + let lower = if range.start == 0 { 0 } else { Index::get(self.vals.bounds.borrow(), range.start-1) as usize }; + let upper = Index::get(self.vals.bounds.borrow(), range.end-1) as usize; + lower .. upper + } else { range } } - } - } - impl timely::Accountable for Column { - #[inline] fn record_count(&self) -> i64 { self.len() as i64 } - } + pub fn upds_bounds(&self, range: Range) -> Range { + if !range.is_empty() { + let lower = if range.start == 0 { 0 } else { Index::get(self.upds.bounds.borrow(), range.start-1) as usize }; + let upper = Index::get(self.upds.bounds.borrow(), range.end-1) as usize; + lower .. upper + } else { range } + } - impl timely::container::IterContainer for Column { - type ItemRef<'a> = columnar::Ref<'a, C>; - type Iter<'a> = IterOwn>; - fn iter<'a>(&'a self) -> Self::Iter<'a> { - match self { - Column::Typed(t) => t.borrow().into_index_iter(), - Column::Bytes(b) => as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(), - Column::Align(a) => as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(), + /// Copies `other[range]` into self, keys and all. + pub fn extend_from_keys(&mut self, other: &Self, range: Range) { + self.keys.extend_from_self(other.keys.borrow(), range.clone()); + self.vals.extend_from_self(other.vals.borrow(), range.clone()); + self.upds.extend_from_self(other.upds.borrow(), other.vals_bounds(range)); } - } - } - impl timely::container::DrainContainer for Column { - type Item<'a> = columnar::Ref<'a, C>; - type DrainIter<'a> = IterOwn>; - fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { - match self { - Column::Typed(t) => t.borrow().into_index_iter(), - Column::Bytes(b) => as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).into_index_iter(), - Column::Align(a) => as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(), + pub fn extend_from_vals(&mut self, other: &Self, range: Range) { + self.vals.values.extend_from_self(other.vals.values.borrow(), range.clone()); + self.upds.extend_from_self(other.upds.borrow(), range); } } } - use timely::container::SizableContainer; - impl SizableContainer for Column { - fn at_capacity(&self) -> bool { - match self { - Self::Typed(t) => { - let length_in_bytes = Indexed::length_in_bytes(&t.borrow()); - length_in_bytes >= (1 << 20) - }, - Self::Bytes(_) => true, - Self::Align(_) => true, - } + pub mod key { + + use std::fmt::Debug; + use columnar::{Container, ContainerOf, Index, Len, Push}; + use columnar::Vecs; + + use crate::layout::ColumnarUpdate as Update; + + /// Trie-shaped update storage. + #[derive(Debug)] + pub struct KeyStorage { + /// An ordered list of keys. + pub keys: ContainerOf, + /// For each val in `vals`, a list of (time, diff) updates. + pub upds: Vecs<(ContainerOf, ContainerOf)>, } - fn ensure_capacity(&mut self, _stash: &mut Option) { } - } - use timely::container::PushInto; - impl>> PushInto for Column { - #[inline] - fn push_into(&mut self, item: T) { - use columnar::Push; - match self { - Column::Typed(t) => t.push(item), - Column::Align(_) | Column::Bytes(_) => { - // We really oughtn't be calling this in this case. - // We could convert to owned, but need more constraints on `C`. - unimplemented!("Pushing into Column::Bytes without first clearing"); + impl Default for KeyStorage { fn default() -> Self { Self { keys: Default::default(), upds: Default::default(), } } } + impl Clone for KeyStorage { fn clone(&self) -> Self { Self { keys: self.keys.clone(), upds: self.upds.clone(), } } } + + pub type Tuple = (::Key, ::Time, ::Diff); + + use std::ops::Range; + impl KeyStorage { + + /// Forms `Self` from sorted update tuples. + pub fn form<'a>(sorted: impl Iterator>>) -> Self { + + let mut output = Self::default(); + let mut sorted = sorted.peekable(); + + if let Some((key,time,diff)) = sorted.next() { + output.keys.push(key); + output.upds.values.push((time, diff)); + for (key,time,diff) in sorted { + let mut differs = false; + // We would now iterate over layers. + // We'll do that manually, as the types are all different. + // Keys first; non-standard logic because they are not (yet) a list of lists. + let keys_len = output.keys.len(); + differs |= ContainerOf::::reborrow_ref(key) != output.keys.borrow().get(keys_len-1); + if differs { output.keys.push(key); } + // Upds last + let upds_len = output.upds.values.len(); + if differs { output.upds.bounds.push(upds_len as u64); } + // differs |= ContainerOf::<(U::Time,U::Diff)>::reborrow_ref((time,diff)) != output.upds.values.borrow().get(upds_len-1); + differs = true; + if differs { output.upds.values.push((time,diff)); } + } + output.upds.bounds.push(output.upds.values.len() as u64); } - } - } - } - use timely::dataflow::channels::ContainerBytes; - impl ContainerBytes for Column { - fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { - // Our expectation / hope is that `bytes` is `u64` aligned and sized. - // If the alignment is borked, we can relocate. IF the size is borked, - // not sure what we do in that case. - assert!(bytes.len() % 8 == 0); - if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) { - Self::Bytes(bytes) - } - else { - // println!("Re-locating bytes for alignment reasons"); - let mut alloc: Vec = vec![0; bytes.len() / 8]; - bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]); - Self::Align(alloc.into()) + assert_eq!(output.keys.len(), output.upds.len()); + + output } - } - fn length_in_bytes(&self) -> usize { - match self { - // We'll need one u64 for the length, then the length rounded up to a multiple of 8. - Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()), - Column::Bytes(b) => b.len(), - Column::Align(a) => 8 * a.len(), + pub fn upds_bounds(&self, range: Range) -> Range { + if !range.is_empty() { + let lower = if range.start == 0 { 0 } else { Index::get(self.upds.bounds.borrow(), range.start-1) as usize }; + let upper = Index::get(self.upds.bounds.borrow(), range.end-1) as usize; + lower .. upper + } else { range } } - } - fn into_bytes(&self, writer: &mut W) { - match self { - Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(), - Column::Bytes(b) => writer.write_all(b).unwrap(), - Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(), + /// Copies `other[range]` into self, keys and all. + pub fn extend_from_keys(&mut self, other: &Self, range: Range) { + self.keys.extend_from_self(other.keys.borrow(), range.clone()); + self.upds.extend_from_self(other.upds.borrow(), range.clone()); } } } } +mod container { -use builder::ColumnBuilder; -mod builder { - use std::collections::VecDeque; - - use columnar::{Columnar, Clear, Len, Push}; - use columnar::bytes::{EncodeDecode, Indexed}; + mod val { - use super::Column; + use crate::layout::ColumnarUpdate as Update; + use crate::ValStorage; - /// A container builder for `Column`. - pub struct ColumnBuilder { - /// Container that we're writing to. - current: C::Container, - /// Empty allocation. - empty: Option>, - /// Completed containers pending to be sent. - pending: VecDeque>, - } + impl timely::Accountable for ValStorage { + #[inline] fn record_count(&self) -> i64 { use columnar::Len; self.upds.values.len() as i64 } + } - use timely::container::PushInto; - impl>> PushInto for ColumnBuilder { - #[inline] - fn push_into(&mut self, item: T) { - self.current.push(item); - // If there is less than 10% slop with 2MB backing allocations, mint a container. - use columnar::Container; - let words = Indexed::length_in_words(&self.current.borrow()); - let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); - if round - words < round / 10 { - let mut alloc = Vec::with_capacity(words); - Indexed::encode(&mut alloc, &self.current.borrow()); - self.pending.push_back(Column::Align(alloc.into_boxed_slice())); - self.current.clear(); - } + use timely::container::SizableContainer; + impl SizableContainer for ValStorage { + fn at_capacity(&self) -> bool { false } + fn ensure_capacity(&mut self, _stash: &mut Option) { } } - } - impl Default for ColumnBuilder { - fn default() -> Self { - ColumnBuilder { - current: Default::default(), - empty: None, - pending: Default::default(), - } + use timely::dataflow::channels::ContainerBytes; + impl ContainerBytes for ValStorage { + fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } + fn length_in_bytes(&self) -> usize { unimplemented!() } + fn into_bytes(&self, _writer: &mut W) { unimplemented!() } } } - use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; - impl> ContainerBuilder for ColumnBuilder { - type Container = Column; + mod key { - #[inline] - fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(container) = self.pending.pop_front() { - self.empty = Some(container); - self.empty.as_mut() - } else { - None - } + use crate::layout::ColumnarUpdate as Update; + use crate::KeyStorage; + + impl timely::Accountable for KeyStorage { + #[inline] fn record_count(&self) -> i64 { use columnar::Len; self.upds.values.len() as i64 } } - #[inline] - fn finish(&mut self) -> Option<&mut Self::Container> { - if !self.current.is_empty() { - use columnar::Container; - let words = Indexed::length_in_words(&self.current.borrow()); - let mut alloc = Vec::with_capacity(words); - Indexed::encode(&mut alloc, &self.current.borrow()); - self.pending.push_back(Column::Align(alloc.into_boxed_slice())); - self.current.clear(); - } - self.empty = self.pending.pop_front(); - self.empty.as_mut() + use timely::container::SizableContainer; + impl SizableContainer for KeyStorage { + fn at_capacity(&self) -> bool { false } + fn ensure_capacity(&mut self, _stash: &mut Option) { } } - } - impl> LengthPreservingContainerBuilder for ColumnBuilder { } + use timely::dataflow::channels::ContainerBytes; + impl ContainerBytes for KeyStorage { + fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } + fn length_in_bytes(&self) -> usize { unimplemented!() } + fn into_bytes(&self, _writer: &mut W) { unimplemented!() } + } + } } -use batcher::Col2KeyBatcher; +pub use column_builder::{val::ValBuilder as ValColBuilder, key::KeyBuilder as KeyColBuilder}; +mod column_builder { -/// Types for consolidating, merging, and extracting columnar update collections. -pub mod batcher { + pub mod val { - use std::collections::VecDeque; - use columnar::Columnar; - use timely::Container; - use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer}; - use differential_dataflow::difference::Semigroup; - use crate::Column; + use std::collections::VecDeque; + use columnar::{Columnar, Clear, Len, Push}; - use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; + use crate::layout::ColumnarUpdate as Update; + use crate::ValStorage; - /// A batcher for columnar storage. - pub type Col2ValBatcher = MergeBatcher, Chunker>, merger::ColumnMerger<(K,V),T,R>>; - pub type Col2KeyBatcher = Col2ValBatcher; - - // First draft: build a "chunker" and a "merger". - - #[derive(Default)] - pub struct Chunker { - /// Buffer into which we'll consolidate. - /// - /// Also the buffer where we'll stage responses to `extract` and `finish`. - /// When these calls return, the buffer is available for reuse. - empty: C, - /// Consolidated buffers ready to go. - ready: VecDeque, - } + type TupleContainer = <(::Key, ::Val, ::Time, ::Diff) as Columnar>::Container; - impl ContainerBuilder for Chunker { - type Container = C; + /// A container builder for `Column`. + pub struct ValBuilder { + /// Container that we're writing to. + current: TupleContainer, + /// Empty allocation. + empty: Option>, + /// Completed containers pending to be sent. + pending: VecDeque>, + } - fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(ready) = self.ready.pop_front() { - self.empty = ready; - Some(&mut self.empty) - } else { - None + use timely::container::PushInto; + impl PushInto for ValBuilder where TupleContainer : Push { + #[inline] + fn push_into(&mut self, item: T) { + self.current.push(item); + if self.current.len() > 1024 { + // TODO: Consolidate the batch? + use columnar::{Container, Index}; + let mut refs = self.current.borrow().into_index_iter().collect::>(); + refs.sort(); + let storage = ValStorage::form(refs.into_iter()); + self.pending.push_back(storage); + self.current.clear(); + } } } - fn finish(&mut self) -> Option<&mut Self::Container> { - self.extract() + impl Default for ValBuilder { + fn default() -> Self { + ValBuilder { + current: Default::default(), + empty: None, + pending: Default::default(), + } + } } - } - impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker - where - D: for<'b> Columnar, - for<'b> columnar::Ref<'b, D>: Ord, - T: for<'b> Columnar, - for<'b> columnar::Ref<'b, T>: Ord, - R: for<'b> Columnar + for<'b> Semigroup>, - for<'b> columnar::Ref<'b, R>: Ord, - C2: Container + SizableContainer + for<'b, 'c> PushInto<(columnar::Ref<'b, D>, columnar::Ref<'b, T>, &'c R)>, - { - fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) { - let mut target: C2 = Default::default(); - target.ensure_capacity(&mut Some(std::mem::take(&mut self.empty))); - - // Scoped to let borrow through `permutation` drop. - { - // Sort input data - // TODO: consider `Vec` that we retain, containing indexes. - let mut permutation = Vec::with_capacity(container.len()); - permutation.extend(container.drain()); - permutation.sort(); - - // Iterate over the data, accumulating diffs for like keys. - let mut iter = permutation.drain(..); - if let Some((data, time, diff)) = iter.next() { - - let mut prev_data = data; - let mut prev_time = time; - let mut prev_diff = ::into_owned(diff); - - for (data, time, diff) in iter { - if (&prev_data, &prev_time) == (&data, &time) { - prev_diff.plus_equals(&diff); - } - else { - if !prev_diff.is_zero() { - let tuple = (prev_data, prev_time, &prev_diff); - target.push_into(tuple); - } - prev_data = data; - prev_time = time; - prev_diff = ::into_owned(diff); - } - } + use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; + impl ContainerBuilder for ValBuilder { + type Container = ValStorage; - if !prev_diff.is_zero() { - let tuple = (prev_data, prev_time, &prev_diff); - target.push_into(tuple); - } + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(container) = self.pending.pop_front() { + self.empty = Some(container); + self.empty.as_mut() + } else { + None } } - if !target.is_empty() { - self.ready.push_back(target); + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.current.is_empty() { + // TODO: Consolidate the batch? + use columnar::{Container, Index}; + let mut refs = self.current.borrow().into_index_iter().collect::>(); + refs.sort(); + let storage = ValStorage::form(refs.into_iter()); + self.pending.push_back(storage); + self.current.clear(); + } + self.empty = self.pending.pop_front(); + self.empty.as_mut() } + + // fn partition(container: &mut Self::Container, builders: &mut [Self], mut index: I) + // where + // Self: for<'a> PushInto<::Item<'a>>, + // I: for<'a> FnMut(&::Item<'a>) -> usize, + // { + // println!("Exchanging!"); + // for datum in container.drain() { + // let index = index(&datum); + // builders[index].push_into(datum); + // } + // container.clear(); + // } + } - } - /// Implementations of `ContainerQueue` and `MergerChunk` for `Column` containers (columnar). - pub mod merger { + impl LengthPreservingContainerBuilder for ValBuilder { } + } - use timely::progress::{Antichain, frontier::AntichainRef}; - use columnar::Columnar; - use timely::container::PushInto; - use crate::container::Column; - use differential_dataflow::difference::Semigroup; + pub mod key { - use differential_dataflow::trace::implementations::merge_batcher::container::{ContainerQueue, MergerChunk}; - use differential_dataflow::trace::implementations::merge_batcher::container::ContainerMerger; + use std::collections::VecDeque; + use columnar::{Columnar, Clear, Len, Push}; - /// A `Merger` implementation backed by `Column` containers (Columnar). - pub type ColumnMerger = ContainerMerger,ColumnQueue<(D, T, R)>>; + use crate::layout::ColumnarUpdate as Update; + use crate::KeyStorage; + type TupleContainer = <(::Key, ::Time, ::Diff) as Columnar>::Container; - /// TODO - pub struct ColumnQueue { - list: Column, - head: usize, + /// A container builder for `Column`. + pub struct KeyBuilder { + /// Container that we're writing to. + current: TupleContainer, + /// Empty allocation. + empty: Option>, + /// Completed containers pending to be sent. + pending: VecDeque>, } - impl ContainerQueue> for ColumnQueue<(D, T, R)> - where - D: for<'a> Columnar, - for<'b> columnar::Ref<'b, D>: Ord, - T: for<'a> Columnar, - for<'b> columnar::Ref<'b, T>: Ord, - R: Columnar, - { - fn next_or_alloc(&mut self) -> Result, Column<(D, T, R)>> { - if self.is_empty() { - Err(std::mem::take(&mut self.list)) - } - else { - Ok(self.pop()) + use timely::container::PushInto; + impl PushInto for KeyBuilder where TupleContainer : Push { + #[inline] + fn push_into(&mut self, item: T) { + self.current.push(item); + if self.current.len() > 1024 { + // TODO: Consolidate the batch? + use columnar::{Container, Index}; + let mut refs = self.current.borrow().into_index_iter().collect::>(); + refs.sort(); + let storage = KeyStorage::form(refs.into_iter()); + self.pending.push_back(storage); + self.current.clear(); } } - fn is_empty(&self) -> bool { - self.head == self.list.len() - } - fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { - let (data1, time1, _) = self.peek(); - let (data2, time2, _) = other.peek(); - - (data1, time1).cmp(&(data2, time2)) - } - fn from(list: Column<(D, T, R)>) -> Self { - ColumnQueue { list, head: 0 } - } } - impl ColumnQueue { - fn pop(&mut self) -> columnar::Ref<'_, T> { - self.head += 1; - self.list.get(self.head - 1) - } + impl Default for KeyBuilder { fn default() -> Self { KeyBuilder { current: Default::default(), empty: None, pending: Default::default(), } } } - fn peek(&self) -> columnar::Ref<'_, T> { - self.list.get(self.head) - } - } + use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; + impl ContainerBuilder for KeyBuilder { + type Container = KeyStorage; - impl MergerChunk for Column<(D, T, R)> - where - D: Columnar + 'static, - T: timely::PartialOrder + Clone + Columnar + 'static, - R: Default + Semigroup + Columnar + 'static - { - type TimeOwned = T; - type DiffOwned = R; - - fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { - let time = T::into_owned(*time); - if upper.less_equal(&time) { - frontier.insert(time); - true + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(container) = self.pending.pop_front() { + self.empty = Some(container); + self.empty.as_mut() + } else { + None } - else { false } - } - fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) { - let (data, time, diff1) = item1; - let (_data, _time, diff2) = item2; - stash.copy_from(diff1); - let stash2: R = R::into_owned(diff2); - stash.plus_equals(&stash2); - if !stash.is_zero() { - self.push_into((data, time, &*stash)); + } + + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.current.is_empty() { + // TODO: Consolidate the batch? + use columnar::{Container, Index}; + let mut refs = self.current.borrow().into_index_iter().collect::>(); + refs.sort(); + let storage = KeyStorage::form(refs.into_iter()); + self.pending.push_back(storage); + self.current.clear(); } + self.empty = self.pending.pop_front(); + self.empty.as_mut() } - fn account(&self) -> (usize, usize, usize, usize) { - (0, 0, 0, 0) - // unimplemented!() - // use timely::Container; - // let (mut size, mut capacity, mut allocations) = (0, 0, 0); - // let cb = |siz, cap| { - // size += siz; - // capacity += cap; - // allocations += 1; - // }; - // self.heap_size(cb); - // (self.len(), size, capacity, allocations) - } - #[inline] fn clear(&mut self) { self.clear() } + + // fn partition(container: &mut Self::Container, builders: &mut [Self], mut index: I) + // where + // Self: for<'a> PushInto<::Item<'a>>, + // I: for<'a> FnMut(&::Item<'a>) -> usize, + // { + // println!("Exchanging!"); + // for datum in container.drain() { + // let index = index(&datum); + // builders[index].push_into(datum); + // } + // container.clear(); + // } + } - } + impl LengthPreservingContainerBuilder for KeyBuilder { } + } } -use dd_builder::ColKeyBuilder; +pub use arrangement::{ValBatcher, ValBuilder, ValSpine, KeyBatcher, KeyBuilder, KeySpine}; +pub mod arrangement { -pub mod dd_builder { + use std::rc::Rc; + use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdKeyBatch}; + use differential_dataflow::trace::rc_blanket_impls::RcBuilder; + use differential_dataflow::trace::implementations::spine_fueled::Spine; - use columnar::Columnar; - use timely::container::DrainContainer; - use differential_dataflow::trace::Builder; - use differential_dataflow::trace::Description; - use differential_dataflow::trace::implementations::Layout; - use differential_dataflow::trace::implementations::layout; - use differential_dataflow::trace::implementations::BatchContainer; - use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, val_batch::OrdValStorage, OrdKeyBatch, Vals, Upds, layers::UpdsBuilder}; - use differential_dataflow::trace::implementations::ord_neu::key_batch::OrdKeyStorage; - use crate::Column; + use crate::layout::ColumnarLayout; + /// A trace implementation backed by columnar storage. + pub type ValSpine = Spine>>>; + /// A batcher for columnar storage. + pub type ValBatcher = ValBatcher2<(K,V,T,R)>; + /// A builder for columnar storage. + pub type ValBuilder = RcBuilder>; + + /// A trace implementation backed by columnar storage. + pub type KeySpine = Spine>>>; + /// A batcher for columnar storage + pub type KeyBatcher = KeyBatcher2<(K,T,R)>; + /// A builder for columnar storage + pub type KeyBuilder = RcBuilder>; + + /// A batch container implementation for Column. + pub use batch_container::Coltainer; + pub mod batch_container { + + use columnar::{Columnar, Container, Clear, Push, Index, Len}; + use differential_dataflow::trace::implementations::BatchContainer; + + /// Container, anchored by `C` to provide an owned type. + pub struct Coltainer { + pub container: C::Container, + } - use differential_dataflow::trace::rc_blanket_impls::RcBuilder; - use differential_dataflow::trace::implementations::TStack; - - pub type ColValBuilder = RcBuilder>>; - pub type ColKeyBuilder = RcBuilder>>; - - /// A builder for creating layers from unsorted update tuples. - pub struct OrdValBuilder { - /// The in-progress result. - /// - /// This is public to allow container implementors to set and inspect their container. - pub result: OrdValStorage, - staging: UpdsBuilder, - } + impl Default for Coltainer { + fn default() -> Self { Self { container: Default::default() } } + } - // The layout `L` determines the key, val, time, and diff types. - impl Builder for OrdValBuilder - where - L: Layout, - layout::Key: Columnar, - layout::Val: Columnar, - layout::Time: Columnar, - layout::Diff: Columnar, - { - type Input = Column<((layout::Key,layout::Val),layout::Time,layout::Diff)>; - type Time = layout::Time; - type Output = OrdValBatch; - - fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { - Self { - result: OrdValStorage { - keys: L::KeyContainer::with_capacity(keys), - vals: Vals::with_capacity(keys + 1, vals), - upds: Upds::with_capacity(vals + 1, upds), - }, - staging: UpdsBuilder::default(), - } + impl BatchContainer for Coltainer where for<'a> columnar::Ref<'a, C> : Ord { + + type ReadItem<'a> = columnar::Ref<'a, C>; + type Owned = C; + + #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { C::into_owned(item) } + #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.copy_from(item) } + + #[inline(always)] fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.container.push(item) } + #[inline(always)] fn push_own(&mut self, item: &Self::Owned) { self.container.push(item) } + + /// Clears the container. May not release resources. + fn clear(&mut self) { self.container.clear() } + + /// Creates a new container with sufficient capacity. + fn with_capacity(_size: usize) -> Self { Self::default() } + /// Creates a new container with sufficient capacity. + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + Self { + container: ::Container::with_capacity_for([cont1.container.borrow(), cont2.container.borrow()].into_iter()), + } + } + + /// Converts a read item into one with a narrower lifetime. + #[inline(always)] fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { columnar::ContainerOf::::reborrow_ref(item) } + + /// Reference to the element at this position. + #[inline(always)] fn index(&self, index: usize) -> Self::ReadItem<'_> { self.container.borrow().get(index) } + + #[inline(always)] fn len(&self) -> usize { self.container.len() } } + } - #[inline] - fn push(&mut self, chunk: &mut Self::Input) { - // NB: Maintaining owned key and val across iterations to track the "last", which we clone into, - // is somewhat appealing from an ease point of view. Might still allocate, do work we don't need, - // but avoids e.g. calls into `last()` and breaks horrid trait requirements. - // Owned key and val would need to be members of `self`, as this method can be called multiple times, - // and we need to correctly cache last for reasons of correctness, not just performance. - - let mut key_con = L::KeyContainer::with_capacity(1); - let mut val_con = L::ValContainer::with_capacity(1); - - for ((key,val),time,diff) in chunk.drain() { - // It would be great to avoid. - let key = as Columnar>::into_owned(key); - let val = as Columnar>::into_owned(val); - // These feel fine (wrt the other versions) - let time = as Columnar>::into_owned(time); - let diff = as Columnar>::into_owned(diff); - - key_con.clear(); key_con.push_own(&key); - val_con.clear(); val_con.push_own(&val); - - // Pre-load the first update. - if self.result.keys.is_empty() { - self.result.vals.vals.push_own(&val); - self.result.keys.push_own(&key); - self.staging.push(time, diff); + use crate::{ColumnarUpdate, ValStorage, KeyStorage}; + use differential_dataflow::trace::implementations::chainless_batcher as chainless; + type ValBatcher2 = chainless::Batcher<::Time, ValStorage>; + type KeyBatcher2 = chainless::Batcher<::Time, KeyStorage>; + pub mod batcher { + + use std::ops::Range; + use columnar::{Columnar, Container, Index, Len, Push}; + use differential_dataflow::trace::implementations::chainless_batcher as chainless; + use differential_dataflow::difference::{Semigroup, IsZero}; + use timely::progress::frontier::{Antichain, AntichainRef}; + + use crate::ColumnarUpdate as Update; + use crate::{ValStorage, KeyStorage}; + + impl chainless::BatcherStorage for ValStorage { + + fn len(&self) -> usize { self.upds.values.len() } + + fn merge(self, other: Self) -> Self { + + let mut this_sum = U::Diff::default(); + let mut that_sum = U::Diff::default(); + + let mut merged = Self::default(); + let this = self; + let that = other; + let mut this_key_range = 0 .. this.keys.len(); + let mut that_key_range = 0 .. that.keys.len(); + while !this_key_range.is_empty() && !that_key_range.is_empty() { + let this_key = this.keys.borrow().get(this_key_range.start); + let that_key = that.keys.borrow().get(that_key_range.start); + match this_key.cmp(&that_key) { + std::cmp::Ordering::Less => { + let lower = this_key_range.start; + gallop(this.keys.borrow(), &mut this_key_range, |x| x < that_key); + merged.extend_from_keys(&this, lower .. this_key_range.start); + }, + std::cmp::Ordering::Equal => { + // keys are equal; must make a bespoke vals list. + // only push the key if merged.vals.values.len() advances. + let values_len = merged.vals.values.len(); + let mut this_val_range = this.vals_bounds(this_key_range.start .. this_key_range.start+1); + let mut that_val_range = that.vals_bounds(that_key_range.start .. that_key_range.start+1); + while !this_val_range.is_empty() && !that_val_range.is_empty() { + let this_val = this.vals.values.borrow().get(this_val_range.start); + let that_val = that.vals.values.borrow().get(that_val_range.start); + match this_val.cmp(&that_val) { + std::cmp::Ordering::Less => { + let lower = this_val_range.start; + gallop(this.vals.values.borrow(), &mut this_val_range, |x| x < that_val); + merged.extend_from_vals(&this, lower .. this_val_range.start); + }, + std::cmp::Ordering::Equal => { + // vals are equal; must make a bespoke upds list. + // only push the val if merged.upds.values.len() advances. + let updates_len = merged.upds.values.len(); + let mut this_upd_range = this.upds_bounds(this_val_range.start .. this_val_range.start+1); + let mut that_upd_range = that.upds_bounds(that_val_range.start .. that_val_range.start+1); + + while !this_upd_range.is_empty() && !that_upd_range.is_empty() { + let (this_time, this_diff) = this.upds.values.borrow().get(this_upd_range.start); + let (that_time, that_diff) = that.upds.values.borrow().get(that_upd_range.start); + match this_time.cmp(&that_time) { + std::cmp::Ordering::Less => { + let lower = this_upd_range.start; + gallop(this.upds.values.0.borrow(), &mut this_upd_range, |x| x < that_time); + merged.upds.values.extend_from_self(this.upds.values.borrow(), lower .. this_upd_range.start); + }, + std::cmp::Ordering::Equal => { + // times are equal; must add diffs. + this_sum.copy_from(this_diff); + that_sum.copy_from(that_diff); + this_sum.plus_equals(&that_sum); + if !this_sum.is_zero() { merged.upds.values.push((this_time, &this_sum)); } + // Advance the update ranges by one. + this_upd_range.start += 1; + that_upd_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_upd_range.start; + gallop(that.upds.values.0.borrow(), &mut that_upd_range, |x| x < this_time); + merged.upds.values.extend_from_self(that.upds.values.borrow(), lower .. that_upd_range.start); + }, + } + } + // Extend with the remaining this and that updates. + merged.upds.values.extend_from_self(this.upds.values.borrow(), this_upd_range); + merged.upds.values.extend_from_self(that.upds.values.borrow(), that_upd_range); + // Seal the updates and push the val. + if merged.upds.values.len() > updates_len { + merged.upds.bounds.push(merged.upds.values.len() as u64); + merged.vals.values.push(this_val); + } + // Advance the val ranges by one. + this_val_range.start += 1; + that_val_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_val_range.start; + gallop(that.vals.values.borrow(), &mut that_val_range, |x| x < this_val); + merged.extend_from_vals(&that, lower .. that_val_range.start); + }, + } + } + // Extend with the remaining this and that values. + merged.extend_from_vals(&this, this_val_range); + merged.extend_from_vals(&that, that_val_range); + // Seal the values and push the key. + if merged.vals.values.len() > values_len { + merged.vals.bounds.push(merged.vals.values.len() as u64); + merged.keys.push(this_key); + } + // Advance the key ranges by one. + this_key_range.start += 1; + that_key_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_key_range.start; + gallop(that.keys.borrow(), &mut that_key_range, |x| x < this_key); + merged.extend_from_keys(&that, lower .. that_key_range.start); + }, + } } - // Perhaps this is a continuation of an already received key. - else if self.result.keys.last() == key_con.get(0) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.vals.last() == val_con.get(0) { - self.staging.push(time, diff); - } else { - // New value; complete representation of prior value. - self.staging.seal(&mut self.result.upds); - self.staging.push(time, diff); - self.result.vals.vals.push_own(&val); + // Extend with the remaining this and that keys. + merged.extend_from_keys(&this, this_key_range); + merged.extend_from_keys(&that, that_key_range); + + merged + } + + fn split(&mut self, frontier: AntichainRef) -> Self { + // Unfortunately the times are at the leaves, so there can be no bulk copying. + let mut ship = Self::default(); + let mut keep = Self::default(); + let mut time = U::Time::default(); + for key_idx in 0 .. self.keys.len() { + let key = self.keys.borrow().get(key_idx); + let keep_vals_len = keep.vals.values.len(); + let ship_vals_len = ship.vals.values.len(); + for val_idx in self.vals_bounds(key_idx..key_idx+1) { + let val = self.vals.values.borrow().get(val_idx); + let keep_upds_len = keep.upds.values.len(); + let ship_upds_len = ship.upds.values.len(); + for upd_idx in self.upds_bounds(val_idx..val_idx+1) { + let (t, diff) = self.upds.values.borrow().get(upd_idx); + time.copy_from(t); + if frontier.less_equal(&time) { + keep.upds.values.push((t, diff)); + } + else { + ship.upds.values.push((t, diff)); + } + } + if keep.upds.values.len() > keep_upds_len { + keep.upds.bounds.push(keep.upds.values.len() as u64); + keep.vals.values.push(val); + } + if ship.upds.values.len() > ship_upds_len { + ship.upds.bounds.push(ship.upds.values.len() as u64); + ship.vals.values.push(val); + } + } + if keep.vals.values.len() > keep_vals_len { + keep.vals.bounds.push(keep.vals.values.len() as u64); + keep.keys.push(key); + } + if ship.vals.values.len() > ship_vals_len { + ship.vals.bounds.push(ship.vals.values.len() as u64); + ship.keys.push(key); } - } else { - // New key; complete representation of prior key. - self.staging.seal(&mut self.result.upds); - self.staging.push(time, diff); - self.result.vals.offs.push_ref(self.result.vals.len()); - self.result.vals.vals.push_own(&val); - self.result.keys.push_own(&key); } + + *self = keep; + ship } - } - #[inline(never)] - fn done(mut self, description: Description) -> OrdValBatch { - self.staging.seal(&mut self.result.upds); - self.result.vals.offs.push_ref(self.result.vals.len()); - OrdValBatch { - updates: self.staging.total(), - storage: self.result, - description, + fn lower(&self, frontier: &mut Antichain) { + use columnar::Columnar; + let mut times = self.upds.values.0.borrow().into_index_iter(); + if let Some(time_ref) = times.next() { + let mut time = ::into_owned(time_ref); + frontier.insert_ref(&time); + for time_ref in times { + ::copy_from(&mut time, time_ref); + frontier.insert_ref(&time); + } + } } } - fn seal(chain: &mut Vec, description: Description) -> Self::Output { - // let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]); - // let mut builder = Self::with_capacity(keys, vals, upds); - let mut builder = Self::with_capacity(0, 0, 0); - for mut chunk in chain.drain(..) { - builder.push(&mut chunk); + impl chainless::BatcherStorage for KeyStorage { + + fn len(&self) -> usize { self.upds.values.len() } + + fn merge(self, other: Self) -> Self { + + let mut this_sum = U::Diff::default(); + let mut that_sum = U::Diff::default(); + + let mut merged = Self::default(); + let this = self; + let that = other; + let mut this_key_range = 0 .. this.keys.len(); + let mut that_key_range = 0 .. that.keys.len(); + while !this_key_range.is_empty() && !that_key_range.is_empty() { + let this_key = this.keys.borrow().get(this_key_range.start); + let that_key = that.keys.borrow().get(that_key_range.start); + match this_key.cmp(&that_key) { + std::cmp::Ordering::Less => { + let lower = this_key_range.start; + gallop(this.keys.borrow(), &mut this_key_range, |x| x < that_key); + merged.extend_from_keys(&this, lower .. this_key_range.start); + }, + std::cmp::Ordering::Equal => { + // keys are equal; must make a bespoke vals list. + // only push the key if merged.vals.values.len() advances. + let updates_len = merged.upds.values.len(); + let mut this_upd_range = this.upds_bounds(this_key_range.start .. this_key_range.start+1); + let mut that_upd_range = that.upds_bounds(that_key_range.start .. that_key_range.start+1); + + while !this_upd_range.is_empty() && !that_upd_range.is_empty() { + let (this_time, this_diff) = this.upds.values.borrow().get(this_upd_range.start); + let (that_time, that_diff) = that.upds.values.borrow().get(that_upd_range.start); + match this_time.cmp(&that_time) { + std::cmp::Ordering::Less => { + let lower = this_upd_range.start; + gallop(this.upds.values.0.borrow(), &mut this_upd_range, |x| x < that_time); + merged.upds.values.extend_from_self(this.upds.values.borrow(), lower .. this_upd_range.start); + }, + std::cmp::Ordering::Equal => { + // times are equal; must add diffs. + this_sum.copy_from(this_diff); + that_sum.copy_from(that_diff); + this_sum.plus_equals(&that_sum); + if !this_sum.is_zero() { merged.upds.values.push((this_time, &this_sum)); } + // Advance the update ranges by one. + this_upd_range.start += 1; + that_upd_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_upd_range.start; + gallop(that.upds.values.0.borrow(), &mut that_upd_range, |x| x < this_time); + merged.upds.values.extend_from_self(that.upds.values.borrow(), lower .. that_upd_range.start); + }, + } + } + // Extend with the remaining this and that updates. + merged.upds.values.extend_from_self(this.upds.values.borrow(), this_upd_range); + merged.upds.values.extend_from_self(that.upds.values.borrow(), that_upd_range); + // Seal the values and push the key. + if merged.upds.values.len() > updates_len { + merged.upds.bounds.push(merged.upds.values.len() as u64); + merged.keys.push(this_key); + } + // Advance the key ranges by one. + this_key_range.start += 1; + that_key_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_key_range.start; + gallop(that.keys.borrow(), &mut that_key_range, |x| x < this_key); + merged.extend_from_keys(&that, lower .. that_key_range.start); + }, + } + } + // Extend with the remaining this and that keys. + merged.extend_from_keys(&this, this_key_range); + merged.extend_from_keys(&that, that_key_range); + + merged } - builder.done(description) - } - } + fn split(&mut self, frontier: AntichainRef) -> Self { + // Unfortunately the times are at the leaves, so there can be no bulk copying. + let mut ship = Self::default(); + let mut keep = Self::default(); + let mut time = U::Time::default(); + for key_idx in 0 .. self.keys.len() { + let key = self.keys.borrow().get(key_idx); + let keep_upds_len = keep.upds.values.len(); + let ship_upds_len = ship.upds.values.len(); + for upd_idx in self.upds_bounds(key_idx..key_idx+1) { + let (t, diff) = self.upds.values.borrow().get(upd_idx); + time.copy_from(t); + if frontier.less_equal(&time) { + keep.upds.values.push((t, diff)); + } + else { + ship.upds.values.push((t, diff)); + } + } + if keep.upds.values.len() > keep_upds_len { + keep.upds.bounds.push(keep.upds.values.len() as u64); + keep.keys.push(key); + } + if ship.upds.values.len() > ship_upds_len { + ship.upds.bounds.push(ship.upds.values.len() as u64); + ship.keys.push(key); + } + } - /// A builder for creating layers from unsorted update tuples. - pub struct OrdKeyBuilder { - /// The in-progress result. - /// - /// This is public to allow container implementors to set and inspect their container. - pub result: OrdKeyStorage, - staging: UpdsBuilder, - } + *self = keep; + ship + } - // The layout `L` determines the key, val, time, and diff types. - impl Builder for OrdKeyBuilder - where - L: Layout, - layout::Key: Columnar, - layout::Val: Columnar, - layout::Time: Columnar, - layout::Diff: Columnar, - { - type Input = Column<((layout::Key,layout::Val),layout::Time,layout::Diff)>; - type Time = layout::Time; - type Output = OrdKeyBatch; - - fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self { - Self { - result: OrdKeyStorage { - keys: L::KeyContainer::with_capacity(keys), - upds: Upds::with_capacity(keys + 1, upds), - }, - staging: UpdsBuilder::default(), + fn lower(&self, frontier: &mut Antichain) { + use columnar::Columnar; + let mut times = self.upds.values.0.borrow().into_index_iter(); + if let Some(time_ref) = times.next() { + let mut time = ::into_owned(time_ref); + frontier.insert_ref(&time); + for time_ref in times { + ::copy_from(&mut time, time_ref); + frontier.insert_ref(&time); + } + } } } - #[inline] - fn push(&mut self, chunk: &mut Self::Input) { - // NB: Maintaining owned key and val across iterations to track the "last", which we clone into, - // is somewhat appealing from an ease point of view. Might still allocate, do work we don't need, - // but avoids e.g. calls into `last()` and breaks horrid trait requirements. - // Owned key and val would need to be members of `self`, as this method can be called multiple times, - // and we need to correctly cache last for reasons of correctness, not just performance. - - let mut key_con = L::KeyContainer::with_capacity(1); - - for ((key,_val),time,diff) in chunk.drain() { - // It would be great to avoid. - let key = as Columnar>::into_owned(key); - // These feel fine (wrt the other versions) - let time = as Columnar>::into_owned(time); - let diff = as Columnar>::into_owned(diff); - - key_con.clear(); key_con.push_own(&key); - - // Pre-load the first update. - if self.result.keys.is_empty() { - self.result.keys.push_own(&key); - self.staging.push(time, diff); + #[inline(always)] + pub(crate) fn gallop(input: TC, range: &mut Range, mut cmp: impl FnMut(::Ref) -> bool) { + // if empty input, or already >= element, return + if !Range::::is_empty(range) && cmp(input.get(range.start)) { + let mut step = 1; + while range.start + step < range.end && cmp(input.get(range.start + step)) { + range.start += step; + step <<= 1; } - // Perhaps this is a continuation of an already received key. - else if self.result.keys.last() == key_con.get(0) { - self.staging.push(time, diff); - } else { - // New key; complete representation of prior key. - self.staging.seal(&mut self.result.upds); - self.staging.push(time, diff); - self.result.keys.push_own(&key); + + step >>= 1; + while step > 0 { + if range.start + step < range.end && cmp(input.get(range.start + step)) { + range.start += step; + } + step >>= 1; } + + range.start += 1; } } + } + + use builder::val::ValMirror; + use builder::key::KeyMirror; + pub mod builder { + + pub mod val { + + use differential_dataflow::trace::implementations::ord_neu::{Vals, Upds}; + use differential_dataflow::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage}; + use differential_dataflow::trace::Description; + + use crate::ValStorage; + use crate::layout::ColumnarUpdate as Update; + use crate::layout::ColumnarLayout as Layout; + use crate::arrangement::Coltainer; + + use differential_dataflow::trace::implementations::OffsetList; + fn vec_u64_to_offset_list(list: Vec) -> OffsetList { + let mut output = OffsetList::with_capacity(list.len()); + output.push(0); + for item in list { output.push(item as usize); } + output + } - #[inline(never)] - fn done(mut self, description: Description) -> OrdKeyBatch { - self.staging.seal(&mut self.result.upds); - OrdKeyBatch { - updates: self.staging.total(), - storage: self.result, - description, + pub struct ValMirror { marker: std::marker::PhantomData } + impl differential_dataflow::trace::Builder for ValMirror { + type Time = U::Time; + type Input = ValStorage; + type Output = OrdValBatch>; + + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { Self { marker: std::marker::PhantomData } } + fn push(&mut self, _chunk: &mut Self::Input) { unimplemented!() } + fn done(self, _description: Description) -> Self::Output { unimplemented!() } + fn seal(chain: &mut Vec, description: Description) -> Self::Output { + if chain.len() == 0 { + let storage = OrdValStorage { + keys: Default::default(), + vals: Default::default(), + upds: Default::default(), + }; + OrdValBatch { storage, description, updates: 0 } + } + else if chain.len() == 1 { + use columnar::Len; + let storage = chain.pop().unwrap(); + let updates = storage.upds.len(); + let storage = OrdValStorage { + keys: Coltainer { container: storage.keys }, + vals: Vals { + offs: vec_u64_to_offset_list(storage.vals.bounds), + vals: Coltainer { container: storage.vals.values }, + }, + upds: Upds { + offs: vec_u64_to_offset_list(storage.upds.bounds), + times: Coltainer { container: storage.upds.values.0 }, + diffs: Coltainer { container: storage.upds.values.1 }, + }, + }; + OrdValBatch { storage, description, updates } + } + else { + println!("chain length: {:?}", chain.len()); + unimplemented!() + } + } } } - fn seal(chain: &mut Vec, description: Description) -> Self::Output { - let mut builder = Self::with_capacity(0, 0, 0); - for mut chunk in chain.drain(..) { - builder.push(&mut chunk); + pub mod key { + + use differential_dataflow::trace::implementations::ord_neu::Upds; + use differential_dataflow::trace::implementations::ord_neu::key_batch::{OrdKeyBatch, OrdKeyStorage}; + use differential_dataflow::trace::Description; + + use crate::KeyStorage; + use crate::layout::ColumnarUpdate as Update; + use crate::layout::ColumnarLayout as Layout; + use crate::arrangement::Coltainer; + + use differential_dataflow::trace::implementations::OffsetList; + fn vec_u64_to_offset_list(list: Vec) -> OffsetList { + let mut output = OffsetList::with_capacity(list.len()); + output.push(0); + for item in list { output.push(item as usize); } + output } - builder.done(description) + pub struct KeyMirror { marker: std::marker::PhantomData } + impl> differential_dataflow::trace::Builder for KeyMirror { + type Time = U::Time; + type Input = KeyStorage; + type Output = OrdKeyBatch>; + + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { Self { marker: std::marker::PhantomData } } + fn push(&mut self, _chunk: &mut Self::Input) { unimplemented!() } + fn done(self, _description: Description) -> Self::Output { unimplemented!() } + fn seal(chain: &mut Vec, description: Description) -> Self::Output { + if chain.len() == 0 { + let storage = OrdKeyStorage { + keys: Default::default(), + upds: Default::default(), + }; + OrdKeyBatch { storage, description, updates: 0, value: OrdKeyBatch::>::create_value() } + } + else if chain.len() == 1 { + use columnar::Len; + let storage = chain.pop().unwrap(); + let updates = storage.upds.len(); + let storage = OrdKeyStorage { + keys: Coltainer { container: storage.keys }, + upds: Upds { + offs: vec_u64_to_offset_list(storage.upds.bounds), + times: Coltainer { container: storage.upds.values.0 }, + diffs: Coltainer { container: storage.upds.values.1 }, + }, + }; + OrdKeyBatch { storage, description, updates, value: OrdKeyBatch::>::create_value() } + } + else { + println!("chain length: {:?}", chain.len()); + unimplemented!() + } + } + } } } } diff --git a/differential-dataflow/src/trace/implementations/chainless_batcher.rs b/differential-dataflow/src/trace/implementations/chainless_batcher.rs new file mode 100644 index 000000000..2129c2c84 --- /dev/null +++ b/differential-dataflow/src/trace/implementations/chainless_batcher.rs @@ -0,0 +1,113 @@ +//! A `Batcher` implementation based on merge sort. +//! +//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger". +//! The chunker receives input batches and consolidates them, producing sorted output +//! "chunks" that are fully consolidated (no adjacent updates can be accumulated). +//! The merger implements the [`Merger`] trait, and provides hooks for manipulating +//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also +//! splitting them apart based on time. +//! +//! Implementations of `MergeBatcher` can be instantiated through the choice of both +//! the chunker and the merger, provided their respective output and input types align. + +use timely::progress::frontier::AntichainRef; +use timely::progress::{frontier::Antichain, Timestamp}; + +use crate::logging::Logger; +use crate::trace; + +/// A type that can be used as storage within a merge batcher. +pub trait BatcherStorage : Default + Sized { + /// Number of contained updates. + fn len(&self) -> usize; + /// Merges two storage containers into one. + /// + /// This is expected to consolidate updates as it goes. + fn merge(self, other: Self) -> Self; + /// Extracts elements not greater or equal to the frontier. + fn split(&mut self, frontier: AntichainRef) -> Self; + /// Ensures `frontier` is less or equal to all contained times. + /// + /// Consider merging with `split`, but needed for new stores as well. + fn lower(&self, frontier: &mut Antichain); +} + +/// A batcher that simple merges `BatcherStorage` implementors. +pub struct Batcher> { + /// Each store is at least twice the size of the next. + storages: Vec, + /// The lower bound of timestamps of the maintained updates. + lower: Antichain, + /// The previosly minted frontier. + prior: Antichain, + + /// Logger for size accounting. + _logger: Option, + /// Timely operator ID. + _operator_id: usize, +} + +impl> Batcher { + /// Ensures lists decrease in size geometrically. + fn tidy(&mut self) { + self.storages.retain(|x| x.len() > 0); + self.storages.sort_by_key(|x| x.len()); + self.storages.reverse(); + while let Some(pos) = (1..self.storages.len()).position(|i| self.storages[i-1].len() < 2 * self.storages[i].len()) { + while self.storages.len() > pos + 1 { + let x = self.storages.pop().unwrap(); + let y = self.storages.pop().unwrap(); + self.storages.push(x.merge(y)); + self.storages.sort_by_key(|x| x.len()); + self.storages.reverse(); + } + } + } +} + +impl> trace::Batcher for Batcher { + type Time = T; + type Input = S; + type Output = S; + + fn new(logger: Option, operator_id: usize) -> Self { + Self { + storages: Vec::default(), + lower: Default::default(), + prior: Antichain::from_elem(T::minimum()), + _logger: logger, + _operator_id: operator_id, + } + } + + fn push_container(&mut self, batch: &mut Self::Input) { + if batch.len() > 0 { + batch.lower(&mut self.lower); + self.storages.push(std::mem::take(batch)); + self.tidy(); + } + } + + fn seal>(&mut self, upper: Antichain) -> B::Output { + let description = trace::Description::new(self.prior.clone(), upper.clone(), Antichain::new()); + self.prior = upper.clone(); + let mut stores = self.storages.iter_mut().rev(); + if let Some(store) = stores.next() { + self.lower.clear(); + let mut ship = store.split(upper.borrow()); + store.lower(&mut self.lower); + for store in stores { + let split = store.split(upper.borrow()); + ship = ship.merge(split); + store.lower(&mut self.lower); + } + self.tidy(); + B::seal(&mut vec![ship], description) + } + else { + B::seal(&mut vec![], description) + } + } + + fn frontier(&mut self) -> AntichainRef<'_, Self::Time> { self.lower.borrow() } +} diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index ea393701d..d73eb71f4 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -41,6 +41,7 @@ pub mod spine_fueled; pub mod merge_batcher; +pub mod chainless_batcher; pub mod ord_neu; pub mod rhh; pub mod huffman_container; diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 218fc612a..b6e57ab0b 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -776,6 +776,7 @@ pub mod key_batch { #[derive(Serialize, Deserialize)] #[serde(bound = " L::KeyContainer: Serialize + for<'a> Deserialize<'a>, + L::ValContainer: Serialize + for<'a> Deserialize<'a>, L::OffsetContainer: Serialize + for<'a> Deserialize<'a>, L::TimeContainer: Serialize + for<'a> Deserialize<'a>, L::DiffContainer: Serialize + for<'a> Deserialize<'a>, @@ -791,13 +792,25 @@ pub mod key_batch { /// we may have many more updates than `storage.updates.len()`. It should equal that /// length, plus the number of singleton optimizations employed. pub updates: usize, + + /// Single value to return if asked. + pub value: L::ValContainer, + } + + impl>> OrdKeyBatch { + /// Creates a container with one value, to slot in to `self.value`. + pub fn create_value() -> L::ValContainer { + let mut value = L::ValContainer::with_capacity(1); + value.push_own(&Default::default()); + value + } } - impl Layout = &'a ()>>> WithLayout for OrdKeyBatch { + impl>> WithLayout for OrdKeyBatch { type Layout = L; } - impl Layout = &'a ()>>> BatchReader for OrdKeyBatch { + impl>> BatchReader for OrdKeyBatch { type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { @@ -815,7 +828,7 @@ pub mod key_batch { fn description(&self) -> &Description> { &self.description } } - impl Layout = &'a ()>>> Batch for OrdKeyBatch { + impl>> Batch for OrdKeyBatch { type Merger = OrdKeyMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef>) -> Self::Merger { @@ -831,6 +844,7 @@ pub mod key_batch { }, description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())), updates: 0, + value: Self::create_value(), } } } @@ -850,7 +864,7 @@ pub mod key_batch { staging: UpdsBuilder, } - impl Merger> for OrdKeyMerger + impl>> Merger> for OrdKeyMerger where OrdKeyBatch: Batch>, { @@ -882,6 +896,7 @@ pub mod key_batch { updates: self.staging.total(), storage: self.result, description: self.description, + value: OrdKeyBatch::::create_value(), } } fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { @@ -983,19 +998,19 @@ pub mod key_batch { } use crate::trace::implementations::WithLayout; - impl Layout = &'a ()>>> WithLayout for OrdKeyCursor { + impl> WithLayout for OrdKeyCursor { type Layout = L; } - impl Layout = &'a ()>>> Cursor for OrdKeyCursor { + impl Layout>> Cursor for OrdKeyCursor { type Storage = OrdKeyBatch; fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { storage.storage.keys.get(self.key_cursor) } - fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a ()> { if self.val_valid(storage) { Some(&()) } else { None } } + fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } } fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } - fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } + fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { storage.value.index(0) } fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) { let (lower, upper) = storage.storage.upds.bounds(self.key_cursor); for index in lower .. upper { @@ -1048,6 +1063,7 @@ pub mod key_batch { impl Builder for OrdKeyBuilder where L: for<'a> Layout>>, + L: Layout>, CI: BuilderInput, Diff=layout::Diff>, { @@ -1092,6 +1108,7 @@ pub mod key_batch { updates: self.staging.total(), storage: self.result, description, + value: OrdKeyBatch::::create_value(), } } From 341704bd7be04645cebda943c9b2e3bdc1367167 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 16 Sep 2025 10:55:02 -0400 Subject: [PATCH 2/2] Implement Distributor --- differential-dataflow/examples/columnar.rs | 163 ++++++++++++--------- 1 file changed, 94 insertions(+), 69 deletions(-) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index d5b9f9d11..d68c4729e 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -1,7 +1,6 @@ //! Wordcount based on `columnar`. use timely::container::{ContainerBuilder, PushInto}; -use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::InputHandleCore; use timely::dataflow::ProbeHandle; @@ -37,8 +36,12 @@ fn main() { let data = data_input.to_stream(scope); let keys = keys_input.to_stream(scope); - let data = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(&data, Pipeline, "Data"); - let keys = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(&keys, Pipeline, "Keys"); + use differential_dataflow::Hashable; + let data_pact = KeyPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; + let keys_pact = KeyPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; + + let data = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(&data, data_pact, "Data"); + let keys = arrange_core::<_,_,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>(&keys, keys_pact, "Keys"); keys.join_core(&data, |_k, (), ()| { Option::<()>::None }) .probe_with(&mut probe); @@ -268,6 +271,17 @@ pub mod storage { self.upds.extend_from_self(other.upds.borrow(), range); } } + + impl timely::Accountable for ValStorage { + #[inline] fn record_count(&self) -> i64 { use columnar::Len; self.upds.values.len() as i64 } + } + + use timely::dataflow::channels::ContainerBytes; + impl ContainerBytes for ValStorage { + fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } + fn length_in_bytes(&self) -> usize { unimplemented!() } + fn into_bytes(&self, _writer: &mut W) { unimplemented!() } + } } pub mod key { @@ -283,7 +297,7 @@ pub mod storage { pub struct KeyStorage { /// An ordered list of keys. pub keys: ContainerOf, - /// For each val in `vals`, a list of (time, diff) updates. + /// For each key in `keys`, a list of (time, diff) updates. pub upds: Vecs<(ContainerOf, ContainerOf)>, } @@ -341,49 +355,11 @@ pub mod storage { self.upds.extend_from_self(other.upds.borrow(), range.clone()); } } - } -} - -mod container { - - mod val { - - use crate::layout::ColumnarUpdate as Update; - use crate::ValStorage; - - impl timely::Accountable for ValStorage { - #[inline] fn record_count(&self) -> i64 { use columnar::Len; self.upds.values.len() as i64 } - } - - use timely::container::SizableContainer; - impl SizableContainer for ValStorage { - fn at_capacity(&self) -> bool { false } - fn ensure_capacity(&mut self, _stash: &mut Option) { } - } - - use timely::dataflow::channels::ContainerBytes; - impl ContainerBytes for ValStorage { - fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } - fn length_in_bytes(&self) -> usize { unimplemented!() } - fn into_bytes(&self, _writer: &mut W) { unimplemented!() } - } - } - - mod key { - - use crate::layout::ColumnarUpdate as Update; - use crate::KeyStorage; impl timely::Accountable for KeyStorage { #[inline] fn record_count(&self) -> i64 { use columnar::Len; self.upds.values.len() as i64 } } - use timely::container::SizableContainer; - impl SizableContainer for KeyStorage { - fn at_capacity(&self) -> bool { false } - fn ensure_capacity(&mut self, _stash: &mut Option) { } - } - use timely::dataflow::channels::ContainerBytes; impl ContainerBytes for KeyStorage { fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } @@ -471,20 +447,6 @@ mod column_builder { self.empty = self.pending.pop_front(); self.empty.as_mut() } - - // fn partition(container: &mut Self::Container, builders: &mut [Self], mut index: I) - // where - // Self: for<'a> PushInto<::Item<'a>>, - // I: for<'a> FnMut(&::Item<'a>) -> usize, - // { - // println!("Exchanging!"); - // for datum in container.drain() { - // let index = index(&datum); - // builders[index].push_into(datum); - // } - // container.clear(); - // } - } impl LengthPreservingContainerBuilder for ValBuilder { } @@ -557,23 +519,86 @@ mod column_builder { self.empty = self.pending.pop_front(); self.empty.as_mut() } + } + + impl LengthPreservingContainerBuilder for KeyBuilder { } + } +} + +use distributor::key::KeyPact; +mod distributor { + + pub mod key { + + use std::rc::Rc; - // fn partition(container: &mut Self::Container, builders: &mut [Self], mut index: I) - // where - // Self: for<'a> PushInto<::Item<'a>>, - // I: for<'a> FnMut(&::Item<'a>) -> usize, - // { - // println!("Exchanging!"); - // for datum in container.drain() { - // let index = index(&datum); - // builders[index].push_into(datum); - // } - // container.clear(); - // } + use columnar::{Container, Index, Len}; + use timely::container::{ContainerBuilder, PushInto}; + use timely::logging::TimelyLogger; + use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor}; + use timely::dataflow::channels::Message; + use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract}; + use timely::progress::Timestamp; + use timely::worker::AsWorker; + + use crate::layout::ColumnarUpdate as Update; + use crate::{KeyColBuilder, KeyStorage}; + pub struct KeyDistributor { + builders: Vec>, + hashfunc: H, } - impl LengthPreservingContainerBuilder for KeyBuilder { } + impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor> for KeyDistributor { + fn partition>>>(&mut self, container: &mut KeyStorage, time: &T, pushers: &mut [P]) { + // For each key, partition and copy (key, time, diff) into the appropriate self.builder. + for index in 0 .. container.keys.len() { + let key = container.keys.borrow().get(index); + let idx = ((self.hashfunc)(key) as usize) % self.builders.len(); + for (t, diff) in container.upds.borrow().get(index).into_index_iter() { + self.builders[idx].push_into((key, t, diff)); + } + while let Some(produced) = self.builders[idx].extract() { + Message::push_at(produced, time.clone(), &mut pushers[idx]); + } + } + } + fn flush>>>(&mut self, time: &T, pushers: &mut [P]) { + for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) { + while let Some(container) = builder.finish() { + Message::push_at(container, time.clone(), pusher); + } + } + } + fn relax(&mut self) { } + } + + pub struct KeyPact { pub hashfunc: H } + + // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. + impl ParallelizationContract> for KeyPact + where + T: Timestamp, + U: Update, + H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64 + 'static, + { + type Pusher = Exchange< + T, + LogPusher>>>>, + KeyDistributor + >; + type Puller = LogPuller>>>>; + + fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = allocator.allocate::>>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); + let distributor = KeyDistributor { + builders: std::iter::repeat_with(Default::default).take(allocator.peers()).collect(), + hashfunc: self.hashfunc, + }; + (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + } + } } }