Skip to content

Commit c979fde

Browse files
committed
Test for columnar reachability
1 parent e79b3ee commit c979fde

File tree

4 files changed

+90
-25
lines changed

4 files changed

+90
-25
lines changed

timely/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ timely_logging = { path = "../logging", version = "0.12" }
3131
timely_communication = { path = "../communication", version = "0.12", default-features = false }
3232
timely_container = { path = "../container", version = "0.12" }
3333
crossbeam-channel = "0.5.0"
34+
columnar = { git = "https://github.com/frankmcsherry/columnar" }
35+
3436

3537
[dev-dependencies]
3638
# timely_sort="0.1.6"

timely/examples/event_driven.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ fn main() {
3434

3535
println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
3636

37-
for round in 0 .. {
37+
for round in 0 .. 10 {
3838
let dataflow = round % dataflows;
3939
if record {
4040
inputs[dataflow].send(());

timely/src/progress/frontier.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ impl<T> Antichain<T> {
248248
/// let mut frontier = Antichain::from_elem(2);
249249
/// assert_eq!(frontier.elements(), &[2]);
250250
///```
251-
#[inline] pub fn elements(&self) -> &[T] { &self[..] }
251+
#[inline] pub fn elements(&self) -> &Vec<T> { &self.elements }
252252

253253
/// Reveals the elements in the antichain.
254254
///

timely/src/progress/reachability.rs

+86-23
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@
7575
use std::collections::{BinaryHeap, HashMap, VecDeque};
7676
use std::cmp::Reverse;
7777

78+
use columnar::{Columnar, Len, Index};
79+
use columnar::ColumnVec;
80+
7881
use crate::progress::Timestamp;
7982
use crate::progress::{Source, Target};
8083
use crate::progress::ChangeBatch;
@@ -84,6 +87,62 @@ use crate::progress::frontier::{Antichain, MutableAntichain};
8487
use crate::progress::timestamp::PathSummary;
8588

8689

90+
use vec_antichain::VecAntichain;
91+
92+
/// A stand-in for `Vec<Antichain<T>>`.
93+
mod vec_antichain {
94+
95+
use columnar::{Columnar, Len, Index, IndexMut};
96+
use columnar::{ColumnVec, Slice};
97+
98+
use crate::progress::Antichain;
99+
100+
#[derive(Clone, Debug)]
101+
pub struct VecAntichain<T> (ColumnVec<T>);
102+
103+
impl<TC: Default> Default for VecAntichain<TC> {
104+
fn default() -> Self {
105+
Self (Default::default())
106+
}
107+
}
108+
109+
impl<TC> Len for VecAntichain<TC> {
110+
#[inline(always)] fn len(&self) -> usize { self.0.len() }
111+
}
112+
113+
impl<TC> Index for VecAntichain<TC> {
114+
type Index<'a> = Slice<&'a TC> where TC: 'a;
115+
116+
#[inline(always)]
117+
fn index(&self, index: usize) -> Self::Index<'_> {
118+
self.0.index(index)
119+
}
120+
}
121+
impl<TC> IndexMut for VecAntichain<TC> {
122+
type IndexMut<'a> = Slice<&'a mut TC> where TC: 'a;
123+
124+
#[inline(always)]
125+
fn index_mut(&mut self, index: usize) -> Self::IndexMut<'_> {
126+
self.0.index_mut(index)
127+
}
128+
}
129+
130+
impl<T, TC: Columnar<T>> Columnar<Antichain<T>> for VecAntichain<TC> {
131+
#[inline(always)]
132+
fn copy(&mut self, item: &Antichain<T>) {
133+
self.0.copy(item.elements());
134+
}
135+
fn clear(&mut self) {
136+
unimplemented!()
137+
}
138+
fn heap_size(&self) -> (usize, usize) {
139+
unimplemented!()
140+
}
141+
}
142+
}
143+
144+
145+
87146
/// A topology builder, which can summarize reachability along paths.
88147
///
89148
/// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles
@@ -132,43 +191,42 @@ pub struct Builder<T: Timestamp> {
132191
/// Indexed by operator index, then input port, then output port. This is the
133192
/// same format returned by `get_internal_summary`, as if we simply appended
134193
/// all of the summaries for the hosted nodes.
135-
pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
194+
nodes: ColumnVec<ColumnVec<VecAntichain<Vec<T::Summary>>>>,
136195
/// Direct connections from sources to targets.
137196
///
138197
/// Edges do not affect timestamps, so we only need to know the connectivity.
139198
/// Indexed by operator index then output port.
140-
pub edges: Vec<Vec<Vec<Target>>>,
199+
edges: Vec<Vec<Vec<Target>>>,
141200
/// Numbers of inputs and outputs for each node.
142-
pub shape: Vec<(usize, usize)>,
201+
shape: Vec<(usize, usize)>,
143202
}
144203

145204
impl<T: Timestamp> Builder<T> {
146205

147206
/// Create a new empty topology builder.
148207
pub fn new() -> Self {
149208
Builder {
150-
nodes: Vec::new(),
209+
nodes: Default::default(),
151210
edges: Vec::new(),
152211
shape: Vec::new(),
153212
}
154213
}
155214

156215
/// Add links internal to operators.
157216
///
158-
/// This method overwrites any existing summary, instead of anything more sophisticated.
217+
/// Nodes must be added in strictly increasing order of `index`.
159218
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {
160219

161220
// Assert that all summaries exist.
162221
debug_assert_eq!(inputs, summary.len());
163222
for x in summary.iter() { debug_assert_eq!(outputs, x.len()); }
164223

165-
while self.nodes.len() <= index {
166-
self.nodes.push(Vec::new());
167-
self.edges.push(Vec::new());
168-
self.shape.push((0, 0));
169-
}
224+
assert_eq!(self.nodes.len(), index);
225+
226+
self.nodes.push(summary);
227+
self.edges.push(Vec::new());
228+
self.shape.push((0, 0));
170229

171-
self.nodes[index] = summary;
172230
if self.edges[index].len() != outputs {
173231
self.edges[index] = vec![Vec::new(); outputs];
174232
}
@@ -287,7 +345,7 @@ impl<T: Timestamp> Builder<T> {
287345
in_degree.entry(target).or_insert(0);
288346
for (output, summaries) in outputs.iter().enumerate() {
289347
let source = Location::new_source(index, output);
290-
for summary in summaries.elements().iter() {
348+
for summary in summaries.iter() {
291349
if summary == &Default::default() {
292350
*in_degree.entry(source).or_insert(0) += 1;
293351
}
@@ -322,9 +380,9 @@ impl<T: Timestamp> Builder<T> {
322380
}
323381
},
324382
Port::Target(port) => {
325-
for (output, summaries) in self.nodes[node][port].iter().enumerate() {
383+
for (output, summaries) in self.nodes.index(node).index(port).iter().enumerate() {
326384
let source = Location::new_source(node, output);
327-
for summary in summaries.elements().iter() {
385+
for summary in summaries.iter() {
328386
if summary == &Default::default() {
329387
*in_degree.get_mut(&source).unwrap() -= 1;
330388
if in_degree[&source] == 0 {
@@ -361,12 +419,12 @@ pub struct Tracker<T:Timestamp> {
361419
/// Indexed by operator index, then input port, then output port. This is the
362420
/// same format returned by `get_internal_summary`, as if we simply appended
363421
/// all of the summaries for the hosted nodes.
364-
nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
422+
nodes: ColumnVec<ColumnVec<VecAntichain<Vec<T::Summary>>>>,
365423
/// Direct connections from sources to targets.
366424
///
367425
/// Edges do not affect timestamps, so we only need to know the connectivity.
368426
/// Indexed by operator index then output port.
369-
edges: Vec<Vec<Vec<Target>>>,
427+
edges: ColumnVec<ColumnVec<Vec<Target>>>,
370428

371429
// TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
372430
// It seems we should be able to flatten most of these so that there are a few allocations
@@ -544,10 +602,15 @@ impl<T:Timestamp> Tracker<T> {
544602
let scope_outputs = builder.shape[0].0;
545603
let output_changes = vec![ChangeBatch::new(); scope_outputs];
546604

605+
let mut edges: ColumnVec<ColumnVec<Vec<Target>>> = Default::default();
606+
for edge in builder.edges {
607+
edges.push(edge);
608+
}
609+
547610
let tracker =
548611
Tracker {
549612
nodes: builder.nodes,
550-
edges: builder.edges,
613+
edges,
551614
per_operator,
552615
target_changes: ChangeBatch::new(),
553616
source_changes: ChangeBatch::new(),
@@ -663,10 +726,10 @@ impl<T:Timestamp> Tracker<T> {
663726
.update_iter(Some((time, diff)));
664727

665728
for (time, diff) in changes {
666-
let nodes = &self.nodes[location.node][port_index];
729+
let nodes = &self.nodes.index(location.node).index(port_index);
667730
for (output_port, summaries) in nodes.iter().enumerate() {
668731
let source = Location { node: location.node, port: Port::Source(output_port) };
669-
for summary in summaries.elements().iter() {
732+
for summary in summaries.iter() {
670733
if let Some(new_time) = summary.results_in(&time) {
671734
self.worklist.push(Reverse((new_time, source, diff)));
672735
}
@@ -686,7 +749,7 @@ impl<T:Timestamp> Tracker<T> {
686749
.update_iter(Some((time, diff)));
687750

688751
for (time, diff) in changes {
689-
for new_target in self.edges[location.node][port_index].iter() {
752+
for new_target in self.edges.index(location.node).index(port_index).iter() {
690753
self.worklist.push(Reverse((
691754
time.clone(),
692755
Location::from(*new_target),
@@ -738,7 +801,7 @@ impl<T:Timestamp> Tracker<T> {
738801
/// Graph locations may be missing from the output, in which case they have no
739802
/// paths to scope outputs.
740803
fn summarize_outputs<T: Timestamp>(
741-
nodes: &Vec<Vec<Vec<Antichain<T::Summary>>>>,
804+
nodes: &ColumnVec<ColumnVec<VecAntichain<Vec<T::Summary>>>>,
742805
edges: &Vec<Vec<Vec<Target>>>,
743806
) -> HashMap<Location, Vec<Antichain<T::Summary>>>
744807
{
@@ -780,7 +843,7 @@ fn summarize_outputs<T: Timestamp>(
780843
Port::Source(output_port) => {
781844

782845
// Consider each input port of the associated operator.
783-
for (input_port, summaries) in nodes[location.node].iter().enumerate() {
846+
for (input_port, summaries) in nodes.index(location.node).iter().enumerate() {
784847

785848
// Determine the current path summaries from the input port.
786849
let location = Location { node: location.node, port: Port::Target(input_port) };
@@ -792,7 +855,7 @@ fn summarize_outputs<T: Timestamp>(
792855
while antichains.len() <= output { antichains.push(Antichain::new()); }
793856

794857
// Combine each operator-internal summary to the output with `summary`.
795-
for operator_summary in summaries[output_port].elements().iter() {
858+
for operator_summary in summaries.index(output_port).iter() {
796859
if let Some(combined) = operator_summary.followed_by(&summary) {
797860
if antichains[output].insert(combined.clone()) {
798861
worklist.push_back((location, output, combined));

0 commit comments

Comments
 (0)