-
Notifications
You must be signed in to change notification settings - Fork 281
/
Copy pathhashjoin.rs
102 lines (78 loc) · 3.84 KB
/
hashjoin.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use std::collections::HashMap;
use rand::{Rng, SeedableRng, rngs::SmallRng};
use timely::dataflow::*;
use timely::dataflow::operators::{Input, Probe};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Exchange;
fn main() {
// command-line args: numbers of nodes and edges in the random graph.
let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
timely::execute_from_args(std::env::args().skip(4), move |worker| {
let index = worker.index();
let peers = worker.peers();
let mut input1 = InputHandle::new();
let mut input2 = InputHandle::new();
let mut probe = ProbeHandle::new();
worker.dataflow(|scope| {
let stream1 = scope.input_from(&mut input1);
let stream2 = scope.input_from(&mut input2);
let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
stream1
.binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
let mut map1 = HashMap::<u64, Vec<u64>>::new();
let mut map2 = HashMap::<u64, Vec<u64>>::new();
move |input1, input2, output| {
// Drain first input, check second map, update first map.
input1.for_each(|time, data| {
let mut session = output.session(&time);
for (key, val1) in data.drain(..) {
if let Some(values) = map2.get(&key) {
for val2 in values.iter() {
session.give((val1.clone(), val2.clone()));
}
}
map1.entry(key).or_insert(Vec::new()).push(val1);
}
});
// Drain second input, check first map, update second map.
input2.for_each(|time, data| {
let mut session = output.session(&time);
for (key, val2) in data.drain(..) {
if let Some(values) = map1.get(&key) {
for val1 in values.iter() {
session.give((val1.clone(), val2.clone()));
}
}
map2.entry(key).or_insert(Vec::new()).push(val2);
}
});
}
})
.container::<Vec<_>>()
.probe_with(&mut probe);
});
let mut rng: SmallRng = SeedableRng::seed_from_u64(index as u64);
let timer = std::time::Instant::now();
let mut sent = 0;
while sent < (vals / peers) {
// Send some amount of data, no more than `batch`.
let to_send = std::cmp::min(batch, vals/peers - sent);
for _ in 0 .. to_send {
input1.send((rng.gen_range(0..keys), rng.gen_range(0..keys)));
input2.send((rng.gen_range(0..keys), rng.gen_range(0..keys)));
}
sent += to_send;
// Advance input, iterate until data cleared.
let next = input1.epoch() + 1;
input1.advance_to(next);
input2.advance_to(next);
while probe.less_than(input1.time()) {
worker.step();
}
println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
}
}).unwrap(); // asserts error-free execution;
}