Skip to content

Commit 14d2fab

Browse files
committed
Get OOM status from SinkReplyBuilder in incoming slot migration
Modify SinkReplyBuilder to contain also OpStatus when error is set. Don't reset error status after command is done, it will be done on next invoicecmd/dispatchcmd call.
1 parent 983156a commit 14d2fab

File tree

8 files changed

+37
-18
lines changed

8 files changed

+37
-18
lines changed

src/facade/op_status.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#pragma once
66

77
#include <cstdint>
8+
#include <limits>
89
#include <ostream>
910

1011
namespace facade {
@@ -32,6 +33,7 @@ enum class OpStatus : uint16_t {
3233
MEMBER_NOTFOUND,
3334
INVALID_JSON_PATH,
3435
INVALID_JSON,
36+
CUSTOM_ERROR = std::numeric_limits<uint16_t>::max() // Custom status error
3537
};
3638

3739
class OpResultBase {

src/facade/reply_builder.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ void SinkReplyBuilder::SendError(ErrorReply error) {
9090
void SinkReplyBuilder::SendError(OpStatus status) {
9191
if (status == OpStatus::OK)
9292
return SendSimpleString("OK");
93+
last_error_status_ = status;
9394
SendError(StatusToMsg(status));
9495
}
9596

@@ -258,6 +259,7 @@ void MCReplyBuilder::SendLong(long val) {
258259

259260
void MCReplyBuilder::SendError(string_view str, std::string_view type) {
260261
last_error_ = str;
262+
last_error_status_ = OpStatus::CUSTOM_ERROR;
261263
SendSimpleString(absl::StrCat("SERVER_ERROR ", str));
262264
}
263265

@@ -368,6 +370,7 @@ void RedisReplyBuilderBase::SendError(std::string_view str, std::string_view typ
368370
}
369371
tl_facade_stats->reply_stats.err_count[type]++;
370372
last_error_ = str;
373+
last_error_status_ = OpStatus::CUSTOM_ERROR;
371374

372375
if (str[0] != '-') {
373376
WritePieces("-ERR ");

src/facade/reply_builder.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,17 @@ class SinkReplyBuilder {
120120
void SendError(ErrorReply error);
121121
virtual void SendProtocolError(std::string_view str) = 0;
122122

123-
std::string ConsumeLastError() {
124-
return std::exchange(last_error_, {});
123+
void ResetLastError() {
124+
last_error_status_ = OpStatus::OK;
125+
std::exchange(last_error_, {});
126+
}
127+
128+
std::string_view GetLastError() {
129+
return last_error_;
130+
}
131+
132+
OpStatus GetLastErrorStatus() const {
133+
return last_error_status_;
125134
}
126135

127136
uint64_t GetLastSendTimeNs() const;
@@ -137,6 +146,7 @@ class SinkReplyBuilder {
137146
protected:
138147
size_t replies_recorded_ = 0;
139148
std::string last_error_;
149+
OpStatus last_error_status_;
140150

141151
private:
142152
io::Sink* sink_;

src/facade/reply_capture.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ CapturingReplyBuilder::Payload CapturingReplyBuilder::Take() {
6565
CHECK(stack_.empty());
6666
Payload pl = std::move(current_);
6767
current_ = monostate{};
68-
ConsumeLastError();
68+
ResetLastError();
6969
return pl;
7070
}
7171

@@ -165,7 +165,7 @@ void CapturingReplyBuilder::Apply(Payload&& pl, RedisReplyBuilder* rb) {
165165
CaptureVisitor cv{rb};
166166
visit(cv, std::move(pl));
167167
// Consumed and printed by InvokeCmd. We just send the actual error here
168-
rb->ConsumeLastError();
168+
rb->ResetLastError();
169169
}
170170

171171
void CapturingReplyBuilder::SetReplyMode(ReplyMode mode) {

src/server/cluster/incoming_slot_migration.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,6 @@ class ClusterShardMigration {
7676
break;
7777
}
7878

79-
auto used_mem = used_mem_current.load(memory_order_relaxed);
80-
// If aplying transaction data will reach 90% of max_memory_limit we end migration.
81-
if ((used_mem + tx_data->command.cmd_len) > (0.9 * max_memory_limit)) {
82-
cntx->ReportError(std::string{kIncomingMigrationOOM});
83-
in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM});
84-
break;
85-
}
86-
8779
while (tx_data->opcode == journal::Op::LSN) {
8880
VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn;
8981
last_attempt_.store(tx_data->lsn);
@@ -112,6 +104,12 @@ class ClusterShardMigration {
112104
// TODO check about ping logic
113105
} else {
114106
ExecuteTx(std::move(*tx_data), cntx);
107+
// Check for OOM error
108+
if (executor_.ExecutionLastErrorStatus() == facade::OpStatus::OUT_OF_MEMORY) {
109+
cntx->ReportError(std::string{kIncomingMigrationOOM});
110+
in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM});
111+
break;
112+
}
115113
}
116114
}
117115

src/server/journal/executor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class JournalExecutor {
3232
return &conn_context_;
3333
}
3434

35+
facade::OpStatus ExecutionLastErrorStatus() {
36+
return reply_builder_.GetLastErrorStatus();
37+
}
38+
3539
private:
3640
void Execute(journal::ParsedEntry::CmdData& cmd);
3741

src/server/main_service.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,9 +1146,10 @@ void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
11461146
DCHECK(!args.empty());
11471147
DCHECK_NE(0u, shard_set->size()) << "Init was not called";
11481148

1149-
absl::Cleanup clear_last_error([builder]() { builder->ConsumeLastError(); });
11501149
ServerState& etl = *ServerState::tlocal();
11511150

1151+
builder->ResetLastError();
1152+
11521153
string cmd = absl::AsciiStrToUpper(args[0]);
11531154
const auto [cid, args_no_cmd] = registry_.FindExtended(cmd, args.subspan(1));
11541155

@@ -1312,6 +1313,9 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args,
13121313
DCHECK(builder);
13131314
DCHECK(cntx);
13141315

1316+
// Reset
1317+
builder->ResetLastError();
1318+
13151319
if (auto err = VerifyCommandExecution(cid, cntx, tail_args); err) {
13161320
// We need to skip this because ACK's should not be replied to
13171321
// Bonus points because this allows to continue replication with ACL users who got
@@ -1320,7 +1324,6 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args,
13201324
return true;
13211325
}
13221326
builder->SendError(std::move(*err));
1323-
builder->ConsumeLastError();
13241327
return true; // return false only for internal error aborts
13251328
}
13261329

@@ -1354,16 +1357,15 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args,
13541357
ReplyGuard reply_guard(cid->name(), builder, cntx);
13551358
#endif
13561359
uint64_t invoke_time_usec = 0;
1357-
auto last_error = builder->ConsumeLastError();
1358-
DCHECK(last_error.empty());
13591360
try {
13601361
invoke_time_usec = cid->Invoke(tail_args, cmd_cntx);
13611362
} catch (std::exception& e) {
13621363
LOG(ERROR) << "Internal error, system probably unstable " << e.what();
13631364
return false;
13641365
}
13651366

1366-
if (std::string reason = builder->ConsumeLastError(); !reason.empty()) {
1367+
if (builder->GetLastErrorStatus() != facade::OpStatus::OK) {
1368+
std::string_view reason = builder->GetLastError();
13671369
VLOG(2) << FailedCommandToString(cid->name(), tail_args, reason);
13681370
LOG_EVERY_T(WARNING, 1) << FailedCommandToString(cid->name(), tail_args, reason);
13691371
}

src/server/multi_command_squasher.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ bool MultiCommandSquasher::ExecuteStandalone(RedisReplyBuilder* rb, const Stored
145145
if (opts_.verify_commands) {
146146
if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) {
147147
rb->SendError(std::move(*err));
148-
rb->ConsumeLastError();
148+
rb->ResetLastError();
149149
return !opts_.error_abort;
150150
}
151151
}

0 commit comments

Comments
 (0)