Skip to content

Commit 16a27f6

Browse files
committed
Changes after review
1 parent 6a5d621 commit 16a27f6

6 files changed

+49
-66
lines changed

src/server/cluster/cluster_defs.h

+4
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,10 @@ class ClusterShardInfos {
174174
// MigrationState constants are ordered in state changing order
175175
enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL };
176176

177+
// Errors during slot migration
178+
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
179+
static constexpr std::string_view kIncomingMigrationOOM = "INCOMING_MIGRATION_OOM";
180+
177181
// return error message if slot doesn't belong to this node
178182
facade::ErrorReply SlotOwnershipError(SlotId slot_id);
179183

src/server/cluster/cluster_family.cc

+5-17
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,13 @@
99
#include <string>
1010

1111
#include "absl/cleanup/cleanup.h"
12-
#include "absl/strings/str_cat.h"
1312
#include "base/flags.h"
1413
#include "base/logging.h"
1514
#include "facade/cmd_arg_parser.h"
1615
#include "facade/dragonfly_connection.h"
1716
#include "facade/error.h"
1817
#include "server/acl/acl_commands_def.h"
1918
#include "server/channel_store.h"
20-
#include "server/cluster/cluster_defs.h"
21-
#include "server/cluster/incoming_slot_migration.h"
22-
#include "server/cluster/outgoing_slot_migration.h"
2319
#include "server/command_registry.h"
2420
#include "server/conn_context.h"
2521
#include "server/dflycmd.h"
@@ -839,7 +835,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr<ClusterConfig> new_config,
839835
removed_slots.Merge(slots);
840836
LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to "
841837
<< migration.GetHostIp() << ":" << migration.GetPort();
842-
migration.Finish(MigrationState::C_FINISHED);
838+
migration.Finish();
843839
res.migrations.push_back(std::move(*it));
844840
outgoing_migration_jobs_.erase(it);
845841
}
@@ -930,7 +926,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
930926

931927
if (!migration) {
932928
VLOG(1) << "Unrecognized incoming migration from " << source_id;
933-
return builder->SendSimpleString(OutgoingMigration::kUnknownMigration);
929+
return builder->SendSimpleString(kUnknownMigration);
934930
}
935931

936932
if (migration->GetState() != MigrationState::C_CONNECTING) {
@@ -942,7 +938,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
942938
}
943939

944940
if (migration->GetState() == MigrationState::C_FATAL) {
945-
return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM));
941+
return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM));
946942
}
947943

948944
migration->Init(flows_num);
@@ -969,10 +965,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
969965
return builder->SendError(kIdNotFound);
970966
}
971967

972-
if (migration->GetState() == MigrationState::C_FATAL) {
973-
return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM));
974-
}
975-
976968
DCHECK(cntx->sync_dispatch);
977969
// we do this to be ignored by the dispatch tracker
978970
// TODO provide a more clear approach
@@ -981,10 +973,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
981973
builder->SendOk();
982974

983975
migration->StartFlow(shard_id, cntx->conn()->socket());
984-
985-
if (migration->GetState() == MigrationState::C_FATAL) {
986-
migration->Stop();
987-
}
988976
}
989977

990978
void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
@@ -1051,7 +1039,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
10511039
[source_id = source_id](const auto& m) { return m.node_info.id == source_id; });
10521040
if (m_it == in_migrations.end()) {
10531041
LOG(WARNING) << "migration isn't in config";
1054-
return builder->SendError(OutgoingMigration::kUnknownMigration);
1042+
return builder->SendError(kUnknownMigration);
10551043
}
10561044

10571045
auto migration = GetIncomingMigration(source_id);
@@ -1060,7 +1048,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
10601048

10611049
if (!migration->Join(attempt)) {
10621050
if (migration->GetState() == MigrationState::C_FATAL) {
1063-
return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM));
1051+
return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM));
10641052
} else {
10651053
return builder->SendError("Join timeout happened");
10661054
}

src/server/cluster/incoming_slot_migration.cc

+15-18
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,10 @@
77
#include <absl/cleanup/cleanup.h>
88
#include <absl/strings/str_cat.h>
99

