Skip to content

Commit 0467bcd

Browse files
committed
Add continuous mode
1 parent f449afc commit 0467bcd

File tree

4 files changed

+343
-69
lines changed

4 files changed

+343
-69
lines changed

README.md

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,38 @@ On the receiving end, suppose we download with 32 TCP connections:
4848

4949
File modification timestamps are preserved during all transfers.
5050

51-
## Incremental transfers
51+
## Continuous mode
5252

53-
Fastsync supports incremental transfers with the `--incremental` flag. When enabled, fastsync will:
53+
Fastsync supports continuous mode with the `--continuous` flag. When enabled, fastsync will:
5454

5555
1. Compare files by name, size, and modification timestamp
5656
2. Skip files that already exist at the destination with matching size and timestamp
5757
3. Transfer only files that are missing or have different size/timestamp
58+
4. Keep syncing in rounds until no changes are detected
59+
5. After each transfer round, check if any files were modified during the transfer
60+
6. If changes are detected, start another sync round
61+
7. Stop when a complete round finishes with no changes detected
5862

59-
Both sender and receiver must use the `--incremental` flag:
63+
Both sender and receiver must use the `--continuous` flag:
6064

6165
# Sender
62-
fastsync send 100.71.154.83:4440 --incremental file.tar.gz
66+
fastsync send 100.71.154.83:4440 --continuous ./data
6367

6468
# Receiver
65-
fastsync recv 100.71.154.83:4440 32 --incremental
69+
fastsync recv 100.71.154.83:4440 32 --continuous
70+
71+
This mode is particularly useful for:
72+
- Syncing directories where files are still being written
73+
- Live database migrations where you sync while the database is running
74+
- Backup scenarios where files are being actively modified
75+
- Any situation where you need to minimize downtime during a transfer
76+
77+
Typical workflow with `--continuous`:
78+
1. Start continuous mode while the source system is live
79+
2. Monitor the output - transfers will become smaller each round
80+
3. When transfers are minimal, stop the source application
81+
4. Let fastsync complete the final round
82+
5. You now have a complete, consistent copy
6683

6784
## Testing
6885

src/main.rs

