Skip to content

Commit 96079f8

Browse files
committed
Owned streams take 2
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 08eca72 commit 96079f8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+553
-366
lines changed

container/src/lib.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ pub mod flatcontainer;
1414
///
1515
/// A container must implement default. The default implementation is not required to allocate
1616
/// memory for variable-length components.
17-
///
18-
/// We require the container to be cloneable to enable efficient copies when providing references
19-
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
20-
/// is efficient (which is not necessarily the case when deriving `Clone`.)
2117
pub trait Container: Default {
2218
/// The type of elements when reading non-destructively from the container.
2319
type ItemRef<'a> where Self: 'a;
@@ -105,7 +101,7 @@ pub trait PushInto<T> {
105101
/// decide to represent a push order for `extract` and `finish`, or not.
106102
pub trait ContainerBuilder: Default + 'static {
107103
/// The container type we're building.
108-
type Container: Container + Clone + 'static;
104+
type Container: Container + 'static;
109105
/// Extract assembled containers, potentially leaving unfinished data behind. Can
110106
/// be called repeatedly, for example while the caller can send data.
111107
///
@@ -170,7 +166,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
170166
}
171167
}
172168

173-
impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
169+
impl<C: Container + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
174170
type Container = C;
175171

176172
#[inline]

mdbook/src/chapter_2/chapter_2_3.md

+8-8
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,12 @@ use timely::dataflow::operators::{ToStream, Partition, Inspect};
126126

127127
fn main() {
128128
timely::example(|scope| {
129-
let streams = (0..10).to_stream(scope)
129+
let mut streams = (0..10).to_stream(scope)
130130
.partition(3, |x| (x % 3, x));
131131

132-
streams[0].inspect(|x| println!("seen 0: {:?}", x));
133-
streams[1].inspect(|x| println!("seen 1: {:?}", x));
134-
streams[2].inspect(|x| println!("seen 2: {:?}", x));
132+
streams.pop().unwrap().inspect(|x| println!("seen 2: {:?}", x));
133+
streams.pop().unwrap().inspect(|x| println!("seen 1: {:?}", x));
134+
streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x));
135135
});
136136
}
137137
```
@@ -147,11 +147,11 @@ use timely::dataflow::operators::{ToStream, Partition, Concat, Inspect};
147147

148148
fn main() {
149149
timely::example(|scope| {
150-
let streams = (0..10).to_stream(scope)
150+
let mut streams = (0..10).to_stream(scope)
151151
.partition(3, |x| (x % 3, x));
152-
streams[0]
153-
.concat(&streams[1])
154-
.concat(&streams[2])
152+
streams.pop().unwrap()
153+
.concat(streams.pop().unwrap())
154+
.concat(streams.pop().unwrap())
155155
.inspect(|x| println!("seen: {:?}", x));
156156
});
157157
}

mdbook/src/chapter_2/chapter_2_4.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ fn main() {
182182
let in1 = (0 .. 10).to_stream(scope);
183183
let in2 = (0 .. 10).to_stream(scope);
184184

185-
in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
185+
in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
186186

187187
let mut notificator = FrontierNotificator::default();
188188
let mut stash = HashMap::new();
@@ -233,7 +233,7 @@ fn main() {
233233
let in1 = (0 .. 10).to_stream(scope);
234234
let in2 = (0 .. 10).to_stream(scope);
235235

236-
in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
236+
in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
237237

238238
let mut stash = HashMap::new();
239239

mdbook/src/chapter_4/chapter_4_2.md

+7-7
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn main() {
2424
// circulate numbers, Collatz stepping each time.
2525
(1 .. 10)
2626
.to_stream(scope)
27-
.concat(&stream)
27+
.concat(stream)
2828
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } )
2929
.inspect(|x| println!("{:?}", x))
3030
.filter(|x| *x != 1)
@@ -63,17 +63,17 @@ fn main() {
6363
let results1 = stream1.map(|x| 3 * x + 1);
6464

6565
// partition the input and feedback streams by even-ness.
66-
let parts =
66+
let mut parts =
6767
(1 .. 10)
6868
.to_stream(scope)
69-
.concat(&results0)
70-
.concat(&results1)
69+
.concat(results0)
70+
.concat(results1)
7171
.inspect(|x| println!("{:?}", x))
7272
.partition(2, |x| (x % 2, x));
7373

7474
// connect each part appropriately.
75-
parts[0].connect_loop(handle0);
76-
parts[1].connect_loop(handle1);
75+
parts.pop().unwrap().connect_loop(handle1);
76+
parts.pop().unwrap().connect_loop(handle0);
7777
});
7878
}
7979
```
@@ -103,7 +103,7 @@ fn main() {
103103

104104
input
105105
.enter(subscope)
106-
.concat(&stream)
106+
.concat(stream)
107107
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } )
108108
.inspect(|x| println!("{:?}", x))
109109
.filter(|x| *x != 1)

