Skip to content

Commit

Permalink
add vector clocks to batch semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
Aurel300 committed Jul 10, 2024
1 parent 5b6a983 commit a8f1676
Showing 1 changed file with 232 additions and 17 deletions.
249 changes: 232 additions & 17 deletions src/future/batch_semaphore.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A counting semaphore supporting both async and sync operations.
use crate::current;
use crate::runtime::execution::ExecutionState;
use crate::runtime::task::TaskId;
use crate::runtime::task::{clock::VectorClock, TaskId};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
Expand All @@ -18,6 +19,7 @@ struct Waiter {
num_permits: usize,
is_queued: AtomicBool,
has_permits: AtomicBool,
clock: VectorClock,
waker: Mutex<Option<Waker>>,
}

Expand All @@ -28,11 +30,178 @@ impl Waiter {
num_permits,
is_queued: AtomicBool::new(false),
has_permits: AtomicBool::new(false),
clock: current::clock(),
waker: Mutex::new(None),
}
}
}

#[derive(Debug)]
enum PermitsAvailable {
/// Arbitrarily many permits are available to be acquired. Acquiring one
/// is causally dependent on an event with the given clock.
Infinity(VectorClock),

/// Some number of permits (`num_available`) is available to be acquired.
/// The permits are grouped into batches in the `permit_clocks` deque, such
/// that batches farther back correspond to later `release` calls. Each
/// batch is a tuple of the permits remaining in that batch and the clock
/// of the event whence the permits originate. Additionally, `last_acquire`
/// maintains a clock of the last acquiry; this is used for causal
/// dependence in `try_acquire` failures.
// Invariant: the number of permits available is equal to the sum of the
// batch sizes in the queue.
Permits {
num_available: usize,
permit_clocks: VecDeque<(usize, VectorClock)>,
last_acquire: VectorClock,
},
}

impl PermitsAvailable {
fn new(num_permits: usize) -> Self {
if num_permits == usize::MAX {
return Self::Infinity(VectorClock::new());
}
let mut permit_clocks = VecDeque::new();
if num_permits > 0 {
permit_clocks.push_back((num_permits, VectorClock::new()));
}
Self::Permits {
num_available: num_permits,
permit_clocks,
last_acquire: VectorClock::new(),
}
}

const fn const_new(num_permits: usize) -> Self {
if num_permits == usize::MAX {
return Self::Infinity(VectorClock::new());
}
// TODO: The VecDeque cannot be populated in a const fn, due to
// TODO: allocation. If this is ever needed, another variant can be
// TODO: added to `PermitsAvailable`, which would express that a
// TODO: certain number of permits are available with a zero clock.
assert!(num_permits == 0);
Self::Permits {
num_available: num_permits,
permit_clocks: VecDeque::new(),
last_acquire: VectorClock::new(),
}
}

fn available(&self) -> usize {
match self {
Self::Infinity(_) => usize::MAX,
Self::Permits { num_available, .. } => *num_available,
}
}

fn failure_clock(&self) -> Option<&VectorClock> {
match self {
Self::Infinity(_) => None,
Self::Permits { last_acquire, .. } => Some(last_acquire),
}
}

fn acquire(&mut self, mut num_permits: usize, acquire_clock: VectorClock) -> Result<VectorClock, TryAcquireError> {
// Acquiring zero permits is always possible, and is not causally
// dependent on any event.
if num_permits == 0 {
return Ok(VectorClock::new());
}

match self {
Self::Infinity(clock) => Ok(clock.clone()),
Self::Permits {
num_available,
permit_clocks,
last_acquire,
} => {
if num_permits <= *num_available {
last_acquire.update(&acquire_clock);

// If infinite permits were released, don't decrement.
if *num_available < usize::MAX {
*num_available -= num_permits;
}

// Acquire `num_permits` from the available batches. This
// may consume one or more batches from the queue. The
// resulting clock should be the join of all the batches
// used (fully or partially), since the acquiry causally
// depends on the releases that created those batches.
let mut clock = VectorClock::new();
while let Some((batch_size, batch_clock)) = permit_clocks.front_mut() {
clock.update(batch_clock);

// An infinite number of permits was released at some
// point, switch variant to reduce future operations.
if *batch_size == usize::MAX {
*self = Self::Infinity(permit_clocks.pop_front().unwrap().1);
num_permits = 0;
break;
}

if num_permits < *batch_size {
// The current batch is larger than the number of
// permits requested: diminish batch, finish loop.
*batch_size -= num_permits;
num_permits = 0;
break;
} else {
// The current batch is fully consumed by the
// request. Remove it from the queue.
num_permits -= *batch_size;
permit_clocks.pop_front();

// Break early to avoid causally depending on the
// next batch.
if num_permits == 0 {
break;
}
}
}

assert_eq!(num_permits, 0);
Ok(clock)
} else {
// There are not enough permits to fulfill the request.
Err(TryAcquireError::NoPermits)
}
}
}
}

fn release(&mut self, num_permits: usize, clock: VectorClock) {
match self {
Self::Infinity(_) => (),
Self::Permits {
num_available,
permit_clocks,
..
} => {
if num_permits == usize::MAX {
// Acquires of permits already in the queue do not causally
// depend on the current release of infinitely many permits
// so we will add a `usize::MAX` "batch" to the queue that
// will eventually make its way to the front of the queue,
// at which point we will switch to the `Infinity` variant.
if permit_clocks.is_empty() {
*self = Self::Infinity(clock);
} else {
*num_available = usize::MAX;
permit_clocks.push_back((num_permits, clock));
}
} else {
*num_available += num_permits;
permit_clocks.push_back((num_permits, clock));
}
}
}
}
}

