Skip to content

Commit 2e0a45f

Browse files
authored
fix(set_family): Don't replicate SPOP if nothing was removed (#5936)
* fix(set_family): Don't replicate SPOP if nothing was removed If nothing is removed with SPOP command we are rewriting command as SREM with invalid syntax. This replicated SREM command fails on replica side and we don't increment LSN. Fixed by skipping replication if SPOP didn't remove anything. Enable logging on replica if command verification failed. Signed-off-by: mkaruza <[email protected]> * fix(zset_family): Disable AUTOJOURNAL for ZRANGESTORE ZRANGESTORE is rewriten as SREM and ZADD so auto journal needs to be disabled. * feat(replica): Report DFATAL error only in ACTIVE and RUNNING state --------- Signed-off-by: mkaruza <[email protected]>
1 parent 370ef9c commit 2e0a45f

File tree

4 files changed

+14
-5
lines changed

4 files changed

+14
-5
lines changed

src/server/main_service.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1452,7 +1452,8 @@ DispatchResult Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder
14521452
}
14531453

14541454
if (auto err = VerifyCommandState(cid, args_no_cmd, *dfly_cntx); err) {
1455-
LOG_IF(WARNING, cntx->replica_conn) << "VerifyCommandState error: " << err->ToSv();
1455+
LOG_IF(WARNING, cntx->replica_conn || !cntx->conn() /* no owner in replica context */)
1456+
<< "VerifyCommandState error: " << err->ToSv();
14561457
if (auto& exec_info = dfly_cntx->conn_state.exec_info; exec_info.IsCollecting())
14571458
exec_info.state = ConnectionState::ExecInfo::EXEC_ERROR;
14581459

src/server/replica.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,13 @@ void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) {
10061006
// inconsistent data because the replica will resume from the next
10071007
// lsn of the master and this lsn entry will be lost.
10081008
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
1009+
} else {
1010+
// We only report DFATAL:
1011+
// 1. Context is running
1012+
// 2. We are ACTIVE global state
1013+
if (cntx->IsRunning() && ((*ServerState::tlocal()).gstate() == GlobalState::ACTIVE)) {
1014+
LOG(DFATAL) << "ExecuteTx() on replica should be successful.";
1015+
}
10091016
}
10101017
}
10111018

src/server/set_family.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, facade::ArgRang
610610
if (isempty) {
611611
db_slice.Del(op_args.db_cntx, find_res->it);
612612
}
613-
if (journal_rewrite && op_args.shard->journal()) {
613+
if (removed && journal_rewrite && op_args.shard->journal()) {
614614
vector<string_view> mapped(vals.Size() + 1);
615615
mapped[0] = key;
616616
std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
@@ -950,13 +950,13 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, unsigned count
950950
StringVec result = RandMemberSet(db_cntx, co, generator, picks_count);
951951

952952
// Remove selected members
953-
bool is_empty = RemoveSet(db_cntx, result, &co).second;
953+
auto [removed, is_empty] = RemoveSet(db_cntx, result, &co);
954954
find_res->post_updater.Run();
955955

956956
CHECK(!is_empty);
957957

958958
// Replicate as SREM with removed keys, because SPOP is not deterministic.
959-
if (op_args.shard->journal()) {
959+
if (removed && op_args.shard->journal()) {
960960
vector<string_view> mapped(result.size() + 1);
961961
mapped[0] = key;
962962
copy(result.begin(), result.end(), mapped.begin() + 1);

src/server/zset_family.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2855,7 +2855,8 @@ void ZSetFamily::Register(CommandRegistry* registry) {
28552855
<< CI{"ZRANK", CO::READONLY | CO::FAST, -3, 1, 1}.HFUNC(ZRank)
28562856
<< CI{"ZRANGEBYLEX", CO::READONLY, -4, 1, 1}.HFUNC(ZRangeByLex)
28572857
<< CI{"ZRANGEBYSCORE", CO::READONLY, -4, 1, 1}.HFUNC(ZRangeByScore)
2858-
<< CI{"ZRANGESTORE", CO::WRITE | CO::DENYOOM, -5, 1, 2}.HFUNC(ZRangeStore)
2858+
<< CI{"ZRANGESTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -5, 1, 2}.HFUNC(
2859+
ZRangeStore)
28592860
<< CI{"ZSCORE", CO::READONLY | CO::FAST, 3, 1, 1}.HFUNC(ZScore)
28602861
<< CI{"ZMSCORE", CO::READONLY | CO::FAST, -3, 1, 1}.HFUNC(ZMScore)
28612862
<< CI{"ZREMRANGEBYRANK", CO::WRITE, 4, 1, 1}.HFUNC(ZRemRangeByRank)

0 commit comments

Comments
 (0)