Skip to content

Commit 9eb3f5b

Browse files
authored
rt(threaded): cap LIFO slot polls (#5712)
As an optimization to improve locality, the multi-threaded scheduler maintains a single slot (LIFO slot). When a task is scheduled, it goes into the LIFO slot. The scheduler will run tasks in the LIFO slot first before checking the local queue. Ping-ping style workloads where task A notifies task B, which notifies task A again, can cause starvation as these two tasks repeatedly schedule the other in the LIFO slot. #5686, a first attempt at solving this problem, consumes a unit of budget each time a task is scheduled from the LIFO slot. However, at the time of this commit, the scheduler allocates 128 units of budget for each chunk of work. This is relatively high in situations where tasks do not perform many async operations yet have meaningful poll times (even 5-10 microsecond poll times can have an outsized impact on the scheduler). In an ideal world, the scheduler would adapt to the workload it is executing. However, as a stopgap, this commit limits the times the LIFO slot is prioritized per scheduler tick.
1 parent 3a94eb0 commit 9eb3f5b

File tree

6 files changed

+119
-30
lines changed

6 files changed

+119
-30
lines changed

.github/workflows/ci.yml

+26
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ jobs:
6060
- wasm32-wasi
6161
- check-external-types
6262
- check-fuzzing
63+
- check-unstable-mt-counters
6364
steps:
6465
- run: exit 0
6566

@@ -233,6 +234,31 @@ jobs:
233234
# the unstable cfg to RustDoc
234235
RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_taskdump
235236

237+
check-unstable-mt-counters:
238+
name: check tokio full --internal-mt-counters
239+
needs: basics
240+
runs-on: ${{ matrix.os }}
241+
strategy:
242+
matrix:
243+
include:
244+
- os: ubuntu-latest
245+
steps:
246+
- uses: actions/checkout@v3
247+
- name: Install Rust ${{ env.rust_stable }}
248+
uses: dtolnay/rust-toolchain@master
249+
with:
250+
toolchain: ${{ env.rust_stable }}
251+
- uses: Swatinem/rust-cache@v2
252+
# Run `tokio` with "unstable" and "taskdump" cfg flags.
253+
- name: check tokio full --cfg unstable --cfg internal-mt-counters
254+
run: cargo test --all-features
255+
working-directory: tokio
256+
env:
257+
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_internal_mt_counters -Dwarnings
258+
# in order to run doctests for unstable features, we must also pass
259+
# the unstable cfg to RustDoc
260+
RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_internal_mt_counters
261+
236262
miri:
237263
name: miri
238264
needs: basics

.github/workflows/loom.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ jobs:
4848
run: cargo test --lib --release --features full -- --nocapture $SCOPE
4949
working-directory: tokio
5050
env:
51-
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings
51+
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings -C debug-assertions
5252
LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }}
5353
LOOM_MAX_BRANCHES: 10000
5454
SCOPE: ${{ matrix.scope }}

tokio/src/runtime/coop.rs

-11
Original file line numberDiff line numberDiff line change
@@ -119,17 +119,6 @@ cfg_rt_multi_thread! {
119119
pub(crate) fn set(budget: Budget) {
120120
let _ = context::budget(|cell| cell.set(budget));
121121
}
122-
123-
/// Consume one unit of progress from the current task's budget.
124-
pub(crate) fn consume_one() {
125-
let _ = context::budget(|cell| {
126-
let mut budget = cell.get();
127-
if let Some(ref mut counter) = budget.0 {
128-
*counter = counter.saturating_sub(1);
129-
}
130-
cell.set(budget);
131-
});
132-
}
133122
}
134123

