Skip to content

Introduce non-clone streams #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ pub mod flatcontainer;
///
/// A container must implement default. The default implementation is not required to allocate
/// memory for variable-length components.
///
/// We require the container to be cloneable to enable efficient copies when providing references
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
/// is efficient (which is not necessarily the case when deriving `Clone`.)
pub trait Container: Default {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;
Expand Down Expand Up @@ -105,7 +101,7 @@ pub trait PushInto<T> {
/// decide to represent a push order for `extract` and `finish`, or not.
pub trait ContainerBuilder: Default + 'static {
/// The container type we're building.
type Container: Container + Clone + 'static;
type Container: Container + 'static;
/// Extract assembled containers, potentially leaving unfinished data behind. Can
/// be called repeatedly, for example while the caller can send data.
///
Expand Down Expand Up @@ -170,7 +166,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
}
}

impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
impl<C: Container + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
type Container = C;

#[inline]
Expand Down
19 changes: 10 additions & 9 deletions mdbook/src/chapter_2/chapter_2_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ use timely::dataflow::operators::{ToStream, Partition, Inspect};

fn main() {
timely::example(|scope| {
let streams = (0..10).to_stream(scope)
let mut streams = (0..10).to_stream(scope)
.partition(3, |x| (x % 3, x));

streams[0].inspect(|x| println!("seen 0: {:?}", x));
streams[1].inspect(|x| println!("seen 1: {:?}", x));
streams[2].inspect(|x| println!("seen 2: {:?}", x));
streams.pop().unwrap().inspect(|x| println!("seen 2: {:?}", x));
streams.pop().unwrap().inspect(|x| println!("seen 1: {:?}", x));
streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x));
});
}
```
Expand All @@ -147,11 +147,11 @@ use timely::dataflow::operators::{ToStream, Partition, Concat, Inspect};

fn main() {
timely::example(|scope| {
let streams = (0..10).to_stream(scope)
let mut streams = (0..10).to_stream(scope)
.partition(3, |x| (x % 3, x));
streams[0]
.concat(&streams[1])
.concat(&streams[2])
streams.pop().unwrap()
.concat(streams.pop().unwrap())
.concat(streams.pop().unwrap())
.inspect(|x| println!("seen: {:?}", x));
});
}
Expand All @@ -169,7 +169,8 @@ fn main() {
let streams = (0..10).to_stream(scope)
.partition(3, |x| (x % 3, x));

scope.concatenate(streams)
scope.clone()
.concatenate(streams)
.inspect(|x| println!("seen: {:?}", x));
});
}
Expand Down
4 changes: 2 additions & 2 deletions mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn main() {
let in1 = (0 .. 10).to_stream(scope);
let in2 = (0 .. 10).to_stream(scope);

in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {

let mut notificator = FrontierNotificator::default();
let mut stash = HashMap::new();
Expand Down Expand Up @@ -233,7 +233,7 @@ fn main() {
let in1 = (0 .. 10).to_stream(scope);
let in2 = (0 .. 10).to_stream(scope);

in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {

let mut stash = HashMap::new();

Expand Down
14 changes: 7 additions & 7 deletions mdbook/src/chapter_4/chapter_4_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
// circulate numbers, Collatz stepping each time.
(1 .. 10)
.to_stream(scope)
.concat(&stream)
.concat(stream)
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } )
.inspect(|x| println!("{:?}", x))
.filter(|x| *x != 1)
Expand Down Expand Up @@ -63,17 +63,17 @@ fn main() {
let results1 = stream1.map(|x| 3 * x + 1);

// partition the input and feedback streams by even-ness.
let parts =
let mut parts =
(1 .. 10)
.to_stream(scope)
.concat(&results0)
.concat(&results1)
.concat(results0)
.concat(results1)
.inspect(|x| println!("{:?}", x))
.partition(2, |x| (x % 2, x));

// connect each part appropriately.
parts[0].connect_loop(handle0);
parts[1].connect_loop(handle1);
parts.pop().unwrap().connect_loop(handle1);
parts.pop().unwrap().connect_loop(handle0);
});
}
```
Expand Down Expand Up @@ -103,7 +103,7 @@ fn main() {

input
.enter(subscope)
.concat(&stream)
.concat(stream)
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } )
.inspect(|x| println!("{:?}", x))
.filter(|x| *x != 1)
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_4/chapter_4_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn main() {
// Assign timestamps to records so that not much work is in each time.
.delay(|number, time| number / 100 )
// Buffer records until all prior timestamps have completed.
.binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
.binary_frontier(cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {

move |input1, input2, output| {

Expand Down
4 changes: 2 additions & 2 deletions timely/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() {

// use the stream of edges
graph.binary_notify(
&stream,
stream,
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
"BFS",
Expand Down Expand Up @@ -130,7 +130,7 @@ fn main() {
});
}
)
.concat(&(0..1).map(|x| (x,x)).to_stream(scope))
.concat((0..1).map(|x| (x,x)).to_stream(scope))
.connect_loop(handle);
});
}).unwrap(); // asserts error-free execution;
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/hashjoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn main() {
let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);