Lines changed: 179 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ Usage:
3232
fastsync recv <server-addr> <num-streams> [options]
3333
3434
Common options:
35-
--incremental Transfer only files that have not been transferred already.
35+
--continuous Keep syncing until no changes detected, skipping unchanged files
3636
3737
Sender options:
3838
<listen-addr> Address (IP and port) for the sending side to bind to and
@@ -151,13 +151,13 @@ fn main() {
151151
match args.first().map(|s| &s[..]) {
152152
Some("send") if args.len() >= 3 => {
153153
let addr = &args[1];
154-
let mut incremental = false;
154+
let mut continuous = false;
155155
let mut max_bandwidth = None;
156156
let mut i = 2;
157157

158158
while i < args.len() && args[i].starts_with('-') {
159159
match args[i].as_str() {
160-
"--incremental" => incremental = true,
160+
"--continuous" => continuous = true,
161161
"--max-bandwidth-mbps" => {
162162
i += 1;
163163
max_bandwidth = Some(
@@ -182,25 +182,36 @@ fn main() {
182182
return;
183183
}
184184

185-
main_send(
186-
SocketAddr::from_str(addr).expect("Invalid send address"),
187-
fnames,
188-
WIRE_PROTO_VERSION,
189-
events_tx,
190-
max_bandwidth,
191-
incremental,
192-
)
193-
.expect("Failed to send.");
185+
if continuous {
186+
main_send_continuous(
187+
SocketAddr::from_str(addr).expect("Invalid send address"),
188+
fnames,
189+
WIRE_PROTO_VERSION,
190+
events_tx,
191+
max_bandwidth,
192+
)
193+
.expect("Failed to send in continuous mode.");
194+
} else {
195+
main_send(
196+
SocketAddr::from_str(addr).expect("Invalid send address"),
197+
fnames,
198+
WIRE_PROTO_VERSION,
199+
events_tx,
200+
max_bandwidth,
201+
continuous,
202+
)
203+
.expect("Failed to send.");
204+
}
194205
}
195206
Some("recv") if args.len() >= 3 => {
196207
let addr = &args[1];
197208
let n_conn = &args[2];
198-
let mut incremental = false;
209+
let mut continuous = false;
199210
let mut i = 3;
200211

201212
while i < args.len() && args[i].starts_with('-') {
202213
match args[i].as_str() {
203-
"--incremental" => incremental = true,
214+
"--continuous" => continuous = true,
204215
_ => {
205216
eprintln!("Unknown option: {}", args[i]);
206217
eprintln!("{}", USAGE);
@@ -210,14 +221,37 @@ fn main() {
210221
i += 1;
211222
}
212223

213-
main_recv(
214-
SocketAddr::from_str(addr).expect("Invalid recv address"),
215-
n_conn,
216-
WriteMode::AskConfirm,
217-
WIRE_PROTO_VERSION,
218-
incremental,
219-
)
220-
.expect("Failed to receive.");
224+
if continuous {
225+
// In continuous mode, keep retrying on errors
226+
loop {
227+
match main_recv(
228+
SocketAddr::from_str(addr).expect("Invalid recv address"),
229+
n_conn,
230+
WriteMode::AskConfirm,
231+
WIRE_PROTO_VERSION,
232+
continuous,
233+
) {
234+
Ok(()) => {
235+
// Should never reach here in continuous mode
236+
println!("Continuous mode unexpectedly exited");
237+
break;
238+
}
239+
Err(e) => {
240+
eprintln!("Error in continuous mode: {}, retrying in 1 second...", e);
241+
std::thread::sleep(std::time::Duration::from_secs(1));
242+
}
243+
}
244+
}
245+
} else {
246+
main_recv(
247+
SocketAddr::from_str(addr).expect("Invalid recv address"),
248+
n_conn,
249+
WriteMode::AskConfirm,
250+
WIRE_PROTO_VERSION,
251+
continuous,
252+
)
253+
.expect("Failed to receive.");
254+
}
221255
}
222256
_ => eprintln!("{}", USAGE),
223257
}
@@ -382,13 +416,44 @@ fn all_filenames_from_path_names(fnames: &[String]) -> Result<Vec<String>> {
382416
Ok(all_files)
383417
}
384418

419+
fn main_send_continuous(
420+
addr: SocketAddr,
421+
fnames: &[String],
422+
protocol_version: u16,
423+
sender_events: std::sync::mpsc::Sender<SenderEvent>,
424+
max_bandwidth_mbps: Option<u64>,
425+
) -> Result<()> {
426+
let mut round = 1;
427+
428+
loop {
429+
println!("\n=== Continuous sync round {} ===", round);
430+
431+
// For continuous mode, always use the standard main_send which already handles
432+
// the manifest-based sync properly
433+
main_send(
434+
addr,
435+
fnames,
436+
protocol_version,
437+
sender_events.clone(),
438+
max_bandwidth_mbps,
439+
true, // continuous flag
440+
)?;
441+
442+
// Wait a bit before next round
443+
std::thread::sleep(std::time::Duration::from_secs(1));
444+
round += 1;
445+
446+
println!("\nWaiting for changes...");
447+
}
448+
}
449+
385450
fn main_send(
386451
addr: SocketAddr,
387452
fnames: &[String],
388453
protocol_version: u16,
389454
sender_events: std::sync::mpsc::Sender<SenderEvent>,
390455
max_bandwidth_mbps: Option<u64>,
391-
incremental: bool,
456+
continuous: bool,
392457
) -> Result<()> {
393458
let mut plan = TransferPlan {
394459
proto_version: protocol_version,
@@ -453,8 +518,8 @@ fn main_send(
453518
stream.write_all(&buffer[..])?;
454519
println!("Waiting for the receiver to accept ...");
455520

456-
// In incremental mode, receive manifest reply and build actions
457-
if incremental {
521+
// In continuous mode, receive manifest reply and build actions
522+
if continuous {
458523
let manifest: ManifestReply = ManifestReply::deserialize_reader(&mut stream)?;
459524
let mut actions = Vec::new();
460525

@@ -479,7 +544,7 @@ fn main_send(
479544
}
480545

481546
// Update send states based on actions
482-
println!("\nIncremental sync actions:");
547+
println!("\nContinuous mode sync actions:");
483548
for (i, action) in actions.iter().enumerate() {
484549
match action {
485550
Action::Skip => {
@@ -682,30 +747,60 @@ fn main_recv(
682747
n_conn: &str,
683748
write_mode: WriteMode,
684749
protocol_version: u16,
685-
incremental: bool,
750+
continuous: bool,
686751
) -> Result<()> {
687752
let n_connections: u32 = u32::from_str(n_conn).expect("Failed to parse number of connections.");
753+
let mut round = 1;
754+
let mut ask_confirm = write_mode;
688755

689-
// First we initiate one connection. The sender will send the plan over
690-
// that. We read it. Unbuffered, because we want to skip the buffer for the
691-
// remaining reads, but the header is tiny so it should be okay.
692-
let mut stream = TcpStream::connect(addr)?;
693-
let plan = TransferPlan::deserialize_reader(&mut stream)?;
694-
if plan.proto_version != protocol_version {
695-
return Err(Error::new(
696-
ErrorKind::InvalidData,
697-
format!(
698-
"Sender is version {} and we only support {WIRE_PROTO_VERSION}",
699-
plan.proto_version
700-
),
701-
));
702-
}
703-
if write_mode == WriteMode::AskConfirm {
704-
plan.ask_confirm_receive()?;
705-
}
756+
loop {
757+
if continuous && round > 1 {
758+
println!("\n=== Continuous sync round {} ===", round);
759+
}
706760

707-
// Build manifest reply if incremental mode
708-
let manifest_reply = if incremental {
761+
// First we initiate one connection. The sender will send the plan over
762+
// that. We read it. Unbuffered, because we want to skip the buffer for the
763+
// remaining reads, but the header is tiny so it should be okay.
764+
let mut stream = match TcpStream::connect(addr) {
765+
Ok(s) => s,
766+
Err(e) if e.kind() == ErrorKind::ConnectionRefused && continuous => {
767+
// In continuous mode, sender might be restarting between rounds
768+
println!("\nSender not available, retrying in 1 second...");
769+
std::thread::sleep(std::time::Duration::from_secs(1));
770+
continue;
771+
}
772+
Err(e) => return Err(e),
773+
};
774+
775+
let plan = match TransferPlan::deserialize_reader(&mut stream) {
776+
Ok(p) => p,
777+
Err(e) if e.kind() == ErrorKind::ConnectionReset && continuous => {
778+
// In continuous mode, connection reset means sender closed between rounds
779+
println!("\nConnection reset by sender, retrying in 1 second...");
780+
std::thread::sleep(std::time::Duration::from_secs(1));
781+
continue;
782+
}
783+
Err(e) => return Err(e),
784+
};
785+
if plan.proto_version != protocol_version {
786+
return Err(Error::new(
787+
ErrorKind::InvalidData,
788+
format!(
789+
"Sender is version {} and we only support {WIRE_PROTO_VERSION}",
790+
plan.proto_version
791+
),
792+
));
793+
}
794+
if ask_confirm == WriteMode::AskConfirm {
795+
plan.ask_confirm_receive()?;
796+
// Only ask confirmation on first round in continuous mode
797+
if continuous {
798+
ask_confirm = WriteMode::Force;
799+
}
800+
}
801+
802+
// Build manifest reply if continuous mode
803+
let manifest_reply = if continuous {
709804
let mut manifest = ManifestReply { files: Vec::new() };
710805

711806
for file_plan in &plan.files {
@@ -735,8 +830,24 @@ fn main_recv(
735830

736831
// Send manifest reply
737832
let manifest_data = borsh::to_vec(&manifest)?;
738-
stream.write_all(&manifest_data)?;
739-
stream.flush()?;
833+
if let Err(e) = stream.write_all(&manifest_data) {
834+
if e.kind() == ErrorKind::ConnectionReset || e.kind() == ErrorKind::BrokenPipe {
835+
// Connection closed by sender, retry
836+
println!("\nConnection closed while sending manifest, retrying in 1 second...");
837+
std::thread::sleep(std::time::Duration::from_secs(1));
838+
continue;
839+
}
840+
return Err(e);
841+
}
842+
if let Err(e) = stream.flush() {
843+
if e.kind() == ErrorKind::ConnectionReset || e.kind() == ErrorKind::BrokenPipe {
844+
// Connection closed by sender, retry
845+
println!("\nConnection closed while flushing, retrying in 1 second...");
846+
std::thread::sleep(std::time::Duration::from_secs(1));
847+
continue;
848+
}
849+
return Err(e);
850+
}
740851

741852
Some(manifest)
742853
} else {
@@ -751,8 +862,8 @@ fn main_recv(
751862
let (sender, receiver) = mpsc::sync_channel::<Chunk>(16);
752863

753864
let writer_thread = std::thread::spawn::<_, ()>(move || {
754-
let (total_len, is_incremental) = if let Some(_manifest) = manifest_reply {
755-
// For incremental mode, use original total as an estimate for progress
865+
let (total_len, is_continuous) = if let Some(_manifest) = manifest_reply {
866+
// For continuous mode, use original total as an estimate for progress
756867
// but don't enforce strict byte count at the end
757868
(plan.files.iter().map(|f| f.len).sum(), true)
758869
} else {
@@ -774,8 +885,8 @@ fn main_recv(
774885
print_progress(bytes_received, total_len, start_time);
775886
}
776887

777-
// Only check exact byte count in non-incremental mode
778-
if !is_incremental && bytes_received < total_len {
888+
// Only check exact byte count in non-continuous mode
889+
if !is_continuous && bytes_received < total_len {
779890
panic!("Transmission ended, but not all data was received.");
780891
}
781892
});
@@ -855,14 +966,25 @@ fn main_recv(
855966
// crates, and create gigabytes of build artifacts, just to do a clean exit.
856967
// So as a hack, just connect one more time to wake up the sender's accept()
857968
// loop. It will conclude there is nothing to send and then exit.
858-
match TcpStream::connect(addr) {
859-
Ok(stream) => std::mem::drop(stream),
860-
// Too bad if we can't wake up the sender, but it's not our problem.
861-
Err(_) => {}
969+
if let Ok(stream) = TcpStream::connect(addr) {
970+
std::mem::drop(stream)
862971
}
972+
// Too bad if we can't wake up the sender, but it's not our problem.
863973

864974
writer_thread.join().expect("Failed to join writer thread.");
865975

976+
// In continuous mode, loop back for next round
977+
if continuous {
978+
round += 1;
979+
// Add a small delay before next round
980+
std::thread::sleep(std::time::Duration::from_millis(100));
981+
continue;
982+
} else {
983+
// In normal mode, we're done
984+
break;
985+
}
986+
}
987+
866988
Ok(())
867989
}
868990

@@ -1038,7 +1160,7 @@ mod tests {
10381160
}
10391161

10401162
#[test]
1041-
fn test_incremental_sync() {
1163+
fn test_continuous_sync() {
10421164
let (events_tx, events_rx) = std::sync::mpsc::channel::<SenderEvent>();
10431165
env::set_current_dir("/tmp/").unwrap();
10441166
let cwd = env::current_dir().unwrap();

0 commit comments

Comments
 (0)