Skip to content

Commit 9e4821d

Browse files
authored
Merge pull request #19 from ArsenArsen/waitgroups
wait-group: implement Go-ish wait groups
2 parents 81abd12 + 1bd774c commit 9e4821d

File tree

7 files changed

+413
-134
lines changed

7 files changed

+413
-134
lines changed

docs/src/SUMMARY.md

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
- [spawn](headers/basic/spawn.md)
2424
- [async/result.hpp](headers/result.md)
2525
- [async/oneshot.hpp](headers/oneshot-event.md)
26+
- [async/wait-group.hpp](headers/wait-group.md)
27+
- [wait\_group](headers/wait-group/wait_group.md)
28+
- [wait\_in\_group](headers/wait-group/wait_in_group.md)
2629
- [async/recurring.hpp](headers/recurring-event.md)
2730
- [async/sequenced.hpp](headers/sequenced-event.md)
2831
- [async/cancellation.hpp](headers/cancellation.md)

docs/src/headers/wait-group.html

Whitespace-only changes.
+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# wait\_group
2+
3+
```cpp
4+
#include <async/wait-group.hpp>
5+
```
6+
7+
`wait_group` is a synchronization primitive that waits for a counter (that can
8+
be incremented) to reach zero. It conceptually maps to a group of related work
9+
being done in parallel, and a few consumers waiting for that work to be done.
10+
The amount of work is increased by calls to `add()` and decreased by calls to
11+
`done()`.
12+
13+
This struct also implements
14+
[BasicLockable](https://en.cppreference.com/w/cpp/named_req/BasicLockable) so
15+
that it can be used with `std::unique_lock`.
16+
17+
## Prototype
18+
19+
```cpp
20+
struct wait_group {
21+
void done(); // (1)
22+
void add(int n); // (2)
23+
24+
sender wait(cancellation_token ct); // (3)
25+
sender wait(); // (4)
26+
27+
void lock(); // (5)
28+
void unlock(); // (6)
29+
};
30+
```
31+
32+
1. "Finishes" a work (decrements the work count).
33+
2. "Adds" more work (increments the work count by `n`).
34+
3. Returns a sender for the wait operation. The operation waits for the counter
35+
to drop to zero.
36+
4. Same as (3) but it cannot be cancelled.
37+
5. Equivalent to `add(1)`.
38+
5. Equivalent to `done()`.
39+
40+
### Arguments
41+
42+
- `n` - amount of work to "add" to this work group
43+
- `ct` - the cancellation token to use to listen for cancellation.
44+
45+
### Return values
46+
47+
1. This method doesn't return any value.
48+
2. This method doesn't return any value.
49+
3. This method returns a sender of unspecified type. The sender completes with
50+
either `true` to indicate success, or `false` to indicate that the wait was cancelled.
51+
4. Same as (3) except the sender completes without a value.
52+
53+
## Examples
54+
55+
```cpp
56+
async::wait_group wg { 3 };
57+
58+
([&wg] () -> async::detached {
59+
std::cout << "before wait" << std::endl;
60+
co_await wg.wait();
61+
std::cout << "after wait" << std::endl;
62+
})();
63+
64+
auto done = [&wg] () {
65+
std::cout << "before done" << std::endl;
66+
wg.done();
67+
std::cout << "after done" << std::endl;
68+
};
69+
70+
done();
71+
done();
72+
std::cout << "before add" << std::endl;
73+
wg.add(2);
74+
std::cout << "after add" << std::endl;
75+
done();
76+
done();
77+
done();
78+
```
79+
80+
Output:
81+
82+
```
83+
before wait
84+
before done
85+
after done
86+
before done
87+
after done
88+
before add
89+
after add
90+
before done
91+
after done
92+
before done
93+
after done
94+
before done
95+
after wait
96+
after done
97+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# wait\_in\_group
2+
3+
`wait_in_group(wg, S)` takes a sender `S` and adds it to the work group `wg`
4+
(calls `wg.add(1)`) immediately before it's started and marks it as done
5+
(calls `wg.done()`) immediately after.
6+
7+
## Prototype
8+
9+
```cpp
10+
template<typename S>
11+
sender wait_in_group(wait_group &wg, S sender);
12+
```
13+
14+
## Requirements
15+
`S` is a sender.
16+
17+
## Arguments
18+
- `wg` - wait group to wait in
19+
- `sender` - sender to wrap in the wait group
20+
21+
## Return value
22+
The value produced by the `sender`.
23+
24+
## Examples
25+
26+
```cpp
27+
bool should_run() {
28+
/* ... */
29+
}
30+
async::result<void> handle_conn(tcp_socket conn) {
31+
/* ... */
32+
}
33+
34+
/* ... */
35+
36+
tcp_socket server;
37+
server.bind(":80");
38+
server.listen(32);
39+
async::wait_group handlers { 0 };
40+
while (should_run()) {
41+
auto conn = socket.accept();
42+
async::detach(async::wait_in_group(handlers, handle_conn(std::move(conn))));
43+
}
44+
45+
/* wait for all connections to terminate */
46+
handlers.wait();
47+
```

docs/src/wait-group.md

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# wait-group
2+
3+
This header includes utilities related to the `wait_group` primitive.
4+
5+
```cpp
6+
#include <async/wait-group.hpp>
7+
```
8+
9+
Wait groups are synchronization primitives that wait for a counter to reach
10+
zero. This counter is conceptually bound to a group of related work that the
11+
consumer would like to simultaneously wait for (hence the name wait groups).

include/async/oneshot-event.hpp

+6-134
Original file line numberDiff line numberDiff line change
@@ -1,155 +1,27 @@
11
#pragma once
22

33
#include <async/algorithm.hpp>
4+
#include <async/wait-group.hpp>
45
#include <async/cancellation.hpp>
56
#include <frg/functional.hpp>
67
#include <frg/list.hpp>
78

89
namespace async {
910

1011
struct oneshot_event {
11-
private:
12-
struct node {
13-
friend struct oneshot_event;
14-
15-
node() = default;
16-
17-
node(const node &) = delete;
18-
19-
node &operator= (const node &) = delete;
20-
21-
virtual void complete() = 0;
22-
23-
protected:
24-
~node() = default;
25-
26-
private:
27-
// Protected by mutex_.
28-
frg::default_list_hook<node> _hook;
29-
};
30-
31-
public:
3212
void raise() {
33-
// Grab all items and mark them as retired while we hold the lock.
34-
frg::intrusive_list<
35-
node,
36-
frg::locate_member<
37-
node,
38-
frg::default_list_hook<node>,
39-
&node::_hook
40-
>
41-
> items;
42-
{
43-
frg::unique_lock lock(mutex_);
44-
assert(!raised_);
45-
46-
items.splice(items.end(), queue_);
47-
raised_ = true;
48-
}
49-
50-
// Now invoke the individual callbacks.
51-
while(!items.empty()) {
52-
auto item = items.front();
53-
items.pop_front();
54-
item->complete();
55-
}
13+
wg_.done();
5614
}
5715

58-
// ----------------------------------------------------------------------------------
59-
// wait() and its boilerplate.
60-
// ----------------------------------------------------------------------------------
61-
62-
template<typename Receiver>
63-
struct wait_operation final : private node {
64-
wait_operation(oneshot_event *evt, cancellation_token ct, Receiver r)
65-
: evt_{evt}, ct_{std::move(ct)}, r_{std::move(r)}, cobs_{this} { }
66-
67-
bool start_inline() {
68-
bool cancelled = false;
69-
{
70-
frg::unique_lock lock(evt_->mutex_);
71-
72-
if(!evt_->raised_) {
73-
if(!cobs_.try_set(ct_)) {
74-
cancelled = true;
75-
}else{
76-
evt_->queue_.push_back(this);
77-
return false;
78-
}
79-
}
80-
}
81-
82-
execution::set_value_inline(r_, !cancelled);
83-
return true;
84-
}
85-
86-
private:
87-
void cancel() {
88-
bool cancelled = false;
89-
{
90-
frg::unique_lock lock(evt_->mutex_);
91-
92-
if(!evt_->raised_) {
93-
cancelled = true;
94-
auto it = evt_->queue_.iterator_to(this);
95-
evt_->queue_.erase(it);
96-
}
97-
}
98-
99-
execution::set_value_noinline(r_, !cancelled_);
100-
}
101-
102-
void complete() override {
103-
if(cobs_.try_reset())
104-
execution::set_value_noinline(r_, true);
105-
}
106-
107-
oneshot_event *evt_;
108-
cancellation_token ct_;
109-
Receiver r_;
110-
cancellation_observer<frg::bound_mem_fn<&wait_operation::cancel>> cobs_;
111-
bool cancelled_ = false;
112-
};
113-
114-
struct [[nodiscard]] wait_sender {
115-
using value_type = bool;
116-
117-
template<typename Receiver>
118-
friend wait_operation<Receiver> connect(wait_sender s, Receiver r) {
119-
return {s.evt, s.ct, std::move(r)};
120-
}
121-
122-
sender_awaiter<wait_sender, bool> operator co_await () {
123-
return {*this};
124-
}
125-
126-
oneshot_event *evt;
127-
cancellation_token ct;
128-
};
129-
130-
wait_sender wait(cancellation_token ct) {
131-
return {this, ct};
16+
auto wait(cancellation_token ct) {
17+
return wg_.wait(ct);
13218
}
13319

13420
auto wait() {
135-
return async::transform(wait(cancellation_token{}), [] (bool waitSuccess) {
136-
assert(waitSuccess);
137-
});
21+
return wg_.wait();
13822
}
139-
14023
private:
141-
platform::mutex mutex_;
142-
143-
bool raised_ = false;
144-
145-
frg::intrusive_list<
146-
node,
147-
frg::locate_member<
148-
node,
149-
frg::default_list_hook<node>,
150-
&node::_hook
151-
>
152-
> queue_;
24+
wait_group wg_ { 1 };
15325
};
15426

15527
} // namespace async

0 commit comments

Comments
 (0)