Skip to content

Commit

Permalink
Implement RwLock::{try_read, try_write} (#72)
Browse files Browse the repository at this point in the history
This is a little tricky because `std` is unclear about whether a thread
can acquire the same read lock multiple times. For `read` it says:

> This function might panic when called if the lock is already held by
> the current thread.

So acquiring a second read lock _might_ fail. But for `try_read` it says:

> This function will return the WouldBlock error if the RwLock could not
> be acquired because it was already locked exclusively.

suggesting that `try_read` _must_ succeed the second time (the lock is
not held exclusively).

We resolve this ambiguity by choosing a conservative semantics that
always forbids a thread acquiring the read lock twice. This helps us
catch deadlocks, especially in async programs where a task might
nondeterministically migrate between threads and only deadlock if that
migration didn't happen.

Another difficulty with this change is that causality is pretty hairy
for the read side of a `RwLock`. In principle, concurrent readers
shouldn't inherit each other's causality, as they don't affect whether
the lock was readable or not. But that's really hard to implement,
especially with `try_write` in the picture too. So again we choose a
conservative implementation for our vector clocks that just always
inherits causality from all prior lock holders. This is likely too
strong, so we'd explore unnecessary symmetries, but I can't convince
myself of the correctness of a weaker implementation.
  • Loading branch information
jamesbornholt authored Jul 7, 2022
1 parent 8bd99ef commit 3a007d6
Show file tree
Hide file tree
Showing 5 changed files with 525 additions and 50 deletions.
10 changes: 8 additions & 2 deletions src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,20 @@ impl TaskSet {
self.tasks.iter().all(|b| !*b)
}

pub fn insert(&mut self, tid: TaskId) {
/// Add a task to the set. If the set did not have this value present, `true` is returned. If
/// the set did have this value present, `false` is returned.
pub fn insert(&mut self, tid: TaskId) -> bool {
if tid.0 >= self.tasks.len() {
self.tasks.resize(1 + tid.0, false);
}
*self.tasks.get_mut(tid.0).unwrap() = true;
!std::mem::replace(&mut *self.tasks.get_mut(tid.0).unwrap(), true)
}

/// Removes a value from the set. Returns whether the value was present in the set.
pub fn remove(&mut self, tid: TaskId) -> bool {
if tid.0 >= self.tasks.len() {
return false;
}
std::mem::replace(&mut self.tasks.get_mut(tid.0).unwrap(), false)
}

Expand Down
154 changes: 140 additions & 14 deletions src/sync/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
use tracing::trace;

/// A reader-writer lock, the same as [`std::sync::RwLock`].
///
/// Unlike [`std::sync::RwLock`], the same thread is never allowed to acquire the read side of a
/// `RwLock` more than once. The `std` version is ambiguous about what behavior is allowed here, so
/// we choose the most conservative one.
pub struct RwLock<T: ?Sized> {
state: Rc<RefCell<RwLockState>>,
inner: std::sync::RwLock<T>,
Expand Down Expand Up @@ -99,16 +103,51 @@ impl<T: ?Sized> RwLock<T> {
///
/// If the access could not be granted at this time, then Err is returned. This function does
/// not block.
///
/// Note that unlike [`std::sync::RwLock::try_read`], if the current thread already holds this
/// read lock, `try_read` will return Err.
pub fn try_read(&self) -> TryLockResult<RwLockReadGuard<T>> {
unimplemented!()
if self.try_lock(RwLockType::Read) {
match self.inner.try_read() {
Ok(guard) => Ok(RwLockReadGuard {
inner: Some(guard),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}),
Err(TryLockError::Poisoned(err)) => Err(TryLockError::Poisoned(PoisonError::new(RwLockReadGuard {
inner: Some(err.into_inner()),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}))),
Err(TryLockError::WouldBlock) => panic!("rwlock state out of sync"),
}
} else {
Err(TryLockError::WouldBlock)
}
}

/// Attempts to acquire this rwlock with shared read access.
///
/// If the access could not be granted at this time, then Err is returned. This function does
/// not block.
pub fn try_write(&self) -> TryLockResult<RwLockWriteGuard<T>> {
unimplemented!()
if self.try_lock(RwLockType::Write) {
match self.inner.try_write() {
Ok(guard) => Ok(RwLockWriteGuard {
inner: Some(guard),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}),
Err(TryLockError::Poisoned(err)) => Err(TryLockError::Poisoned(PoisonError::new(RwLockWriteGuard {
inner: Some(err.into_inner()),
state: Rc::clone(&self.state),
me: ExecutionState::me(),
}))),
Err(TryLockError::WouldBlock) => panic!("rwlock state out of sync"),
}
} else {
Err(TryLockError::WouldBlock)
}
}

/// Consumes this `RwLock`, returning the underlying data
Expand All @@ -125,6 +164,7 @@ impl<T: ?Sized> RwLock<T> {
self.inner.into_inner()
}

/// Acquire the lock in the provided mode, blocking this thread until it succeeds.
fn lock(&self, typ: RwLockType) {
let me = ExecutionState::me();

Expand All @@ -133,7 +173,7 @@ impl<T: ?Sized> RwLock<T> {
holder = ?state.holder,
waiting_readers = ?state.waiting_readers,
waiting_writers = ?state.waiting_writers,
"waiting to acquire {:?} lock on rwlock {:p}",
"acquiring {:?} lock on rwlock {:p}",
typ,
self.state,
);
Expand All @@ -144,28 +184,37 @@ impl<T: ?Sized> RwLock<T> {
} else {
state.waiting_readers.insert(me);
}
// Block if the lock is in a state where we can't acquire it immediately
match &state.holder {
// Block if the lock is in a state where we can't acquire it immediately. Note that we only
// need to context switch here if we can't acquire the lock. If it's available for us to
// acquire, but there is also another thread `t` that wants to acquire it, then `t` must
// have been runnable when this thread was chosen to execute and could have been chosen
// instead.
let should_switch = match &state.holder {
RwLockHolder::Write(writer) => {
if *writer == me {
panic!("deadlock! task {:?} tried to acquire a RwLock it already holds", me);
}
ExecutionState::with(|s| s.current_mut().block());
true
}
RwLockHolder::Read(readers) => {
if readers.contains(me) {
panic!("deadlock! task {:?} tried to acquire a RwLock it already holds", me);
}
if typ == RwLockType::Write {
ExecutionState::with(|s| s.current_mut().block());
true
} else {
false
}
}
_ => {}
}
RwLockHolder::None => false,
};
drop(state);

// Acquiring a lock is a yield point
thread::switch();
if should_switch {
thread::switch();
}

let mut state = self.state.borrow_mut();
// Once the scheduler has resumed this thread, we are clear to take the lock. We might
Expand Down Expand Up @@ -203,14 +252,91 @@ impl<T: ?Sized> RwLock<T> {
typ,
self.state
);
// Update acquiring thread's clock with the clock stored in the RwLock
ExecutionState::with(|s| s.update_clock(&state.clock));

// Increment the current thread's clock and update this RwLock's clock to match.
// TODO we can likely do better here: there is no causality between multiple readers holding
// the lock at the same time.
ExecutionState::with(|s| {
s.update_clock(&state.clock);
state.clock.update(s.get_clock(me));
});

// Block all other waiters, since we won the race to take this lock
// TODO a bit of a bummer that we have to do this (it would be cleaner if those threads
// TODO never become unblocked), but might need to track more state to avoid this.
Self::block_waiters(&*state, me, typ);
drop(state);

// We need to let other threads in here so they may fail a `try_read` or `try_write`. This
// is the case because the current thread holding the lock might not have any further
// context switches until after releasing the lock.
thread::switch();
}

/// Attempt to acquire this lock in the provided mode, but without blocking. Returns `true` if
/// the lock was able to be acquired without blocking, or `false` otherwise.
fn try_lock(&self, typ: RwLockType) -> bool {
let me = ExecutionState::me();

let mut state = self.state.borrow_mut();
trace!(
holder = ?state.holder,
waiting_readers = ?state.waiting_readers,
waiting_writers = ?state.waiting_writers,
"trying to acquire {:?} lock on rwlock {:p}",
typ,
self.state,
);

let acquired = match (typ, &mut state.holder) {
(RwLockType::Write, RwLockHolder::None) => {
state.holder = RwLockHolder::Write(me);
true
}
(RwLockType::Read, RwLockHolder::None) => {
let mut readers = TaskSet::new();
readers.insert(me);
state.holder = RwLockHolder::Read(readers);
true
}
(RwLockType::Read, RwLockHolder::Read(readers)) => {
// If we already hold the read lock, `insert` returns false, which will cause this
// acquisition to fail with `WouldBlock` so we can diagnose potential deadlocks.
readers.insert(me)
}
_ => false,
};

trace!(
"{} {:?} lock on rwlock {:p}",
if acquired { "acquired" } else { "failed to acquire" },
typ,
self.state,
);

// Update this thread's clock with the clock stored in the RwLock.
// We need to do the vector clock update even in the failing case, because there's a causal
// dependency: if the `try_lock` fails, the current thread `t1` knows that the thread `t2`
// that owns the lock is not in the right state to be read/written, and therefore `t1` has a
// causal dependency on everything that happened before in `t2` (which is recorded in the
// RwLock's clock).
// TODO we can likely do better here: there is no causality between successful `try_read`s
// and other concurrent readers, and there's no need to update the clock on failed
// `try_read`s.
ExecutionState::with(|s| {
s.update_clock(&state.clock);
state.clock.update(s.get_clock(me));
});

// Block all other waiters, since we won the race to take this lock
Self::block_waiters(&*state, me, typ);
drop(state);

// We need to let other threads in here so they
// (a) may fail a `try_lock` (in case we acquired), or
// (b) may release the lock (in case we failed to acquire) so we can succeed in a subsequent
// `try_lock`.
thread::switch();

acquired
}

fn block_waiters(state: &RwLockState, me: TaskId, typ: RwLockType) {
Expand Down Expand Up @@ -324,7 +450,7 @@ impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
state.holder = RwLockHolder::None;
}
}
_ => panic!("exiting a reader but rwlock is in the wrong state"),
_ => panic!("exiting a reader but rwlock is in the wrong state {:?}", state.holder),
}

if ExecutionState::should_stop() {
Expand Down
63 changes: 36 additions & 27 deletions tests/basic/clocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use shuttle::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use shuttle::sync::mpsc::{channel, sync_channel};
use shuttle::sync::{Barrier, Condvar, Mutex, Once, RwLock};
use shuttle::{check_dfs, check_pct, current, thread};
use shuttle::{check_dfs, check_pct, check_random, current, thread};
use std::collections::HashSet;
use std::sync::Arc;
use test_log::test;
Expand All @@ -10,7 +10,7 @@ pub fn me() -> usize {
usize::from(thread::current().id())
}

// TODO Maybe make this a macro so backtraces are more informative
#[track_caller]
pub fn check_clock(f: impl Fn(usize, u32) -> bool) {
for (i, &c) in current::clock().iter().enumerate() {
assert!(
Expand Down Expand Up @@ -76,32 +76,34 @@ fn clock_mutex_pct() {

// RWLocks
fn clock_rwlock(num_writers: usize, num_readers: usize) {
// This test checks that when a thread acquires a RwLock, it inherits the clocks
// of any writers that accessed the lock before it, but not the clocks from any readers.
// This test checks that when a thread acquires a RwLock, it inherits the clocks of writers that
// accessed the lock before it. It's the same as `clock_mutex`, except that readers don't update
// the set S, and aren't required to appear in the clock for future lock holders.
//
// Test: create a rwlock-protected set, initialized with 0 (the id of the main thread)
// and spawn some writers and readers. Each thread does the following:
// (1) check that its own initial vector clock only has nonzero for the main thread (thread 0)
// (2w) [for writers only] acquire a write lock on the set and add its own thread id to it
// (2r) [for readers only] acquire a read lock on the set
// (3) read its own clock again, call this C
// (4) check that the only nonzero entries in C are for the threads in S and the current thread (for readers)
//
// Note: no dummy thread here since we're already checking that readers' clock entries are always zero
let mut set = HashSet::new();
set.insert(0);
let set = Arc::new(RwLock::new(set));
// TODO this test is pretty weak. Testing readers is hard because they race with each other; for
// example, a reader might see the clock update from another reader before that reader has a
// chance to update the set S. Causality is also pretty fuzzy for readers (see the TODOs in the
// RwLock implementation). So we don't test very much about them here.
let set = Arc::new(std::sync::Mutex::new(HashSet::from([0])));
let lock = Arc::new(RwLock::new(()));

// Create dummy thread (should have id 1)
thread::spawn(|| {
assert_eq!(me(), 1usize);
});

// Spawn the writers
let _thds = (0..num_writers)
.map(|_| {
let set = Arc::clone(&set);
let lock = Arc::clone(&lock);
thread::spawn(move || {
check_clock(|i, c| (c > 0) == (i == 0));
let mut set = set.write().unwrap();
let _guard = lock.write().unwrap();
let mut set = set.lock().unwrap();
set.insert(me());
// Check that the only nonzero clock entries are for the threads in the set
check_clock(|i, c| (c > 0) == set.contains(&i));
assert!(!set.contains(&1)); // dummy thread is never in the set
check_clock(|i, c| !set.contains(&i) || (c > 0));
})
})
.collect::<Vec<_>>();
Expand All @@ -110,26 +112,33 @@ fn clock_rwlock(num_writers: usize, num_readers: usize) {
let _thds = (0..num_readers)
.map(|_| {
let set = Arc::clone(&set);
let lock = Arc::clone(&lock);
thread::spawn(move || {
check_clock(|i, c| (c > 0) == (i == 0));
let set = set.read().unwrap();
// Check that the only nonzero clock entries are for threads in the set and the current thread
check_clock(|i, c| (c > 0) == (i == me() || set.contains(&i)));
let _guard = lock.read().unwrap();
let set = set.lock().unwrap();
assert!(!set.contains(&1)); // dummy thread is never in the set
check_clock(|i, c| !set.contains(&i) || (c > 0));
})
})
.collect::<Vec<_>>();
}

#[test]
fn clock_rwlock_dfs() {
// TODO 2 writers + 2 readers takes too long right now; once we reduce context switching, it should be feasible
check_dfs(|| clock_rwlock(2, 1), None);
check_dfs(|| clock_rwlock(1, 2), None);
// Unfortunately anything larger than this takes > 500k iterations, too slow to be useful :(
// But the PCT and random tests below buy us a much bigger search.
check_dfs(|| clock_rwlock(1, 1), None);
}

#[test]
fn clock_rwlock_pct() {
check_pct(|| clock_rwlock(10, 20), 10_000, 3);
check_pct(|| clock_rwlock(4, 4), 10_000, 3);
}

#[test]
fn clock_rwlock_random() {
check_random(|| clock_rwlock(4, 4), 10_000);
}

// Barrier
Expand Down Expand Up @@ -336,7 +345,7 @@ fn clock_mpsc_bounded() {
// The sender has sent a message, so its clock is nonzero
let c1 = current::clock().get(1);
assert!(c1 > 0);
let _ = rx.recv().unwrap();
rx.recv().unwrap();
// The sender has sent another message, so its clock has increased
assert!(current::clock().get(2) > c1);
// Receive the remaining messages
Expand Down
Loading

0 comments on commit 3a007d6

Please sign in to comment.