stream1
.binary(&stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
.binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {

let mut map1 = HashMap::<u64, Vec<u64>>::new();
let mut map2 = HashMap::<u64, Vec<u64>>::new();
Expand Down
8 changes: 4 additions & 4 deletions timely/examples/loopdemo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ fn main() {

let step =
stream
.concat(&loop_stream)
.concat(loop_stream)
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
.filter(|x| x > &1);

step.connect_loop(loop_handle);
step.probe_with(&mut probe);
step
.probe_with(&mut probe)
.connect_loop(loop_handle);
});

let ns_per_request = 1_000_000_000 / rate;
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn main() {

// bring edges and ranks together!
let changes = edge_stream.binary_frontier(
&rank_stream,
rank_stream,
Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
Exchange::new(|x: &(usize, i64)| x.0 as u64),
"PageRank",
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn main() {
(0 .. elements)
.filter(move |&x| (x as usize) % peers == index)
.to_stream(scope)
.concat(&cycle)
.concat(cycle)
.exchange(|&x| x)
.map_in_place(|x| *x += 1)
.branch_when(move |t| t < &iterations).1
Expand Down
4 changes: 2 additions & 2 deletions timely/examples/unionfind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ fn main() {
}

trait UnionFind {
fn union_find(&self) -> Self;
fn union_find(self) -> Self;
}

impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {
fn union_find(&self) -> Stream<G, (usize, usize)> {
fn union_find(self) -> Stream<G, (usize, usize)> {

self.unary(Pipeline, "UnionFind", |_,_| {

Expand Down
3 changes: 1 addition & 2 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::dataflow::channels::Message;
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
use crate::progress::Timestamp;
use crate::worker::AsWorker;
use crate::Data;

/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T, C> {
Expand Down Expand Up @@ -84,7 +83,7 @@ impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for
where
CB: ContainerBuilder,
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
CB::Container: Data + Send + SizableContainer + crate::dataflow::channels::ContainerBytes,
CB::Container: Send + SizableContainer + crate::dataflow::channels::ContainerBytes,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, CB, LogPusher<T, CB::Container, Box<dyn Push<Message<T, CB::Container>>>>, H>;
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
use crate::dataflow::channels::Message;
use crate::dataflow::operators::Capability;
use crate::progress::Timestamp;
use crate::{Container, Data};
use crate::Container;

/// Buffers data sent at the same time, for efficient communication.
///
Expand Down Expand Up @@ -44,7 +44,7 @@ impl<T, CB: Default, P> Buffer<T, CB, P> {
}
}

impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
impl<T, C: Container + 'static, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
/// Returns a `Session`, which accepts data to send at the associated time
#[inline]
pub fn session(&mut self, time: &T) -> Session<T, CapacityContainerBuilder<C>, P> {
Expand Down Expand Up @@ -133,7 +133,7 @@ pub struct Session<'a, T, CB, P> {
buffer: &'a mut Buffer<T, CB, P>,
}

impl<'a, T, C: Container + Data, P> Session<'a, T, CapacityContainerBuilder<C>, P>
impl<'a, T, C: Container + 'static, P> Session<'a, T, CapacityContainerBuilder<C>, P>
where
T: Eq + Clone + 'a,
P: Push<Message<T, C>> + 'a,
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::communication::Push;
use crate::container::{ContainerBuilder, SizableContainer, PushInto};
use crate::dataflow::channels::Message;
use crate::{Container, Data};
use crate::Container;

// TODO : Software write combining
/// Distributes records among target pushees according to a distribution function.
Expand Down Expand Up @@ -50,7 +50,7 @@ where
}
}

impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
impl<T: Eq+Clone, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
CB::Container: SizableContainer,
Expand Down
6 changes: 4 additions & 2 deletions timely/src/dataflow/channels/pushers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub use self::tee::{Tee, TeeHelper};
pub use self::exchange::Exchange;
pub use self::counter::Counter;
pub use self::exchange::Exchange;
pub use self::owned::PushOwned;
pub use self::tee::{Tee, TeeHelper};

pub mod owned;
pub mod tee;
pub mod exchange;
pub mod counter;
Expand Down
43 changes: 43 additions & 0 deletions timely/src/dataflow/channels/pushers/owned.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! A `Push` implementor with a single target.

use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;

use timely_communication::Push;
use crate::dataflow::channels::Message;

/// A pusher that can bind to a single downstream pusher.
pub struct PushOwned<T, D>(Rc<RefCell<Option<Box<dyn Push<Message<T, D>>>>>>);

impl<T, D> PushOwned<T, D> {
/// Create a new `PushOwned`. Similarly to `Tee`, it returns a pair where either element
/// can be used as pusher or registrar.
pub fn new() -> (Self, Self) {
let shared = Rc::new(RefCell::new(None));
(Self(Rc::clone(&shared)), Self(shared))
}

/// Set the downstream pusher.
///
/// Consumes `Self` as only a single pusher can be set.
pub fn set_pusher<P: Push<Message<T, D>> + 'static>(self, pusher: P) {
*self.0.borrow_mut() = Some(Box::new(pusher));
}
}

impl<T, D> fmt::Debug for PushOwned<T, D> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PushOwned").finish_non_exhaustive()
}
}

impl<T, D> Push<Message<T, D>> for PushOwned<T, D> {
#[inline]
fn push(&mut self, message: &mut Option<Message<T, D>>) {
let mut pusher = self.0.borrow_mut();
if let Some(pusher) = pusher.as_mut() {
pusher.push(message);
}
}
}
11 changes: 6 additions & 5 deletions timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ use std::rc::Rc;
use crate::dataflow::channels::Message;

use crate::communication::Push;
use crate::{Container, Data};
use crate::Container;
use crate::dataflow::channels::pushers::PushOwned;

type PushList<T, C> = Rc<RefCell<Vec<Box<dyn Push<Message<T, C>>>>>>;
type PushList<T, C> = Rc<RefCell<Vec<PushOwned<T, C>>>>;

/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
pub struct Tee<T, C> {
buffer: C,
shared: PushList<T, C>,
}

impl<T: Data, C: Container + Data> Push<Message<T, C>> for Tee<T, C> {
impl<T: Clone + 'static, C: Container + Clone + 'static> Push<Message<T, C>> for Tee<T, C> {
#[inline]
fn push(&mut self, message: &mut Option<Message<T, C>>) {
let mut pushers = self.shared.borrow_mut();
Expand Down Expand Up @@ -86,8 +87,8 @@ pub struct TeeHelper<T, C> {

impl<T, C> TeeHelper<T, C> {
/// Adds a new `Push` implementor to the list of recipients shared with a `Stream`.
pub fn add_pusher<P: Push<Message<T, C>>+'static>(&self, pusher: P) {
self.shared.borrow_mut().push(Box::new(pusher));
pub fn add_pusher(&self, pusher: PushOwned<T, C>) {
self.shared.borrow_mut().push(pusher);
}
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! });
//! ```

pub use self::stream::{StreamCore, Stream};
pub use self::stream::{StreamTee, Stream, StreamCore};
pub use self::scopes::{Scope, ScopeParent};

pub use self::operators::core::input::Handle as InputHandleCore;
Expand Down
Loading
Loading