Skip to content

Commit 143433d

Browse files
authored
Merge pull request #25 from avdgrinten/barrier
barrier: Add barrier class
2 parents c76e34a + e315938 commit 143433d

File tree

1 file changed

+72
-0
lines changed

1 file changed

+72
-0
lines changed

include/async/barrier.hpp

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#pragma once
2+
3+
#include <async/recurring-event.hpp>
4+
5+
namespace async {
6+
7+
struct barrier {
8+
using arrival_token = uint64_t;
9+
10+
barrier(ptrdiff_t expected)
11+
: expected_{expected} { }
12+
13+
arrival_token arrive(ptrdiff_t n = 1) {
14+
return do_arrive(n, 0);
15+
}
16+
17+
arrival_token arrive_and_join(ptrdiff_t n = 1) {
18+
return do_arrive(n, n);
19+
}
20+
21+
arrival_token arrive_and_drop(ptrdiff_t n = 1) {
22+
return do_arrive(0, -n);
23+
}
24+
25+
auto async_wait(arrival_token s) {
26+
return evt_.async_wait_if([this, s] () -> bool {
27+
return seq_.load(std::memory_order_relaxed) == s;
28+
});
29+
}
30+
31+
private:
32+
arrival_token do_arrive(ptrdiff_t n, ptrdiff_t delta) {
33+
uint64_t s;
34+
bool advance = false;
35+
{
36+
frg::unique_lock lock{mutex_};
37+
38+
s = seq_.load(std::memory_order_relaxed);
39+
assert(expected_ + delta >= 0);
40+
41+
counter_ += n;
42+
expected_ += delta;
43+
44+
if (counter_ == expected_) {
45+
advance = true;
46+
seq_.store(s + 1, std::memory_order_relaxed);
47+
counter_ = 0;
48+
} else {
49+
assert(counter_ < expected_);
50+
}
51+
}
52+
if (advance)
53+
evt_.raise();
54+
55+
return s;
56+
}
57+
58+
platform::mutex mutex_;
59+
// Sequence number. Increased after each barrier.
60+
// Write-protected by mutex_. Can be read even without holding mutex_.
61+
std::atomic<uint64_t> seq_{0};
62+
// Expected number of arrivals.
63+
// Protected by mutex_.
64+
ptrdiff_t expected_;
65+
// Arrival count. Reset to zero on each barrier.
66+
// Protected by mutex_.
67+
ptrdiff_t counter_{0};
68+
69+
async::recurring_event evt_;
70+
};
71+
72+
} // namespace async

0 commit comments

Comments
 (0)