Skip to content

Commit 196805c

Browse files
committed
Set flag to connection context if OOM is reported
1 parent dc0005f commit 196805c

File tree

4 files changed

+18
-9
lines changed

4 files changed

+18
-9
lines changed

src/server/cluster/cluster_family.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1039,7 +1039,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
10391039
[source_id = source_id](const auto& m) { return m.node_info.id == source_id; });
10401040
if (m_it == in_migrations.end()) {
10411041
LOG(WARNING) << "migration isn't in config";
1042-
return builder->SendError(kUnknownMigration);
1042+
return builder->SendSimpleString(kUnknownMigration);
10431043
}
10441044

10451045
auto migration = GetIncomingMigration(source_id);

src/server/cluster/incoming_slot_migration.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,6 @@ class ClusterShardMigration {
7676
break;
7777
}
7878

79-
auto used_mem = used_mem_current.load(memory_order_relaxed);
80-
// If aplying transaction data will reach 90% of max_memory_limit we end migration.
81-
if ((used_mem + tx_data->command.cmd_len) > (0.9 * max_memory_limit)) {
82-
cntx->ReportError(std::string{kIncomingMigrationOOM});
83-
in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM});
84-
break;
85-
}
86-
8779
while (tx_data->opcode == journal::Op::LSN) {
8880
VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn;
8981
last_attempt_.store(tx_data->lsn);
@@ -112,6 +104,12 @@ class ClusterShardMigration {
112104
// TODO check about ping logic
113105
} else {
114106
ExecuteTx(std::move(*tx_data), cntx);
107+
// Break incoming slot migration if command reported OOM
108+
if (executor_.connection_context()->IsOOM()) {
109+
cntx->ReportError(std::string{kIncomingMigrationOOM});
110+
in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM});
111+
break;
112+
}
115113
}
116114
}
117115

src/server/conn_context.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,10 @@ class ConnectionContext : public facade::ConnectionContext {
295295
return conn_state.db_index;
296296
}
297297

298+
bool IsOOM() {
299+
return std::exchange(is_oom_, false);
300+
}
301+
298302
void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args,
299303
facade::RedisReplyBuilder* rb);
300304

@@ -323,6 +327,9 @@ class ConnectionContext : public facade::ConnectionContext {
323327
// The related connection is bound to main listener or serves the memcached protocol
324328
bool has_main_or_memcache_listener = false;
325329

330+
// OOM reported while executing
331+
bool is_oom_ = false;
332+
326333
private:
327334
void EnableMonitoring(bool enable) {
328335
subscriptions++; // required to support the monitoring

src/server/main_service.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,6 +1405,10 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args,
14051405
}
14061406

14071407
if (std::string reason = builder->ConsumeLastError(); !reason.empty()) {
1408+
// Set flag if OOM reported
1409+
if (reason == kOutOfMemory) {
1410+
cmd_cntx.conn_cntx->is_oom_ = true;
1411+
}
14081412
VLOG(2) << FailedCommandToString(cid->name(), tail_args, reason);
14091413
LOG_EVERY_T(WARNING, 1) << FailedCommandToString(cid->name(), tail_args, reason);
14101414
}

0 commit comments

Comments
 (0)