@@ -8,30 +8,45 @@ use std::time::{Duration, Instant};
8
8
use std:: cmp:: Reverse ;
9
9
use crossbeam_channel:: { Sender , Receiver } ;
10
10
11
- /// Methods required to act as a timely scheduler .
11
+ /// Methods required to act as a scheduler for timely operators .
12
12
///
13
- /// The core methods are the activation of "paths", sequences of integers, and
14
- /// the enumeration of active paths by prefix. A scheduler may delay the report
15
- /// of a path indefinitely, but it should report at least one extension for the
16
- /// empty path `&[]` or risk parking the worker thread without a certain unpark.
13
+ /// Operators are described by "paths" of integers, indicating the path along
14
+ /// a tree of regions, arriving at the the operator. Each path is either "idle"
15
+ /// or "active", where the latter indicates that someone has requested that the
16
+ /// operator be scheduled by the worker. Operators go from idle to active when
17
+ /// the `activate(path)` method is called, and from active to idle when the path
18
+ /// is returned through a call to `extensions(path, _)`.
17
19
///
18
- /// There is no known harm to "spurious wake-ups" where a not-active path is
19
- /// returned through `extensions()`.
20
+ /// The worker will continually probe for extensions to the root empty path `[]`,
21
+ /// and then follow all returned addresses, recursively. A scheduler need not
22
+ /// schedule all active paths, but it should return *some* active path when the
23
+ /// worker probes the empty path, or the worker may put the thread to sleep.
24
+ ///
25
+ /// There is no known harm to scheduling an idle path.
26
+ /// The worker may speculatively schedule paths of its own accord.
20
27
pub trait Scheduler {
21
28
/// Mark a path as immediately scheduleable.
29
+ ///
30
+ /// The scheduler is not required to immediately schedule the path, but it
31
+ /// should not signal that it has no work until the path has been scheduled.
22
32
fn activate ( & mut self , path : & [ usize ] ) ;
23
33
/// Populates `dest` with next identifiers on active extensions of `path`.
24
34
///
25
35
/// This method is where a scheduler is allowed to exercise some discretion,
26
36
/// in that it does not need to present *all* extensions, but it can instead
27
- /// present only those that the runtime should schedule.
28
- fn extensions ( & mut self , path : & [ usize ] , dest : & mut Vec < usize > ) ;
37
+ /// present only those that the runtime should immediately schedule.
38
+ ///
39
+ /// The worker *will* schedule all extensions before probing new prefixes.
40
+ /// The scheduler is invited to rely on this, and to schedule in "batches",
41
+ /// where the next time the worker probes for extensions to the empty path
42
+ /// then all addresses in the batch have certainly been scheduled.
43
+ fn extensions ( & mut self , path : & [ usize ] , dest : & mut BinaryHeap < Reverse < usize > > ) ;
29
44
}
30
45
31
46
// Trait objects can be schedulers too.
32
47
impl Scheduler for Box < dyn Scheduler > {
33
48
fn activate ( & mut self , path : & [ usize ] ) { ( * * self ) . activate ( path) }
34
- fn extensions ( & mut self , path : & [ usize ] , dest : & mut Vec < usize > ) { ( * * self ) . extensions ( path, dest) }
49
+ fn extensions ( & mut self , path : & [ usize ] , dest : & mut BinaryHeap < Reverse < usize > > ) { ( * * self ) . extensions ( path, dest) }
35
50
}
36
51
37
52
/// Allocation-free activation tracker.
@@ -93,7 +108,7 @@ impl Activations {
93
108
}
94
109
95
110
/// Discards the current active set and presents the next active set.
96
- pub fn advance ( & mut self ) {
111
+ fn advance ( & mut self ) {
97
112
98
113
// Drain inter-thread activations.
99
114
while let Ok ( path) = self . rx . try_recv ( ) {
@@ -128,15 +143,15 @@ impl Activations {
128
143
self . clean = self . bounds . len ( ) ;
129
144
}
130
145
131
- /// Maps a function across activated paths.
132
- pub fn map_active ( & self , logic : impl Fn ( & [ usize ] ) ) {
133
- for ( offset, length) in self . bounds . iter ( ) {
134
- logic ( & self . slices [ * offset .. ( * offset + * length) ] ) ;
135
- }
136
- }
137
-
138
146
/// Sets as active any symbols that follow `path`.
139
- pub fn for_extensions ( & self , path : & [ usize ] , mut action : impl FnMut ( usize ) ) {
147
+ fn for_extensions ( & mut self , path : & [ usize ] , mut action : impl FnMut ( usize ) ) {
148
+
149
+ // Each call for the root path is a moment where the worker has reset.
150
+ // This relies on a worker implementation that follows the scheduling
151
+ // instructions perfectly; if any offered paths are not explored, oops.
152
+ if path. is_empty ( ) {
153
+ self . advance ( ) ;
154
+ }
140
155
141
156
let position =
142
157
self . bounds [ ..self . clean ]
@@ -209,13 +224,14 @@ impl Activations {
209
224
std:: thread:: park ( ) ;
210
225
}
211
226
}
227
+ }
212
228
213
- /// True iff there are no immediate activations.
214
- ///
215
- /// Used by others to guard work done in anticipation of potentially parking.
216
- /// An alternate method name could be `would_park`.
217
- pub fn is_idle ( & self ) -> bool {
218
- self . bounds . is_empty ( ) && self . timer . is_none ( )
229
+ impl Scheduler for Activations {
230
+ fn activate ( & mut self , path : & [ usize ] ) {
231
+ self . activate ( path ) ;
232
+ }
233
+ fn extensions ( & mut self , path : & [ usize ] , dest : & mut BinaryHeap < Reverse < usize > > ) {
234
+ self . for_extensions ( path , |index| dest . push ( Reverse ( index ) ) ) ;
219
235
}
220
236
}
221
237
0 commit comments