Skip to content

Commit 07e6958

Browse files
authored
fix(cluster_family): Cancel slot migration from incoming node on OOM (#5000)
If applying command on incoming node will result in OOM we are closing migration and switch state to FATAL. Already applied keys would be removed form incoming node and migration status for both nodes would be FATAL. Signed-off-by: mkaruza <[email protected]>
1 parent 9d7c713 commit 07e6958

15 files changed

+168
-17
lines changed

src/server/cluster/cluster_defs.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,11 @@ class ClusterShardInfos {
172172
};
173173

174174
// MigrationState constants are ordered in state changing order
175-
enum class MigrationState : uint8_t {
176-
C_CONNECTING,
177-
C_SYNC,
178-
C_ERROR,
179-
C_FINISHED,
180-
};
175+
enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL };
176+
177+
// Errors during slot migration
178+
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
179+
static constexpr std::string_view kIncomingMigrationOOM = "INCOMING_MIGRATION_OOM";
181180

182181
// return error message if slot doesn't belong to this node
183182
facade::ErrorReply SlotOwnershipError(SlotId slot_id);

src/server/cluster/cluster_family.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,8 @@ static string_view StateToStr(MigrationState state) {
726726
return "ERROR"sv;
727727
case MigrationState::C_FINISHED:
728728
return "FINISHED"sv;
729+
case MigrationState::C_FATAL:
730+
return "FATAL"sv;
729731
}
730732
DCHECK(false) << "Unknown State value " << static_cast<underlying_type_t<MigrationState>>(state);
731733
return "UNDEFINED_STATE"sv;
@@ -765,7 +767,6 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b
765767
};
766768

767769
for (const auto& m : incoming_migrations_jobs_) {
768-
// TODO add error status
769770
append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetKeyCount(),
770771
m->GetErrorStr());
771772
}
@@ -925,7 +926,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
925926

926927
if (!migration) {
927928
VLOG(1) << "Unrecognized incoming migration from " << source_id;
928-
return builder->SendSimpleString(OutgoingMigration::kUnknownMigration);
929+
return builder->SendSimpleString(kUnknownMigration);
929930
}
930931

931932
if (migration->GetState() != MigrationState::C_CONNECTING) {
@@ -936,6 +937,10 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
936937
DeleteSlots(slots);
937938
}
938939

940+
if (migration->GetState() == MigrationState::C_FATAL) {
941+
return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM));
942+
}
943+
939944
migration->Init(flows_num);
940945

941946
return builder->SendOk();
@@ -955,6 +960,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
955960
cntx->conn()->SetName(absl::StrCat("migration_flow_", source_id));
956961

957962
auto migration = GetIncomingMigration(source_id);
963+
958964
if (!migration) {
959965
return builder->SendError(kIdNotFound);
960966
}
@@ -1033,15 +1039,19 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
10331039
[source_id = source_id](const auto& m) { return m.node_info.id == source_id; });
10341040
if (m_it == in_migrations.end()) {
10351041
LOG(WARNING) << "migration isn't in config";
1036-
return builder->SendError(OutgoingMigration::kUnknownMigration);
1042+
return builder->SendSimpleString(kUnknownMigration);
10371043
}
10381044

10391045
auto migration = GetIncomingMigration(source_id);
10401046
if (!migration)
10411047
return builder->SendError(kIdNotFound);
10421048

10431049
if (!migration->Join(attempt)) {
1044-
return builder->SendError("Join timeout happened");
1050+
if (migration->GetState() == MigrationState::C_FATAL) {
1051+
return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM));
1052+
} else {
1053+
return builder->SendError("Join timeout happened");
1054+
}
10451055
}
10461056

10471057
ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true);

src/server/cluster/incoming_slot_migration.cc

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ class ClusterShardMigration {
8585
VLOG(1) << "Finalized flow " << source_shard_id_;
8686
return;
8787
}
88+
if (in_migration_->GetState() == MigrationState::C_FATAL) {
89+
VLOG(1) << "Flow finalization " << source_shard_id_
90+
<< " canceled due memory limit reached";
91+
return;
92+
}
8893
if (!tx_data->command.cmd_args.empty()) {
8994
VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by "
9095
<< tx_data->command.cmd_args[0];
@@ -99,6 +104,12 @@ class ClusterShardMigration {
99104
// TODO check about ping logic
100105
} else {
101106
ExecuteTx(std::move(*tx_data), cntx);
107+
// Break incoming slot migration if command reported OOM
108+
if (executor_.connection_context()->IsOOM()) {
109+
cntx->ReportError(std::string{kIncomingMigrationOOM});
110+
in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM});
111+
break;
112+
}
102113
}
103114
}
104115

