12
12
#include " base/logging.h"
13
13
#include " cluster_family.h"
14
14
#include " cluster_utility.h"
15
- #include " server/cluster/cluster_defs.h"
16
15
#include " server/db_slice.h"
17
16
#include " server/engine_shard_set.h"
18
17
#include " server/error.h"
@@ -37,7 +36,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
37
36
SliceSlotMigration (DbSlice* slice, ServerContext server_context, SlotSet slots,
38
37
journal::Journal* journal, OutgoingMigration* om)
39
38
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) {
40
- exec_st_.SwitchErrorHandler ([om](auto ge) { om->Finish (std::move (ge)); });
39
+ exec_st_.SwitchErrorHandler (
40
+ [om](auto ge) { om->Finish (MigrationState::C_ERROR, std::move (ge)); });
41
41
}
42
42
43
43
~SliceSlotMigration () {
@@ -139,14 +139,8 @@ void OutgoingMigration::OnAllShards(
139
139
});
140
140
}
141
141
142
- void OutgoingMigration::Finish (GenericError error) {
143
- auto next_state = MigrationState::C_FINISHED;
142
+ void OutgoingMigration::Finish (MigrationState next_state, GenericError error) {
144
143
if (error) {
145
- if (error.Format () == kIncomingMigrationOOM ) {
146
- next_state = MigrationState::C_FATAL;
147
- } else {
148
- next_state = MigrationState::C_ERROR;
149
- }
150
144
LOG (WARNING) << " Finish outgoing migration for " << cf_->MyID () << " : "
151
145
<< migration_info_.node_info .id << " with error: " << error.Format ();
152
146
exec_st_.ReportError (std::move (error));
@@ -230,8 +224,7 @@ void OutgoingMigration::SyncFb() {
230
224
// Break outgoing migration if INIT from incoming node responded with OOM. Usually this will
231
225
// happen on second iteration after first failed with OOM. Sending second INIT is required to
232
226
// cleanup slots on incoming slot migration node.
233
- if (CheckRespFirstTypes ({RespExpr::ERROR}) &&
234
- facade::ToSV (LastResponseArgs ().front ().GetBuf ()) == kIncomingMigrationOOM ) {
227
+ if (CheckRespSimpleError (kIncomingMigrationOOM )) {
235
228
ChangeState (MigrationState::C_FATAL);
236
229
break ;
237
230
}
@@ -374,13 +367,10 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
374
367
return false ;
375
368
}
376
369
377
- if (CheckRespFirstTypes ({RespExpr::ERROR})) {
378
- auto error = facade::ToSV (LastResponseArgs ().front ().GetBuf ());
379
- // Check if returned incoming slot OOM and finish migration
380
- if (error == kIncomingMigrationOOM ) {
381
- Finish (std::string (error));
382
- return false ;
383
- }
370
+ // Check OOM from incoming slot migration on ACK request
371
+ if (CheckRespSimpleError (kIncomingMigrationOOM )) {
372
+ Finish (MigrationState::C_FATAL, std::string (kIncomingMigrationOOM ));
373
+ return false ;
384
374
}
385
375
386
376
if (!CheckRespFirstTypes ({RespExpr::INT64})) {
@@ -399,7 +389,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
399
389
}
400
390
401
391
if (!exec_st_.GetError ()) {
402
- Finish ();
392
+ Finish (MigrationState::C_FINISHED );
403
393
keys_number_ = cluster::GetKeyCount (migration_info_.slot_ranges );
404
394
cf_->ApplyMigrationSlotRangeToConfig (migration_info_.node_info .id , migration_info_.slot_ranges ,
405
395
false );
0 commit comments