mdbook/src/chapter_4/chapter_4_3.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ fn main() {
7676
// Assign timestamps to records so that not much work is in each time.
7777
.delay(|number, time| number / 100 )
7878
// Buffer records until all prior timestamps have completed.
79-
.binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
79+
.binary_frontier(cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
8080
8181
move |input1, input2, output| {
8282

timely/examples/bfs.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ fn main() {
4545

4646
// use the stream of edges
4747
graph.binary_notify(
48-
&stream,
48+
stream,
4949
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
5050
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
5151
"BFS",
@@ -130,7 +130,7 @@ fn main() {
130130
});
131131
}
132132
)
133-
.concat(&(0..1).map(|x| (x,x)).to_stream(scope))
133+
.concat((0..1).map(|x| (x,x)).to_stream(scope))
134134
.connect_loop(handle);
135135
});
136136
}).unwrap(); // asserts error-free execution;

timely/examples/hashjoin.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ fn main() {
3232
let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
3333

3434
stream1
35-
.binary(&stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
35+
.binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
3636

3737
let mut map1 = HashMap::<u64, Vec<u64>>::new();
3838
let mut map2 = HashMap::<u64, Vec<u64>>::new();

timely/examples/loopdemo.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ fn main() {
2727

2828
let step =
2929
stream
30-
.concat(&loop_stream)
30+
.concat(loop_stream)
3131
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
3232
.filter(|x| x > &1);
33-
34-
step.connect_loop(loop_handle);
35-
step.probe_with(&mut probe);
33+
step
34+
.probe_with(&mut probe)
35+
.connect_loop(loop_handle);
3636
});
3737

3838
let ns_per_request = 1_000_000_000 / rate;

timely/examples/pagerank.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ fn main() {
2323

2424
// bring edges and ranks together!
2525
let changes = edge_stream.binary_frontier(
26-
&rank_stream,
26+
rank_stream,
2727
Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
2828
Exchange::new(|x: &(usize, i64)| x.0 as u64),
2929
"PageRank",

timely/examples/pingpong.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ fn main() {
1414
(0 .. elements)
1515
.filter(move |&x| (x as usize) % peers == index)
1616
.to_stream(scope)
17-
.concat(&cycle)
17+
.concat(cycle)
1818
.exchange(|&x| x)
1919
.map_in_place(|x| *x += 1)
2020
.branch_when(move |t| t < &iterations).1

timely/examples/unionfind.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ fn main() {
4747
}).unwrap(); // asserts error-free execution;
4848
}
4949

50-
trait UnionFind {
51-
fn union_find(&self) -> Self;
50+
trait UnionFind<G: Scope> {
51+
fn union_find(self) -> OwnedStream<G, Vec<(usize, usize)>>;
5252
}
5353

54-
impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {
55-
fn union_find(&self) -> Stream<G, (usize, usize)> {
54+
impl<G: Scope, S: StreamLike<G, Vec<(usize, usize)>>> UnionFind<G> for S {
55+
fn union_find(self) -> OwnedStream<G, Vec<(usize, usize)>> {
5656

5757
self.unary(Pipeline, "UnionFind", |_,_| {
5858

timely/src/dataflow/channels/pushers/buffer.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
66
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;
9-
use crate::{Container, Data};
9+
use crate::Container;
1010

1111
/// Buffers data sent at the same time, for efficient communication.
1212
///
@@ -44,7 +44,7 @@ impl<T, CB: Default, P> Buffer<T, CB, P> {
4444
}
4545
}
4646

47-
impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
47+
impl<T, C: Container + 'static, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
4848
/// Returns a `Session`, which accepts data to send at the associated time
4949
#[inline]
5050
pub fn session(&mut self, time: &T) -> Session<T, CapacityContainerBuilder<C>, P> {
@@ -133,7 +133,7 @@ pub struct Session<'a, T, CB, P> {
133133
buffer: &'a mut Buffer<T, CB, P>,
134134
}
135135

136-
impl<'a, T, C: Container + Data, P> Session<'a, T, CapacityContainerBuilder<C>, P>
136+
impl<'a, T, C: Container + 'static, P> Session<'a, T, CapacityContainerBuilder<C>, P>
137137
where
138138
T: Eq + Clone + 'a,
139139
P: Push<Message<T, C>> + 'a,

timely/src/dataflow/channels/pushers/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
pub use self::tee::{Tee, TeeHelper};
2-
pub use self::exchange::Exchange;
31
pub use self::counter::Counter;
2+
pub use self::exchange::Exchange;
3+
pub use self::owned::PushOwned;
4+
pub use self::tee::{Tee, TeeHelper};
45

6+
pub mod owned;
57
pub mod tee;
68
pub mod exchange;
79
pub mod counter;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
//! A `Push` implementor with a single target.
2+
3+
use std::cell::RefCell;
4+
use std::fmt;
5+
use std::rc::Rc;
6+
7+
use timely_communication::Push;
8+
use crate::{Data, Container};
9+
use crate::dataflow::channels::Message;
10+
11+
/// A pusher that can bind to a single downstream pusher.
12+
pub struct PushOwned<T, D>(Rc<RefCell<Option<Box<dyn Push<Message<T, D>>>>>>);
13+
14+
impl<T, D> PushOwned<T, D> {
15+
/// Create a new `PushOwned`. Similarly to `Tee`, it returns a pair where either element
16+
/// can be used as pusher or registrar.
17+
pub fn new() -> (Self, Self) {
18+
let zelf = Self(Rc::new(RefCell::new(None)));
19+
(zelf.clone(), zelf)
20+
}
21+
22+
/// Set the downstream pusher.
23+
pub fn set<P: Push<Message<T, D>> + 'static>(self, pusher: P) {
24+
*self.0.borrow_mut() = Some(Box::new(pusher));
25+
}
26+
}
27+
28+
impl<T, D> fmt::Debug for PushOwned<T, D> {
29+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30+
f.debug_struct("PushOwned").finish_non_exhaustive()
31+
}
32+
}
33+
34+
impl<T, D> Clone for PushOwned<T, D> {
35+
fn clone(&self) -> Self {
36+
Self(Rc::clone(&self.0))
37+
}
38+
}
39+
40+
impl<T: Data, D: Container> Push<Message<T, D>> for PushOwned<T, D> {
41+
#[inline]
42+
fn push(&mut self, message: &mut Option<Message<T, D>>) {
43+
let mut pusher = self.0.borrow_mut();
44+
if let Some(pusher) = pusher.as_mut() {
45+
pusher.push(message);
46+
}
47+
}
48+
}

timely/src/dataflow/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
//! });
1414
//! ```
1515
16-
pub use self::stream::{StreamCore, Stream};
16+
pub use self::stream::{StreamCore, Stream, StreamLike, OwnedStream};
1717
pub use self::scopes::{Scope, ScopeParent};
1818

1919
pub use self::operators::core::input::Handle as InputHandleCore;

timely/src/dataflow/operators/aggregation/aggregate.rs

+12-6
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::hash::Hash;
33
use std::collections::HashMap;
44

55
use crate::{Data, ExchangeData};
6-
use crate::dataflow::{Stream, Scope};
6+
use crate::dataflow::{Scope, StreamLike, OwnedStream};
77
use crate::dataflow::operators::generic::operator::Operator;
88
use crate::dataflow::channels::pact::Exchange;
99

@@ -61,19 +61,25 @@ pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> {
6161
/// });
6262
/// ```
6363
fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
64-
&self,
64+
self,
6565
fold: F,
6666
emit: E,
67-
hash: H) -> Stream<S, R> where S::Timestamp: Eq;
67+
hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp: Eq;
6868
}
6969

70-
impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)> {
70+
impl<G, K, V, S> Aggregate<G, K, V> for S
71+
where
72+
G: Scope,
73+
K: ExchangeData + Hash + Eq + Clone,
74+
V: ExchangeData,
75+
S: StreamLike<G, Vec<(K, V)>>,
76+
{
7177

7278
fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
73-
&self,
79+
self,
7480
fold: F,
7581
emit: E,
76-
hash: H) -> Stream<S, R> where S::Timestamp: Eq {
82+
hash: H) -> OwnedStream<G, Vec<R>> where G::Timestamp: Eq {
7783

7884
let mut aggregates = HashMap::new();
7985
self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {

timely/src/dataflow/operators/aggregation/state_machine.rs

+10-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::hash::Hash;
33
use std::collections::HashMap;
44

55
use crate::{Data, ExchangeData};
6-
use crate::dataflow::{Stream, Scope};
6+
use crate::dataflow::{OwnedStream, Scope, StreamLike};
77
use crate::dataflow::operators::generic::operator::Operator;
88
use crate::dataflow::channels::pact::Exchange;
99

@@ -51,17 +51,23 @@ pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
5151
I: IntoIterator<Item=R>, // type of output iterator
5252
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
5353
H: Fn(&K)->u64+'static, // "hash" function for keys
54-
>(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq ;
54+
>(self, fold: F, hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp : Hash+Eq ;
5555
}
5656

57-
impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)> {
57+
impl<G, K, V, S> StateMachine<G, K, V> for S
58+
where
59+
G: Scope,
60+
K: ExchangeData + Hash + Eq + Clone,
61+
V: ExchangeData,
62+
S: StreamLike<G, Vec<(K, V)>>,
63+
{
5864
fn state_machine<
5965
R: Data, // output type
6066
D: Default+'static, // per-key state (data)
6167
I: IntoIterator<Item=R>, // type of output iterator
6268
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
6369
H: Fn(&K)->u64+'static, // "hash" function for keys
64-
>(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq {
70+
>(self, fold: F, hash: H) -> OwnedStream<G, Vec<R>> where G::Timestamp : Hash+Eq {
6571

6672
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
6773
let mut states = HashMap::new(); // keys -> state

0 commit comments

Comments
 (0)