From 6cddc4385e9c084728628930504a4c3652c558a1 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Fri, 25 Apr 2025 13:14:31 +0200 Subject: [PATCH 1/3] fix(cluster_family): Cancel slot migration from incoming node on OOM If applying command on incoming node will result in OOM (we overflow max_memory_limit) we are closing migration and switch state to FATAL. Signed-off-by: mkaruza --- src/server/cluster/cluster_defs.h | 7 +-- src/server/cluster/cluster_family.cc | 28 ++++++++- src/server/cluster/incoming_slot_migration.cc | 34 ++++++++++- src/server/cluster/incoming_slot_migration.h | 18 +++++- src/server/cluster/outgoing_slot_migration.cc | 44 +++++++++++--- src/server/cluster/outgoing_slot_migration.h | 2 +- src/server/journal/executor.cc | 2 +- src/server/journal/serializer.cc | 4 ++ src/server/journal/types.h | 1 + tests/dragonfly/cluster_test.py | 57 +++++++++++++++++++ tools/cluster_mgr.py | 2 + 11 files changed, 177 insertions(+), 22 deletions(-) diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h index b54bb3859e34..47771ebc525c 100644 --- a/src/server/cluster/cluster_defs.h +++ b/src/server/cluster/cluster_defs.h @@ -172,12 +172,7 @@ class ClusterShardInfos { }; // MigrationState constants are ordered in state changing order -enum class MigrationState : uint8_t { - C_CONNECTING, - C_SYNC, - C_ERROR, - C_FINISHED, -}; +enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL }; // return error message if slot doesn't belong to this node facade::ErrorReply SlotOwnershipError(SlotId slot_id); diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 88446a1d3fe5..ff0a9922e48a 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -9,6 +9,7 @@ #include #include "absl/cleanup/cleanup.h" +#include "absl/strings/str_cat.h" #include "base/flags.h" #include "base/logging.h" #include "facade/cmd_arg_parser.h" @@ -16,6 +17,9 @@ #include "facade/error.h" #include "server/acl/acl_commands_def.h" #include "server/channel_store.h" +#include "server/cluster/cluster_defs.h" +#include "server/cluster/incoming_slot_migration.h" +#include "server/cluster/outgoing_slot_migration.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/dflycmd.h" @@ -726,6 +730,8 @@ static string_view StateToStr(MigrationState state) { return "ERROR"sv; case MigrationState::C_FINISHED: return "FINISHED"sv; + case MigrationState::C_FATAL: + return "FATAL"sv; } DCHECK(false) << "Unknown State value " << static_cast>(state); return "UNDEFINED_STATE"sv; @@ -765,7 +771,6 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b }; for (const auto& m : incoming_migrations_jobs_) { - // TODO add error status append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetKeyCount(), m->GetErrorStr()); } @@ -834,7 +839,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr new_config, removed_slots.Merge(slots); LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to " << migration.GetHostIp() << ":" << migration.GetPort(); - migration.Finish(); + migration.Finish(MigrationState::C_FINISHED); res.migrations.push_back(std::move(*it)); outgoing_migration_jobs_.erase(it); } @@ -936,6 +941,10 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) { DeleteSlots(slots); } + if (migration->GetState() == MigrationState::C_FATAL) { + return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); + } + migration->Init(flows_num); return builder->SendOk(); @@ -955,10 +964,15 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder, cntx->conn()->SetName(absl::StrCat("migration_flow_", source_id)); auto migration = GetIncomingMigration(source_id); + if (!migration) { return builder->SendError(kIdNotFound); } + if (migration->GetState() == MigrationState::C_FATAL) { + return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); + } + DCHECK(cntx->sync_dispatch); // we do this to be ignored by the dispatch tracker // TODO provide a more clear approach @@ -967,6 +981,10 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder, builder->SendOk(); migration->StartFlow(shard_id, cntx->conn()->socket()); + + if (migration->GetState() == MigrationState::C_FATAL) { + migration->Stop(); + } } void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id, @@ -1041,7 +1059,11 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) { return builder->SendError(kIdNotFound); if (!migration->Join(attempt)) { - return builder->SendError("Join timeout happened"); + if (migration->GetState() == MigrationState::C_FATAL) { + return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); + } else { + return builder->SendError("Join timeout happened"); + } } ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true); diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 2a6c8057a262..78a9c39ea7fb 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -7,9 +7,15 @@ #include #include +#include + #include "base/flags.h" #include "base/logging.h" #include "cluster_utility.h" +#include "facade/error.h" +#include "server/cluster/cluster_defs.h" +#include "server/cluster/outgoing_slot_migration.h" +#include "server/common.h" #include "server/error.h" #include "server/journal/executor.h" #include "server/journal/tx_executor.h" @@ -70,6 +76,20 @@ class ClusterShardMigration { break; } + auto oom_check = [&]() -> bool { + auto used_mem = used_mem_current.load(memory_order_relaxed); + if ((used_mem + tx_data->command.cmd_len) > max_memory_limit) { + cntx->ReportError(IncomingSlotMigration::kMigrationOOM); + in_migration_->ReportFatalError(GenericError(IncomingSlotMigration::kMigrationOOM)); + return true; + } + return false; + }; + + if (oom_check()) { + break; + } + while (tx_data->opcode == journal::Op::LSN) { VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn; last_attempt_.store(tx_data->lsn); @@ -79,6 +99,11 @@ class ClusterShardMigration { VLOG(1) << "Finalized flow " << source_shard_id_; return; } + if (oom_check()) { + VLOG(2) << "Flow finalization " << source_shard_id_ + << " canceled due memory limit reached"; + return; + } if (!tx_data->command.cmd_args.empty()) { VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by " << tx_data->command.cmd_args[0]; @@ -181,6 +206,11 @@ bool IncomingSlotMigration::Join(long attempt) { return false; } + // If any of migration shards reported ERROR (OOM) we can return error + if (GetState() == MigrationState::C_FATAL) { + return false; + } + // if data was sent after LSN, WaitFor() always returns false so to reduce wait time // we check current state and if WaitFor false but GetLastAttempt() == attempt // the Join is failed and we can return false @@ -251,7 +281,9 @@ void IncomingSlotMigration::Init(uint32_t shards_num) { void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) { shard_flows_[shard]->Start(&cntx_, source); - VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_; + VLOG(1) << "Incoming flow " << shard + << (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for " + << source_id_; } size_t IncomingSlotMigration::GetKeyCount() const { diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index 11edf400996f..425de870674e 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -3,6 +3,9 @@ // #pragma once +#include + +#include "absl/base/thread_annotations.h" #include "helio/util/fiber_socket_base.h" #include "server/cluster/cluster_defs.h" #include "server/common.h" @@ -50,10 +53,20 @@ class IncomingSlotMigration { return source_id_; } + // Switch to FATAL state and store error message + void ReportFatalError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(state_mu_, error_mu_) { + errors_count_.fetch_add(1, std::memory_order_relaxed); + util::fb2::LockGuard lk_state(state_mu_); + util::fb2::LockGuard lk_error(error_mu_); + state_ = MigrationState::C_FATAL; + last_error_ = std::move(err); + } + void ReportError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(error_mu_) { errors_count_.fetch_add(1, std::memory_order_relaxed); util::fb2::LockGuard lk(error_mu_); - last_error_ = std::move(err); + if (GetState() != MigrationState::C_FATAL) + last_error_ = std::move(err); } std::string GetErrorStr() const ABSL_LOCKS_EXCLUDED(error_mu_) { @@ -69,12 +82,15 @@ class IncomingSlotMigration { void Pause(bool pause); + static constexpr char kMigrationOOM[] = "INCOMING_MIGRATION_OOM"; + private: std::string source_id_; Service& service_; std::vector> shard_flows_; SlotRanges slots_; ExecutionState cntx_; + mutable util::fb2::Mutex error_mu_; dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_); std::atomic errors_count_ = 0; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 52d47b07be22..a813230db930 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -12,6 +12,10 @@ #include "base/logging.h" #include "cluster_family.h" #include "cluster_utility.h" +#include "facade/resp_expr.h" +#include "server/cluster/cluster_defs.h" +#include "server/cluster/incoming_slot_migration.h" +#include "server/common.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" #include "server/error.h" @@ -36,7 +40,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, journal::Journal* journal, OutgoingMigration* om) : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) { - exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); }); + exec_st_.SwitchErrorHandler( + [om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); }); } ~SliceSlotMigration() { @@ -138,10 +143,8 @@ void OutgoingMigration::OnAllShards( }); } -void OutgoingMigration::Finish(GenericError error) { - auto next_state = MigrationState::C_FINISHED; +void OutgoingMigration::Finish(MigrationState next_state, GenericError error) { if (error) { - next_state = MigrationState::C_ERROR; LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": " << migration_info_.node_info.id << " with error: " << error.Format(); exec_st_.ReportError(std::move(error)); @@ -164,6 +167,7 @@ void OutgoingMigration::Finish(GenericError error) { case MigrationState::C_SYNC: case MigrationState::C_ERROR: + case MigrationState::C_FATAL: should_cancel_flows = true; break; } @@ -271,12 +275,20 @@ void OutgoingMigration::SyncFb() { } long attempt = 0; + bool fatal_state = false; while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) { - // process commands that were on pause and try again - VLOG(1) << "Waiting for migration to finalize..."; - ThisFiber::SleepFor(500ms); + // Don't sleep if we ended up in FATAL state + if (GetState() == MigrationState::C_FATAL) { + fatal_state = true; + break; + } else { + // Process commands that were on pause and try again + VLOG(1) << "Waiting for migration to finalize..."; + ThisFiber::SleepFor(500ms); + } } - if (!exec_st_.IsRunning()) { + // End outgoing slot migration if we are FINISHED or are in FATAL state + if (!exec_st_.IsRunning() && !fatal_state) { continue; } break; @@ -355,6 +367,20 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { return false; } + if (CheckRespFirstTypes({RespExpr::ERROR})) { + auto error = facade::ToSV(LastResponseArgs().front().GetBuf()); + LOG(WARNING) << "Error response for " << cf_->MyID() << " : " << migration_info_.node_info.id + << " attempt " << attempt << " msg: " << error; + auto next_state = MigrationState::C_ERROR; + // Check if there is OOM response from incoming slot migration + if (error == IncomingSlotMigration::kMigrationOOM) { + SetLastError(GenericError(IncomingSlotMigration::kMigrationOOM)); + next_state = MigrationState::C_FATAL; + } + Finish(next_state, std::string(error)); + return false; + } + if (!CheckRespFirstTypes({RespExpr::INT64})) { LOG(WARNING) << "Incorrect response type for " << cf_->MyID() << " : " << migration_info_.node_info.id << " attempt " << attempt @@ -371,7 +397,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } if (!exec_st_.GetError()) { - Finish(); + Finish(MigrationState::C_FINISHED); keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, false); diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 6b020fc7eace..57cd8bb67f26 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient { // if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet // can be called from any thread, but only after Start() // if is_error = true and migration is in progress it will be restarted otherwise nothing happens - void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); + void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_); diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index bc43095c0c6d..0ff9c13a05e3 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -33,7 +33,7 @@ template journal::ParsedEntry::CmdData BuildFromParts(Ts... par start += part.size(); } - return {std::move(buf), std::move(slice_parts)}; + return {std::move(buf), std::move(slice_parts), cmd_str.size()}; } } // namespace diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index be7d7ba839cb..1a148fad48b4 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -162,6 +162,7 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data) size_t cmd_size = 0; SET_OR_RETURN(ReadUInt(), cmd_size); + data->cmd_len = cmd_size; // Read all strings consecutively. data->command_buf = make_unique(cmd_size); @@ -174,6 +175,9 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data) ptr += size; cmd_size -= size; } + + data->cmd_len -= cmd_size; + return {}; } diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 24183623a5e3..ce06e8347384 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -73,6 +73,7 @@ struct ParsedEntry : public EntryBase { struct CmdData { std::unique_ptr command_buf; CmdArgVec cmd_args; // represents the parsed command. + size_t cmd_len{0}; }; CmdData cmd; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index ac5faa7c042b..6e7144138f55 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -3236,3 +3236,60 @@ async def test_cancel_blocking_cmd_during_mygration_finalization(df_factory: Dfl await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes]) assert await c_nodes[1].type("list") == "none" + + +@dfly_args({"cluster_mode": "yes"}) +async def test_slot_migration_oom(df_factory): + instances = [ + df_factory.create( + port=next(next_port), + admin_port=next(next_port), + proactor_threads=4, + maxmemory="1024MB", + ), + df_factory.create( + port=next(next_port), + admin_port=next(next_port), + proactor_threads=2, + maxmemory="512MB", + ), + ] + + df_factory.start_all(instances) + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + await nodes[0].client.execute_command("DEBUG POPULATE 100 test 10000000") + + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id) + ) + + logging.info("Start migration") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # Wait for FATAL status + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FATAL", 300) + await wait_for_status(nodes[1].admin_client, nodes[0].id, "FATAL") + + # Node_0 slot-migration-status + status = await nodes[0].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id + ) + # Direction + assert status[0][0] == "out" + # Error message + assert status[0][4] == "INCOMING_MIGRATION_OOM" + + # Node_1 slot-migration-status + status = await nodes[1].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id + ) + # Direction + assert status[0][0] == "in" + # Error message + assert status[0][4] == "INCOMING_MIGRATION_OOM" diff --git a/tools/cluster_mgr.py b/tools/cluster_mgr.py index 4530d8902133..007ebbbee053 100755 --- a/tools/cluster_mgr.py +++ b/tools/cluster_mgr.py @@ -449,6 +449,8 @@ def migrate(args): continue if len(sync_status) != 1: die_with_err(f"Unexpected number of migrations {len(sync_status)}: {sync_status}") + if "FATAL" in sync_status[0]: + die_with_err(f"Error in migration {len(sync_status)}: {sync_status}") if "FINISHED" in sync_status[0]: print(f"Migration finished: {sync_status[0]}") break From f080a5208aefc8c283d86779fef20dc278033c64 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Wed, 7 May 2025 13:29:55 +0200 Subject: [PATCH 2/3] Changes after review --- src/server/cluster/cluster_defs.h | 4 ++ src/server/cluster/cluster_family.cc | 22 ++------ src/server/cluster/incoming_slot_migration.cc | 33 ++++++------ src/server/cluster/incoming_slot_migration.h | 5 -- src/server/cluster/outgoing_slot_migration.cc | 52 ++++++++++--------- src/server/cluster/outgoing_slot_migration.h | 4 +- 6 files changed, 52 insertions(+), 68 deletions(-) diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h index 47771ebc525c..b130f20b0698 100644 --- a/src/server/cluster/cluster_defs.h +++ b/src/server/cluster/cluster_defs.h @@ -174,6 +174,10 @@ class ClusterShardInfos { // MigrationState constants are ordered in state changing order enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL }; +// Errors during slot migration +static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION"; +static constexpr std::string_view kIncomingMigrationOOM = "INCOMING_MIGRATION_OOM"; + // return error message if slot doesn't belong to this node facade::ErrorReply SlotOwnershipError(SlotId slot_id); diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index ff0a9922e48a..37419d1c332a 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -9,7 +9,6 @@ #include #include "absl/cleanup/cleanup.h" -#include "absl/strings/str_cat.h" #include "base/flags.h" #include "base/logging.h" #include "facade/cmd_arg_parser.h" @@ -17,9 +16,6 @@ #include "facade/error.h" #include "server/acl/acl_commands_def.h" #include "server/channel_store.h" -#include "server/cluster/cluster_defs.h" -#include "server/cluster/incoming_slot_migration.h" -#include "server/cluster/outgoing_slot_migration.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/dflycmd.h" @@ -839,7 +835,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr new_config, removed_slots.Merge(slots); LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to " << migration.GetHostIp() << ":" << migration.GetPort(); - migration.Finish(MigrationState::C_FINISHED); + migration.Finish(); res.migrations.push_back(std::move(*it)); outgoing_migration_jobs_.erase(it); } @@ -930,7 +926,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) { if (!migration) { VLOG(1) << "Unrecognized incoming migration from " << source_id; - return builder->SendSimpleString(OutgoingMigration::kUnknownMigration); + return builder->SendSimpleString(kUnknownMigration); } if (migration->GetState() != MigrationState::C_CONNECTING) { @@ -942,7 +938,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) { } if (migration->GetState() == MigrationState::C_FATAL) { - return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); + return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM)); } migration->Init(flows_num); @@ -969,10 +965,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder, return builder->SendError(kIdNotFound); } - if (migration->GetState() == MigrationState::C_FATAL) { - return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); - } - DCHECK(cntx->sync_dispatch); // we do this to be ignored by the dispatch tracker // TODO provide a more clear approach @@ -981,10 +973,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder, builder->SendOk(); migration->StartFlow(shard_id, cntx->conn()->socket()); - - if (migration->GetState() == MigrationState::C_FATAL) { - migration->Stop(); - } } void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id, @@ -1051,7 +1039,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) { [source_id = source_id](const auto& m) { return m.node_info.id == source_id; }); if (m_it == in_migrations.end()) { LOG(WARNING) << "migration isn't in config"; - return builder->SendError(OutgoingMigration::kUnknownMigration); + return builder->SendError(kUnknownMigration); } auto migration = GetIncomingMigration(source_id); @@ -1060,7 +1048,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) { if (!migration->Join(attempt)) { if (migration->GetState() == MigrationState::C_FATAL) { - return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); + return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM)); } else { return builder->SendError("Join timeout happened"); } diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 78a9c39ea7fb..bb3c2e9c51b5 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -7,15 +7,10 @@ #include #include -#include - #include "base/flags.h" #include "base/logging.h" #include "cluster_utility.h" -#include "facade/error.h" #include "server/cluster/cluster_defs.h" -#include "server/cluster/outgoing_slot_migration.h" -#include "server/common.h" #include "server/error.h" #include "server/journal/executor.h" #include "server/journal/tx_executor.h" @@ -76,17 +71,11 @@ class ClusterShardMigration { break; } - auto oom_check = [&]() -> bool { - auto used_mem = used_mem_current.load(memory_order_relaxed); - if ((used_mem + tx_data->command.cmd_len) > max_memory_limit) { - cntx->ReportError(IncomingSlotMigration::kMigrationOOM); - in_migration_->ReportFatalError(GenericError(IncomingSlotMigration::kMigrationOOM)); - return true; - } - return false; - }; - - if (oom_check()) { + auto used_mem = used_mem_current.load(memory_order_relaxed); + // If aplying transaction data will reach 90% of max_memory_limit we end migration. + if ((used_mem + tx_data->command.cmd_len) > (0.9 * max_memory_limit)) { + cntx->ReportError(std::string{kIncomingMigrationOOM}); + in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM}); break; } @@ -99,8 +88,8 @@ class ClusterShardMigration { VLOG(1) << "Finalized flow " << source_shard_id_; return; } - if (oom_check()) { - VLOG(2) << "Flow finalization " << source_shard_id_ + if (in_migration_->GetState() == MigrationState::C_FATAL) { + VLOG(1) << "Flow finalization " << source_shard_id_ << " canceled due memory limit reached"; return; } @@ -248,6 +237,11 @@ void IncomingSlotMigration::Stop() { } } + // Don't wait if we reached FATAL state + if (state_ == MigrationState::C_FATAL) { + return; + } + // we need to Join the migration process to prevent data corruption const absl::Time start = absl::Now(); const absl::Duration timeout = @@ -284,6 +278,9 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou VLOG(1) << "Incoming flow " << shard << (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for " << source_id_; + if (GetState() == MigrationState::C_FATAL) { + Stop(); + } } size_t IncomingSlotMigration::GetKeyCount() const { diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index 425de870674e..481245c6d3f3 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -3,9 +3,6 @@ // #pragma once -#include - -#include "absl/base/thread_annotations.h" #include "helio/util/fiber_socket_base.h" #include "server/cluster/cluster_defs.h" #include "server/common.h" @@ -82,8 +79,6 @@ class IncomingSlotMigration { void Pause(bool pause); - static constexpr char kMigrationOOM[] = "INCOMING_MIGRATION_OOM"; - private: std::string source_id_; Service& service_; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index a813230db930..58219a92c87c 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -12,10 +12,7 @@ #include "base/logging.h" #include "cluster_family.h" #include "cluster_utility.h" -#include "facade/resp_expr.h" #include "server/cluster/cluster_defs.h" -#include "server/cluster/incoming_slot_migration.h" -#include "server/common.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" #include "server/error.h" @@ -40,8 +37,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, journal::Journal* journal, OutgoingMigration* om) : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) { - exec_st_.SwitchErrorHandler( - [om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); }); + exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); }); } ~SliceSlotMigration() { @@ -143,8 +139,14 @@ void OutgoingMigration::OnAllShards( }); } -void OutgoingMigration::Finish(MigrationState next_state, GenericError error) { +void OutgoingMigration::Finish(GenericError error) { + auto next_state = MigrationState::C_FINISHED; if (error) { + if (error.Format() == kIncomingMigrationOOM) { + next_state = MigrationState::C_FATAL; + } else { + next_state = MigrationState::C_ERROR; + } LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": " << migration_info_.node_info.id << " with error: " << error.Format(); exec_st_.ReportError(std::move(error)); @@ -225,6 +227,15 @@ void OutgoingMigration::SyncFb() { continue; } + // Break outgoing migration if INIT from incoming node responded with OOM. Usually this will + // happen on second iteration after first failed with OOM. Sending second INIT is required to + // cleanup slots on incoming slot migration node. + if (CheckRespFirstTypes({RespExpr::ERROR}) && + facade::ToSV(LastResponseArgs().front().GetBuf()) == kIncomingMigrationOOM) { + ChangeState(MigrationState::C_FATAL); + break; + } + if (!CheckRespIsSimpleReply("OK")) { if (CheckRespIsSimpleReply(kUnknownMigration)) { const absl::Duration passed = absl::Now() - start_time; @@ -275,20 +286,16 @@ void OutgoingMigration::SyncFb() { } long attempt = 0; - bool fatal_state = false; while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) { - // Don't sleep if we ended up in FATAL state + // Break loop and don't sleep in case of C_FATAL if (GetState() == MigrationState::C_FATAL) { - fatal_state = true; break; - } else { - // Process commands that were on pause and try again - VLOG(1) << "Waiting for migration to finalize..."; - ThisFiber::SleepFor(500ms); } + // Process commands that were on pause and try again + VLOG(1) << "Waiting for migration to finalize..."; + ThisFiber::SleepFor(500ms); } - // End outgoing slot migration if we are FINISHED or are in FATAL state - if (!exec_st_.IsRunning() && !fatal_state) { + if (!exec_st_.IsRunning()) { continue; } break; @@ -369,16 +376,11 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { if (CheckRespFirstTypes({RespExpr::ERROR})) { auto error = facade::ToSV(LastResponseArgs().front().GetBuf()); - LOG(WARNING) << "Error response for " << cf_->MyID() << " : " << migration_info_.node_info.id - << " attempt " << attempt << " msg: " << error; - auto next_state = MigrationState::C_ERROR; - // Check if there is OOM response from incoming slot migration - if (error == IncomingSlotMigration::kMigrationOOM) { - SetLastError(GenericError(IncomingSlotMigration::kMigrationOOM)); - next_state = MigrationState::C_FATAL; + // Check if returned incoming slot OOM and finish migration + if (error == kIncomingMigrationOOM) { + Finish(std::string(error)); + return false; } - Finish(next_state, std::string(error)); - return false; } if (!CheckRespFirstTypes({RespExpr::INT64})) { @@ -397,7 +399,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } if (!exec_st_.GetError()) { - Finish(MigrationState::C_FINISHED); + Finish(); keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, false); diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 57cd8bb67f26..9ed9ccd4abee 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient { // if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet // can be called from any thread, but only after Start() // if is_error = true and migration is in progress it will be restarted otherwise nothing happens - void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); + void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_); @@ -76,8 +76,6 @@ class OutgoingMigration : private ProtocolClient { size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_); - static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION"; - private: // should be run for all shards void StartFlow(journal::Journal* journal, io::Sink* dest); From 40851f9a2c9b15f9726fe6561929148c3a7655ea Mon Sep 17 00:00:00 2001 From: mkaruza Date: Wed, 7 May 2025 21:11:59 +0200 Subject: [PATCH 3/3] Simplify handling error response check --- src/server/cluster/cluster_family.cc | 2 +- src/server/cluster/incoming_slot_migration.cc | 1 - src/server/cluster/outgoing_slot_migration.cc | 28 ++++++------------- src/server/cluster/outgoing_slot_migration.h | 2 +- src/server/protocol_client.cc | 5 ++++ src/server/protocol_client.h | 3 ++ 6 files changed, 19 insertions(+), 22 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 37419d1c332a..3368baf0e756 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -835,7 +835,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr new_config, removed_slots.Merge(slots); LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to " << migration.GetHostIp() << ":" << migration.GetPort(); - migration.Finish(); + migration.Finish(MigrationState::C_FINISHED); res.migrations.push_back(std::move(*it)); outgoing_migration_jobs_.erase(it); } diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index bb3c2e9c51b5..4c0420c71de5 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -10,7 +10,6 @@ #include "base/flags.h" #include "base/logging.h" #include "cluster_utility.h" -#include "server/cluster/cluster_defs.h" #include "server/error.h" #include "server/journal/executor.h" #include "server/journal/tx_executor.h" diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 58219a92c87c..60086d784c56 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -12,7 +12,6 @@ #include "base/logging.h" #include "cluster_family.h" #include "cluster_utility.h" -#include "server/cluster/cluster_defs.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" #include "server/error.h" @@ -37,7 +36,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, journal::Journal* journal, OutgoingMigration* om) : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) { - exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); }); + exec_st_.SwitchErrorHandler( + [om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); }); } ~SliceSlotMigration() { @@ -139,14 +139,8 @@ void OutgoingMigration::OnAllShards( }); } -void OutgoingMigration::Finish(GenericError error) { - auto next_state = MigrationState::C_FINISHED; +void OutgoingMigration::Finish(MigrationState next_state, GenericError error) { if (error) { - if (error.Format() == kIncomingMigrationOOM) { - next_state = MigrationState::C_FATAL; - } else { - next_state = MigrationState::C_ERROR; - } LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": " << migration_info_.node_info.id << " with error: " << error.Format(); exec_st_.ReportError(std::move(error)); @@ -230,8 +224,7 @@ void OutgoingMigration::SyncFb() { // Break outgoing migration if INIT from incoming node responded with OOM. Usually this will // happen on second iteration after first failed with OOM. Sending second INIT is required to // cleanup slots on incoming slot migration node. - if (CheckRespFirstTypes({RespExpr::ERROR}) && - facade::ToSV(LastResponseArgs().front().GetBuf()) == kIncomingMigrationOOM) { + if (CheckRespSimpleError(kIncomingMigrationOOM)) { ChangeState(MigrationState::C_FATAL); break; } @@ -374,13 +367,10 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { return false; } - if (CheckRespFirstTypes({RespExpr::ERROR})) { - auto error = facade::ToSV(LastResponseArgs().front().GetBuf()); - // Check if returned incoming slot OOM and finish migration - if (error == kIncomingMigrationOOM) { - Finish(std::string(error)); - return false; - } + // Check OOM from incoming slot migration on ACK request + if (CheckRespSimpleError(kIncomingMigrationOOM)) { + Finish(MigrationState::C_FATAL, std::string(kIncomingMigrationOOM)); + return false; } if (!CheckRespFirstTypes({RespExpr::INT64})) { @@ -399,7 +389,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } if (!exec_st_.GetError()) { - Finish(); + Finish(MigrationState::C_FINISHED); keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, false); diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 9ed9ccd4abee..95a32feb9534 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient { // if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet // can be called from any thread, but only after Start() // if is_error = true and migration is in progress it will be restarted otherwise nothing happens - void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); + void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_); diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 8ecd6f65db7d..895e2af1a04a 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -339,6 +339,11 @@ bool ProtocolClient::CheckRespIsSimpleReply(string_view reply) const { ToSV(resp_args_.front().GetBuf()) == reply; } +bool ProtocolClient::CheckRespSimpleError(string_view error) const { + return resp_args_.size() == 1 && resp_args_.front().type == RespExpr::ERROR && + ToSV(resp_args_.front().GetBuf()) == error; +} + bool ProtocolClient::CheckRespFirstTypes(initializer_list types) const { unsigned i = 0; for (RespExpr::Type type : types) { diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 6c08a0bb862f..7e7ddda036b7 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -85,6 +85,9 @@ class ProtocolClient { // Check if reps_args contains a simple reply. bool CheckRespIsSimpleReply(std::string_view reply) const; + // Check if resp_args contains a simple error + bool CheckRespSimpleError(std::string_view error) const; + // Check resp_args contains the following types at front. bool CheckRespFirstTypes(std::initializer_list types) const;