135124
cfg_rt! {

tokio/src/runtime/scheduler/multi_thread/counters.rs

+16
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,23 @@ mod imp {
66
static NUM_MAINTENANCE: AtomicUsize = AtomicUsize::new(0);
77
static NUM_NOTIFY_LOCAL: AtomicUsize = AtomicUsize::new(0);
88
static NUM_UNPARKS_LOCAL: AtomicUsize = AtomicUsize::new(0);
9+
static NUM_LIFO_SCHEDULES: AtomicUsize = AtomicUsize::new(0);
10+
static NUM_LIFO_CAPPED: AtomicUsize = AtomicUsize::new(0);
911

1012
impl Drop for super::Counters {
1113
fn drop(&mut self) {
1214
let notifies_local = NUM_NOTIFY_LOCAL.load(Relaxed);
1315
let unparks_local = NUM_UNPARKS_LOCAL.load(Relaxed);
1416
let maintenance = NUM_MAINTENANCE.load(Relaxed);
17+
let lifo_scheds = NUM_LIFO_SCHEDULES.load(Relaxed);
18+
let lifo_capped = NUM_LIFO_CAPPED.load(Relaxed);
1519

1620
println!("---");
1721
println!("notifies (local): {}", notifies_local);
1822
println!(" unparks (local): {}", unparks_local);
1923
println!(" maintenance: {}", maintenance);
24+
println!(" LIFO schedules: {}", lifo_scheds);
25+
println!(" LIFO capped: {}", lifo_capped);
2026
}
2127
}
2228

@@ -31,13 +37,23 @@ mod imp {
3137
pub(crate) fn inc_num_maintenance() {
3238
NUM_MAINTENANCE.fetch_add(1, Relaxed);
3339
}
40+
41+
pub(crate) fn inc_lifo_schedules() {
42+
NUM_LIFO_SCHEDULES.fetch_add(1, Relaxed);
43+
}
44+
45+
pub(crate) fn inc_lifo_capped() {
46+
NUM_LIFO_CAPPED.fetch_add(1, Relaxed);
47+
}
3448
}
3549

3650
#[cfg(not(tokio_internal_mt_counters))]
3751
mod imp {
3852
pub(crate) fn inc_num_inc_notify_local() {}
3953
pub(crate) fn inc_num_unparks_local() {}
4054
pub(crate) fn inc_num_maintenance() {}
55+
pub(crate) fn inc_lifo_schedules() {}
56+
pub(crate) fn inc_lifo_capped() {}
4157
}
4258

4359
#[derive(Debug)]

tokio/src/runtime/scheduler/multi_thread/worker.rs

+72-18
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,14 @@ struct Core {
9090
/// When a task is scheduled from a worker, it is stored in this slot. The
9191
/// worker will check this slot for a task **before** checking the run
9292
/// queue. This effectively results in the **last** scheduled task to be run
93-
/// next (LIFO). This is an optimization for message passing patterns and
94-
/// helps to reduce latency.
93+
/// next (LIFO). This is an optimization for improving locality which
94+
/// benefits message passing patterns and helps to reduce latency.
9595
lifo_slot: Option<Notified>,
9696

97+
/// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
98+
/// they go to the back of the `run_queue`.
99+
lifo_enabled: bool,
100+
97101
/// The worker-local run queue.
98102
run_queue: queue::Local<Arc<Handle>>,
99103

@@ -191,6 +195,12 @@ type Notified = task::Notified<Arc<Handle>>;
191195
// Tracks thread-local state
192196
scoped_thread_local!(static CURRENT: Context);
193197

198+
/// Value picked out of thin-air. Running the LIFO slot a handful of times
199+
/// seemms sufficient to benefit from locality. More than 3 times probably is
200+
/// overweighing. The value can be tuned in the future with data that shows
201+
/// improvements.
202+
const MAX_LIFO_POLLS_PER_TICK: usize = 3;
203+
194204
pub(super) fn create(
195205
size: usize,
196206
park: Parker,
@@ -214,6 +224,7 @@ pub(super) fn create(
214224
cores.push(Box::new(Core {
215225
tick: 0,
216226
lifo_slot: None,
227+
lifo_enabled: !config.disable_lifo_slot,
217228
run_queue,
218229
is_searching: false,
219230
is_shutdown: false,
@@ -422,7 +433,13 @@ fn run(worker: Arc<Worker>) {
422433

423434
impl Context {
424435
fn run(&self, mut core: Box<Core>) -> RunResult {
436+
// Reset `lifo_enabled` here in case the core was previously stolen from
437+
// a task that had the LIFO slot disabled.
438+
self.reset_lifo_enabled(&mut core);
439+
425440
while !core.is_shutdown {
441+
self.assert_lifo_enabled_is_correct(&core);
442+
426443
// Increment the tick
427444
core.tick();
428445

@@ -463,13 +480,16 @@ impl Context {
463480
// another idle worker to try to steal work.
464481
core.transition_from_searching(&self.worker);
465482

483+
self.assert_lifo_enabled_is_correct(&core);
484+
466485
// Make the core available to the runtime context
467486
core.metrics.start_poll();
468487
*self.core.borrow_mut() = Some(core);
469488

470489
// Run the task
471490
coop::budget(|| {
472491
task.run();
492+
let mut lifo_polls = 0;
473493

474494
// As long as there is budget remaining and a task exists in the
475495
// `lifo_slot`, then keep running.
@@ -478,7 +498,12 @@ impl Context {
478498
// by another worker.
479499
let mut core = match self.core.borrow_mut().take() {
480500
Some(core) => core,
481-
None => return Err(()),
501+
None => {
502+
// In this case, we cannot call `reset_lifo_enabled()`
503+
// because the core was stolen. The stealer will handle
504+
// that at the top of `Context::run`
505+
return Err(());
506+
}
482507
};
483508

484509
// If task poll times is enabled, measure the poll time. Note
@@ -491,35 +516,62 @@ impl Context {
491516
// Check for a task in the LIFO slot
492517
let task = match core.lifo_slot.take() {
493518
Some(task) => task,
494-
None => return Ok(core),
519+
None => {
520+
self.reset_lifo_enabled(&mut core);
521+
return Ok(core);
522+
}
495523
};
496524

497-
// Polling a task doesn't necessarily consume any budget, if it
498-
// doesn't use any Tokio leaf futures. To prevent such tasks
499-
// from using the lifo slot in an infinite loop, we consume an
500-
// extra unit of budget between each iteration of the loop.
501-
coop::consume_one();
502-
503-
if coop::has_budget_remaining() {
504-
// Run the LIFO task, then loop
505-
core.metrics.start_poll();
506-
*self.core.borrow_mut() = Some(core);
507-
let task = self.worker.handle.shared.owned.assert_owner(task);
508-
task.run();
509-
} else {
525+
if !coop::has_budget_remaining() {
510526
// Not enough budget left to run the LIFO task, push it to
511527
// the back of the queue and return.
512528
core.run_queue.push_back_or_overflow(
513529
task,
514530
self.worker.inject(),
515531
&mut core.metrics,
516532
);
533+
// If we hit this point, the LIFO slot should be enabled.
534+
// There is no need to reset it.
535+
debug_assert!(core.lifo_enabled);
517536
return Ok(core);
518537
}
538+
539+
// Track that we are about to run a task from the LIFO slot.
540+
lifo_polls += 1;
541+
super::counters::inc_lifo_schedules();
542+
543+
// Disable the LIFO slot if we reach our limit
544+
//
545+
// In ping-ping style workloads where task A notifies task B,
546+
// which notifies task A again, continuously prioritizing the
547+
// LIFO slot can cause starvation as these two tasks will
548+
// repeatedly schedule the other. To mitigate this, we limit the
549+
// number of times the LIFO slot is prioritized.
550+
if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
551+
core.lifo_enabled = false;
552+
super::counters::inc_lifo_capped();
553+
}
554+
555+
// Run the LIFO task, then loop
556+
core.metrics.start_poll();
557+
*self.core.borrow_mut() = Some(core);
558+
let task = self.worker.handle.shared.owned.assert_owner(task);
559+
task.run();
519560
}
520561
})
521562
}
522563

564+
fn reset_lifo_enabled(&self, core: &mut Core) {
565+
core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
566+
}
567+
568+
fn assert_lifo_enabled_is_correct(&self, core: &Core) {
569+
debug_assert_eq!(
570+
core.lifo_enabled,
571+
!self.worker.handle.shared.config.disable_lifo_slot
572+
);
573+
}
574+
523575
fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
524576
if core.tick % self.worker.handle.shared.config.event_interval == 0 {
525577
super::counters::inc_num_maintenance();
@@ -573,6 +625,8 @@ impl Context {
573625
}
574626

575627
fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
628+
self.assert_lifo_enabled_is_correct(&core);
629+
576630
// Take the parker out of core
577631
let mut park = core.park.take().expect("park missing");
578632

@@ -840,7 +894,7 @@ impl Handle {
840894
// task must always be pushed to the back of the queue, enabling other
841895
// tasks to be executed. If **not** a yield, then there is more
842896
// flexibility and the task may go to the front of the queue.
843-
let should_notify = if is_yield || self.shared.config.disable_lifo_slot {
897+
let should_notify = if is_yield || !core.lifo_enabled {
844898
core.run_queue
845899
.push_back_or_overflow(task, &self.shared.inject, &mut core.metrics);
846900
true

tokio/src/runtime/tests/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ cfg_loom! {
6060
mod loom_shutdown_join;
6161
mod loom_join_set;
6262
mod loom_yield;
63+
64+
// Make sure debug assertions are enabled
65+
#[cfg(not(debug_assertions))]
66+
compiler_error!("these tests require debug assertions to be enabled");
6367
}
6468

6569
cfg_not_loom! {

0 commit comments

Comments
 (0)