Skip to content

Commit 038c4d9

Browse files
authored
rt: implement task dumps for multi-thread runtime (#5717)
This patch implements task dumps on the multi-thread runtime. It complements #5608, which implemented task dumps on the current-thread runtime.
1 parent 7b24b22 commit 038c4d9

File tree

10 files changed

+631
-77
lines changed

10 files changed

+631
-77
lines changed

examples/dump.rs

+20-12
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
target_os = "linux",
77
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
88
))]
9-
#[tokio::main(flavor = "current_thread")]
9+
#[tokio::main]
1010
async fn main() {
1111
use std::hint::black_box;
1212

@@ -22,21 +22,29 @@ async fn main() {
2222

2323
#[inline(never)]
2424
async fn c() {
25-
black_box(tokio::task::yield_now()).await
25+
loop {
26+
tokio::task::yield_now().await;
27+
}
2628
}
2729

28-
tokio::spawn(a());
29-
tokio::spawn(b());
30-
tokio::spawn(c());
30+
async fn dump() {
31+
let handle = tokio::runtime::Handle::current();
32+
let dump = handle.dump().await;
3133

32-
let handle = tokio::runtime::Handle::current();
33-
let dump = handle.dump();
34-
35-
for (i, task) in dump.tasks().iter().enumerate() {
36-
let trace = task.trace();
37-
println!("task {i} trace:");
38-
println!("{trace}");
34+
for (i, task) in dump.tasks().iter().enumerate() {
35+
let trace = task.trace();
36+
println!("task {i} trace:");
37+
println!("{trace}\n");
38+
}
3939
}
40+
41+
tokio::select!(
42+
biased;
43+
_ = tokio::spawn(a()) => {},
44+
_ = tokio::spawn(b()) => {},
45+
_ = tokio::spawn(c()) => {},
46+
_ = dump() => {},
47+
);
4048
}
4149

4250
#[cfg(not(all(

tokio/src/loom/std/barrier.rs

+217
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
//! A `Barrier` that provides `wait_timeout`.
2+
//!
3+
//! This implementation mirrors that of the Rust standard library.
4+
5+
use crate::loom::sync::{Condvar, Mutex};
6+
use std::fmt;
7+
use std::time::{Duration, Instant};
8+
9+
/// A barrier enables multiple threads to synchronize the beginning
10+
/// of some computation.
11+
///
12+
/// # Examples
13+
///
14+
/// ```
15+
/// use std::sync::{Arc, Barrier};
16+
/// use std::thread;
17+
///
18+
/// let mut handles = Vec::with_capacity(10);
19+
/// let barrier = Arc::new(Barrier::new(10));
20+
/// for _ in 0..10 {
21+
/// let c = Arc::clone(&barrier);
22+
/// // The same messages will be printed together.
23+
/// // You will NOT see any interleaving.
24+
/// handles.push(thread::spawn(move|| {
25+
/// println!("before wait");
26+
/// c.wait();
27+
/// println!("after wait");
28+
/// }));
29+
/// }
30+
/// // Wait for other threads to finish.
31+
/// for handle in handles {
32+
/// handle.join().unwrap();
33+
/// }
34+
/// ```
35+
pub(crate) struct Barrier {
36+
lock: Mutex<BarrierState>,
37+
cvar: Condvar,
38+
num_threads: usize,
39+
}
40+
41+
// The inner state of a double barrier
42+
struct BarrierState {
43+
count: usize,
44+
generation_id: usize,
45+
}
46+
47+
/// A `BarrierWaitResult` is returned by [`Barrier::wait()`] when all threads
48+
/// in the [`Barrier`] have rendezvoused.
49+
///
50+
/// # Examples
51+
///
52+
/// ```
53+
/// use std::sync::Barrier;
54+
///
55+
/// let barrier = Barrier::new(1);
56+
/// let barrier_wait_result = barrier.wait();
57+
/// ```
58+
pub(crate) struct BarrierWaitResult(bool);
59+
60+
impl fmt::Debug for Barrier {
61+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62+
f.debug_struct("Barrier").finish_non_exhaustive()
63+
}
64+
}
65+
66+
impl Barrier {
67+
/// Creates a new barrier that can block a given number of threads.
68+
///
69+
/// A barrier will block `n`-1 threads which call [`wait()`] and then wake
70+
/// up all threads at once when the `n`th thread calls [`wait()`].
71+
///
72+
/// [`wait()`]: Barrier::wait
73+
///
74+
/// # Examples
75+
///
76+
/// ```
77+
/// use std::sync::Barrier;
78+
///
79+
/// let barrier = Barrier::new(10);
80+
/// ```
81+
#[must_use]
82+
pub(crate) fn new(n: usize) -> Barrier {
83+
Barrier {
84+
lock: Mutex::new(BarrierState {
85+
count: 0,
86+
generation_id: 0,
87+
}),
88+
cvar: Condvar::new(),
89+
num_threads: n,
90+
}
91+
}
92+
93+
/// Blocks the current thread until all threads have rendezvoused here.
94+
///
95+
/// Barriers are re-usable after all threads have rendezvoused once, and can
96+
/// be used continuously.
97+
///
98+
/// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that
99+
/// returns `true` from [`BarrierWaitResult::is_leader()`] when returning
100+
/// from this function, and all other threads will receive a result that
101+
/// will return `false` from [`BarrierWaitResult::is_leader()`].
102+
///
103+
/// # Examples
104+
///
105+
/// ```
106+
/// use std::sync::{Arc, Barrier};
107+
/// use std::thread;
108+
///
109+
/// let mut handles = Vec::with_capacity(10);
110+
/// let barrier = Arc::new(Barrier::new(10));
111+
/// for _ in 0..10 {
112+
/// let c = Arc::clone(&barrier);
113+
/// // The same messages will be printed together.
114+
/// // You will NOT see any interleaving.
115+
/// handles.push(thread::spawn(move|| {
116+
/// println!("before wait");
117+
/// c.wait();
118+
/// println!("after wait");
119+
/// }));
120+
/// }
121+
/// // Wait for other threads to finish.
122+
/// for handle in handles {
123+
/// handle.join().unwrap();
124+
/// }
125+
/// ```
126+
pub(crate) fn wait(&self) -> BarrierWaitResult {
127+
let mut lock = self.lock.lock();
128+
let local_gen = lock.generation_id;
129+
lock.count += 1;
130+
if lock.count < self.num_threads {
131+
// We need a while loop to guard against spurious wakeups.
132+
// https://en.wikipedia.org/wiki/Spurious_wakeup
133+
while local_gen == lock.generation_id {
134+
lock = self.cvar.wait(lock).unwrap();
135+
}
136+
BarrierWaitResult(false)
137+
} else {
138+
lock.count = 0;
139+
lock.generation_id = lock.generation_id.wrapping_add(1);
140+
self.cvar.notify_all();
141+
BarrierWaitResult(true)
142+
}
143+
}
144+
145+
/// Blocks the current thread until all threads have rendezvoused here for
146+
/// at most `timeout` duration.
147+
pub(crate) fn wait_timeout(&self, timeout: Duration) -> Option<BarrierWaitResult> {
148+
// This implementation mirrors `wait`, but with each blocking operation
149+
// replaced by a timeout-amenable alternative.
150+
151+
let deadline = Instant::now() + timeout;
152+
153+
// Acquire `self.lock` with at most `timeout` duration.
154+
let mut lock = loop {
155+
if let Some(guard) = self.lock.try_lock() {
156+
break guard;
157+
} else if Instant::now() > deadline {
158+
return None;
159+
} else {
160+
std::thread::yield_now();
161+
}
162+
};
163+
164+
// Shrink the `timeout` to account for the time taken to acquire `lock`.
165+
let timeout = deadline.saturating_duration_since(Instant::now());
166+
167+
let local_gen = lock.generation_id;
168+
lock.count += 1;
169+
if lock.count < self.num_threads {
170+
// We need a while loop to guard against spurious wakeups.
171+
// https://en.wikipedia.org/wiki/Spurious_wakeup
172+
while local_gen == lock.generation_id {
173+
let (guard, timeout_result) = self.cvar.wait_timeout(lock, timeout).unwrap();
174+
lock = guard;
175+
if timeout_result.timed_out() {
176+
return None;
177+
}
178+
}
179+
Some(BarrierWaitResult(false))
180+
} else {
181+
lock.count = 0;
182+
lock.generation_id = lock.generation_id.wrapping_add(1);
183+
self.cvar.notify_all();
184+
Some(BarrierWaitResult(true))
185+
}
186+
}
187+
}
188+
189+
impl fmt::Debug for BarrierWaitResult {
190+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191+
f.debug_struct("BarrierWaitResult")
192+
.field("is_leader", &self.is_leader())
193+
.finish()
194+
}
195+
}
196+
197+
impl BarrierWaitResult {
198+
/// Returns `true` if this thread is the "leader thread" for the call to
199+
/// [`Barrier::wait()`].
200+
///
201+
/// Only one thread will have `true` returned from their result, all other
202+
/// threads will have `false` returned.
203+
///
204+
/// # Examples
205+
///
206+
/// ```
207+
/// use std::sync::Barrier;
208+
///
209+
/// let barrier = Barrier::new(1);
210+
/// let barrier_wait_result = barrier.wait();
211+
/// println!("{:?}", barrier_wait_result.is_leader());
212+
/// ```
213+
#[must_use]
214+
pub(crate) fn is_leader(&self) -> bool {
215+
self.0
216+
}
217+
}

