forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloading_shared_values.hh
369 lines (322 loc) · 13.3 KB
/
loading_shared_values.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "utils/assert.hh"
#include <vector>
#include <seastar/core/shared_future.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/future.hh>
#include <seastar/core/bitops.hh>
#include <boost/intrusive/unordered_set.hpp>
#include "seastarx.hh"
namespace bi = boost::intrusive;
namespace utils {
struct do_nothing_loading_shared_values_stats {
static void inc_hits() noexcept {} // Increase the number of times entry was found ready
static void inc_misses() noexcept {} // Increase the number of times entry was not found
static void inc_blocks() noexcept {} // Increase the number of times entry was not ready (>= misses)
static void inc_evictions() noexcept {} // Increase the number of times entry was evicted
};
// Entries stay around as long as there is any live external reference (entry_ptr) to them.
// Supports asynchronous insertion, ensures that only one entry will be loaded.
// InitialBucketsCount is required to be greater than zero. Otherwise a constructor will throw an
// std::invalid_argument exception.
template<typename Key,
typename Tp,
typename Hash = std::hash<Key>,
typename EqualPred = std::equal_to<Key>,
typename Stats = do_nothing_loading_shared_values_stats,
size_t InitialBucketsCount = 16>
requires requires () {
Stats::inc_hits();
Stats::inc_misses();
Stats::inc_blocks();
Stats::inc_evictions();
}
class loading_shared_values {
public:
using key_type = Key;
using value_type = Tp;
static constexpr size_t initial_buckets_count = InitialBucketsCount;
private:
class entry : public bi::unordered_set_base_hook<bi::store_hash<true>>, public enable_lw_shared_from_this<entry> {
private:
loading_shared_values& _parent;
key_type _key;
std::optional<value_type> _val;
shared_promise<> _loaded;
public:
const key_type& key() const noexcept {
return _key;
}
const value_type& value() const noexcept {
return *_val;
}
value_type& value() noexcept {
return *_val;
}
/// \brief "Release" the object from the contained value.
/// After this call the state of the value kept inside this object is undefined and it may no longer be used.
///
/// \return The r-value reference to the value kept inside this object.
value_type&& release() {
return *std::move(_val);
}
void set_value(value_type new_val) {
_val.emplace(std::move(new_val));
}
bool orphaned() const {
return !is_linked();
}
shared_promise<>& loaded() {
return _loaded;
}
bool ready() const noexcept {
return bool(_val);
}
entry(loading_shared_values& parent, key_type k)
: _parent(parent), _key(std::move(k)) {}
~entry() {
if (is_linked()) {
_parent._set.erase(_parent._set.iterator_to(*this));
}
Stats::inc_evictions();
}
friend bool operator==(const entry& a, const entry& b){
return EqualPred()(a.key(), b.key());
}
friend std::size_t hash_value(const entry& v) {
return Hash()(v.key());
}
};
template<typename KeyType, typename KeyEqual>
struct key_eq {
bool operator()(const KeyType& k, const entry& c) const {
return KeyEqual()(k, c.key());
}
bool operator()(const entry& c, const KeyType& k) const {
return KeyEqual()(c.key(), k);
}
};
using set_type = bi::unordered_set<entry, bi::power_2_buckets<true>, bi::compare_hash<true>>;
using bi_set_bucket_traits = typename set_type::bucket_traits;
using set_iterator = typename set_type::iterator;
enum class shrinking_is_allowed { no, yes };
public:
// Pointer to entry value
class entry_ptr {
lw_shared_ptr<entry> _e;
public:
using element_type = value_type;
entry_ptr() = default;
entry_ptr(std::nullptr_t) noexcept : _e() {};
explicit entry_ptr(lw_shared_ptr<entry> e) : _e(std::move(e)) {}
entry_ptr& operator=(std::nullptr_t) noexcept {
_e = nullptr;
return *this;
}
explicit operator bool() const noexcept { return bool(_e); }
bool operator==(const entry_ptr&) const = default;
element_type& operator*() const noexcept { return _e->value(); }
element_type* operator->() const noexcept { return &_e->value(); }
/// \brief Get the wrapped value. Avoid the copy if this is the last reference to this value.
/// If this is the last reference then the wrapped value is going to be std::move()ed. Otherwise it's going to
/// be copied.
/// \return The wrapped value.
element_type release() {
auto res = _e.owned() ? _e->release() : _e->value();
_e = nullptr;
return res;
}
// Returns the key this entry is associated with.
// Valid if bool(*this).
const key_type& key() const {
return _e->key();
}
// Returns true iff the entry is not linked in the set.
// Call only when bool(*this).
bool orphaned() const {
return _e->orphaned();
}
friend class loading_shared_values;
friend std::ostream& operator<<(std::ostream& os, const entry_ptr& ep) {
return os << ep._e.get();
}
};
private:
std::vector<typename set_type::bucket_type> _buckets;
set_type _set;
public:
static const key_type& to_key(const entry_ptr& e_ptr) noexcept {
return e_ptr._e->key();
}
/// \throw std::invalid_argument if InitialBucketsCount is zero
loading_shared_values()
: _buckets(InitialBucketsCount)
, _set(bi_set_bucket_traits(_buckets.data(), _buckets.size()))
{
static_assert(noexcept(Stats::inc_evictions()), "Stats::inc_evictions must be non-throwing");
static_assert(noexcept(Stats::inc_hits()), "Stats::inc_hits must be non-throwing");
static_assert(noexcept(Stats::inc_misses()), "Stats::inc_misses must be non-throwing");
static_assert(noexcept(Stats::inc_blocks()), "Stats::inc_blocks must be non-throwing");
static_assert(InitialBucketsCount && ((InitialBucketsCount & (InitialBucketsCount - 1)) == 0), "Initial buckets count should be a power of two");
}
loading_shared_values(loading_shared_values&&) = default;
loading_shared_values(const loading_shared_values&) = delete;
~loading_shared_values() {
SCYLLA_ASSERT(!_set.size());
}
/// \brief
/// Returns a future which resolves with a shared pointer to the entry for the given key.
/// Always returns a valid pointer if succeeds.
///
/// If entry is missing, the loader is invoked. If entry is already loading, this invocation
/// will wait for prior loading to complete and use its result when it's done.
///
/// The loader object does not survive deferring, so the caller must deal with its liveness.
template<typename Loader>
requires std::same_as<typename futurize<std::invoke_result_t<Loader, const key_type&>>::type, future<value_type>>
future<entry_ptr> get_or_load(const key_type& key, Loader&& loader) noexcept {
try {
auto i = _set.find(key, Hash(), key_eq<key_type, EqualPred>());
lw_shared_ptr<entry> e;
future<> f = make_ready_future<>();
if (i != _set.end()) {
e = i->shared_from_this();
// take a short cut if the value is ready
if (e->ready()) {
Stats::inc_hits();
return make_ready_future<entry_ptr>(entry_ptr(std::move(e)));
}
f = e->loaded().get_shared_future();
} else {
Stats::inc_misses();
e = make_lw_shared<entry>(*this, key);
rehash_before_insert();
_set.insert(*e);
// get_shared_future() may throw, so make sure to call it before invoking the loader(key)
f = e->loaded().get_shared_future();
// Future indirectly forwarded to `e`.
(void)futurize_invoke([&] { return loader(key); }).then_wrapped([e](future<value_type>&& val_fut) mutable {
if (val_fut.failed()) {
e->loaded().set_exception(val_fut.get_exception());
} else {
e->set_value(val_fut.get());
e->loaded().set_value();
}
});
}
if (!f.available()) {
Stats::inc_blocks();
return f.then([e]() mutable {
return entry_ptr(std::move(e));
});
} else if (f.failed()) {
return make_exception_future<entry_ptr>(std::move(f).get_exception());
} else {
Stats::inc_hits();
return make_ready_future<entry_ptr>(entry_ptr(std::move(e)));
}
} catch (...) {
return make_exception_future<entry_ptr>(std::current_exception());
}
}
/// \brief Try to rehash the container so that the load factor is between 0.25 and 0.75.
/// \throw May throw if allocation of a new buckets array throws.
void rehash() {
rehash<shrinking_is_allowed::yes>(_set.size());
}
size_t buckets_count() const {
return _buckets.size();
}
size_t size() const {
return _set.size();
}
template<typename KeyType, typename KeyHasher, typename KeyEqual>
entry_ptr find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
if (it == _set.end() || !it->ready()) {
return entry_ptr();
}
return entry_ptr(it->shared_from_this());
};
// Removes a given key from this container.
// If a given key is currently loading, the loading will succeed and will return entry_ptr
// to the caller, but the value will not be present in the container. It will be removed
// when the last entry_ptr dies, as usual.
//
// Post-condition: !find(key)
template<typename KeyType, typename KeyHasher, typename KeyEqual>
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) {
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
if (it != _set.end()) {
_set.erase(it);
}
}
// Removes a given key from this container.
// If a given key is currently loading, the loading will succeed and will return entry_ptr
// to the caller, but the value will not be present in the container. It will be removed
// when the last entry_ptr dies, as usual.
//
// Post-condition: !find(key)
template<typename KeyType>
void remove(const KeyType& key) {
remove(key, Hash(), EqualPred());
}
// Removes all values which match a given predicate or are currently loading.
// Guarantees that no values which match the predicate and whose loading was initiated
// before this call will be present after this call (or appear at any time later).
// Same effects as if remove(e.key()) was called on each matching entry.
template<typename Pred>
requires std::is_invocable_r_v<bool, Pred, const Tp&>
void remove_if(const Pred& pred) {
auto it = _set.begin();
while (it != _set.end()) {
if (!it->ready() || pred(it->value())) {
auto next = std::next(it);
_set.erase(it);
it = next;
} else {
++it;
}
}
}
// keep the default non-templated overloads to ease on the compiler for specifications
// that do not require the templated find().
entry_ptr find(const key_type& key) noexcept {
return find(key, Hash(), EqualPred());
}
private:
void rehash_before_insert() noexcept {
try {
rehash<shrinking_is_allowed::no>(_set.size() + 1);
} catch (...) {
// if rehashing fails - continue with the current buckets array
}
}
template <shrinking_is_allowed ShrinkingIsAllowed>
void rehash(size_t new_size) {
size_t new_buckets_count = 0;
// Try to keep the load factor between 0.25 (when shrinking is allowed) and 0.75.
if (ShrinkingIsAllowed == shrinking_is_allowed::yes && new_size < buckets_count() / 4) {
if (!new_size) {
new_buckets_count = 1;
} else {
new_buckets_count = size_t(1) << log2floor(new_size * 4);
}
} else if (new_size > 3 * buckets_count() / 4) {
new_buckets_count = buckets_count() * 2;
}
if (new_buckets_count < InitialBucketsCount) {
return;
}
std::vector<typename set_type::bucket_type> new_buckets(new_buckets_count);
_set.rehash(bi_set_bucket_traits(new_buckets.data(), new_buckets.size()));
_buckets = std::move(new_buckets);
}
};
}