forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathphased_barrier.hh
97 lines (84 loc) · 2.5 KB
/
phased_barrier.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/*
* Copyright 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/shared_ptr.hh>
#include "seastarx.hh"
namespace utils {
// Synchronizer which allows to track and wait for asynchronous operations
// which were in progress at the time of wait initiation.
class phased_barrier {
public:
using phase_type = uint64_t;
private:
using gate = seastar::gate;
lw_shared_ptr<gate> _gate;
phase_type _phase;
public:
phased_barrier()
: _gate(make_lw_shared<gate>())
, _phase(0)
{ }
class operation {
lw_shared_ptr<gate> _gate;
public:
operation() : _gate() {}
operation(lw_shared_ptr<gate> g) : _gate(std::move(g)) {}
operation(const operation&) = delete;
operation(operation&&) = default;
operation& operator=(operation&& o) noexcept {
if (this != &o) {
this->~operation();
new (this) operation(std::move(o));
}
return *this;
}
~operation() {
if (_gate) {
_gate->leave();
}
}
};
// Starts new operation. The operation ends when the "operation" object is destroyed.
// The operation may last longer than the life time of the phased_barrier.
operation start() {
_gate->enter();
return { _gate };
}
future<> close() noexcept {
return _gate->close();
}
bool is_closed() const noexcept {
return _gate->is_closed();
}
// Starts a new phase and waits for all operations started in any of the earlier phases.
// It is fine to start multiple awaits in parallel.
// Cannot fail.
future<> advance_and_await() noexcept {
if (!operations_in_progress()) {
++_phase;
return make_ready_future();
}
auto new_gate = [] {
seastar::memory::scoped_critical_alloc_section _;
return make_lw_shared<gate>();
}();
++_phase;
auto old_gate = std::exchange(_gate, std::move(new_gate));
return old_gate->close().then([old_gate, op = start()] {});
}
// Returns current phase number. The smallest value returned is 0.
phase_type phase() const {
return _phase;
}
// Number of operations in current phase.
size_t operations_in_progress() const {
return _gate->get_count();
}
};
}