3232  fastsync recv <server-addr> <num-streams> [options] 
3333
3434Common options: 
35-   --incremental                   Transfer only files that have not been transferred already.  
35+   --continuous                    Keep syncing until no changes detected, skipping unchanged files  
3636
3737Sender options: 
3838  <listen-addr>                  Address (IP and port) for the sending side to bind to and 
@@ -151,13 +151,13 @@ fn main() {
151151    match  args. first ( ) . map ( |s| & s[ ..] )  { 
152152        Some ( "send" )  if  args. len ( )  >= 3  => { 
153153            let  addr = & args[ 1 ] ; 
154-             let  mut  incremental  = false ; 
154+             let  mut  continuous  = false ; 
155155            let  mut  max_bandwidth = None ; 
156156            let  mut  i = 2 ; 
157157
158158            while  i < args. len ( )  && args[ i] . starts_with ( '-' )  { 
159159                match  args[ i] . as_str ( )  { 
160-                     "--incremental "  => incremental  = true , 
160+                     "--continuous "  => continuous  = true , 
161161                    "--max-bandwidth-mbps"  => { 
162162                        i += 1 ; 
163163                        max_bandwidth = Some ( 
@@ -182,25 +182,36 @@ fn main() {
182182                return ; 
183183            } 
184184
185-             main_send ( 
186-                 SocketAddr :: from_str ( addr) . expect ( "Invalid send address" ) , 
187-                 fnames, 
188-                 WIRE_PROTO_VERSION , 
189-                 events_tx, 
190-                 max_bandwidth, 
191-                 incremental, 
192-             ) 
193-             . expect ( "Failed to send." ) ; 
185+             if  continuous { 
186+                 main_send_continuous ( 
187+                     SocketAddr :: from_str ( addr) . expect ( "Invalid send address" ) , 
188+                     fnames, 
189+                     WIRE_PROTO_VERSION , 
190+                     events_tx, 
191+                     max_bandwidth, 
192+                 ) 
193+                 . expect ( "Failed to send in continuous mode." ) ; 
194+             }  else  { 
195+                 main_send ( 
196+                     SocketAddr :: from_str ( addr) . expect ( "Invalid send address" ) , 
197+                     fnames, 
198+                     WIRE_PROTO_VERSION , 
199+                     events_tx, 
200+                     max_bandwidth, 
201+                     continuous, 
202+                 ) 
203+                 . expect ( "Failed to send." ) ; 
204+             } 
194205        } 
195206        Some ( "recv" )  if  args. len ( )  >= 3  => { 
196207            let  addr = & args[ 1 ] ; 
197208            let  n_conn = & args[ 2 ] ; 
198-             let  mut  incremental  = false ; 
209+             let  mut  continuous  = false ; 
199210            let  mut  i = 3 ; 
200211
201212            while  i < args. len ( )  && args[ i] . starts_with ( '-' )  { 
202213                match  args[ i] . as_str ( )  { 
203-                     "--incremental "  => incremental  = true , 
214+                     "--continuous "  => continuous  = true , 
204215                    _ => { 
205216                        eprintln ! ( "Unknown option: {}" ,  args[ i] ) ; 
206217                        eprintln ! ( "{}" ,  USAGE ) ; 
@@ -210,14 +221,37 @@ fn main() {
210221                i += 1 ; 
211222            } 
212223
213-             main_recv ( 
214-                 SocketAddr :: from_str ( addr) . expect ( "Invalid recv address" ) , 
215-                 n_conn, 
216-                 WriteMode :: AskConfirm , 
217-                 WIRE_PROTO_VERSION , 
218-                 incremental, 
219-             ) 
220-             . expect ( "Failed to receive." ) ; 
224+             if  continuous { 
225+                 // In continuous mode, keep retrying on errors 
226+                 loop  { 
227+                     match  main_recv ( 
228+                         SocketAddr :: from_str ( addr) . expect ( "Invalid recv address" ) , 
229+                         n_conn, 
230+                         WriteMode :: AskConfirm , 
231+                         WIRE_PROTO_VERSION , 
232+                         continuous, 
233+                     )  { 
234+                         Ok ( ( ) )  => { 
235+                             // Should never reach here in continuous mode 
236+                             println ! ( "Continuous mode unexpectedly exited" ) ; 
237+                             break ; 
238+                         } 
239+                         Err ( e)  => { 
240+                             eprintln ! ( "Error in continuous mode: {}, retrying in 1 second..." ,  e) ; 
241+                             std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) ; 
242+                         } 
243+                     } 
244+                 } 
245+             }  else  { 
246+                 main_recv ( 
247+                     SocketAddr :: from_str ( addr) . expect ( "Invalid recv address" ) , 
248+                     n_conn, 
249+                     WriteMode :: AskConfirm , 
250+                     WIRE_PROTO_VERSION , 
251+                     continuous, 
252+                 ) 
253+                 . expect ( "Failed to receive." ) ; 
254+             } 
221255        } 
222256        _ => eprintln ! ( "{}" ,  USAGE ) , 
223257    } 
@@ -382,13 +416,44 @@ fn all_filenames_from_path_names(fnames: &[String]) -> Result<Vec<String>> {
382416    Ok ( all_files) 
383417} 
384418
419+ fn  main_send_continuous ( 
420+     addr :  SocketAddr , 
421+     fnames :  & [ String ] , 
422+     protocol_version :  u16 , 
423+     sender_events :  std:: sync:: mpsc:: Sender < SenderEvent > , 
424+     max_bandwidth_mbps :  Option < u64 > , 
425+ )  -> Result < ( ) >  { 
426+     let  mut  round = 1 ; 
427+ 
428+     loop  { 
429+         println ! ( "\n === Continuous sync round {} ===" ,  round) ; 
430+ 
431+         // For continuous mode, always use the standard main_send which already handles 
432+         // the manifest-based sync properly 
433+         main_send ( 
434+             addr, 
435+             fnames, 
436+             protocol_version, 
437+             sender_events. clone ( ) , 
438+             max_bandwidth_mbps, 
439+             true ,  // continuous flag 
440+         ) ?; 
441+ 
442+         // Wait a bit before next round 
443+         std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) ; 
444+         round += 1 ; 
445+ 
446+         println ! ( "\n Waiting for changes..." ) ; 
447+     } 
448+ } 
449+ 
385450fn  main_send ( 
386451    addr :  SocketAddr , 
387452    fnames :  & [ String ] , 
388453    protocol_version :  u16 , 
389454    sender_events :  std:: sync:: mpsc:: Sender < SenderEvent > , 
390455    max_bandwidth_mbps :  Option < u64 > , 
391-     incremental :  bool , 
456+     continuous :  bool , 
392457)  -> Result < ( ) >  { 
393458    let  mut  plan = TransferPlan  { 
394459        proto_version :  protocol_version, 
@@ -453,8 +518,8 @@ fn main_send(
453518            stream. write_all ( & buffer[ ..] ) ?; 
454519            println ! ( "Waiting for the receiver to accept ..." ) ; 
455520
456-             // In incremental  mode, receive manifest reply and build actions 
457-             if  incremental  { 
521+             // In continuous  mode, receive manifest reply and build actions 
522+             if  continuous  { 
458523                let  manifest:  ManifestReply  = ManifestReply :: deserialize_reader ( & mut  stream) ?; 
459524                let  mut  actions = Vec :: new ( ) ; 
460525
@@ -479,7 +544,7 @@ fn main_send(
479544                } 
480545
481546                // Update send states based on actions 
482-                 println ! ( "\n  Incremental  sync actions:" ) ; 
547+                 println ! ( "\n  Continuous mode  sync actions:" ) ; 
483548                for  ( i,  action)  in  actions. iter ( ) . enumerate ( )  { 
484549                    match  action { 
485550                        Action :: Skip  => { 
@@ -682,30 +747,60 @@ fn main_recv(
682747    n_conn :  & str , 
683748    write_mode :  WriteMode , 
684749    protocol_version :  u16 , 
685-     incremental :  bool , 
750+     continuous :  bool , 
686751)  -> Result < ( ) >  { 
687752    let  n_connections:  u32  = u32:: from_str ( n_conn) . expect ( "Failed to parse number of connections." ) ; 
753+     let  mut  round = 1 ; 
754+     let  mut  ask_confirm = write_mode; 
688755
689-     // First we initiate one connection. The sender will send the plan over 
690-     // that. We read it. Unbuffered, because we want to skip the buffer for the 
691-     // remaining reads, but the header is tiny so it should be okay. 
692-     let  mut  stream = TcpStream :: connect ( addr) ?; 
693-     let  plan = TransferPlan :: deserialize_reader ( & mut  stream) ?; 
694-     if  plan. proto_version  != protocol_version { 
695-         return  Err ( Error :: new ( 
696-             ErrorKind :: InvalidData , 
697-             format ! ( 
698-                 "Sender is version {} and we only support {WIRE_PROTO_VERSION}" , 
699-                 plan. proto_version
700-             ) , 
701-         ) ) ; 
702-     } 
703-     if  write_mode == WriteMode :: AskConfirm  { 
704-         plan. ask_confirm_receive ( ) ?; 
705-     } 
756+     loop  { 
757+         if  continuous && round > 1  { 
758+             println ! ( "\n === Continuous sync round {} ===" ,  round) ; 
759+         } 
706760
707-     // Build manifest reply if incremental mode 
708-     let  manifest_reply = if  incremental { 
761+         // First we initiate one connection. The sender will send the plan over 
762+         // that. We read it. Unbuffered, because we want to skip the buffer for the 
763+         // remaining reads, but the header is tiny so it should be okay. 
764+         let  mut  stream = match  TcpStream :: connect ( addr)  { 
765+             Ok ( s)  => s, 
766+             Err ( e)  if  e. kind ( )  == ErrorKind :: ConnectionRefused  && continuous => { 
767+                 // In continuous mode, sender might be restarting between rounds 
768+                 println ! ( "\n Sender not available, retrying in 1 second..." ) ; 
769+                 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) ; 
770+                 continue ; 
771+             } 
772+             Err ( e)  => return  Err ( e) , 
773+         } ; 
774+ 
775+         let  plan = match  TransferPlan :: deserialize_reader ( & mut  stream)  { 
776+             Ok ( p)  => p, 
777+             Err ( e)  if  e. kind ( )  == ErrorKind :: ConnectionReset  && continuous => { 
778+                 // In continuous mode, connection reset means sender closed between rounds 
779+                 println ! ( "\n Connection reset by sender, retrying in 1 second..." ) ; 
780+                 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) ; 
781+                 continue ; 
782+             } 
783+             Err ( e)  => return  Err ( e) , 
784+         } ; 
785+         if  plan. proto_version  != protocol_version { 
786+             return  Err ( Error :: new ( 
787+                 ErrorKind :: InvalidData , 
788+                 format ! ( 
789+                     "Sender is version {} and we only support {WIRE_PROTO_VERSION}" , 
790+                     plan. proto_version
791+                 ) , 
792+             ) ) ; 
793+         } 
794+         if  ask_confirm == WriteMode :: AskConfirm  { 
795+             plan. ask_confirm_receive ( ) ?; 
796+             // Only ask confirmation on first round in continuous mode 
797+             if  continuous { 
798+                 ask_confirm = WriteMode :: Force ; 
799+             } 
800+         } 
801+ 
802+     // Build manifest reply if continuous mode 
803+     let  manifest_reply = if  continuous { 
709804        let  mut  manifest = ManifestReply  {  files :  Vec :: new ( )  } ; 
710805
711806        for  file_plan in  & plan. files  { 
@@ -735,8 +830,24 @@ fn main_recv(
735830
736831        // Send manifest reply 
737832        let  manifest_data = borsh:: to_vec ( & manifest) ?; 
738-         stream. write_all ( & manifest_data) ?; 
739-         stream. flush ( ) ?; 
833+         if  let  Err ( e)  = stream. write_all ( & manifest_data)  { 
834+             if  e. kind ( )  == ErrorKind :: ConnectionReset  || e. kind ( )  == ErrorKind :: BrokenPipe  { 
835+                 // Connection closed by sender, retry 
836+                 println ! ( "\n Connection closed while sending manifest, retrying in 1 second..." ) ; 
837+                 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) ; 
838+                 continue ; 
839+             } 
840+             return  Err ( e) ; 
841+         } 
842+         if  let  Err ( e)  = stream. flush ( )  { 
843+             if  e. kind ( )  == ErrorKind :: ConnectionReset  || e. kind ( )  == ErrorKind :: BrokenPipe  { 
844+                 // Connection closed by sender, retry 
845+                 println ! ( "\n Connection closed while flushing, retrying in 1 second..." ) ; 
846+                 std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) ; 
847+                 continue ; 
848+             } 
849+             return  Err ( e) ; 
850+         } 
740851
741852        Some ( manifest) 
742853    }  else  { 
@@ -751,8 +862,8 @@ fn main_recv(
751862    let  ( sender,  receiver)  = mpsc:: sync_channel :: < Chunk > ( 16 ) ; 
752863
753864    let  writer_thread = std:: thread:: spawn :: < _ ,  ( ) > ( move  || { 
754-         let  ( total_len,  is_incremental )  = if  let  Some ( _manifest)  = manifest_reply { 
755-             // For incremental  mode, use original total as an estimate for progress 
865+         let  ( total_len,  is_continuous )  = if  let  Some ( _manifest)  = manifest_reply { 
866+             // For continuous  mode, use original total as an estimate for progress 
756867            // but don't enforce strict byte count at the end 
757868            ( plan. files . iter ( ) . map ( |f| f. len ) . sum ( ) ,  true ) 
758869        }  else  { 
@@ -774,8 +885,8 @@ fn main_recv(
774885            print_progress ( bytes_received,  total_len,  start_time) ; 
775886        } 
776887
777-         // Only check exact byte count in non-incremental  mode 
778-         if  !is_incremental  && bytes_received < total_len { 
888+         // Only check exact byte count in non-continuous  mode 
889+         if  !is_continuous  && bytes_received < total_len { 
779890            panic ! ( "Transmission ended, but not all data was received." ) ; 
780891        } 
781892    } ) ; 
@@ -855,14 +966,25 @@ fn main_recv(
855966    // crates, and create gigabytes of build artifacts, just to do a clean exit. 
856967    // So as a hack, just connect one more time to wake up the sender's accept() 
857968    // loop. It will conclude there is nothing to send and then exit. 
858-     match  TcpStream :: connect ( addr)  { 
859-         Ok ( stream)  => std:: mem:: drop ( stream) , 
860-         // Too bad if we can't wake up the sender, but it's not our problem. 
861-         Err ( _)  => { } 
969+     if  let  Ok ( stream)  = TcpStream :: connect ( addr)  { 
970+         std:: mem:: drop ( stream) 
862971    } 
972+     // Too bad if we can't wake up the sender, but it's not our problem. 
863973
864974    writer_thread. join ( ) . expect ( "Failed to join writer thread." ) ; 
865975
976+         // In continuous mode, loop back for next round 
977+         if  continuous { 
978+             round += 1 ; 
979+             // Add a small delay before next round 
980+             std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 100 ) ) ; 
981+             continue ; 
982+         }  else  { 
983+             // In normal mode, we're done 
984+             break ; 
985+         } 
986+     } 
987+ 
866988    Ok ( ( ) ) 
867989} 
868990
@@ -1038,7 +1160,7 @@ mod tests {
10381160    } 
10391161
10401162    #[ test]  
1041-     fn  test_incremental_sync ( )  { 
1163+     fn  test_continuous_sync ( )  { 
10421164        let  ( events_tx,  events_rx)  = std:: sync:: mpsc:: channel :: < SenderEvent > ( ) ; 
10431165        env:: set_current_dir ( "/tmp/" ) . unwrap ( ) ; 
10441166        let  cwd = env:: current_dir ( ) . unwrap ( ) ; 
0 commit comments