10-
#include <utility>
11-
1210
#include "base/flags.h"
1311
#include "base/logging.h"
1412
#include "cluster_utility.h"
15-
#include "facade/error.h"
1613
#include "server/cluster/cluster_defs.h"
17-
#include "server/cluster/outgoing_slot_migration.h"
18-
#include "server/common.h"
1914
#include "server/error.h"
2015
#include "server/journal/executor.h"
2116
#include "server/journal/tx_executor.h"
@@ -76,17 +71,11 @@ class ClusterShardMigration {
7671
break;
7772
}
7873

79-
auto oom_check = [&]() -> bool {
80-
auto used_mem = used_mem_current.load(memory_order_relaxed);
81-
if ((used_mem + tx_data->command.cmd_len) > max_memory_limit) {
82-
cntx->ReportError(IncomingSlotMigration::kMigrationOOM);
83-
in_migration_->ReportFatalError(GenericError(IncomingSlotMigration::kMigrationOOM));
84-
return true;
85-
}
86-
return false;
87-
};
88-
89-
if (oom_check()) {
74+
auto used_mem = used_mem_current.load(memory_order_relaxed);
75+
// If aplying transaction data will reach 90% of max_memory_limit we end migration.
76+
if ((used_mem + tx_data->command.cmd_len) > (0.9 * max_memory_limit)) {
77+
cntx->ReportError(std::string{kIncomingMigrationOOM});
78+
in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM});
9079
break;
9180
}
9281

@@ -99,8 +88,8 @@ class ClusterShardMigration {
9988
VLOG(1) << "Finalized flow " << source_shard_id_;
10089
return;
10190
}
102-
if (oom_check()) {
103-
VLOG(2) << "Flow finalization " << source_shard_id_
91+
if (in_migration_->GetState() == MigrationState::C_FATAL) {
92+
VLOG(1) << "Flow finalization " << source_shard_id_
10493
<< " canceled due memory limit reached";
10594
return;
10695
}
@@ -248,6 +237,11 @@ void IncomingSlotMigration::Stop() {
248237
}
249238
}
250239

240+
// Don't wait if we reached FATAL state
241+
if (state_ == MigrationState::C_FATAL) {
242+
return;
243+
}
244+
251245
// we need to Join the migration process to prevent data corruption
252246
const absl::Time start = absl::Now();
253247
const absl::Duration timeout =
@@ -284,6 +278,9 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou
284278
VLOG(1) << "Incoming flow " << shard
285279
<< (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for "
286280
<< source_id_;
281+
if (GetState() == MigrationState::C_FATAL) {
282+
Stop();
283+
}
287284
}
288285

289286
size_t IncomingSlotMigration::GetKeyCount() const {

src/server/cluster/incoming_slot_migration.h

-5
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
//
44
#pragma once
55

6-
#include <string>
7-
8-
#include "absl/base/thread_annotations.h"
96
#include "helio/util/fiber_socket_base.h"
107
#include "server/cluster/cluster_defs.h"
118
#include "server/common.h"
@@ -82,8 +79,6 @@ class IncomingSlotMigration {
8279

8380
void Pause(bool pause);
8481

85-
static constexpr char kMigrationOOM[] = "INCOMING_MIGRATION_OOM";
86-
8782
private:
8883
std::string source_id_;
8984
Service& service_;

src/server/cluster/outgoing_slot_migration.cc

+24-23
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
#include "base/logging.h"
1313
#include "cluster_family.h"
1414
#include "cluster_utility.h"
15-
#include "facade/resp_expr.h"
1615
#include "server/cluster/cluster_defs.h"
17-
#include "server/cluster/incoming_slot_migration.h"
18-
#include "server/common.h"
1916
#include "server/db_slice.h"
2017
#include "server/engine_shard_set.h"
2118
#include "server/error.h"
@@ -40,8 +37,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
4037
SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots,
4138
journal::Journal* journal, OutgoingMigration* om)
4239
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) {
43-
exec_st_.SwitchErrorHandler(
44-
[om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); });
40+
exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
4541
}
4642

