Skip to content

Commit ba33ade

Browse files
committed
Consolidate activation requests as they occur
1 parent 08b2087 commit ba33ade

File tree

1 file changed

+127
-45
lines changed

1 file changed

+127
-45
lines changed

timely/src/scheduling/activate.rs

+127-45
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,10 @@ impl Scheduler for Box<dyn Scheduler> {
3939
/// Allocation-free activation tracker.
4040
#[derive(Debug)]
4141
pub struct Activations {
42-
clean: usize,
43-
/// `(offset, length)`
44-
bounds: Vec<(usize, usize)>,
45-
slices: Vec<usize>,
46-
buffer: Vec<usize>,
42+
/// Current activations that are being served.
43+
current: ActivationsBuffer,
44+
/// Upcoming activations that may soon be served.
45+
pending: ActivationsBuffer,
4746

4847
// Inter-thread activations.
4948
tx: Sender<Vec<usize>>,
@@ -60,10 +59,8 @@ impl Activations {
6059
pub fn new(timer: Instant) -> Self {
6160
let (tx, rx) = crossbeam_channel::unbounded();
6261
Self {
63-
clean: 0,
64-
bounds: Vec::new(),
65-
slices: Vec::new(),
66-
buffer: Vec::new(),
62+
current: ActivationsBuffer::new(),
63+
pending: ActivationsBuffer::new(),
6764
tx,
6865
rx,
6966
timer,
@@ -73,8 +70,7 @@ impl Activations {
7370

7471
/// Activates the task addressed by `path`.
7572
pub fn activate(&mut self, path: &[usize]) {
76-
self.bounds.push((self.slices.len(), path.len()));
77-
self.slices.extend(path);
73+
self.pending.activate(path);
7874
}
7975

8076
/// Schedules a future activation for the task addressed by `path`.
@@ -89,6 +85,20 @@ impl Activations {
8985
}
9086
}
9187

88+
/// Reactivates the task addressed by `path`, ideally within `delay`.
89+
///
90+
/// The task may be activated before `delay`, in which case the task should reactivate itself if
91+
/// it requires further reactivation, as this specific `delay` may no longer be in effect.
92+
pub fn activate_within(&mut self, path: &[usize], delay: Duration) {
93+
if delay == Duration::new(0, 0) {
94+
self.activate(path)
95+
}
96+
else {
97+
let moment = self.timer.elapsed() + delay;
98+
self.pending.activate_by(path, moment)
99+
}
100+
}
101+
92102
/// Discards the current active set and presents the next active set.
93103
pub fn advance(&mut self) {
94104

@@ -104,37 +114,135 @@ impl Activations {
104114
self.activate(&path[..]);
105115
}
106116

107-
self.bounds.drain(.. self.clean);
117+
self.current.clear();
118+
self.pending.compact();
119+
self.pending.extract_through(&mut self.current, now);
120+
121+
}
122+
123+
/// Maps a function across activated paths.
124+
pub fn map_active(&self, logic: impl Fn(&[usize])) {
125+
self.current.map_active(logic)
126+
}
127+
128+
/// Sets as active any symbols that follow `path`.
129+
pub fn for_extensions(&self, path: &[usize], action: impl FnMut(usize)) {
130+
self.current.for_extensions(path, action)
131+
}
132+
133+
/// Constructs a thread-safe `SyncActivations` handle to this activator.
134+
pub fn sync(&self) -> SyncActivations {
135+
SyncActivations {
136+
tx: self.tx.clone(),
137+
thread: std::thread::current(),
138+
}
139+
}
140+
141+
/// Time until next scheduled event.
142+
///
143+
/// This method should be used before putting a worker thread to sleep, as it
144+
/// indicates the amount of time before the thread should be unparked for the
145+
/// next scheduled activation.
146+
pub fn empty_for(&self) -> Option<Duration> {
147+
if !self.current.is_empty() || !self.pending.is_empty() {
148+
Some(Duration::new(0,0))
149+
}
150+
else {
151+
self.queue.peek().map(|Reverse((t,_a))| {
152+
let elapsed = self.timer.elapsed();
153+
if t < &elapsed { Duration::new(0,0) }
154+
else { *t - elapsed }
155+
})
156+
}
157+
}
158+
}
159+
160+
/// Manages delayed activations for path-named tasks.
161+
#[derive(Debug)]
162+
pub struct ActivationsBuffer {
163+
/// `(offset, length)`, and an elapsed timer duration.
164+
/// The zero duration can be used to indicate "immediately".
165+
bounds: Vec<(usize, usize, Duration)>,
166+
/// Elements of path slices.
167+
slices: Vec<usize>,
168+
/// A spare buffer to copy into.
169+
buffer: Vec<usize>,
170+
}
171+
172+
impl ActivationsBuffer {
173+
/// Creates a new activation tracker.
174+
pub fn new() -> Self {
175+
Self {
176+
bounds: Vec::new(),
177+
slices: Vec::new(),
178+
buffer: Vec::new(),
179+
}
180+
}
181+
182+
fn clear(&mut self) {
183+
self.bounds.clear();
184+
self.slices.clear();
185+
self.buffer.clear();
186+
}
187+
188+
fn is_empty(&self) -> bool {
189+
self.bounds.is_empty()
190+
}
191+
192+
/// Activates the task addressed by `path`.
193+
pub fn activate(&mut self, path: &[usize]) {
194+
self.activate_by(path, Duration::new(0, 0));
195+
}
196+
197+
/// Activates the task addressed by `path`.
198+
pub fn activate_by(&mut self, path: &[usize], duration: Duration) {
199+
self.bounds.push((self.slices.len(), path.len(), duration));
200+
self.slices.extend(path);
201+
}
202+
203+
/// Orders activations by their path, and retains only each's most immediate duration.
204+
pub fn compact(&mut self) {
108205

109206
{ // Scoped, to allow borrow to drop.
110207
let slices = &self.slices[..];
111-
self.bounds.sort_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
208+
// Sort slices by path, and within each path by duration.
209+
self.bounds.sort_by_key(|x| (&slices[x.0 .. (x.0 + x.1)], x.2));
210+
// Deduplicate by path, retaining the least duration.
112211
self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
113212
}
114213

115214
// Compact the slices.
116215
self.buffer.clear();
117-
for (offset, length) in self.bounds.iter_mut() {
216+
for (offset, length, _duration) in self.bounds.iter_mut() {
118217
self.buffer.extend(&self.slices[*offset .. (*offset + *length)]);
119218
*offset = self.buffer.len() - *length;
120219
}
121220
::std::mem::swap(&mut self.buffer, &mut self.slices);
221+
}
122222

123-
self.clean = self.bounds.len();
223+
/// Extracts all activations less or equal to `threshold` into `other`.
224+
pub fn extract_through(&mut self, other: &mut Self, threshold: Duration) {
225+
for (offset, length, duration) in self.bounds.iter_mut() {
226+
if *duration <= threshold {
227+
other.activate_by(&self.slices[*offset .. (*offset + *length)], *duration);
228+
}
229+
}
230+
self.bounds.retain(|(_off, _len, duration)| *duration > threshold);
231+
// Could `self.compact()` here, but it will happen as part of future work.
124232
}
125233

126234
/// Maps a function across activated paths.
127235
pub fn map_active(&self, logic: impl Fn(&[usize])) {
128-
for (offset, length) in self.bounds.iter() {
236+
for (offset, length, _duration) in self.bounds.iter() {
129237
logic(&self.slices[*offset .. (*offset + *length)]);
130238
}
131239
}
132-
240+
133241
/// Sets as active any symbols that follow `path`.
134242
pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {
135243

136244
let position =
137-
self.bounds[..self.clean]
245+
self.bounds
138246
.binary_search_by_key(&path, |x| &self.slices[x.0 .. (x.0 + x.1)]);
139247
let position = match position {
140248
Ok(x) => x,
@@ -146,7 +254,7 @@ impl Activations {
146254
.iter()
147255
.cloned()
148256
.skip(position)
149-
.map(|(offset, length)| &self.slices[offset .. (offset + length)])
257+
.map(|(offset, length, _)| &self.slices[offset .. (offset + length)])
150258
.take_while(|x| x.starts_with(path))
151259
.for_each(|x| {
152260
// push non-empty, non-duplicate extensions.
@@ -158,32 +266,6 @@ impl Activations {
158266
}
159267
});
160268
}
161-
162-
/// Constructs a thread-safe `SyncActivations` handle to this activator.
163-
pub fn sync(&self) -> SyncActivations {
164-
SyncActivations {
165-
tx: self.tx.clone(),
166-
thread: std::thread::current(),
167-
}
168-
}
169-
170-
/// Time until next scheduled event.
171-
///
172-
/// This method should be used before putting a worker thread to sleep, as it
173-
/// indicates the amount of time before the thread should be unparked for the
174-
/// next scheduled activation.
175-
pub fn empty_for(&self) -> Option<Duration> {
176-
if !self.bounds.is_empty() {
177-
Some(Duration::new(0,0))
178-
}
179-
else {
180-
self.queue.peek().map(|Reverse((t,_a))| {
181-
let elapsed = self.timer.elapsed();
182-
if t < &elapsed { Duration::new(0,0) }
183-
else { *t - elapsed }
184-
})
185-
}
186-
}
187269
}
188270

189271
/// A thread-safe handle to an `Activations`.

0 commit comments

Comments
 (0)