@@ -170,9 +170,13 @@ std::optional<Replica::LastMasterSyncData> Replica::Stop() {
170170  sync_fb_.JoinIfNeeded ();
171171  DVLOG (1 ) << " MainReplicationFb stopped " this ;
172172  acks_fb_.JoinIfNeeded ();
173-   for  (auto & flow : shard_flows_) {
174-     flow.reset ();
175-   }
173+ 
174+   proactor_->Await ([this ]() {
175+     for  (auto & flow : shard_flows_) {
176+       flow.reset ();
177+     }
178+     shard_flows_.clear ();
179+   });
176180
177181  if  (last_journal_LSNs_.has_value ()) {
178182    return  LastMasterSyncData{master_context_.master_repl_id , last_journal_LSNs_.value ()};
@@ -501,29 +505,41 @@ error_code Replica::InitiatePSync() {
501505  return  error_code{};
502506}
503507
508+ void  Replica::InitializeShardFlows () {
509+   decltype (shard_flows_) shard_flows_copy;
510+   shard_flows_copy.resize (master_context_.num_flows );
511+   DCHECK (!shard_flows_copy.empty ());
512+   thread_flow_map_ = Partition (shard_flows_copy.size ());
513+   const  size_t  pool_sz = shard_set->pool ()->size ();
514+ 
515+   shard_set->pool ()->AwaitFiberOnAll ([pool_sz, this , &shard_flows_copy](auto  index, auto * ctx) {
516+     for  (unsigned  i = index; i < shard_flows_copy.size (); i += pool_sz) {
517+       uint64_t  partial_sync_lsn = 0 ;
518+       if  (shard_flows_[i]) {
519+         partial_sync_lsn = shard_flows_[i]->JournalExecutedCount ();
520+       }
521+       shard_flows_copy[i].reset (
522+           new  DflyShardReplica (server (), master_context_, i, &service_, multi_shard_exe_));
523+       if  (partial_sync_lsn > 0 ) {
524+         shard_flows_[i]->SetRecordsExecuted (partial_sync_lsn);
525+       }
526+     }
527+   });
528+   //  now update shard_flows on proactor thread
529+   shard_flows_ = std::move (shard_flows_copy);
530+ }
531+ 
504532//  Initialize and start sub-replica for each flow.
505533error_code Replica::InitiateDflySync (std::optional<LastMasterSyncData> last_master_sync_data) {
506534  auto  start_time = absl::Now ();
507535
508536  //  Initialize MultiShardExecution.
509537  multi_shard_exe_.reset (new  MultiShardExecution ());
510538
511-   //  Initialize shard flows.
512-   shard_flows_.resize (master_context_.num_flows );
513-   DCHECK (!shard_flows_.empty ());
514-   for  (unsigned  i = 0 ; i < shard_flows_.size (); ++i) {
515-     //  Transfer LSN state for partial sync
516-     uint64_t  partial_sync_lsn = 0 ;
517-     if  (shard_flows_[i]) {
518-       partial_sync_lsn = shard_flows_[i]->JournalExecutedCount ();
519-     }
520-     shard_flows_[i].reset (
521-         new  DflyShardReplica (server (), master_context_, i, &service_, multi_shard_exe_));
522-     if  (partial_sync_lsn > 0 ) {
523-       shard_flows_[i]->SetRecordsExecuted (partial_sync_lsn);
524-     }
525-   }
526-   thread_flow_map_ = Partition (shard_flows_.size ());
539+   //  Initialize shard flows. The update to the shard_flows_ should be done by this thread.
540+   //  Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset()
541+   //  below.
542+   InitializeShardFlows ();
527543
528544  //  Blocked on until all flows got full sync cut.
529545  BlockingCounter sync_block{unsigned (shard_flows_.size ())};
@@ -1215,6 +1231,7 @@ auto Replica::GetSummary() const -> Summary {
12151231    //  Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
12161232    //  it's unlikely to cause a real bug.
12171233    for  (const  auto & flow : shard_flows_) {  //  Get last io time from all sub flows.
1234+       DCHECK (Proactor () == ProactorBase::me ());
12181235      last_io_time = std::max (last_io_time, flow->LastIoTime ());
12191236    }
12201237
@@ -1246,25 +1263,14 @@ auto Replica::GetSummary() const -> Summary {
12461263    return  res;
12471264  };
12481265
1249-   if  (Sock ())
1250-     return  Proactor ()->AwaitBrief (f);
1251- 
1252-   /* *
1253-    * when this branch happens: there is a very short grace period 
1254-    * where Sock() is not initialized, yet the server can 
1255-    * receive ROLE/INFO commands. That period happens when launching 
1256-    * an instance with '--replicaof' and then immediately 
1257-    * sending a command. 
1258-    * 
1259-    * In that instance, we have to run f() on the current fiber. 
1260-    */  
1261-   return  f ();
1266+   return  Proactor ()->AwaitBrief (f);
12621267}
12631268
12641269std::vector<uint64_t > Replica::GetReplicaOffset () const  {
12651270  std::vector<uint64_t > flow_rec_count;
12661271  flow_rec_count.resize (shard_flows_.size ());
12671272  for  (const  auto & flow : shard_flows_) {
1273+     DCHECK (flow.get ());
12681274    uint32_t  flow_id = flow->FlowId ();
12691275    uint64_t  rec_count = flow->JournalExecutedCount ();
12701276    DCHECK_LT (flow_id, shard_flows_.size ());
0 commit comments