diff --git a/Cargo.lock b/Cargo.lock index d96af80..4874ac4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -198,6 +198,15 @@ dependencies = [ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ctrlc" +version = "3.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "nix 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "dirs" version = "2.0.2" @@ -269,6 +278,18 @@ dependencies = [ "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "nix" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "cc 1.0.47 (registry+https://github.com/rust-lang/crates.io-index)", + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", + "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "num-integer" version = "0.1.41" @@ -399,6 +420,7 @@ version = "0.1.0" dependencies = [ "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", + "ctrlc 3.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "simplelog 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -482,6 +504,11 @@ name = "vec_map" version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "winapi" version = "0.3.8" @@ -526,6 +553,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum crossbeam-queue 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dfd6515864a82d2f877b42813d4553292c6659498c9a2aa31bab5a15243c2700" "checksum crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" "checksum crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4" +"checksum ctrlc 3.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c7dfd2d8b4c82121dfdff120f818e09fc4380b0b7e17a742081a89b94853e87f" "checksum dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3" "checksum dirs-sys 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "afa0b23de8fd801745c471deffa6e12d248f962c9fd4b4c33787b055599bde7b" "checksum failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f8273f13c977665c5db7eb2b99ae520952fe5ac831ae4cd09d80c4c7042b5ed9" @@ -535,6 +563,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)" = "1a31a0627fdf1f6a39ec0dd577e101440b7db22672c0901fe00a9a6fbb5c24e8" "checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" "checksum memoffset 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "75189eb85871ea5c2e2c15abbdd541185f63b408415e5051f5cac122d8c774b9" +"checksum nix 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6c722bee1037d430d0f8e687bbdbf222f27cc6e4e68d5caf630857bb2b6dbdce" "checksum num-integer 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)" = "b85e541ef8255f6cf42bbfe4ef361305c6c135d10919ecc26126c4e5ae94bc09" "checksum num-traits 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)" = "443c53b3c3531dfcbfa499d8893944db78474ad7a1d87fa2d94d1a2231693ac6" "checksum proc-macro2 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "9c9e470a8dc4aeae2dee2f335e8f533e2d4b347e1434e5671afc49b054592f27" @@ -561,6 +590,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum unicode-width 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7007dbd421b92cc6e28410fe7362e2e0a2503394908f417b68ec8d1c364c4e20" "checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" "checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" +"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" "checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml index 296dcfa..ee1b1e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,4 @@ dirs = "2.0.2" crossbeam = "0.7.3" log = "0.4.8" simplelog = "0.7.4" +ctrlc = { version = "3.1.3", features = ["termination"] } diff --git a/LICENSE-3RD-PARTY b/LICENSE-3RD-PARTY index ad92451..fb4c6d7 100644 --- a/LICENSE-3RD-PARTY +++ b/LICENSE-3RD-PARTY @@ -7,6 +7,7 @@ applies to: - crossbeam, Copyright (c) 2019 The Crossbeam Project Developers - log, Copyright (c) 2014 The Rust Project Developers - simplelog, Copyright (c) 2015 Victor Brekenfeld +- ctrlc ------------------------------------------------------------------------------- Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -34,6 +35,7 @@ applies to: - crossbeam - log - simplelog +- ctrlc ------------------------------------------------------------------------------- Apache License Version 2.0, January 2004 diff --git a/src/lib.rs b/src/lib.rs index ae63720..912b542 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,8 @@ use std::{ fs, path::{Path, PathBuf}, process::Command, + sync::atomic::{AtomicBool, Ordering}, + sync::Arc, thread, time::Duration, }; @@ -39,17 +41,33 @@ pub fn run( output: impl AsRef, hosts: Vec<&str>, ) -> Result<()> { + // Set up a shared boolean to check whether the user has aborted + let running = Arc::new(AtomicBool::new(true)); + let r = running.clone(); + ctrlc::set_handler(move || { + r.store(false, Ordering::SeqCst); + info!( + "Abort signal received. Waiting for remote encoders to finish the \ + current chunk and quit gracefully." + ); + }) + .expect("Error setting Ctrl-C handler"); + // Create our local temporary directory let mut tmp_dir = dirs::home_dir().ok_or("Home directory not found")?; tmp_dir.push(TMP_DIR); fs::create_dir(&tmp_dir)?; // Start the operation - let result = run_local(input.as_ref(), output.as_ref(), &tmp_dir, &hosts); + let result = run_local( + input.as_ref(), + output.as_ref(), + &tmp_dir, + &hosts, + running.clone(), + ); - // Clean up infallibly info!("Cleaning up"); - // TODO: make sure this also happens when stopped with Ctrl + C // Remove remote temporary directories for &host in &hosts { // Clean up temporary directory on host @@ -57,7 +75,13 @@ pub fn run( .args(&[&host, "rm", "-r", remote::TMP_DIR]) .output() .expect("Failed executing ssh command"); - if !output.status.success() { + // These checks for `running` are necessary, because Ctrl + C also + // seems to terminate the commands we launch, which means they'll + // return unsuccessfully. With this check we prevent an error message + // in this case, because that's what the user wants. Unfortunately this + // also means we have to litter the `running` variable almost + // everyhwere. + if !output.status.success() && running.load(Ordering::SeqCst) { error!("Failed removing remote temporary directory on {}", host); } } @@ -76,13 +100,21 @@ fn run_local( output: &Path, tmp_dir: &Path, hosts: &[&str], + running: Arc, ) -> Result<()> { // Build path to audio file let mut audio = tmp_dir.to_path_buf(); audio.push(AUDIO); // Start the extraction info!("Extracting audio"); - local::extract_audio(input, &audio)?; + local::extract_audio(input, &audio, &running)?; + + // We check whether the user has aborted before every time-intensive task. + // It's a better experience, but a bit ugly code-wise. + if !running.load(Ordering::SeqCst) { + // Abort early + return Ok(()); + } // Create directory for video chunks let mut chunk_dir = tmp_dir.to_path_buf(); @@ -90,7 +122,7 @@ fn run_local( fs::create_dir(&chunk_dir)?; // Split the video info!("Splitting video into chunks"); - local::split_video(input, &chunk_dir, SEGMENT_LENGTH)?; + local::split_video(input, &chunk_dir, SEGMENT_LENGTH, &running)?; // Get the list of created chunks let mut chunks = fs::read_dir(&chunk_dir)? .map(|res| res.and_then(|readdir| Ok(readdir.path()))) @@ -99,6 +131,11 @@ fn run_local( // for the user to watch since it allows seeing the progress at a glance. chunks.sort(); + if !running.load(Ordering::SeqCst) { + // Abort early + return Ok(()); + } + // Initialize the global channel for chunks let (sender, receiver) = channel::unbounded(); // Send all chunks into it @@ -122,24 +159,31 @@ fn run_local( let thread_receiver = receiver.clone(); // Create owned encoded_dir for the thread let enc = encoded_dir.clone(); + // Create copy of running indicator for the thread + let r = running.clone(); // Start it - let handle = thread::spawn(|| { - remote::host_thread(host, thread_receiver, enc); - }); + let handle = + thread::Builder::new().name(host.clone()).spawn(|| { + remote::host_thread(host, thread_receiver, enc, r); + })?; host_threads.push(handle); } // Wait for all hosts to finish for handle in host_threads { - if let Err(e) = handle.join() { - error!("Thread for a host panicked: {:?}", e); + if handle.join().is_err() { return Err("A host thread panicked".into()); } } + if !running.load(Ordering::SeqCst) { + // We aborted early + return Ok(()); + } + // Combine encoded chunks and audio info!("Combining encoded chunks into final video"); - local::combine(&encoded_dir, &audio, output)?; + local::combine(&encoded_dir, &audio, output, &running)?; Ok(()) } diff --git a/src/local.rs b/src/local.rs index 64e0970..3cd9d5c 100644 --- a/src/local.rs +++ b/src/local.rs @@ -1,11 +1,22 @@ //! Functions for operations on the local host. -use std::{fs, path::Path, process::Command, time::Duration}; +use std::{ + fs, + path::Path, + process::Command, + sync::atomic::{AtomicBool, Ordering}, + sync::Arc, + time::Duration, +}; use super::Result; /// Uses `ffmpeg` to locally extract and encode the audio. -pub fn extract_audio(input: &Path, output: &Path) -> Result<()> { +pub fn extract_audio( + input: &Path, + output: &Path, + running: &Arc, +) -> Result<()> { // Convert input and output to &str let input = input.to_str().ok_or("Input invalid Unicode")?; let output = output.to_str().ok_or("Output invalid Unicode")?; @@ -15,7 +26,7 @@ pub fn extract_audio(input: &Path, output: &Path) -> Result<()> { "-y", "-i", input, "-vn", "-c:a", "aac", "-b:a", "192k", output, ]) .output()?; - if !output.status.success() { + if !output.status.success() && running.load(Ordering::SeqCst) { return Err("Failed extracting audio".into()); } @@ -27,6 +38,7 @@ pub fn split_video( input: &Path, output_dir: &Path, segment_length: Duration, + running: &Arc, ) -> Result<()> { // Convert input and output to &str let input = input.to_str().ok_or("Input invalid Unicode")?; @@ -49,7 +61,7 @@ pub fn split_video( output, ]) .output()?; - if !output.status.success() { + if !output.status.success() && running.load(Ordering::SeqCst) { return Err("Failed splitting video".into()); } @@ -57,7 +69,12 @@ pub fn split_video( } /// Uses `ffmpeg` to locally combine the encoded chunks and audio. -pub fn combine(encoded_dir: &Path, audio: &Path, output: &Path) -> Result<()> { +pub fn combine( + encoded_dir: &Path, + audio: &Path, + output: &Path, + running: &Arc, +) -> Result<()> { // Create list of encoded chunks let mut chunks = fs::read_dir(&encoded_dir)? .map(|res| res.and_then(|readdir| Ok(readdir.path()))) @@ -104,7 +121,7 @@ pub fn combine(encoded_dir: &Path, audio: &Path, output: &Path) -> Result<()> { output, ]) .output()?; - if !output.status.success() { + if !output.status.success() && running.load(Ordering::SeqCst) { return Err("Failed combining video".into()); } diff --git a/src/remote.rs b/src/remote.rs index 31cab52..7bef0b8 100644 --- a/src/remote.rs +++ b/src/remote.rs @@ -2,7 +2,13 @@ use crossbeam::channel::{self, Receiver}; use log::{debug, info}; -use std::{path::PathBuf, process::Command, thread}; +use std::{ + path::PathBuf, + process::Command, + sync::atomic::{AtomicBool, Ordering}, + sync::Arc, + thread, +}; /// The name of the temporary directory in the home directory of remote hosts. pub static TMP_DIR: &str = "shepherd_tmp_remote"; @@ -12,6 +18,7 @@ pub fn host_thread( host: String, global_receiver: Receiver, encoded_dir: PathBuf, + running: Arc, ) { debug!("Spawned host thread {}", host); @@ -21,7 +28,7 @@ pub fn host_thread( .output() .expect("Failed executing ssh command"); assert!( - output.status.success(), + output.status.success() || !running.load(Ordering::SeqCst), "Failed creating remote temporary directory" ); @@ -29,8 +36,13 @@ pub fn host_thread( let (sender, receiver) = channel::bounded(0); // Create copy of host for thread let host_cpy = host.clone(); + // Create copy of running indicator for thread + let r = running.clone(); // Start the encoder thread - let handle = thread::spawn(move || encoder_thread(host_cpy, receiver)); + let handle = thread::Builder::new() + .name(format!("{}-encoder", host)) + .spawn(move || encoder_thread(host_cpy, receiver, r)) + .expect("Failed spawning thread"); // Try to fetch a chunk from the global channel while let Ok(chunk) = global_receiver.recv() { @@ -43,9 +55,17 @@ pub fn host_thread( ]) .output() .expect("Failed executing scp command"); - assert!(output.status.success(), "Failed transferring chunk"); - // Pass the chunk to the encoder thread (blocks until channel empty) - sender.send(chunk).expect("Failed sending chunk in channel"); + assert!( + output.status.success() || !running.load(Ordering::SeqCst), + "Failed transferring chunk" + ); + + // Pass the chunk to the encoder thread (blocks until encoder is ready + // to receive and fails if it terminated prematurely) + if sender.send(chunk).is_err() { + // Encoder stopped, so quit early + break; + } } // Since the global channel is empty, drop our sender to disconnect the // local channel @@ -54,6 +74,11 @@ pub fn host_thread( // Wait for the encoder let encoded = handle.join().expect("Encoder thread panicked"); + // Abort early if signal was sent + if !running.load(Ordering::SeqCst) { + info!("{} exiting", host); + return; + } debug!("Host thread {} got encoded chunks {:?}", host, encoded); // Get a &str from encoded_dir PathBuf @@ -64,7 +89,10 @@ pub fn host_thread( .args(&[&format!("{}:{}", host, chunk), encoded_dir]) .output() .expect("Failed executing scp command"); - assert!(output.status.success(), "Failed transferring encoded chunk"); + assert!( + output.status.success() || !running.load(Ordering::SeqCst), + "Failed transferring encoded chunk" + ); info!("{} returned encoded chunk {}", host, chunk); } @@ -72,11 +100,20 @@ pub fn host_thread( } /// Encodes chunks on a host and returns the encoded remote file names. -fn encoder_thread(host: String, receiver: Receiver) -> Vec { +fn encoder_thread( + host: String, + receiver: Receiver, + running: Arc, +) -> Vec { // We'll use this to store the encoded chunks' remote file names. let mut encoded = Vec::new(); while let Ok(chunk) = receiver.recv() { + // Abort early if signal was sent + if !running.load(Ordering::SeqCst) { + break; + } + debug!("Encoder thread {} received chunk {:?}", host, chunk); // Construct the chunk's remote file name let chunk_name = format!( @@ -125,7 +162,10 @@ fn encoder_thread(host: String, receiver: Receiver) -> Vec { ]) .output() .expect("Failed executing ssh command"); - assert!(output.status.success(), "Failed encoding"); + assert!( + output.status.success() || !running.load(Ordering::SeqCst), + "Failed encoding" + ); // Remember the encoded chunk encoded.push(enc_name);