/// A counting semaphore which permits waiting on multiple permits at once,
/// and supports both asychronous and synchronous blocking operations.
///
Expand All @@ -45,8 +214,8 @@ struct BatchSemaphoreState {
// Key invariants:
//
// (1) if `waiters` is nonempty and the head waiter is `H`,
// then `H.num_permits > permits_available`. (In other words, we are
// never in a state where there are enough permits available for the
// then `H.num_permits > permits_available.available()`. (In other words,
// we are never in a state where there are enough permits available for the
// first waiter. This invariant is ensured by the `drop` handler below.)
//
// (2) W is in waiters iff W.is_queued
Expand All @@ -57,17 +226,26 @@ struct BatchSemaphoreState {
//
// (4) closed ==> waiters.is_empty()
waiters: VecDeque<Arc<Waiter>>,
permits_available: usize,
permits_available: PermitsAvailable,
// TODO: should there be a clock for the close event?
closed: bool,
}

impl BatchSemaphoreState {
fn acquire_permits(&mut self, num_permits: usize) -> Result<(), TryAcquireError> {
assert!(num_permits > 0);
if self.closed {
Err(TryAcquireError::Closed)
} else if self.waiters.is_empty() && num_permits <= self.permits_available {
// No one is waiting and there are enough permits available
self.permits_available -= num_permits;
} else if self.waiters.is_empty() {
// No one is waiting: try to acquire permits
let clock = self.permits_available.acquire(num_permits, current::clock())?;

// If successful, the acquiry is causally dependent on the event
// which released the acquired permits.
ExecutionState::with(|s| {
s.update_clock(&clock);
});

Ok(())
} else {
Err(TryAcquireError::NoPermits)
Expand Down Expand Up @@ -117,7 +295,7 @@ impl BatchSemaphore {
pub fn new(num_permits: usize) -> Self {
let state = RefCell::new(BatchSemaphoreState {
waiters: VecDeque::new(),
permits_available: num_permits,
permits_available: PermitsAvailable::new(num_permits),
closed: false,
});
Self { state }
Expand All @@ -127,7 +305,7 @@ impl BatchSemaphore {
pub const fn const_new(num_permits: usize) -> Self {
let state = RefCell::new(BatchSemaphoreState {
waiters: VecDeque::new(),
permits_available: num_permits,
permits_available: PermitsAvailable::const_new(num_permits),
closed: false,
});
Self { state }
Expand All @@ -136,14 +314,15 @@ impl BatchSemaphore {
/// Returns the current number of available permits.
pub fn available_permits(&self) -> usize {
let state = self.state.borrow();
state.permits_available
state.permits_available.available()
}

/// Closes the semaphore. This prevents the semaphore from issuing new
/// permits and notifies all pending waiters.
pub fn close(&self) {
let mut state = self.state.borrow_mut();
state.closed = true;

// Wake up all the waiters. Since we've marked the state as closed, they
// will all return `AcquireError::closed` from their acquire calls.
let ptr = &*state as *const BatchSemaphoreState;
Expand Down Expand Up @@ -180,6 +359,18 @@ impl BatchSemaphore {
pub fn try_acquire(&self, num_permits: usize) -> Result<(), TryAcquireError> {
let mut state = self.state.borrow_mut();
state.acquire_permits(num_permits)
.map_err(|err| {
// Conservatively, the requester causally depends on the
// last successful acquire.
// TODO: This is not precise, but `try_acquire` causal
// TODO: dependency is both hard to define, and is most
// TODO: likely not worth the effort.
let failure_clock = state.permits_available.failure_clock().unwrap();
ExecutionState::with(|s| {
s.update_clock(failure_clock);
});
err
})
}

fn enqueue_waiter(&self, waiter: &Arc<Waiter>) {
Expand Down Expand Up @@ -228,26 +419,44 @@ impl BatchSemaphore {
}

let mut state = self.state.borrow_mut();
state.permits_available += num_permits;

if ExecutionState::should_stop() {
return;
}

// Permits released into the semaphore reflect the releasing thread's
// clock; future acquires of those permits are causally dependent on
// this event.
ExecutionState::with(|s| {
let clock = s.increment_clock();
state.permits_available.release(num_permits, clock.clone());
});

let me = ExecutionState::me();
trace!(task = ?me, avail = ?state.permits_available, waiters = ?state.waiters, "released {} permits for semaphore {:p}", num_permits, &self.state);

while let Some(front) = state.waiters.front() {
if front.num_permits <= state.permits_available {
trace!("granted {:?} permits to waiter {:?}", front.num_permits, front);
state.permits_available -= front.num_permits;
if front.num_permits <= state.permits_available.available() {
let waiter = state.waiters.pop_front().unwrap();
trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter);

// The clock we pass into the semaphore is the clock of the
// waiter, corresponding to the point at which the waiter was
// enqueued. The clock we get in return corresponds to the
// join of the clocks of the acquired permits, used to update
// the waiter's clock to causally depend on the release events.
let clock = state.permits_available.acquire(waiter.num_permits, waiter.clock.clone()).unwrap();

// Update waiter state as it is no longer in the queue
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));
assert!(!waiter.has_permits.swap(true, Ordering::SeqCst));
ExecutionState::with(|s| {
if !s.get_mut(waiter.task_id).finished() {
s.get_mut(waiter.task_id).unblock();
let task = s.get_mut(waiter.task_id);
if !task.finished() {
// The acquiry is causally dependent on the event
// which released the acquired permits.
task.clock.update(&clock);
task.unblock();
}
});
let mut maybe_waker = waiter.waker.lock().unwrap();
Expand Down Expand Up @@ -318,7 +527,13 @@ impl Future for Acquire<'_> {
assert!(self.waiter.waker.lock().unwrap().is_some());
Poll::Pending
} else {
match self.semaphore.try_acquire(self.waiter.num_permits) {
// We access the semaphore state directly instead of using the
// public `try_acquire`, because in case of `NoPermits`, we do not
// want to update the clock, as this thread will be blocked below.
let mut state = self.semaphore.state.borrow_mut();
let acquire_result = state.acquire_permits(self.waiter.num_permits);
drop(state);
match acquire_result {
Ok(()) => {
assert!(!self.waiter.is_queued.load(Ordering::SeqCst));
self.waiter.has_permits.store(true, Ordering::SeqCst);
Expand Down

0 comments on commit a8f1676

Please sign in to comment.