tokio/src/loom/std/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod atomic_u16;
44
mod atomic_u32;
55
mod atomic_u64;
66
mod atomic_usize;
7+
mod barrier;
78
mod mutex;
89
#[cfg(feature = "parking_lot")]
910
mod parking_lot;
@@ -76,6 +77,8 @@ pub(crate) mod sync {
7677

7778
pub(crate) use std::sync::atomic::{fence, AtomicBool, AtomicPtr, AtomicU8, Ordering};
7879
}
80+
81+
pub(crate) use super::barrier::Barrier;
7982
}
8083

8184
pub(crate) mod sys {

tokio/src/runtime/handle.rs

+28-3
Original file line numberDiff line numberDiff line change
@@ -339,15 +339,40 @@ cfg_metrics! {
339339
cfg_taskdump! {
340340
impl Handle {
341341
/// Capture a snapshot of this runtime's state.
342-
pub fn dump(&self) -> crate::runtime::Dump {
342+
pub async fn dump(&self) -> crate::runtime::Dump {
343343
match &self.inner {
344344
scheduler::Handle::CurrentThread(handle) => handle.dump(),
345345
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
346-
scheduler::Handle::MultiThread(_) =>
347-
unimplemented!("taskdumps are unsupported on the multi-thread runtime"),
346+
scheduler::Handle::MultiThread(handle) => {
347+
// perform the trace in a separate thread so that the
348+
// trace itself does not appear in the taskdump.
349+
let handle = handle.clone();
350+
spawn_thread(async {
351+
let handle = handle;
352+
handle.dump().await
353+
}).await
354+
},
348355
}
349356
}
350357
}
358+
359+
cfg_rt_multi_thread! {
360+
/// Spawn a new thread and asynchronously await on its result.
361+
async fn spawn_thread<F>(f: F) -> <F as Future>::Output
362+
where
363+
F: Future + Send + 'static,
364+
<F as Future>::Output: Send + 'static
365+
{
366+
let (tx, rx) = crate::sync::oneshot::channel();
367+
crate::loom::thread::spawn(|| {
368+
let rt = crate::runtime::Builder::new_current_thread().build().unwrap();
369+
rt.block_on(async {
370+
let _ = tx.send(f.await);
371+
});
372+
});
373+
rx.await.unwrap()
374+
}
375+
}
351376
}
352377

353378
/// Error returned by `try_current` when no Runtime has been started

tokio/src/runtime/scheduler/multi_thread/handle.rs

+25
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,31 @@ cfg_metrics! {
9595
}
9696
}
9797

98+
cfg_taskdump! {
99+
impl Handle {
100+
pub(crate) async fn dump(&self) -> crate::runtime::Dump {
101+
let trace_status = &self.shared.trace_status;
102+
103+
// If a dump is in progress, block.
104+
trace_status.start_trace_request(&self).await;
105+
106+
let result = loop {
107+
if let Some(result) = trace_status.take_result() {
108+
break result;
109+
} else {
110+
self.notify_all();
111+
trace_status.result_ready.notified().await;
112+
}
113+
};
114+
115+
// Allow other queued dumps to proceed.
116+
trace_status.end_trace_request(&self).await;
117+
118+
result
119+
}
120+
}
121+
}
122+
98123
impl fmt::Debug for Handle {
99124
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
100125
fmt.debug_struct("multi_thread::Handle { ... }").finish()

tokio/src/runtime/scheduler/multi_thread/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ pub(crate) mod queue;
2323
mod worker;
2424
pub(crate) use worker::{Context, Launch, Shared};
2525

26+
cfg_taskdump! {
27+
pub(crate) use worker::Synced;
28+
}
29+
2630
pub(crate) use worker::block_in_place;
2731

2832
use crate::loom::sync::Arc;

0 commit comments

Comments
 (0)