From 0c9308482bcf5dc22aa8cf2e348738ad8aa00a85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Wed, 26 Jun 2024 00:53:36 -0700 Subject: [PATCH] Add support for 'abort'ing shuttle::future::JoinHandle's --- src/future/mod.rs | 14 +++++- src/runtime/task/mod.rs | 10 ++++- src/runtime/thread/continuation.rs | 15 +++++-- tests/future/basic.rs | 72 ++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 7 deletions(-) diff --git a/src/future/mod.rs b/src/future/mod.rs index 79fee118..d11a87e9 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -54,12 +54,22 @@ impl Default for JoinHandleInner { } impl JoinHandle { + /// Detach the task associated with the handle. + fn detach(&self) { + ExecutionState::try_with(|state| { + if !state.is_finished() { + let task = state.get_mut(self.task_id); + task.detach(); + } + }); + } + /// Abort the task associated with the handle. pub fn abort(&self) { ExecutionState::try_with(|state| { if !state.is_finished() { let task = state.get_mut(self.task_id); - task.detach(); + task.abort(); } }); } @@ -96,7 +106,7 @@ impl Error for JoinError {} impl Drop for JoinHandle { fn drop(&mut self) { - self.abort(); + self.detach(); } } diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index 71acc0a2..5953ffa0 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -343,6 +343,12 @@ impl Task { self.detached = true; } + pub(crate) fn abort(&mut self) { + self.finish(); + let mut continuation = self.continuation.borrow_mut(); + continuation.wipe(); + } + pub(crate) fn waker(&self) -> Waker { self.waker.clone() } @@ -373,8 +379,10 @@ impl Task { self.park_state.blocked_in_park = false; } + // TODO: Investigate whether we should move `wipe` here. (I think the correct scheme is to have it + // toggleable by the instantiator of the `Task` — those modelling async `JoinHandle`s should + // clean eagerly, thos modelling sync `JoinHandle`s should not.) pub(crate) fn finish(&mut self) { - assert!(self.state != TaskState::Finished); self.state = TaskState::Finished; } diff --git a/src/runtime/thread/continuation.rs b/src/runtime/thread/continuation.rs index 7474a80a..27236dc1 100644 --- a/src/runtime/thread/continuation.rs +++ b/src/runtime/thread/continuation.rs @@ -222,12 +222,19 @@ pub(crate) struct PooledContinuation { queue: Rc>>, } +impl PooledContinuation { + pub fn wipe(&mut self) { + if let Some(c) = self.continuation.take() { + if c.reusable() { + self.queue.borrow_mut().push_back(c); + } + } + } +} + impl Drop for PooledContinuation { fn drop(&mut self) { - let c = self.continuation.take().unwrap(); - if c.reusable() { - self.queue.borrow_mut().push_back(c); - } + self.wipe() } } diff --git a/tests/future/basic.rs b/tests/future/basic.rs index c998406d..e5f5eccd 100644 --- a/tests/future/basic.rs +++ b/tests/future/basic.rs @@ -221,6 +221,78 @@ fn async_counter() { }); } +// We need a way to hold the `MutexGuard`, which is `!Send`, across an `await`. +struct WrappedMutexGuard<'a> { + #[allow(unused)] + guard: shuttle::sync::MutexGuard<'a, ()>, +} + +unsafe impl<'a> Send for WrappedMutexGuard<'a> {} + +async fn acquire_and_loop(mutex: Arc>) { + let _g = WrappedMutexGuard { + guard: mutex.lock().unwrap(), + }; + loop { + future::yield_now().await; + } +} + +// The idea is to acquire a mutex, abort the JoinHandle, then acquire the Mutex. +// This should succeed, because `JoinHandle::abort()` should free the Mutex. +#[test] +fn abort_frees_mutex() { + check_random( + || { + let mutex = Arc::new(Mutex::new(())); + let jh = future::spawn(acquire_and_loop(mutex.clone())); + + jh.abort(); // this unblocks + + let _g = mutex.lock(); + }, + 1000, + ); +} + +// The idea is to acquire a mutex, drop the JoinHandle, then acquire the Mutex. +// This should fail, because `drop`ping the JoinHandle just detaches it, meaning +// it keeps holding the Mutex. +#[test] +#[should_panic(expected = "exceeded max_steps bound")] +fn drop_join_handle_deadlocks() { + check_random( + || { + let mutex = Arc::new(Mutex::new(())); + let jh = future::spawn(acquire_and_loop(mutex.clone())); + + drop(jh); + + let _g = mutex.lock(); + }, + 1000, + ); +} + +// The idea is to acquire a mutex, forget the JoinHandle, then acquire the Mutex. +// This should fail, because `forget`ting the JoinHandle doesn't cause it to release +// the Mutex. +#[test] +#[should_panic(expected = "exceeded max_steps bound")] +fn forget_join_handle_deadlocks() { + check_random( + || { + let mutex = Arc::new(Mutex::new(())); + let jh = future::spawn(acquire_and_loop(mutex.clone())); + + std::mem::forget(jh); + + let _g = mutex.lock(); + }, + 1000, + ); +} + #[test] fn async_counter_random() { check_random(async_counter, 5000)