@@ -190,6 +201,11 @@ bool IncomingSlotMigration::Join(long attempt) {
190201
return false;
191202
}
192203

204+
// If any of migration shards reported ERROR (OOM) we can return error
205+
if (GetState() == MigrationState::C_FATAL) {
206+
return false;
207+
}
208+
193209
// if data was sent after LSN, WaitFor() always returns false so to reduce wait time
194210
// we check current state and if WaitFor false but GetLastAttempt() == attempt
195211
// the Join is failed and we can return false
@@ -227,6 +243,11 @@ void IncomingSlotMigration::Stop() {
227243
}
228244
}
229245

246+
// Don't wait if we reached FATAL state
247+
if (state_ == MigrationState::C_FATAL) {
248+
return;
249+
}
250+
230251
// we need to Join the migration process to prevent data corruption
231252
const absl::Time start = absl::Now();
232253
const absl::Duration timeout =
@@ -260,7 +281,12 @@ void IncomingSlotMigration::Init(uint32_t shards_num) {
260281

261282
void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
262283
shard_flows_[shard]->Start(&cntx_, source);
263-
VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_;
284+
VLOG(1) << "Incoming flow " << shard
285+
<< (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for "
286+
<< source_id_;
287+
if (GetState() == MigrationState::C_FATAL) {
288+
Stop();
289+
}
264290
}
265291

266292
size_t IncomingSlotMigration::GetKeyCount() const {

src/server/cluster/incoming_slot_migration.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,20 @@ class IncomingSlotMigration {
5050
return source_id_;
5151
}
5252

53+
// Switch to FATAL state and store error message
54+
void ReportFatalError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(state_mu_, error_mu_) {
55+
errors_count_.fetch_add(1, std::memory_order_relaxed);
56+
util::fb2::LockGuard lk_state(state_mu_);
57+
util::fb2::LockGuard lk_error(error_mu_);
58+
state_ = MigrationState::C_FATAL;
59+
last_error_ = std::move(err);
60+
}
61+
5362
void ReportError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(error_mu_) {
5463
errors_count_.fetch_add(1, std::memory_order_relaxed);
5564
util::fb2::LockGuard lk(error_mu_);
56-
last_error_ = std::move(err);
65+
if (GetState() != MigrationState::C_FATAL)
66+
last_error_ = std::move(err);
5767
}
5868

5969
std::string GetErrorStr() const ABSL_LOCKS_EXCLUDED(error_mu_) {
@@ -75,6 +85,7 @@ class IncomingSlotMigration {
7585
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
7686
SlotRanges slots_;
7787
ExecutionState cntx_;
88+
7889
mutable util::fb2::Mutex error_mu_;
7990
dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_);
8091
std::atomic<size_t> errors_count_ = 0;

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,12 @@ void OutgoingMigration::OnAllShards(
145145
void OutgoingMigration::Finish(GenericError error) {
146146
auto next_state = MigrationState::C_FINISHED;
147147
if (error) {
148-
next_state = MigrationState::C_ERROR;
148+
// If OOM error move to FATAL, non-recoverable state
149+
if (error == errc::not_enough_memory) {
150+
next_state = MigrationState::C_FATAL;
151+
} else {
152+
next_state = MigrationState::C_ERROR;
153+
}
149154
LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": "
150155
<< migration_info_.node_info.id << " with error: " << error.Format();
151156
exec_st_.ReportError(std::move(error));
@@ -168,6 +173,7 @@ void OutgoingMigration::Finish(GenericError error) {
168173

169174
case MigrationState::C_SYNC:
170175
case MigrationState::C_ERROR:
176+
case MigrationState::C_FATAL:
171177
should_cancel_flows = true;
172178
break;
173179
}
@@ -230,6 +236,13 @@ void OutgoingMigration::SyncFb() {
230236
}
231237

232238
if (!CheckRespIsSimpleReply("OK")) {
239+
// Break outgoing migration if INIT from incoming node responded with OOM. Usually this will
240+
// happen on second iteration after first failed with OOM. Sending second INIT is required to
241+
// cleanup slots on incoming slot migration node.
242+
if (CheckRespSimpleError(kIncomingMigrationOOM)) {
243+
ChangeState(MigrationState::C_FATAL);
244+
break;
245+
}
233246
if (CheckRespIsSimpleReply(kUnknownMigration)) {
234247
const absl::Duration passed = absl::Now() - start_time;
235248
// we provide 30 seconds to distribute the config to all nodes to avoid extra errors
@@ -280,7 +293,11 @@ void OutgoingMigration::SyncFb() {
280293

281294
long attempt = 0;
282295
while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) {
283-
// process commands that were on pause and try again
296+
// Break loop and don't sleep in case of C_FATAL
297+
if (GetState() == MigrationState::C_FATAL) {
298+
break;
299+
}
300+
// Process commands that were on pause and try again
284301
VLOG(1) << "Waiting for migration to finalize...";
285302
ThisFiber::SleepFor(500ms);
286303
}
@@ -367,6 +384,13 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
367384
return false;
368385
}
369386

387+
// Check OOM from incoming slot migration on ACK request
388+
if (CheckRespSimpleError(kIncomingMigrationOOM)) {
389+
Finish(GenericError{std::make_error_code(errc::not_enough_memory),
390+
std::string(kIncomingMigrationOOM)});
391+
return false;
392+
}
393+
370394
if (!CheckRespFirstTypes({RespExpr::INT64})) {
371395
LOG(WARNING) << "Incorrect response type for " << cf_->MyID() << " : "
372396
<< migration_info_.node_info.id << " attempt " << attempt

src/server/cluster/outgoing_slot_migration.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ class OutgoingMigration : private ProtocolClient {
7676

7777
size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_);
7878

79-
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
80-
8179
private:
8280
// should be run for all shards
8381
void StartFlow(journal::Journal* journal, io::Sink* dest);

src/server/conn_context.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,10 @@ class ConnectionContext : public facade::ConnectionContext {
295295
return conn_state.db_index;
296296
}
297297

298+
bool IsOOM() {
299+
return std::exchange(is_oom_, false);
300+
}
301+
298302
void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args,
299303
facade::RedisReplyBuilder* rb);
300304

@@ -323,6 +327,9 @@ class ConnectionContext : public facade::ConnectionContext {
323327
// The related connection is bound to main listener or serves the memcached protocol
324328
bool has_main_or_memcache_listener = false;
325329

330+
// OOM reported while executing
331+
bool is_oom_ = false;
332+
326333
private:
327334
void EnableMonitoring(bool enable) {
328335
subscriptions++; // required to support the monitoring

src/server/journal/executor.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ template <typename... Ts> journal::ParsedEntry::CmdData BuildFromParts(Ts... par
3333
start += part.size();
3434
}
3535

36-
return {std::move(buf), std::move(slice_parts)};
36+
return {std::move(buf), std::move(slice_parts), cmd_str.size()};
3737
}
3838
} // namespace
3939

src/server/journal/serializer.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data)
162162

163163
size_t cmd_size = 0;
164164
SET_OR_RETURN(ReadUInt<uint64_t>(), cmd_size);
165+
data->cmd_len = cmd_size;
165166

166167
// Read all strings consecutively.
167168
data->command_buf = make_unique<uint8_t[]>(cmd_size);
@@ -174,6 +175,9 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data)
174175
ptr += size;
175176
cmd_size -= size;
176177
}
178+
179+
data->cmd_len -= cmd_size;
180+
177181
return {};
178182
}
179183

src/server/journal/types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ struct ParsedEntry : public EntryBase {
7373
struct CmdData {
7474
std::unique_ptr<uint8_t[]> command_buf;
7575
CmdArgVec cmd_args; // represents the parsed command.
76+
size_t cmd_len{0};
7677
};
7778
CmdData cmd;
7879

0 commit comments

Comments
 (0)