Skip to content

Commit 0fdc20c

Browse files
committed
Simpler container calls
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 6300718 commit 0fdc20c

File tree

3 files changed

+5
-17
lines changed

3 files changed

+5
-17
lines changed

timely/examples/wordcount.rs

-2
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@ fn main() {
1717
// create a new input, exchange data, and inspect its output
1818
worker.dataflow::<usize,_,_>(|scope| {
1919
input.to_stream(scope)
20-
.container::<Vec<_>>()
2120
.flat_map(|(text, diff): (String, i64)|
2221
text.split_whitespace()
2322
.map(move |word| (word.to_owned(), diff))
2423
.collect::<Vec<_>>()
2524
)
26-
.container::<Vec<_>>()
2725
.unary_frontier(exchange, "WordCount", |_capability, _info| {
2826

2927
let mut queues = HashMap::new();

timely/src/dataflow/channels/pushers/owned.rs

+2-8
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ impl<T, D> PushOwned<T, D> {
1414
/// Create a new `PushOwned`. Similarly to `Tee`, it returns a pair where either element
1515
/// can be used as pusher or registrar.
1616
pub fn new() -> (Self, Self) {
17-
let zelf = Self(Rc::new(RefCell::new(None)));
18-
(zelf.clone(), zelf)
17+
let shared = Rc::new(RefCell::new(None));
18+
(Self(Rc::clone(&shared)), Self(shared))
1919
}
2020

2121
/// Set the downstream pusher.
@@ -30,12 +30,6 @@ impl<T, D> fmt::Debug for PushOwned<T, D> {
3030
}
3131
}
3232

33-
impl<T, D> Clone for PushOwned<T, D> {
34-
fn clone(&self) -> Self {
35-
Self(Rc::clone(&self.0))
36-
}
37-
}
38-
3933
impl<T, D> Push<Message<T, D>> for PushOwned<T, D> {
4034
#[inline]
4135
fn push(&mut self, message: &mut Option<Message<T, D>>) {

timely/src/dataflow/stream.rs

+3-7
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl<S: Scope, C: Container + 'static> OwnedStream<S, C> {
8181
}
8282

8383
/// Allows the assertion of a container type, for the benefit of type inference.
84-
pub fn container<D: Container>(self) -> <Self as AsStream<S, D>>::Stream where Self: AsStream<S, D> { self.as_stream() }
84+
pub fn container<D: Container>(self) -> OwnedStream<S, C> where Self: AsStream<S, D> { self.as_stream() }
8585
}
8686

8787
/// A stream batching data in vectors.
@@ -136,24 +136,20 @@ impl<S: Scope, C: Container> StreamCore<S, C> {
136136
pub fn scope(&self) -> S { self.scope.clone() }
137137

138138
/// Allows the assertion of a container type, for the benefit of type inference.
139-
pub fn container<D: Container>(self) -> <Self as AsStream<S, D>>::Stream where Self: AsStream<S, D> { self.as_stream() }
139+
pub fn container<D: Container>(self) -> StreamCore<S, C> where Self: AsStream<S, D> { self.as_stream() }
140140
}
141141

142142
/// A type that can be translated to a [StreamCore].
143143
pub trait AsStream<S: Scope, C> {
144-
/// The type of the stream.
145-
type Stream;
146144
/// Translate `self` to a stream.
147-
fn as_stream(self) -> Self::Stream;
145+
fn as_stream(self) -> Self;
148146
}
149147

150148
impl<S: Scope, C> AsStream<S, C> for StreamCore<S, C> {
151-
type Stream = StreamCore<S, C>;
152149
fn as_stream(self) -> Self { self }
153150
}
154151

155152
impl<G: Scope, C> AsStream<G, C> for OwnedStream<G, C> {
156-
type Stream = OwnedStream<G, C>;
157153
fn as_stream(self) -> Self { self }
158154
}
159155

0 commit comments

Comments
 (0)