Skip to content

Commit d8d4d42

Browse files
committed
Move parking from communication into scheduler
1 parent 1257900 commit d8d4d42

File tree

8 files changed

+44
-67
lines changed

8 files changed

+44
-67
lines changed

communication/src/allocator/generic.rs

-8
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,6 @@ impl Allocate for Generic {
9393
fn receive(&mut self) { self.receive(); }
9494
fn release(&mut self) { self.release(); }
9595
fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
96-
fn await_events(&self, _duration: Option<std::time::Duration>) {
97-
match self {
98-
Generic::Thread(t) => t.await_events(_duration),
99-
Generic::Process(p) => p.await_events(_duration),
100-
Generic::ProcessBinary(pb) => pb.await_events(_duration),
101-
Generic::ZeroCopy(z) => z.await_events(_duration),
102-
}
103-
}
10496
}
10597

10698

communication/src/allocator/mod.rs

-9
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::rc::Rc;
44
use std::cell::RefCell;
5-
use std::time::Duration;
65

76
pub use self::thread::Thread;
87
pub use self::process::Process;
@@ -57,14 +56,6 @@ pub trait Allocate {
5756
/// into a performance problem.
5857
fn events(&self) -> &Rc<RefCell<Vec<usize>>>;
5958

60-
/// Awaits communication events.
61-
///
62-
/// This method may park the current thread, for at most `duration`,
63-
/// until new events arrive.
64-
/// The method is not guaranteed to wait for any amount of time, but
65-
/// good implementations should use this as a hint to park the thread.
66-
fn await_events(&self, _duration: Option<Duration>) { }
67-
6859
/// Ensure that received messages are surfaced in each channel.
6960
///
7061
/// This method should be called to ensure that received messages are

communication/src/allocator/process.rs

-5
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::rc::Rc;
44
use std::cell::RefCell;
55
use std::sync::{Arc, Mutex};
66
use std::any::Any;
7-
use std::time::Duration;
87
use std::collections::{HashMap};
98
use crossbeam_channel::{Sender, Receiver};
109

@@ -178,10 +177,6 @@ impl Allocate for Process {
178177
self.inner.events()
179178
}
180179

181-
fn await_events(&self, duration: Option<Duration>) {
182-
self.inner.await_events(duration);
183-
}
184-
185180
fn receive(&mut self) {
186181
let mut events = self.inner.events().borrow_mut();
187182
while let Ok(index) = self.counters_recv.try_recv() {

communication/src/allocator/thread.rs

-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::rc::Rc;
44
use std::cell::RefCell;
5-
use std::time::Duration;
65
use std::collections::VecDeque;
76

87
use crate::allocator::{Allocate, AllocateBuilder};
@@ -36,16 +35,6 @@ impl Allocate for Thread {
3635
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
3736
&self.events
3837
}
39-
fn await_events(&self, duration: Option<Duration>) {
40-
if self.events.borrow().is_empty() {
41-
if let Some(duration) = duration {
42-
std::thread::park_timeout(duration);
43-
}
44-
else {
45-
std::thread::park();
46-
}
47-
}
48-
}
4938
}
5039

5140
/// Thread-local counting channel push endpoint.

communication/src/allocator/zero_copy/allocator.rs

-3
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,4 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
271271
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
272272
self.inner.events()
273273
}
274-
fn await_events(&self, duration: Option<std::time::Duration>) {
275-
self.inner.await_events(duration);
276-
}
277274
}

communication/src/allocator/zero_copy/allocator_process.rs

-10
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,4 @@ impl Allocate for ProcessAllocator {
240240
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
241241
&self.events
242242
}
243-
fn await_events(&self, duration: Option<std::time::Duration>) {
244-
if self.events.borrow().is_empty() {
245-
if let Some(duration) = duration {
246-
std::thread::park_timeout(duration);
247-
}
248-
else {
249-
std::thread::park();
250-
}
251-
}
252-
}
253243
}

timely/src/scheduling/activate.rs

+29-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ impl Activations {
177177
/// This method should be used before putting a worker thread to sleep, as it
178178
/// indicates the amount of time before the thread should be unparked for the
179179
/// next scheduled activation.
180-
pub fn empty_for(&self) -> Option<Duration> {
180+
fn empty_for(&self) -> Option<Duration> {
181181
if !self.bounds.is_empty() || self.timer.is_none() {
182182
Some(Duration::new(0,0))
183183
}
@@ -189,6 +189,34 @@ impl Activations {
189189
})
190190
}
191191
}
192+
193+
/// Indicates that there is nothing to do for `timeout`, and that the scheduler
194+
/// can allow the thread to sleep until then.
195+
///
196+
/// The method does not *need* to park the thread, and indeed it may elect to
197+
/// unpark earlier if there are deferred activations.
198+
pub fn park_timeout(&self, timeout: Option<Duration>) {
199+
let empty_for = self.empty_for();
200+
let timeout = match (timeout, empty_for) {
201+
(Some(x), Some(y)) => Some(std::cmp::min(x,y)),
202+
(x, y) => x.or(y),
203+
};
204+
205+
if let Some(timeout) = timeout {
206+
std::thread::park_timeout(timeout);
207+
}
208+
else {
209+
std::thread::park();
210+
}
211+
}
212+
213+
/// True iff there are no immediate activations.
214+
///
215+
/// Used by others to guard work done in anticipation of potentially parking.
216+
/// An alternate method name could be `would_park`.
217+
pub fn is_idle(&self) -> bool {
218+
self.bounds.is_empty() && self.timer.is_none()
219+
}
192220
}
193221

194222
/// A thread-safe handle to an `Activations`.

timely/src/worker.rs

+15-20
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ impl<A: Allocate> Worker<A> {
331331
/// worker.step_or_park(Some(Duration::from_secs(1)));
332332
/// });
333333
/// ```
334-
pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
334+
pub fn step_or_park(&mut self, timeout: Option<Duration>) -> bool {
335335

336336
{ // Process channel events. Activate responders.
337337
let mut allocator = self.allocator.borrow_mut();
@@ -360,28 +360,23 @@ impl<A: Allocate> Worker<A> {
360360
.borrow_mut()
361361
.advance();
362362

363-
// Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
364-
let empty_for = self.activations.borrow().empty_for();
365-
// Determine the minimum park duration, where `None` are an absence of a constraint.
366-
let delay = match (duration, empty_for) {
367-
(Some(x), Some(y)) => Some(std::cmp::min(x,y)),
368-
(x, y) => x.or(y),
369-
};
363+
if self.activations.borrow().is_idle() {
364+
// If the timeout is zero, don't bother trying to park.
365+
// More generally, we could put some threshold in here.
366+
if timeout != Some(Duration::new(0, 0)) {
367+
// Log parking and flush log.
368+
if let Some(l) = self.logging().as_mut() {
369+
l.log(crate::logging::ParkEvent::park(timeout));
370+
l.flush();
371+
}
370372

371-
if delay != Some(Duration::new(0,0)) {
373+
// We have just drained `allocator.events()` up above;
374+
// otherwise we should first check it for emptiness.
375+
self.activations.borrow().park_timeout(timeout);
372376

373-
// Log parking and flush log.
374-
if let Some(l) = self.logging().as_mut() {
375-
l.log(crate::logging::ParkEvent::park(delay));
376-
l.flush();
377+
// Log return from unpark.
378+
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
377379
}
378-
379-
self.allocator
380-
.borrow()
381-
.await_events(delay);
382-
383-
// Log return from unpark.
384-
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
385380
}
386381
else { // Schedule active dataflows.
387382

0 commit comments

Comments
 (0)