Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/core/expire_period.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ namespace dfly {

class ExpirePeriod {
public:
static constexpr size_t kMaxGenId = 15;
static constexpr std::size_t kMaxGenId = 15;

ExpirePeriod() : val_(0), gen_(0), precision_(0) {
static_assert(sizeof(ExpirePeriod) == 8); // TODO
}

explicit ExpirePeriod(uint64_t ms, unsigned gen = 0) : ExpirePeriod() {
explicit ExpirePeriod(uint64_t ms, unsigned gen = 0) : gen_(gen) {
Set(ms);
}

Expand All @@ -35,7 +35,9 @@ class ExpirePeriod {

void Set(uint64_t ms);

bool is_second_precision() { return precision_ == 1;}
bool is_second_precision() {
return precision_ == 1;
}
Comment on lines +38 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool is_second_precision() {
return precision_ == 1;
}
bool is_second_precision() const {
return precision_ == 1;
}


private:
uint64_t val_ : 59;
Expand All @@ -48,13 +50,13 @@ inline void ExpirePeriod::Set(uint64_t ms) {

if (ms < kBarrier) {
val_ = ms;
precision_ = 0; // ms
precision_ = 0; // ms
return;
}

precision_ = 1;
if (ms < kBarrier << 10) {
ms = (ms + 500) / 1000; // seconds
ms = (ms + 500) / 1000; // seconds
}
val_ = ms >= kBarrier ? kBarrier - 1 : ms;
}
Expand Down
8 changes: 7 additions & 1 deletion src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ class DbSlice::PrimeBumpPolicy {
DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner)
: shard_id_(index),
cache_mode_(cache_mode),
expire_allowed_(1),
expire_gen_id_(0),
owner_(owner),
client_tracking_map_(owner->memory_resource()) {
db_arr_.emplace_back();
Expand Down Expand Up @@ -1241,12 +1243,16 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
return {it, ExpireIterator{}};
}

// TODO: to employ multi-generation update of expire-base and the underlying values.
int64_t expire_time = ExpireTime(expire_it->second);

// Never do expiration on replica or if expiration is disabled or global lock was taken.
if (int64_t(cntx.time_now_ms) < expire_time || owner_->IsReplica() || !expire_allowed_ ||
!shard_owner()->shard_lock()->Check(IntentLock::Mode::EXCLUSIVE)) {
// Update expiry if needed.
if (expire_it->second.generation_id() != expire_gen_id_) {
expire_it->second = FromAbsoluteTime(expire_time);
}
// Keeping the entry.
return {it, expire_it};
}

Expand Down
22 changes: 17 additions & 5 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ class DbSlice {
// Returns slot statistics for db 0.
SlotStats GetSlotStats(SlotId sid) const;

void UpdateExpireBase(uint64_t now, unsigned generation) {
expire_base_[generation & 1] = now;
void NextExpireGen(uint64_t now_ms) {
expire_gen_id_ ^= 1;
expire_base_[expire_gen_id_] = now_ms;
}

void UpdateMemoryParams(int64_t budget, size_t bytes_per_object) {
Expand All @@ -251,12 +252,17 @@ class DbSlice {
return bytes_per_object_;
}

// returns expire time in ms.
int64_t ExpireTime(const ExpirePeriod& val) const {
return expire_base_[0] + val.duration_ms();
return expire_base_[val.generation_id()] + val.duration_ms();
}

ExpirePeriod FromAbsoluteTime(uint64_t time_ms) const {
return ExpirePeriod{time_ms - expire_base_[0]};
return ExpirePeriod{time_ms - expire_base_[expire_gen_id_], expire_gen_id_};
}

unsigned expire_gen_id() const {
return expire_gen_id_;
}

struct ItAndUpdater {
Expand Down Expand Up @@ -616,11 +622,17 @@ class DbSlice {

ShardId shard_id_;
uint8_t cache_mode_ : 1;
uint8_t expire_allowed_ : 1;
uint8_t expire_gen_id_ : 1;

EngineShard* owner_;

// base time for computing expirations.
// the absolute expiration time is expire_base_ + relative_time.
// In order to allow rolling updates of expire_base_ we maintain two
// generations of expire_base_ and each expiration entry has a generation_id
// that tells which expire_base_ to use.
int64_t expire_base_[2]; // Used for expire logic, represents a real clock.
bool expire_allowed_ = true;

uint64_t version_ = 1; // Used to version entries in the PrimeTable.
uint64_t next_moved_id_ = 1;
Expand Down
5 changes: 3 additions & 2 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ optional<DebugCmd::PopulateOptions> DebugCmd::ParsePopulateArgs(CmdArgList args,
}
case FLAG_EXPIRE: {
auto [min_ttl, max_ttl] = parser.Next<uint32_t, uint32_t>();
if (min_ttl >= max_ttl) {
if (min_ttl > max_ttl) {
builder->SendError(kExpiryOutOfRange);
(void)parser.TakeError();
return nullopt;
Expand Down Expand Up @@ -1641,7 +1641,8 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat
if (options.expire_ttl_range.has_value()) {
uint32_t start = options.expire_ttl_range->first;
uint32_t end = options.expire_ttl_range->second;
uint32_t expire_ttl = rand() % (end - start) + start;
uint32_t expire_ttl = start + ((end > start) ? rand() % (end - start) : 0);

VLOG(1) << "set key " << key << " expire ttl as " << expire_ttl;
auto cid = sf_.service().mutable_registry()->Find("EXPIRE");
absl::InlinedVector<string, 5> args;
Expand Down
18 changes: 18 additions & 0 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,24 @@ TEST_F(DflyEngineTest, Huffman) {
EXPECT_LT(metrics.heap_used_bytes, 14'000'000); // less than 15mb
}

TEST_F(DflyEngineTest, RebaseExpire) {
// expire: two weeks in seconds
auto resp = Run({"debug", "populate", "20000", "key", "10", "EXPIRE", "1209600", "1209600"});
EXPECT_EQ(resp, "OK");
resp = Run({"EXPIRETIME", "key:42"});
long exp_time = *resp.GetInt();
EXPECT_GT(exp_time, 0);
AdvanceTime(2 * 24 * 3600 * 1000); // advance 2 days.

// verify that all keys have been updated.
ExpectConditionWithinTimeout([&] {
auto metrics = GetMetrics();
return metrics.shard_stats.total_update_expire_calls == 20'000;
});
resp = Run({"EXPIRETIME", "key:42"});
EXPECT_EQ(*resp.GetInt(), exp_time);
}

class DflyCommandAliasTest : public DflyEngineTest {
protected:
DflyCommandAliasTest() {
Expand Down
103 changes: 101 additions & 2 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ string EngineShard::TxQueueInfo::Format() const {
}

EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
static_assert(sizeof(Stats) == 96);
static_assert(sizeof(Stats) == 104);

#define ADD(x) x += o.x

Expand All @@ -157,6 +157,7 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
ADD(total_heartbeat_expired_bytes);
ADD(total_heartbeat_expired_calls);
ADD(total_migrated_keys);
ADD(total_update_expire_calls);

#undef ADD
return *this;
Expand Down Expand Up @@ -301,6 +302,8 @@ std::optional<CollectedPageStats> EngineShard::DoDefrag(CollectPageStats collect
return page_usage.CollectedStats();
}

constexpr uint32_t kRunAtLowPriority = 0u;

// the memory defragmentation task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
Expand All @@ -310,7 +313,6 @@ std::optional<CollectedPageStats> EngineShard::DoDefrag(CollectPageStats collect
// priority.
// otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
constexpr uint32_t kRunAtLowPriority = 0u;
if (!namespaces) {
return kRunAtLowPriority;
}
Expand All @@ -326,6 +328,97 @@ uint32_t EngineShard::DefragTask() {
return 6; // priority.
}

// TODO: this is wrong. we can do better by updating the expire base in DeleteExpiredStep.
uint32_t EngineShard::UpdateExpiresTask() {
if (!namespaces) {
return kRunAtLowPriority;
}

DVLOG(1) << "EngineShard::UpdateExpiresTask shard_id: " << shard_id_;

DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());
uint64_t now_ms = GetCurrentTimeMs();

// measure the age of the current expire base.
unsigned base_age_sec = db_slice.FromAbsoluteTime(now_ms).duration_ms() / 1000;

const uint64_t kLowThresholdSec = 3600 * 24 * 2; // 2 days
const uint64_t kHighThresholdSec = 3600 * 24 * 7; // 7 days
if (update_expire_state_ == nullptr) {
if (base_age_sec < kLowThresholdSec) {
// no need to update expire base if period is less than a few days.
return kRunAtLowPriority;
}

db_slice.NextExpireGen(now_ms);
update_expire_state_ = new UpdateExpireState();
VLOG(1) << "shard " << shard_id_ << " updated expire base to " << now_ms
<< ", generation: " << db_slice.expire_gen_id();
}

DCHECK(update_expire_state_ != nullptr);
if (base_age_sec > kHighThresholdSec) {
LOG_EVERY_T(ERROR, 3600) << "Expire base age is very high: " << base_age_sec;
}

if (update_expire_state_->db_index < db_slice.db_array_size()) {
unsigned current_gen_id = db_slice.expire_gen_id();

auto cb = [&](ExpireIterator it) {
if (it->second.generation_id() != current_gen_id) {
int64_t ms = db_slice.ExpireTime(it->second);

// only update if the expire time is in the future.
// we rely on DeleteExpiredStep to delete expired keys because we may need to propagate
// deletions to the journal.
if (ms > int64_t(now_ms)) {
DVLOG(2) << "Update expire generation from " << it->second.generation_id() << " to "
<< current_gen_id;
it->second = db_slice.FromAbsoluteTime(ms);

DCHECK_EQ(current_gen_id, db_slice.expire_gen_id());
DCHECK_EQ(ms, db_slice.ExpireTime(it->second));

stats_.total_update_expire_calls++;
} else {
update_expire_state_->stale_entries++;
}
}
};

auto& expire_table = db_slice.GetDBTable(update_expire_state_->db_index)->expire;
unsigned iters = 0;
do {
auto next = expire_table.Traverse(detail::DashCursor(update_expire_state_->cursor), cb);
if (next) {
update_expire_state_->cursor = next.token();
} else {
// finished this db, move to the next
update_expire_state_->cursor = 0;
++update_expire_state_->db_index;
while (update_expire_state_->db_index < db_slice.db_array_size() &&
!db_slice.IsDbValid(update_expire_state_->db_index)) {
++update_expire_state_->db_index;
}
}
} while (update_expire_state_->cursor && ++iters < 100);
}

if (update_expire_state_->db_index >= db_slice.db_array_size()) {
if (update_expire_state_->stale_entries == 0) {
// We went over all the items and not stale items were found, we are done.
delete update_expire_state_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't followed the logic of this function completely but this seems dangerous if the state is kept among different idle runs of this function. Why ? Because:

  1. IdleTsk runs -> allocates update_expire_state_
  2. We exit so we don't run the task again
  3. Memory leak since we don't call delete update_expire_state_ anywhere else

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IdleTask exits only on process shutdown. if it concludes the traversal, it deletes the state.

update_expire_state_ = nullptr;
return kRunAtLowPriority;
}

// Repeat the process if we found stale entries to update.
*update_expire_state_ = {};
}

return 5; // run again soon, moderate frequency.
}

EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
: txq_([](const Transaction* t) { return t->txid(); }),
queue_(kQueueLen, 1, 1),
Expand All @@ -347,6 +440,8 @@ void EngineShard::Shutdown() {

void EngineShard::StopPeriodicFiber() {
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
ProactorBase::me()->RemoveOnIdleTask(update_expire_base_task_);

fiber_heartbeat_periodic_done_.Notify();
if (fiber_heartbeat_periodic_.IsJoinable()) {
fiber_heartbeat_periodic_.Join();
Expand Down Expand Up @@ -394,6 +489,7 @@ void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) {
RunFPeriodically(heartbeat, period_ms, "heartbeat", &fiber_heartbeat_periodic_done_);
});
defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); });
update_expire_base_task_ = pb->AddOnIdleTask([this]() { return UpdateExpiresTask(); });
}

void EngineShard::StartPeriodicShardHandlerFiber(util::ProactorBase* pb,
Expand Down Expand Up @@ -605,8 +701,10 @@ void EngineShard::Heartbeat() {

// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id());

// Skip heartbeat if we are serializing a big value
static auto start = std::chrono::system_clock::now();

// Skip heartbeat if global transaction is in process.
// This is determined by attempting to check if shard lock can be acquired.
const bool can_acquire_global_lock = shard_lock()->Check(IntentLock::Mode::EXCLUSIVE);
Expand All @@ -620,6 +718,7 @@ void EngineShard::Heartbeat() {
}
return;
}

start = std::chrono::system_clock::now();

if (!IsReplica()) { // Never run expiry/evictions on replica.
Expand Down
14 changes: 13 additions & 1 deletion src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class EngineShard {
uint64_t total_heartbeat_expired_keys = 0;
uint64_t total_heartbeat_expired_bytes = 0;
uint64_t total_heartbeat_expired_calls = 0;
uint64_t total_update_expire_calls = 0;

// cluster stats
uint64_t total_migrated_keys = 0;
Expand Down Expand Up @@ -258,6 +259,7 @@ class EngineShard {
// context of the controlling thread will access this shard!
// --------------------------------------------------------------------------
uint32_t DefragTask();
uint32_t UpdateExpiresTask();

TxQueue txq_;
TaskQueue queue_, queue2_;
Expand All @@ -284,8 +286,18 @@ class EngineShard {
journal::Journal* journal_ = nullptr;
IntentLock shard_lock_;

uint32_t defrag_task_ = 0;
// Idle tasks.
uint32_t defrag_task_ = 0, update_expire_base_task_ = 0;

EvictionTaskState eviction_state_; // Used on eviction fiber
struct UpdateExpireState {
uint64_t cursor = 0;
DbIndex db_index = 0;
uint64_t stale_entries = 0;
};

UpdateExpireState* update_expire_state_ = nullptr;

util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_;

Expand Down
2 changes: 1 addition & 1 deletion src/server/namespaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Namespace::Namespace() {
CHECK(es != nullptr);
ShardId sid = es->shard_id();
shard_db_slices_[sid] = make_unique<DbSlice>(sid, absl::GetFlag(FLAGS_cache_mode), es);
shard_db_slices_[sid]->UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
shard_db_slices_[sid]->NextExpireGen(absl::GetCurrentTimeNanos() / 1000000);
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ void BaseFamilyTest::ResetService() {
TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000;
auto default_ns = &namespaces->GetDefaultNamespace();
auto cb = [&](EngineShard* s) {
default_ns->GetDbSlice(s->shard_id()).UpdateExpireBase(TEST_current_time_ms - 1000, 0);
default_ns->GetDbSlice(s->shard_id()).NextExpireGen(TEST_current_time_ms - 1000);
};
shard_set->RunBriefInParallel(cb);

Expand Down
Loading