forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloading_cache.hh
806 lines (677 loc) · 35.1 KB
/
loading_cache.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
/*
* Copyright (C) 2016-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <chrono>
#include <memory_resource>
#include <optional>
#include <ranges>
#include <algorithm>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/parent_from_member.hpp>
#include <seastar/core/loop.hh>
#include <seastar/core/timer.hh>
#include <seastar/core/gate.hh>
#include "exceptions/exceptions.hh"
#include "utils/assert.hh"
#include "utils/loading_shared_values.hh"
#include "utils/chunked_vector.hh"
#include "utils/log.hh"
namespace bi = boost::intrusive;
namespace utils {
enum class loading_cache_reload_enabled { no, yes };
template <typename Clock>
struct loading_cache_config_base final {
size_t max_size = 0;
Clock::duration expiry;
Clock::duration refresh;
};
using loading_cache_config = loading_cache_config_base<seastar::lowres_clock>;
template <typename Tp>
struct simple_entry_size {
size_t operator()(const Tp& val) {
return 1;
}
};
struct do_nothing_loading_cache_stats {
// Accounts events when entries are evicted from the unprivileged cache section due to size restriction.
// These events are interesting because they are an indication of a cache pollution event.
static void inc_unprivileged_on_cache_size_eviction() noexcept {};
// A metric complementary to the above one. Both combined allow to get the total number of cache evictions
static void inc_privileged_on_cache_size_eviction() noexcept {};
};
/// \brief Loading cache is a cache that loads the value into the cache using the given asynchronous callback.
///
/// Each cached value if reloading is enabled (\tparam ReloadEnabled == loading_cache_reload_enabled::yes) is reloaded after
/// the "refresh" time period since it was loaded for the last time.
///
/// The values are going to be evicted from the cache if they are not accessed during the "expiration" period or haven't
/// been reloaded even once during the same period.
///
/// If "expiration" is set to zero - the caching is going to be disabled and get_XXX(...) is going to call the "loader" callback
/// every time in order to get the requested value.
///
/// \note In order to avoid the eviction of cached entries due to "aging" of the contained value the user has to choose
/// the "expiration" to be at least ("refresh" + "max load latency"). This way the value is going to stay in the cache and is going to be
/// read in a non-blocking way as long as it's frequently accessed. Note however that since reloading is an asynchronous
/// procedure it may get delayed by other running task. Therefore choosing the "expiration" too close to the ("refresh" + "max load latency")
/// value one risks to have his/her cache values evicted when the system is heavily loaded.
///
/// The cache is also limited in size and if adding the next value is going
/// to exceed the cache size limit the least recently used value(s) is(are) going to be evicted until the size of the cache
/// becomes such that adding the new value is not going to break the size limit. If the new entry's size is greater than
/// the cache size then the get_XXX(...) method is going to return a future with the loading_cache::entry_is_too_big exception.
///
/// The cache is comprised of 2 dynamic sections.
/// Total size of both sections should not exceed the maximum cache size.
/// New cache entry is always added to the unprivileged section.
/// After a cache entry is read more than SectionHitThreshold times it moves to the second (privileged) cache section.
/// Both sections' entries obey expiration and reload rules as explained above.
/// When cache entries need to be evicted due to a size restriction unprivileged section least recently used entries are evicted first.
/// If cache size is still too big event after there are no more entries in the unprivileged section the least recently used entries
/// from the privileged section are going to be evicted till the cache size restriction is met.
///
/// The size of the cache is defined as a sum of sizes of all cached entries.
/// The size of each entry is defined by the value returned by the \tparam EntrySize predicate applied on it.
///
/// The get(key) or get_ptr(key) methods ensures that the "loader" callback is called only once for each cached entry regardless of how many
/// callers are calling for the get_XXX(key) for the same "key" at the same time. Only after the value is evicted from the cache
/// it's going to be "loaded" in the context of get_XXX(key). As long as the value is cached get_XXX(key) is going to return the
/// cached value immediately and reload it in the background every "refresh" time period as described above.
///
/// \tparam Key type of the cache key
/// \tparam Tp type of the cached value
/// \tparam SectionHitThreshold number of hits after which a cache item is going to be moved to the privileged cache section.
/// \tparam ReloadEnabled if loading_cache_reload_enabled::yes allow reloading the values otherwise don't reload
/// \tparam EntrySize predicate to calculate the entry size
/// \tparam Hash hash function
/// \tparam EqualPred equality predicate
/// \tparam LoadingSharedValuesStats statistics incrementing class (see utils::loading_shared_values)
/// \tparam Alloc elements allocator
template<typename Key,
typename Tp,
int SectionHitThreshold = 0,
loading_cache_reload_enabled ReloadEnabled = loading_cache_reload_enabled::no,
typename EntrySize = simple_entry_size<Tp>,
typename Hash = std::hash<Key>,
typename EqualPred = std::equal_to<Key>,
typename LoadingSharedValuesStats = utils::do_nothing_loading_shared_values_stats,
typename LoadingCacheStats = utils::do_nothing_loading_cache_stats,
typename Clock = seastar::lowres_clock,
typename Alloc = std::pmr::polymorphic_allocator<>>
class loading_cache {
public:
using config = loading_cache_config;
private:
using loading_cache_clock_type = Clock;
using time_point = loading_cache_clock_type::time_point;
using duration = loading_cache_clock_type::duration;
using safe_link_list_hook = bi::list_base_hook<bi::link_mode<bi::safe_link>>;
class timestamped_val {
public:
using value_type = Tp;
using loading_values_type = typename utils::loading_shared_values<Key, timestamped_val, Hash, EqualPred, LoadingSharedValuesStats, 256>;
class lru_entry;
class value_ptr;
private:
value_type _value;
loading_cache_clock_type::time_point _loaded;
loading_cache_clock_type::time_point _last_read;
lru_entry* _lru_entry_ptr = nullptr; /// MRU item is at the front, LRU - at the back
size_t _size = 0;
public:
timestamped_val(value_type val)
: _value(std::move(val))
, _loaded(loading_cache_clock_type::now())
, _last_read(_loaded)
, _size(EntrySize()(_value))
{}
timestamped_val(timestamped_val&&) = default;
timestamped_val& operator=(value_type new_val) {
SCYLLA_ASSERT(_lru_entry_ptr);
_value = std::move(new_val);
_loaded = loading_cache_clock_type::now();
_lru_entry_ptr->owning_section_size() -= _size;
_size = EntrySize()(_value);
_lru_entry_ptr->owning_section_size() += _size;
return *this;
}
value_type& value() noexcept { return _value; }
const value_type& value() const noexcept { return _value; }
static const timestamped_val& container_of(const value_type& value) {
return *bi::get_parent_from_member(&value, ×tamped_val::_value);
}
loading_cache_clock_type::time_point last_read() const noexcept {
return _last_read;
}
loading_cache_clock_type::time_point loaded() const noexcept {
return _loaded;
}
size_t size() const noexcept {
return _size;
}
bool ready() const noexcept {
return _lru_entry_ptr;
}
lru_entry* lru_entry_ptr() const noexcept {
return _lru_entry_ptr;
}
private:
void touch() noexcept {
_last_read = loading_cache_clock_type::now();
if (_lru_entry_ptr) {
_lru_entry_ptr->touch();
}
}
void set_anchor_back_reference(lru_entry* lru_entry_ptr) noexcept {
_lru_entry_ptr = lru_entry_ptr;
}
};
private:
using loading_values_type = typename timestamped_val::loading_values_type;
using timestamped_val_ptr = typename loading_values_type::entry_ptr;
using ts_value_lru_entry = typename timestamped_val::lru_entry;
using lru_list_type = typename ts_value_lru_entry::lru_list_type;
using list_iterator = typename lru_list_type::iterator;
public:
using value_type = Tp;
using key_type = Key;
using value_ptr = typename timestamped_val::value_ptr;
class entry_is_too_big : public std::exception {};
private:
loading_cache(config cfg, logging::logger& logger)
: _cfg(std::move(cfg))
, _logger(logger)
, _timer([this] { on_timer(); })
{
static_assert(noexcept(LoadingCacheStats::inc_unprivileged_on_cache_size_eviction()), "LoadingCacheStats::inc_unprivileged_on_cache_size_eviction must be non-throwing");
static_assert(noexcept(LoadingCacheStats::inc_privileged_on_cache_size_eviction()), "LoadingCacheStats::inc_privileged_on_cache_size_eviction must be non-throwing");
_logger.debug("Loading cache; max_size: {}, expiry: {}ms, refresh: {}ms", _cfg.max_size,
std::chrono::duration_cast<std::chrono::milliseconds>(_cfg.expiry).count(),
std::chrono::duration_cast<std::chrono::milliseconds>(_cfg.refresh).count());
if (!validate_config(_cfg)) {
throw exceptions::configuration_exception("loading_cache: caching is enabled but refresh period and/or max_size are zero");
}
}
bool validate_config(const config& cfg) const noexcept {
// Sanity check: if expiration period is given then non-zero refresh period and maximal size are required
if (cfg.expiry != duration(0) && (cfg.max_size == 0 || cfg.refresh == duration(0))) {
return false;
}
return true;
}
public:
template<typename Func>
requires std::is_invocable_r_v<future<value_type>, Func, const key_type&>
loading_cache(config cfg, logging::logger& logger, Func&& load)
: loading_cache(std::move(cfg), logger)
{
static_assert(ReloadEnabled == loading_cache_reload_enabled::yes, "This constructor should only be invoked when ReloadEnabled == loading_cache_reload_enabled::yes");
_load = std::forward<Func>(load);
// If expiration period is zero - caching is disabled
if (!caching_enabled()) {
return;
}
_timer_period = std::min(_cfg.expiry, _cfg.refresh);
_timer.arm(_timer_period);
}
loading_cache(size_t max_size, duration expiry, logging::logger& logger)
: loading_cache({max_size, expiry, time_point::max().time_since_epoch()}, logger)
{
static_assert(ReloadEnabled == loading_cache_reload_enabled::no, "This constructor should only be invoked when ReloadEnabled == loading_cache_reload_enabled::no");
// If expiration period is zero - caching is disabled
if (!caching_enabled()) {
return;
}
_timer_period = _cfg.expiry;
_timer.arm(_timer_period);
}
~loading_cache() {
auto value_destroyer = [] (ts_value_lru_entry* ptr) { loading_cache::destroy_ts_value(ptr); };
_unprivileged_lru_list.erase_and_dispose(_unprivileged_lru_list.begin(), _unprivileged_lru_list.end(), value_destroyer);
_lru_list.erase_and_dispose(_lru_list.begin(), _lru_list.end(), value_destroyer);
}
void reset() noexcept {
_logger.info("Resetting cache");
remove_if([](const value_type&){ return true; });
}
bool update_config(config cfg) {
_logger.info("Updating loading cache; max_size: {}, expiry: {}ms, refresh: {}ms", cfg.max_size,
std::chrono::duration_cast<std::chrono::milliseconds>(cfg.expiry).count(),
std::chrono::duration_cast<std::chrono::milliseconds>(cfg.refresh).count());
if (!validate_config(cfg)) {
_logger.debug("loading_cache: caching is enabled but refresh period and/or max_size are zero");
return false;
}
_updated_cfg.emplace(std::move(cfg));
// * If the timer is already armed we need to rearm it so that the changes on config can take place.
// * If timer is not armed and caching is enabled, it means that on_timer was executed but its continuation hasn't finished yet,
// so we don't need to rearm it here, since on_timer's continuation will take care of that
// * If caching is disabled and it's being enabled here on update_config, we also need to arm the timer, so that the changes on config
// can take place
if (_timer.armed() ||
(!caching_enabled() && _updated_cfg->expiry != duration(0))) {
_timer.rearm(loading_cache_clock_type::now() + duration(std::chrono::milliseconds(1)));
}
return true;
}
template <typename LoadFunc>
requires std::is_invocable_r_v<future<value_type>, LoadFunc, const key_type&>
future<value_ptr> get_ptr(const Key& k, LoadFunc&& load) {
// We shouldn't be here if caching is disabled
SCYLLA_ASSERT(caching_enabled());
return _loading_values.get_or_load(k, [load = std::forward<LoadFunc>(load)] (const Key& k) mutable {
return load(k).then([] (value_type val) {
return timestamped_val(std::move(val));
});
}).then([this, k] (timestamped_val_ptr ts_val_ptr) {
// check again since it could have already been inserted and initialized
if (!ts_val_ptr->ready() && !ts_val_ptr.orphaned()) {
_logger.trace("{}: storing the value for the first time", k);
if (ts_val_ptr->size() > _cfg.max_size) {
return make_exception_future<value_ptr>(entry_is_too_big());
}
ts_value_lru_entry* new_lru_entry = Alloc().template allocate_object<ts_value_lru_entry>();
// Remove the least recently used items if map is too big.
shrink();
new(new_lru_entry) ts_value_lru_entry(std::move(ts_val_ptr), *this);
// This will "touch" the entry and add it to the LRU list - we must do this before the shrink() call.
value_ptr vp(new_lru_entry->timestamped_value_ptr());
return make_ready_future<value_ptr>(std::move(vp));
}
return make_ready_future<value_ptr>(std::move(ts_val_ptr));
});
}
future<value_ptr> get_ptr(const Key& k) {
static_assert(ReloadEnabled == loading_cache_reload_enabled::yes, "");
return get_ptr(k, _load);
}
future<Tp> get(const Key& k) {
static_assert(ReloadEnabled == loading_cache_reload_enabled::yes, "");
// If caching is disabled - always load in the foreground
if (!caching_enabled()) {
return _load(k);
}
return get_ptr(k).then([] (value_ptr v_ptr) {
return make_ready_future<Tp>(*v_ptr);
});
}
future<> stop() {
return _timer_reads_gate.close().finally([this] { _timer.cancel(); });
}
/// Find a value for a specific Key value and touch() it.
/// \tparam KeyType Key type
/// \tparam KeyHasher Hash functor type
/// \tparam KeyEqual Equality functor type
///
/// \param key Key value to look for
/// \param key_hasher_func Hash functor
/// \param key_equal_func Equality functor
/// \return cache_value_ptr object pointing to the found value or nullptr otherwise.
template<typename KeyType, typename KeyHasher, typename KeyEqual>
value_ptr find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
// cache_value_ptr constructor is going to update a "last read" timestamp of the corresponding object and move
// the object to the front of the LRU
return set_find(key, std::move(key_hasher_func), std::move(key_equal_func));
};
value_ptr find(const Key& k) noexcept {
return set_find(k);
}
// Removes all values matching a given predicate and values which are currently loading.
// Guarantees that no values which match the predicate and whose loading was initiated
// before this call will be present after this call (or appear at any time later).
// The predicate may be invoked multiple times on the same value.
// It must return the same result for a given value (it must be a pure function).
template <typename Pred>
requires std::is_invocable_r_v<bool, Pred, const value_type&>
void remove_if(Pred&& pred) {
auto cond_pred = [&pred] (const ts_value_lru_entry& v) {
return pred(v.timestamped_value().value());
};
auto value_destroyer = [] (ts_value_lru_entry* p) {
loading_cache::destroy_ts_value(p);
};
_unprivileged_lru_list.remove_and_dispose_if(cond_pred, value_destroyer);
_lru_list.remove_and_dispose_if(cond_pred, value_destroyer);
_loading_values.remove_if([&pred] (const timestamped_val& v) {
return pred(v.value());
});
}
// Removes a given key from the cache.
// The key is removed immediately.
// After this, get_ptr() is guaranteed to reload the value before returning it.
// As a consequence of the above, if there is a concurrent get_ptr() in progress with this,
// its value will not populate the cache. It will still succeed.
void remove(const Key& k) {
remove_ts_value(set_find(k));
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
_loading_values.remove(k);
}
// Removes a given key from the cache.
// Same guarantees as with remove(key).
template<typename KeyType, typename KeyHasher, typename KeyEqual>
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
remove_ts_value(set_find(key, key_hasher_func, key_equal_func));
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
_loading_values.remove(key, key_hasher_func, key_equal_func);
}
size_t size() const {
return _lru_list.size() + _unprivileged_lru_list.size();
}
/// \brief returns the memory size the currently cached entries occupy according to the EntrySize predicate.
size_t memory_footprint() const noexcept {
return _unprivileged_section_size + _privileged_section_size;
}
/// \brief returns the memory size the currently cached entries occupy in the privileged section according to the EntrySize predicate.
size_t privileged_section_memory_footprint() const noexcept {
return _privileged_section_size;
}
/// \brief returns the memory size the currently cached entries occupy in the unprivileged section according to the EntrySize predicate.
size_t unprivileged_section_memory_footprint() const noexcept {
return _unprivileged_section_size;
}
private:
void remove_ts_value(timestamped_val_ptr ts_ptr) {
if (!ts_ptr) {
return;
}
ts_value_lru_entry* lru_entry_ptr = ts_ptr->lru_entry_ptr();
lru_list_type& entry_list = container_list(*lru_entry_ptr);
entry_list.erase_and_dispose(entry_list.iterator_to(*lru_entry_ptr), [] (ts_value_lru_entry* p) { loading_cache::destroy_ts_value(p); });
}
timestamped_val_ptr ready_entry_ptr(timestamped_val_ptr tv_ptr) {
if (!tv_ptr || !tv_ptr->ready()) {
return nullptr;
}
return std::move(tv_ptr);
}
lru_list_type& container_list(const ts_value_lru_entry& lru_entry_ptr) noexcept {
return (lru_entry_ptr.touch_count() > SectionHitThreshold) ? _lru_list : _unprivileged_lru_list;
}
template<typename KeyType, typename KeyHasher, typename KeyEqual>
timestamped_val_ptr set_find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
return ready_entry_ptr(_loading_values.find(key, std::move(key_hasher_func), std::move(key_equal_func)));
}
// keep the default non-templated overloads to ease on the compiler for specifications
// that do not require the templated find().
timestamped_val_ptr set_find(const Key& key) noexcept {
return ready_entry_ptr(_loading_values.find(key));
}
bool caching_enabled() const {
return _cfg.expiry != duration(0);
}
static void destroy_ts_value(ts_value_lru_entry* val) noexcept {
Alloc().delete_object(val);
}
/// This is the core method in the 2 sections LRU implementation.
/// Set the given item as the most recently used item at the corresponding cache section.
/// The MRU item is going to be at the front of the list, the LRU item - at the back.
/// The entry is initially entering the "unprivileged" section (represented by a _unprivileged_lru_list).
/// After an entry is touched more than SectionHitThreshold times it moves to a "privileged" section
/// (represented by an _lru_list).
///
/// \param lru_entry Cache item that has been "touched"
void touch_lru_entry_2_sections(ts_value_lru_entry& lru_entry) {
if (lru_entry.is_linked()) {
lru_list_type& lru_list = container_list(lru_entry);
lru_list.erase(lru_list.iterator_to(lru_entry));
}
if (lru_entry.touch_count() < SectionHitThreshold) {
_logger.trace("Putting key {} into the unprivileged section", lru_entry.key());
_unprivileged_lru_list.push_front(lru_entry);
lru_entry.inc_touch_count();
} else {
_logger.trace("Putting key {} into the privileged section", lru_entry.key());
_lru_list.push_front(lru_entry);
// Bump it up only once to avoid a wrap around
if (lru_entry.touch_count() == SectionHitThreshold) {
// This code will run only once, when a promotion
// from unprivileged to privileged section happens.
// Update section size bookkeeping.
lru_entry.owning_section_size() -= lru_entry.timestamped_value().size();
lru_entry.inc_touch_count();
lru_entry.owning_section_size() += lru_entry.timestamped_value().size();
}
}
}
future<> reload(timestamped_val_ptr ts_value_ptr) {
const Key& key = loading_values_type::to_key(ts_value_ptr);
// Do nothing if the entry has been dropped before we got here (e.g. by the _load() call on another key that is
// also being reloaded).
if (!ts_value_ptr->lru_entry_ptr()) {
_logger.trace("{}: entry was dropped before the reload", key);
return make_ready_future<>();
}
return _load(key).then_wrapped([this, ts_value_ptr = std::move(ts_value_ptr), &key] (auto&& f) mutable {
// if the entry has been evicted by now - simply end here
if (!ts_value_ptr->lru_entry_ptr()) {
_logger.trace("{}: entry was dropped during the reload", key);
return make_ready_future<>();
}
// The exceptions are related to the load operation itself.
// We should ignore them for the background reads - if
// they persist the value will age and will be reloaded in
// the foreground. If the foreground READ fails the error
// will be propagated up to the user and will fail the
// corresponding query.
try {
*ts_value_ptr = f.get();
} catch (std::exception& e) {
_logger.debug("{}: reload failed: {}", key, e.what());
} catch (...) {
_logger.debug("{}: reload failed: unknown error", key);
}
return make_ready_future<>();
});
}
void drop_expired() {
auto now = loading_cache_clock_type::now();
auto expiration_cond = [now, this] (const ts_value_lru_entry& lru_entry) {
using namespace std::chrono;
// An entry should be discarded if it hasn't been reloaded for too long or nobody cares about it anymore
const timestamped_val& v = lru_entry.timestamped_value();
auto since_last_read = now - v.last_read();
auto since_loaded = now - v.loaded();
if (_cfg.expiry < since_last_read || (ReloadEnabled == loading_cache_reload_enabled::yes && _cfg.expiry < since_loaded)) {
_logger.trace("drop_expired(): {}: dropping the entry: expiry {}, ms passed since: loaded {} last_read {}", lru_entry.key(), _cfg.expiry.count(), duration_cast<milliseconds>(since_loaded).count(), duration_cast<milliseconds>(since_last_read).count());
return true;
}
return false;
};
auto value_destroyer = [] (ts_value_lru_entry* p) {
loading_cache::destroy_ts_value(p);
};
_unprivileged_lru_list.remove_and_dispose_if(expiration_cond, value_destroyer);
_lru_list.remove_and_dispose_if(expiration_cond, value_destroyer);
}
// Shrink the cache to the max_size discarding the least recently used items.
// Get rid from the entries that were used exactly once first.
void shrink() noexcept {
using namespace std::chrono;
auto drop_privileged_entry = [&] {
ts_value_lru_entry& lru_entry = *_lru_list.rbegin();
_logger.trace("shrink(): {}: dropping the entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
loading_cache::destroy_ts_value(&lru_entry);
LoadingCacheStats::inc_privileged_on_cache_size_eviction();
};
auto drop_unprivileged_entry = [&] {
ts_value_lru_entry& lru_entry = *_unprivileged_lru_list.rbegin();
_logger.trace("shrink(): {}: dropping the unprivileged entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
loading_cache::destroy_ts_value(&lru_entry);
LoadingCacheStats::inc_unprivileged_on_cache_size_eviction();
};
// When cache entries need to be evicted due to a size restriction,
// unprivileged section entries are evicted first.
//
// However, we make sure that the unprivileged section does not get
// too small, because this could lead to starving the unprivileged section.
// For example if the cache could store at most 50 entries and there are 49 entries in
// privileged section, after adding 5 entries (that would go to unprivileged
// section) 4 of them would get evicted and only the 5th one would stay.
// This caused problems with BATCH statements where all prepared statements
// in the batch have to stay in cache at the same time for the batch to correctly
// execute.
auto minimum_unprivileged_section_size = _cfg.max_size / 2;
while (memory_footprint() >= _cfg.max_size && _unprivileged_section_size > minimum_unprivileged_section_size) {
drop_unprivileged_entry();
}
while (memory_footprint() >= _cfg.max_size && !_lru_list.empty()) {
drop_privileged_entry();
}
// If dropping entries from privileged section did not help,
// we have to drop entries from unprivileged section,
// going below minimum_unprivileged_section_size.
while (memory_footprint() >= _cfg.max_size) {
drop_unprivileged_entry();
}
}
// Try to bring the load factors of the _loading_values into a known range.
void periodic_rehash() noexcept {
try {
_loading_values.rehash();
} catch (...) {
// if rehashing fails - continue with the current buckets array
}
}
void on_timer() {
_logger.trace("on_timer(): start");
if (_updated_cfg) {
_cfg = *_updated_cfg;
_updated_cfg.reset();
_timer_period = std::min(_cfg.expiry, _cfg.refresh);
}
// Caching might have been disabled during a config update
if (!caching_enabled()) {
reset();
return;
}
// Clean up items that were not touched for the whole expiry period.
drop_expired();
// check if rehashing is needed and do it if it is.
periodic_rehash();
if constexpr (ReloadEnabled == loading_cache_reload_enabled::no) {
_logger.trace("on_timer(): rearming");
_timer.arm(loading_cache_clock_type::now() + _timer_period);
return;
}
// Reload all those which value needs to be reloaded.
// Future is waited on indirectly in `stop()` (via `_timer_reads_gate`).
// FIXME: error handling
(void)with_gate(_timer_reads_gate, [this] {
auto now = loading_cache_clock_type::now();
auto to_reload = std::array<lru_list_type*, 2>({&_unprivileged_lru_list, &_lru_list})
| std::views::transform([] (auto* list_ptr) -> decltype(auto) { return *list_ptr; })
| std::views::join
| std::views::filter([this, now] (ts_value_lru_entry& lru_entry) {
return lru_entry.timestamped_value().loaded() + _cfg.refresh < now;
})
| std::views::transform([] (ts_value_lru_entry& lru_entry) {
return lru_entry.timestamped_value_ptr();
})
| std::ranges::to<utils::chunked_vector<timestamped_val_ptr>>();
return parallel_for_each(std::move(to_reload), [this] (timestamped_val_ptr ts_value_ptr) {
_logger.trace("on_timer(): {}: reloading the value", loading_values_type::to_key(ts_value_ptr));
return this->reload(std::move(ts_value_ptr));
}).finally([this] {
_logger.trace("on_timer(): rearming");
// If the config was updated after on_timer and before this continuation finished
// it's necessary to run on_timer again to make sure that everything will be reloaded correctly
if (_updated_cfg) {
_timer.arm(loading_cache_clock_type::now() + duration(std::chrono::milliseconds(1)));
} else {
_timer.arm(loading_cache_clock_type::now() + _timer_period);
}
});
});
}
loading_values_type _loading_values;
lru_list_type _lru_list; // list containing "privileged" section entries
lru_list_type _unprivileged_lru_list; // list containing "unprivileged" section entries
size_t _privileged_section_size = 0;
size_t _unprivileged_section_size = 0;
loading_cache_clock_type::duration _timer_period;
config _cfg;
std::optional<config> _updated_cfg;
logging::logger& _logger;
std::function<future<Tp>(const Key&)> _load;
timer<loading_cache_clock_type> _timer;
seastar::gate _timer_reads_gate;
};
template<typename Key, typename Tp, int SectionHitThreshold, loading_cache_reload_enabled ReloadEnabled, typename EntrySize, typename Hash, typename EqualPred, typename LoadingSharedValuesStats, typename LoadingCacheStats, typename Clock, typename Alloc>
class loading_cache<Key, Tp, SectionHitThreshold, ReloadEnabled, EntrySize, Hash, EqualPred, LoadingSharedValuesStats, LoadingCacheStats, Clock, Alloc>::timestamped_val::value_ptr {
private:
using loading_values_type = typename timestamped_val::loading_values_type;
public:
using timestamped_val_ptr = typename loading_values_type::entry_ptr;
using value_type = Tp;
private:
timestamped_val_ptr _ts_val_ptr;
public:
value_ptr(timestamped_val_ptr ts_val_ptr) : _ts_val_ptr(std::move(ts_val_ptr)) {
if (_ts_val_ptr) {
_ts_val_ptr->touch();
}
}
value_ptr(std::nullptr_t) noexcept : _ts_val_ptr() {}
bool operator==(const value_ptr&) const = default;
explicit operator bool() const noexcept { return bool(_ts_val_ptr); }
value_type& operator*() const noexcept { return _ts_val_ptr->value(); }
value_type* operator->() const noexcept { return &_ts_val_ptr->value(); }
friend std::ostream& operator<<(std::ostream& os, const value_ptr& vp) {
return os << vp._ts_val_ptr;
}
};
/// \brief This is and LRU list entry which is also an anchor for a loading_cache value.
template<typename Key, typename Tp, int SectionHitThreshold, loading_cache_reload_enabled ReloadEnabled, typename EntrySize, typename Hash, typename EqualPred, typename LoadingSharedValuesStats, typename LoadingCacheStats, typename Clock, typename Alloc>
class loading_cache<Key, Tp, SectionHitThreshold, ReloadEnabled, EntrySize, Hash, EqualPred, LoadingSharedValuesStats, LoadingCacheStats, Clock, Alloc>::timestamped_val::lru_entry : public safe_link_list_hook {
private:
using loading_values_type = typename timestamped_val::loading_values_type;
public:
using lru_list_type = bi::list<lru_entry>;
using timestamped_val_ptr = typename loading_values_type::entry_ptr;
private:
timestamped_val_ptr _ts_val_ptr;
loading_cache& _parent;
int _touch_count;
public:
lru_entry(timestamped_val_ptr ts_val, loading_cache& owner_cache)
: _ts_val_ptr(std::move(ts_val))
, _parent(owner_cache)
, _touch_count(0)
{
// We don't want to allow SectionHitThreshold to be greater than half the max value of _touch_count to avoid a wrap around
static_assert(SectionHitThreshold <= std::numeric_limits<decltype(_touch_count)>::max() / 2, "SectionHitThreshold value is too big");
_ts_val_ptr->set_anchor_back_reference(this);
owning_section_size() += _ts_val_ptr->size();
}
void inc_touch_count() noexcept {
++_touch_count;
}
int touch_count() const noexcept {
return _touch_count;
}
~lru_entry() {
if (safe_link_list_hook::is_linked()) {
lru_list_type& lru_list = _parent.container_list(*this);
lru_list.erase(lru_list.iterator_to(*this));
}
owning_section_size() -= _ts_val_ptr->size();
_ts_val_ptr->set_anchor_back_reference(nullptr);
}
size_t& owning_section_size() noexcept {
return _touch_count <= SectionHitThreshold ? _parent._unprivileged_section_size : _parent._privileged_section_size;
}
void touch() noexcept {
_parent.touch_lru_entry_2_sections(*this);
}
const Key& key() const noexcept {
return loading_values_type::to_key(_ts_val_ptr);
}
timestamped_val& timestamped_value() noexcept { return *_ts_val_ptr; }
const timestamped_val& timestamped_value() const noexcept { return *_ts_val_ptr; }
timestamped_val_ptr timestamped_value_ptr() noexcept { return _ts_val_ptr; }
};
}