forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathatomic_vector.hh
67 lines (59 loc) · 2.13 KB
/
atomic_vector.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/*
* Copyright (C) 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/rwlock.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/noncopyable_function.hh>
#include <vector>
// This class supports atomic removes (by using a lock and returning a
// future) and non atomic insert and iteration (by using indexes).
template <typename T>
class atomic_vector {
std::vector<T> _vec;
mutable seastar::rwlock _vec_lock;
public:
void add(const T& value) {
_vec.push_back(value);
}
seastar::future<> remove(const T& value) {
return with_lock(_vec_lock.for_write(), [this, value] {
_vec.erase(std::remove(_vec.begin(), _vec.end(), value), _vec.end());
});
}
// This must be called on a thread. The callback function must not
// call remove.
//
// We would take callbacks that take a T&, but we had bugs in the
// past with some of those callbacks holding that reference past a
// preemption.
void thread_for_each(seastar::noncopyable_function<void(T)> func) const {
_vec_lock.for_read().lock().get();
auto unlock = seastar::defer([this] {
_vec_lock.for_read().unlock();
});
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
func(_vec[i]);
}
}
// The callback function must not call remove.
//
// We would take callbacks that take a T&, but we had bugs in the
// past with some of those callbacks holding that reference past a
// preemption.
seastar::future<> for_each(seastar::noncopyable_function<seastar::future<>(T)> func) const {
auto holder = co_await _vec_lock.hold_read_lock();
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
co_await func(_vec[i]);
}
}
};