@@ -649,6 +649,183 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
649649  rb->SendOk ();
650650}
651651
652+ namespace  {
653+ 
654+ struct  ShardJournalChannel  : journal::JournalConsumerInterface, io::Source {
655+   void  ConsumeJournalChange (const  journal::JournalChangeItem& item) override  {
656+     //  TODO don't block journal slice
657+     ec.await ([&] { return  wpos < cap; });
658+     buffer[wpos++] = item.journal_item .data ;
659+     ec.notifyAll ();
660+     parent.notifyAll ();
661+   }
662+ 
663+   io::Result<unsigned  long > ReadSome (const  iovec* v, uint32_t  len) override  {
664+     ec.await ([&] { return  rpos < wpos; });
665+ 
666+     auto  bytes_read = 0 ;
667+     size_t  chunk_size = 0 ;
668+     bool  partial_read = false ;
669+     while  (rpos < wpos && len > 0 ) {
670+       auto & item = buffer[rpos];
671+       chunk_size = std::min (v->iov_len , item.size ());
672+       std::copy_n (item.data (), chunk_size, static_cast <char *>(v->iov_base ));
673+ 
674+       bytes_read += chunk_size;
675+       ++v;
676+       --len;
677+ 
678+       //  If we were not able to fully consume the string, erase the front of it but remain at same
679+       //  index
680+       partial_read = chunk_size < item.size ();
681+       if  (!partial_read) {
682+         ++rpos;
683+       } else  {
684+         item.erase (0 , chunk_size);
685+       }
686+     }
687+ 
688+     if  (rpos == wpos && wpos == cap) {
689+       auto  new_size = 0 ;
690+       if  (partial_read) {
691+         buffer[0 ] = std::move (buffer[rpos]);
692+         new_size = 1 ;
693+       }
694+ 
695+       rpos = 0 ;
696+       wpos = new_size;
697+       buffer.resize (new_size);
698+     }
699+ 
700+     ec.notifyAll ();
701+     return  bytes_read;
702+   }
703+ 
704+   bool  HasData () const  {
705+     return  rpos > 0 ;
706+   }
707+ 
708+   fb2::EventCount ec;
709+   size_t  rpos{0 };
710+   size_t  wpos{0 };
711+   size_t  cap{5 };
712+   std::vector<std::string> buffer;
713+   fb2::EventCount& parent;
714+ };
715+ 
716+ struct  Pipe  final  : io::Source, io::Sink {
717+   io::Result<unsigned  long > ReadSome (const  iovec* v, uint32_t  len) override  {
718+     if  (done) {
719+       return  0 ;
720+     }
721+ 
722+     ec.await ([&] { return  rpos < wpos; });
723+     auto  bytes_read = 0 ;
724+ 
725+     while  (rpos < wpos && len > 0 ) {
726+       const  auto  chunk_size = min (wpos - rpos, v->iov_len );
727+       std::copy_n (buffer.begin () + rpos, chunk_size, static_cast <char *>(v->iov_base ));
728+       bytes_read += chunk_size;
729+       rpos += chunk_size;
730+       ++v;
731+       --len;
732+     }
733+ 
734+     if  (rpos == wpos && wpos == cap) {
735+       rpos = 0 ;
736+       wpos = 0 ;
737+       ec.notifyAll ();
738+     }
739+ 
740+     return  bytes_read;
741+   }
742+ 
743+   io::Result<unsigned  long > WriteSome (const  iovec* v, uint32_t  len) override  {
744+     CHECK (!done);
745+     ec.await ([&] { return  wpos < cap; });
746+     int  bytes_written = 0 ;
747+ 
748+     while  (wpos < cap && len > 0 ) {
749+       const  auto  chunk_size = std::min (cap - wpos, v->iov_len );
750+       auto  p = static_cast <const  char *>(v->iov_base );
751+       std::copy_n (p, chunk_size, buffer.begin () + wpos);
752+       bytes_written += chunk_size;
753+       wpos += chunk_size;
754+       ++v;
755+       --len;
756+     }
757+ 
758+     std::string debugging{reinterpret_cast <const  char *>(buffer.data () + 5 ), wpos};
759+     VLOG (1 ) << " debugging: "   << debugging;
760+     ec.notifyAll ();
761+     return  bytes_written;
762+   }
763+ 
764+   std::array<uint8_t , 1024 > buffer;
765+   size_t  rpos{0 };
766+   size_t  wpos{0 };
767+   size_t  cap{1024 };
768+   std::atomic_bool done{false };
769+   fb2::EventCount ec;
770+ };
771+ 
772+ }  //  namespace
773+ 
774+ void  DflyCmd::StartValkeySync () {
775+   auto  Write = [this ](auto  v) {
776+     const  auto  buf = io::Bytes (reinterpret_cast <const  unsigned  char *>(v.data ()), v.size ());
777+     CHECK (!_valkey_replica->conn ->socket ()->Write (buf));
778+   };
779+ 
780+   CHECK (_valkey_replica.has_value ()) << " There is no valkey replica to sync with"  ;
781+ 
782+   //  Since we do not know the size of rdb up front, use the EOF protocol, send
783+   //  "$EOF:<40-random-chars>\n" first, then the same 40 chars at the end
784+   std::string eof_mark (40 , ' X'  );
785+   std::string eof_mark_with_prefix = absl::StrCat (" $EOF:"  , eof_mark, " \n "  );
786+ 
787+   Write (eof_mark_with_prefix);
788+ 
789+   for  (unsigned  i = 0 ; i < shard_set->size (); ++i) {
790+     Pipe p;
791+     auto  cb = [&] {
792+       std::array<uint8_t , 128 > backing;
793+       const  io::MutableBytes mb{backing};
794+       while  (!p.done ) {
795+         if  (auto  n = p.Read (mb); !n.has_value () || n.value () == 0 ) {
796+           break ;
797+         }
798+         CHECK (!_valkey_replica->conn ->socket ()->Write (mb));
799+       }
800+ 
801+       if  (auto  n = p.Read (mb); n.has_value () && n.value ()) {
802+         CHECK (!_valkey_replica->conn ->socket ()->Write (mb));
803+       }
804+     };
805+     auto  drain_fb = fb2::Fiber (" replica-drain-fb"  , cb);
806+ 
807+     shard_set->Await (i, [&p, this , i] {
808+       auto  shard = EngineShard::tlocal ();
809+       auto  mode = i == 0  ? SaveMode::SINGLE_SHARD_WITH_SUMMARY : SaveMode::SINGLE_SHARD;
810+       RdbSaver saver{&p, mode, false , " "  };
811+       if  (mode == SaveMode::SINGLE_SHARD_WITH_SUMMARY) {
812+         CHECK (!saver.SaveHeader (saver.GetGlobalData (&sf_->service ())));
813+       }
814+ 
815+       saver.StartSnapshotInShard (false , &_valkey_replica->exec_st , shard);
816+       CHECK (!saver.WaitSnapshotInShard (shard));
817+       p.done  = true ;
818+       VLOG (1 ) << " finished writing snapshot for shard "   << shard->shard_id ();
819+     });
820+ 
821+     drain_fb.JoinIfNeeded ();
822+   }
823+ 
824+   Write (eof_mark);
825+ 
826+   //  Stable sync
827+ }
828+ 
652829OpStatus DflyCmd::StartFullSyncInThread (FlowInfo* flow, ExecutionState* exec_st,
653830                                        EngineShard* shard) {
654831  DCHECK (shard);
@@ -730,6 +907,11 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E
730907  };
731908}
732909
910+ void  DflyCmd::CreateValkeySyncSession (facade::Connection* conn) {
911+   fb2::LockGuard lk (mu_);
912+   _valkey_replica.emplace (conn, [](const  GenericError&) {});
913+ }
914+ 
733915auto  DflyCmd::CreateSyncSession (ConnectionState* state) -> std::pair<uint32_t, unsigned> {
734916  util::fb2::LockGuard lk (mu_);
735917  unsigned  sync_id = next_sync_id_++;
0 commit comments