Skip to content

Commit

Permalink
Fix issue with StrictlyFair BatchSemaphores
Browse files Browse the repository at this point in the history
  • Loading branch information
sarsko committed Dec 3, 2024
1 parent 4915ff6 commit 45f8714
Showing 1 changed file with 55 additions and 42 deletions.
97 changes: 55 additions & 42 deletions src/future/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,46 @@ impl BatchSemaphoreState {
Err(TryAcquireError::NoPermits)
}
}

// Unblocks waiters in FIFO order.
fn fair_release(&mut self) {
// in a strictly fair mode we will always pick the first waiter
// in the queue, as long as there are enough permits available
while let Some(front) = self.waiters.front() {
if front.num_permits <= self.permits_available.available() {
let waiter = self.waiters.pop_front().unwrap();

// The clock we pass into the semaphore is the clock of the
// waiter, corresponding to the point at which the waiter was
// enqueued. The clock we get in return corresponds to the
// join of the clocks of the acquired permits, used to update
// the waiter's clock to causally depend on the release events.
let clock = self
.permits_available
.acquire(waiter.num_permits, waiter.clock.clone())
.unwrap();
trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter);

// Update waiter state as it is no longer in the queue
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));
assert!(!waiter.has_permits.swap(true, Ordering::SeqCst));
ExecutionState::with(|s| {
let task = s.get_mut(waiter.task_id);
assert!(!task.finished());
// The acquiry is causally dependent on the event
// which released the acquired permits.
task.clock.update(&clock);
task.unblock();
});
let mut maybe_waker = waiter.waker.lock().unwrap();
if let Some(waker) = maybe_waker.take() {
waker.wake();
}
} else {
break;
}
}
}
}

/// Counting semaphore
Expand Down Expand Up @@ -501,48 +541,7 @@ impl BatchSemaphore {

match self.fairness {
Fairness::StrictlyFair => {
// in a strictly fair mode we will always pick the first waiter
// in the queue, as long as there are enough permits available
while let Some(front) = state.waiters.front() {
if front.num_permits <= state.permits_available.available() {
let waiter = state.waiters.pop_front().unwrap();

crate::annotations::record_semaphore_acquire_unblocked(
state.id.unwrap(),
waiter.task_id,
waiter.num_permits,
);

// The clock we pass into the semaphore is the clock of the
// waiter, corresponding to the point at which the waiter was
// enqueued. The clock we get in return corresponds to the
// join of the clocks of the acquired permits, used to update
// the waiter's clock to causally depend on the release events.
let clock = state
.permits_available
.acquire(waiter.num_permits, waiter.clock.clone())
.unwrap();
trace!("granted {:?} permits to waiter {:?}", waiter.num_permits, waiter);

// Update waiter state as it is no longer in the queue
assert!(waiter.is_queued.swap(false, Ordering::SeqCst));
assert!(!waiter.has_permits.swap(true, Ordering::SeqCst));
ExecutionState::with(|s| {
let task = s.get_mut(waiter.task_id);
assert!(!task.finished());
// The acquiry is causally dependent on the event
// which released the acquired permits.
task.clock.update(&clock);
task.unblock();
});
let mut maybe_waker = waiter.waker.lock().unwrap();
if let Some(waker) = maybe_waker.take() {
waker.wake();
}
} else {
break;
}
}
state.fair_release();
}
Fairness::Unfair => {
// in an unfair mode, we will unblock all the waiters for which
Expand All @@ -568,6 +567,16 @@ impl BatchSemaphore {
// Releasing a semaphore is a yield point
thread::switch();
}

fn unblock_if_fair(&self) {
match self.fairness {
Fairness::StrictlyFair => {
let mut state = self.state.borrow_mut();
state.fair_release();
}
Fairness::Unfair => {}
}
}
}

// Safety: Semaphore is never actually passed across true threads, only across continuations. The
Expand Down Expand Up @@ -713,6 +722,10 @@ impl Drop for Acquire<'_> {
if self.waiter.is_queued.load(Ordering::SeqCst) {
// If the associated waiter is in the wait list, remove it
self.semaphore.remove_waiter(&self.waiter);
// If this is a fair semaphore, then the `drop` may cause other waiters to become unblocked.
// This happens if we tried to acquire more permits than are available, and later acquire attempts tried
// to acquire less permits than are available.
self.semaphore.unblock_if_fair();
} else if self.waiter.has_permits.load(Ordering::SeqCst) && !self.completed {
// If the waiter was granted permits, release them
self.semaphore.release(self.waiter.num_permits);
Expand Down

0 comments on commit 45f8714

Please sign in to comment.