Skip to content

Commit fd33adb

Browse files
committed
chore: add a task that periodically updates the expire base time
The task is run during cpu idle periods. It starts refreshing the base after several days. During the updates there are two bases and only one that is currently active: expire_gen_id_ 1. The update process concludes when it reaches end of the traversal and it did not skip any values that should be updated. 2. It skips values when it encounters expired times as it can not delete values in the scope of the task. Signed-off-by: Roman Gershman <[email protected]>
1 parent 4f8b677 commit fd33adb

File tree

9 files changed

+169
-18
lines changed

9 files changed

+169
-18
lines changed

src/core/expire_period.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ namespace dfly {
1010

1111
class ExpirePeriod {
1212
public:
13-
static constexpr size_t kMaxGenId = 15;
13+
static constexpr std::size_t kMaxGenId = 15;
1414

1515
ExpirePeriod() : val_(0), gen_(0), precision_(0) {
1616
static_assert(sizeof(ExpirePeriod) == 8); // TODO
1717
}
1818

19-
explicit ExpirePeriod(uint64_t ms, unsigned gen = 0) : ExpirePeriod() {
19+
explicit ExpirePeriod(uint64_t ms, unsigned gen = 0) : gen_(gen) {
2020
Set(ms);
2121
}
2222

@@ -35,7 +35,9 @@ class ExpirePeriod {
3535

3636
void Set(uint64_t ms);
3737

38-
bool is_second_precision() { return precision_ == 1;}
38+
bool is_second_precision() {
39+
return precision_ == 1;
40+
}
3941

4042
private:
4143
uint64_t val_ : 59;
@@ -48,13 +50,13 @@ inline void ExpirePeriod::Set(uint64_t ms) {
4850

4951
if (ms < kBarrier) {
5052
val_ = ms;
51-
precision_ = 0; // ms
53+
precision_ = 0; // ms
5254
return;
5355
}
5456

5557
precision_ = 1;
5658
if (ms < kBarrier << 10) {
57-
ms = (ms + 500) / 1000; // seconds
59+
ms = (ms + 500) / 1000; // seconds
5860
}
5961
val_ = ms >= kBarrier ? kBarrier - 1 : ms;
6062
}

src/server/db_slice.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,8 @@ class DbSlice::PrimeBumpPolicy {
398398
DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner)
399399
: shard_id_(index),
400400
cache_mode_(cache_mode),
401+
expire_allowed_(1),
402+
expire_gen_id_(0),
401403
owner_(owner),
402404
client_tracking_map_(owner->memory_resource()) {
403405
db_arr_.emplace_back();
@@ -1241,12 +1243,16 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
12411243
return {it, ExpireIterator{}};
12421244
}
12431245

1244-
// TODO: to employ multi-generation update of expire-base and the underlying values.
12451246
int64_t expire_time = ExpireTime(expire_it->second);
12461247

12471248
// Never do expiration on replica or if expiration is disabled or global lock was taken.
12481249
if (int64_t(cntx.time_now_ms) < expire_time || owner_->IsReplica() || !expire_allowed_ ||
12491250
!shard_owner()->shard_lock()->Check(IntentLock::Mode::EXCLUSIVE)) {
1251+
// Update expiry if needed.
1252+
if (expire_it->second.generation_id() != expire_gen_id_) {
1253+
expire_it->second = FromAbsoluteTime(expire_time);
1254+
}
1255+
// Keeping the entry.
12501256
return {it, expire_it};
12511257
}
12521258

src/server/db_slice.h

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,9 @@ class DbSlice {
234234
// Returns slot statistics for db 0.
235235
SlotStats GetSlotStats(SlotId sid) const;
236236

237-
void UpdateExpireBase(uint64_t now, unsigned generation) {
238-
expire_base_[generation & 1] = now;
237+
void NextExpireGen(uint64_t now_ms) {
238+
expire_gen_id_ ^= 1;
239+
expire_base_[expire_gen_id_] = now_ms;
239240
}
240241

241242
void UpdateMemoryParams(int64_t budget, size_t bytes_per_object) {
@@ -251,12 +252,17 @@ class DbSlice {
251252
return bytes_per_object_;
252253
}
253254

255+
// returns expire time in ms.
254256
int64_t ExpireTime(const ExpirePeriod& val) const {
255-
return expire_base_[0] + val.duration_ms();
257+
return expire_base_[val.generation_id()] + val.duration_ms();
256258
}
257259

258260
ExpirePeriod FromAbsoluteTime(uint64_t time_ms) const {
259-
return ExpirePeriod{time_ms - expire_base_[0]};
261+
return ExpirePeriod{time_ms - expire_base_[expire_gen_id_], expire_gen_id_};
262+
}
263+
264+
unsigned expire_gen_id() const {
265+
return expire_gen_id_;
260266
}
261267

262268
struct ItAndUpdater {
@@ -616,11 +622,17 @@ class DbSlice {
616622

617623
ShardId shard_id_;
618624
uint8_t cache_mode_ : 1;
625+
uint8_t expire_allowed_ : 1;
626+
uint8_t expire_gen_id_ : 1;
619627

620628
EngineShard* owner_;
621629

630+
// base time for computing expirations.
631+
// the absolute expiration time is expire_base_ + relative_time.
632+
// In order to allow rolling updates of expire_base_ we maintain two
633+
// generations of expire_base_ and each expiration entry has a generation_id
634+
// that tells which expire_base_ to use.
622635
int64_t expire_base_[2]; // Used for expire logic, represents a real clock.
623-
bool expire_allowed_ = true;
624636

625637
uint64_t version_ = 1; // Used to version entries in the PrimeTable.
626638
uint64_t next_moved_id_ = 1;

src/server/debugcmd.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ optional<DebugCmd::PopulateOptions> DebugCmd::ParsePopulateArgs(CmdArgList args,
872872
}
873873
case FLAG_EXPIRE: {
874874
auto [min_ttl, max_ttl] = parser.Next<uint32_t, uint32_t>();
875-
if (min_ttl >= max_ttl) {
875+
if (min_ttl > max_ttl) {
876876
builder->SendError(kExpiryOutOfRange);
877877
(void)parser.TakeError();
878878
return nullopt;
@@ -1641,7 +1641,8 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat
16411641
if (options.expire_ttl_range.has_value()) {
16421642
uint32_t start = options.expire_ttl_range->first;
16431643
uint32_t end = options.expire_ttl_range->second;
1644-
uint32_t expire_ttl = rand() % (end - start) + start;
1644+
uint32_t expire_ttl = start + ((end > start) ? rand() % (end - start) : 0);
1645+
16451646
VLOG(1) << "set key " << key << " expire ttl as " << expire_ttl;
16461647
auto cid = sf_.service().mutable_registry()->Find("EXPIRE");
16471648
absl::InlinedVector<string, 5> args;

src/server/dragonfly_test.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -953,6 +953,24 @@ TEST_F(DflyEngineTest, Huffman) {
953953
EXPECT_LT(metrics.heap_used_bytes, 14'000'000); // less than 15mb
954954
}
955955

956+
TEST_F(DflyEngineTest, RebaseExpire) {
957+
// expire: two weeks in seconds
958+
auto resp = Run({"debug", "populate", "20000", "key", "10", "EXPIRE", "1209600", "1209600"});
959+
EXPECT_EQ(resp, "OK");
960+
resp = Run({"EXPIRETIME", "key:42"});
961+
long exp_time = *resp.GetInt();
962+
EXPECT_GT(exp_time, 0);
963+
AdvanceTime(2 * 24 * 3600 * 1000); // advance 2 days.
964+
965+
// verify that all keys have been updated.
966+
ExpectConditionWithinTimeout([&] {
967+
auto metrics = GetMetrics();
968+
return metrics.shard_stats.total_update_expire_calls == 20'000;
969+
});
970+
resp = Run({"EXPIRETIME", "key:42"});
971+
EXPECT_EQ(*resp.GetInt(), exp_time);
972+
}
973+
956974
class DflyCommandAliasTest : public DflyEngineTest {
957975
protected:
958976
DflyCommandAliasTest() {

src/server/engine_shard.cc

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ string EngineShard::TxQueueInfo::Format() const {
141141
}
142142

143143
EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
144-
static_assert(sizeof(Stats) == 96);
144+
static_assert(sizeof(Stats) == 104);
145145

146146
#define ADD(x) x += o.x
147147

@@ -157,6 +157,7 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
157157
ADD(total_heartbeat_expired_bytes);
158158
ADD(total_heartbeat_expired_calls);
159159
ADD(total_migrated_keys);
160+
ADD(total_update_expire_calls);
160161

161162
#undef ADD
162163
return *this;
@@ -301,6 +302,8 @@ std::optional<CollectedPageStats> EngineShard::DoDefrag(CollectPageStats collect
301302
return page_usage.CollectedStats();
302303
}
303304

305+
constexpr uint32_t kRunAtLowPriority = 0u;
306+
304307
// the memory defragmentation task is as follow:
305308
// 1. Check if memory usage is high enough
306309
// 2. Check if diff between commited and used memory is high enough
@@ -310,7 +313,6 @@ std::optional<CollectedPageStats> EngineShard::DoDefrag(CollectPageStats collect
310313
// priority.
311314
// otherwise lower the task priority so that it would not use the CPU when not required
312315
uint32_t EngineShard::DefragTask() {
313-
constexpr uint32_t kRunAtLowPriority = 0u;
314316
if (!namespaces) {
315317
return kRunAtLowPriority;
316318
}
@@ -326,6 +328,98 @@ uint32_t EngineShard::DefragTask() {
326328
return 6; // priority.
327329
}
328330

331+
// TODO: this is wrong. we can do better by updating the expire base in DeleteExpiredStep.
332+
uint32_t EngineShard::UpdateExpiresTask() {
333+
if (!namespaces) {
334+
return kRunAtLowPriority;
335+
}
336+
337+
DVLOG(1) << "EngineShard::UpdateExpiresTask shard_id: " << shard_id_;
338+
339+
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());
340+
uint64_t now_ms = GetCurrentTimeMs();
341+
342+
// measure the age of the current expire base.
343+
unsigned base_age_sec = db_slice.FromAbsoluteTime(now_ms).duration_ms() / 1000;
344+
345+
const uint64_t kLowThresholdSec = 3600 * 24 * 2; // 2 days
346+
const uint64_t kHighThresholdSec = 3600 * 24 * 7; // 7 days
347+
if (update_expire_state_ == nullptr) {
348+
if (base_age_sec < kLowThresholdSec) {
349+
// no need to update expire base if period is less than a few days.
350+
return kRunAtLowPriority;
351+
}
352+
353+
db_slice.NextExpireGen(now_ms);
354+
update_expire_state_ = new UpdateExpireState();
355+
VLOG(1) << "shard " << shard_id_ << " updated expire base to " << now_ms
356+
<< ", generation: " << db_slice.expire_gen_id();
357+
}
358+
359+
DCHECK(update_expire_state_ != nullptr);
360+
if (base_age_sec > kHighThresholdSec) {
361+
LOG_EVERY_T(ERROR, 3600) << "Expire base age is very high: " << base_age_sec;
362+
}
363+
364+
if (update_expire_state_->db_index < db_slice.db_array_size()) {
365+
unsigned current_gen_id = db_slice.expire_gen_id();
366+
367+
auto cb = [&](ExpireIterator it) {
368+
if (it->second.generation_id() != current_gen_id) {
369+
int64_t ms = db_slice.ExpireTime(it->second);
370+
371+
// only update if the expire time is in the future.
372+
// we rely on DeleteExpiredStep to delete expired keys because we may need to propagate
373+
// deletions to the journal.
374+
if (ms > int64_t(now_ms)) {
375+
DVLOG(2) << "Update expire generation from " << it->second.generation_id() << " to "
376+
<< current_gen_id;
377+
auto prev = it->second;
378+
it->second = db_slice.FromAbsoluteTime(ms);
379+
auto after = db_slice.FromAbsoluteTime(ms);
380+
DCHECK_EQ(current_gen_id, db_slice.expire_gen_id());
381+
DCHECK_EQ(ms, db_slice.ExpireTime(it->second));
382+
383+
stats_.total_update_expire_calls++;
384+
} else {
385+
update_expire_state_->stale_entries++;
386+
}
387+
}
388+
};
389+
390+
auto& expire_table = db_slice.GetDBTable(update_expire_state_->db_index)->expire;
391+
unsigned iters = 0;
392+
do {
393+
auto next = expire_table.Traverse(detail::DashCursor(update_expire_state_->cursor), cb);
394+
if (next) {
395+
update_expire_state_->cursor = next.token();
396+
} else {
397+
// finished this db, move to the next
398+
update_expire_state_->cursor = 0;
399+
++update_expire_state_->db_index;
400+
while (update_expire_state_->db_index < db_slice.db_array_size() &&
401+
!db_slice.IsDbValid(update_expire_state_->db_index)) {
402+
++update_expire_state_->db_index;
403+
}
404+
}
405+
} while (update_expire_state_->cursor && ++iters < 100);
406+
}
407+
408+
if (update_expire_state_->db_index >= db_slice.db_array_size()) {
409+
if (update_expire_state_->stale_entries == 0) {
410+
// We went over all the items and not stale items were found, we are done.
411+
delete update_expire_state_;
412+
update_expire_state_ = nullptr;
413+
return kRunAtLowPriority;
414+
}
415+
416+
// Repeat the process if we found stale entries to update.
417+
*update_expire_state_ = {};
418+
}
419+
420+
return 5; // run again soon, moderate frequency.
421+
}
422+
329423
EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
330424
: txq_([](const Transaction* t) { return t->txid(); }),
331425
queue_(kQueueLen, 1, 1),
@@ -347,6 +441,8 @@ void EngineShard::Shutdown() {
347441

348442
void EngineShard::StopPeriodicFiber() {
349443
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
444+
ProactorBase::me()->RemoveOnIdleTask(update_expire_base_task_);
445+
350446
fiber_heartbeat_periodic_done_.Notify();
351447
if (fiber_heartbeat_periodic_.IsJoinable()) {
352448
fiber_heartbeat_periodic_.Join();
@@ -394,6 +490,7 @@ void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) {
394490
RunFPeriodically(heartbeat, period_ms, "heartbeat", &fiber_heartbeat_periodic_done_);
395491
});
396492
defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); });
493+
update_expire_base_task_ = pb->AddOnIdleTask([this]() { return UpdateExpiresTask(); });
397494
}
398495

399496
void EngineShard::StartPeriodicShardHandlerFiber(util::ProactorBase* pb,
@@ -605,8 +702,10 @@ void EngineShard::Heartbeat() {
605702

606703
// TODO: iterate over all namespaces
607704
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());
705+
608706
// Skip heartbeat if we are serializing a big value
609707
static auto start = std::chrono::system_clock::now();
708+
610709
// Skip heartbeat if global transaction is in process.
611710
// This is determined by attempting to check if shard lock can be acquired.
612711
const bool can_acquire_global_lock = shard_lock()->Check(IntentLock::Mode::EXCLUSIVE);
@@ -620,6 +719,7 @@ void EngineShard::Heartbeat() {
620719
}
621720
return;
622721
}
722+
623723
start = std::chrono::system_clock::now();
624724

625725
if (!IsReplica()) { // Never run expiry/evictions on replica.

src/server/engine_shard.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class EngineShard {
4949
uint64_t total_heartbeat_expired_keys = 0;
5050
uint64_t total_heartbeat_expired_bytes = 0;
5151
uint64_t total_heartbeat_expired_calls = 0;
52+
uint64_t total_update_expire_calls = 0;
5253

5354
// cluster stats
5455
uint64_t total_migrated_keys = 0;
@@ -258,6 +259,7 @@ class EngineShard {
258259
// context of the controlling thread will access this shard!
259260
// --------------------------------------------------------------------------
260261
uint32_t DefragTask();
262+
uint32_t UpdateExpiresTask();
261263

262264
TxQueue txq_;
263265
TaskQueue queue_, queue2_;
@@ -284,8 +286,18 @@ class EngineShard {
284286
journal::Journal* journal_ = nullptr;
285287
IntentLock shard_lock_;
286288

287-
uint32_t defrag_task_ = 0;
289+
// Idle tasks.
290+
uint32_t defrag_task_ = 0, update_expire_base_task_ = 0;
291+
288292
EvictionTaskState eviction_state_; // Used on eviction fiber
293+
struct UpdateExpireState {
294+
uint64_t cursor = 0;
295+
DbIndex db_index = 0;
296+
uint64_t stale_entries = 0;
297+
};
298+
299+
UpdateExpireState* update_expire_state_ = nullptr;
300+
289301
util::fb2::Fiber fiber_heartbeat_periodic_;
290302
util::fb2::Done fiber_heartbeat_periodic_done_;
291303

src/server/namespaces.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ Namespace::Namespace() {
2222
CHECK(es != nullptr);
2323
ShardId sid = es->shard_id();
2424
shard_db_slices_[sid] = make_unique<DbSlice>(sid, absl::GetFlag(FLAGS_cache_mode), es);
25-
shard_db_slices_[sid]->UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
25+
shard_db_slices_[sid]->NextExpireGen(absl::GetCurrentTimeNanos() / 1000000);
2626
});
2727
}
2828

src/server/test_utils.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ void BaseFamilyTest::ResetService() {
255255
TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000;
256256
auto default_ns = &namespaces->GetDefaultNamespace();
257257
auto cb = [&](EngineShard* s) {
258-
default_ns->GetDbSlice(s->shard_id()).UpdateExpireBase(TEST_current_time_ms - 1000, 0);
258+
default_ns->GetDbSlice(s->shard_id()).NextExpireGen(TEST_current_time_ms - 1000);
259259
};
260260
shard_set->RunBriefInParallel(cb);
261261

0 commit comments

Comments
 (0)