4743
~SliceSlotMigration() {
@@ -143,8 +139,14 @@ void OutgoingMigration::OnAllShards(
143139
});
144140
}
145141

146-
void OutgoingMigration::Finish(MigrationState next_state, GenericError error) {
142+
void OutgoingMigration::Finish(GenericError error) {
143+
auto next_state = MigrationState::C_FINISHED;
147144
if (error) {
145+
if (error.Format() == kIncomingMigrationOOM) {
146+
next_state = MigrationState::C_FATAL;
147+
} else {
148+
next_state = MigrationState::C_ERROR;
149+
}
148150
LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": "
149151
<< migration_info_.node_info.id << " with error: " << error.Format();
150152
exec_st_.ReportError(std::move(error));
@@ -225,6 +227,15 @@ void OutgoingMigration::SyncFb() {
225227
continue;
226228
}
227229

230+
// Break outgoing migration if INIT from incoming node responded with OOM. Usually this will
231+
// happen on second iteration after first failed with OOM. Sending second INIT is required to
232+
// cleanup slots on incoming slot migration node.
233+
if (CheckRespFirstTypes({RespExpr::ERROR}) &&
234+
facade::ToSV(LastResponseArgs().front().GetBuf()) == kIncomingMigrationOOM) {
235+
ChangeState(MigrationState::C_FATAL);
236+
break;
237+
}
238+
228239
if (!CheckRespIsSimpleReply("OK")) {
229240
if (CheckRespIsSimpleReply(kUnknownMigration)) {
230241
const absl::Duration passed = absl::Now() - start_time;
@@ -275,20 +286,16 @@ void OutgoingMigration::SyncFb() {
275286
}
276287

277288
long attempt = 0;
278-
bool fatal_state = false;
279289
while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) {
280-
// Don't sleep if we ended up in FATAL state
290+
// Break loop and don't sleep in case of C_FATAL
281291
if (GetState() == MigrationState::C_FATAL) {
282-
fatal_state = true;
283292
break;
284-
} else {
285-
// Process commands that were on pause and try again
286-
VLOG(1) << "Waiting for migration to finalize...";
287-
ThisFiber::SleepFor(500ms);
288293
}
294+
// Process commands that were on pause and try again
295+
VLOG(1) << "Waiting for migration to finalize...";
296+
ThisFiber::SleepFor(500ms);
289297
}
290-
// End outgoing slot migration if we are FINISHED or are in FATAL state
291-
if (!exec_st_.IsRunning() && !fatal_state) {
298+
if (!exec_st_.IsRunning()) {
292299
continue;
293300
}
294301
break;
@@ -371,13 +378,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
371378
auto error = facade::ToSV(LastResponseArgs().front().GetBuf());
372379
LOG(WARNING) << "Error response for " << cf_->MyID() << " : " << migration_info_.node_info.id
373380
<< " attempt " << attempt << " msg: " << error;
374-
auto next_state = MigrationState::C_ERROR;
375-
// Check if there is OOM response from incoming slot migration
376-
if (error == IncomingSlotMigration::kMigrationOOM) {
377-
SetLastError(GenericError(IncomingSlotMigration::kMigrationOOM));
378-
next_state = MigrationState::C_FATAL;
379-
}
380-
Finish(next_state, std::string(error));
381+
Finish(std::string(error));
381382
return false;
382383
}
383384

@@ -397,7 +398,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
397398
}
398399

399400
if (!exec_st_.GetError()) {
400-
Finish(MigrationState::C_FINISHED);
401+
Finish();
401402
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
402403
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges,
403404
false);

src/server/cluster/outgoing_slot_migration.h

+1-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient {
3030
// if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet
3131
// can be called from any thread, but only after Start()
3232
// if is_error = true and migration is in progress it will be restarted otherwise nothing happens
33-
void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
33+
void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
3434

3535
MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_);
3636

@@ -76,8 +76,6 @@ class OutgoingMigration : private ProtocolClient {
7676

7777
size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_);
7878

79-
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
80-
8179
private:
8280
// should be run for all shards
8381
void StartFlow(journal::Journal* journal, io::Sink* dest);

0 commit comments

Comments
 (0)