Skip to content

Commit

Permalink
Handle SIGINT and SIGTERM gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
martindisch committed Nov 21, 2019
1 parent cef82d2 commit 34e2769
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 27 deletions.
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dirs = "2.0.2"
crossbeam = "0.7.3"
log = "0.4.8"
simplelog = "0.7.4"
ctrlc = { version = "3.1.3", features = ["termination"] }
2 changes: 2 additions & 0 deletions LICENSE-3RD-PARTY
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ applies to:
- crossbeam, Copyright (c) 2019 The Crossbeam Project Developers
- log, Copyright (c) 2014 The Rust Project Developers
- simplelog, Copyright (c) 2015 Victor Brekenfeld
- ctrlc
-------------------------------------------------------------------------------
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -34,6 +35,7 @@ applies to:
- crossbeam
- log
- simplelog
- ctrlc
-------------------------------------------------------------------------------
Apache License
Version 2.0, January 2004
Expand Down
68 changes: 56 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::{
fs,
path::{Path, PathBuf},
process::Command,
sync::atomic::{AtomicBool, Ordering},
sync::Arc,
thread,
time::Duration,
};
Expand Down Expand Up @@ -39,25 +41,47 @@ pub fn run(
output: impl AsRef<Path>,
hosts: Vec<&str>,
) -> Result<()> {
// Set up a shared boolean to check whether the user has aborted
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
info!(
"Abort signal received. Waiting for remote encoders to finish the \
current chunk and quit gracefully."
);
})
.expect("Error setting Ctrl-C handler");

// Create our local temporary directory
let mut tmp_dir = dirs::home_dir().ok_or("Home directory not found")?;
tmp_dir.push(TMP_DIR);
fs::create_dir(&tmp_dir)?;

// Start the operation
let result = run_local(input.as_ref(), output.as_ref(), &tmp_dir, &hosts);
let result = run_local(
input.as_ref(),
output.as_ref(),
&tmp_dir,
&hosts,
running.clone(),
);

// Clean up infallibly
info!("Cleaning up");
// TODO: make sure this also happens when stopped with Ctrl + C
// Remove remote temporary directories
for &host in &hosts {
// Clean up temporary directory on host
let output = Command::new("ssh")
.args(&[&host, "rm", "-r", remote::TMP_DIR])
.output()
.expect("Failed executing ssh command");
if !output.status.success() {
// These checks for `running` are necessary, because Ctrl + C also
// seems to terminate the commands we launch, which means they'll
// return unsuccessfully. With this check we prevent an error message
// in this case, because that's what the user wants. Unfortunately this
// also means we have to litter the `running` variable almost
// everyhwere.
if !output.status.success() && running.load(Ordering::SeqCst) {
error!("Failed removing remote temporary directory on {}", host);
}
}
Expand All @@ -76,21 +100,29 @@ fn run_local(
output: &Path,
tmp_dir: &Path,
hosts: &[&str],
running: Arc<AtomicBool>,
) -> Result<()> {
// Build path to audio file
let mut audio = tmp_dir.to_path_buf();
audio.push(AUDIO);
// Start the extraction
info!("Extracting audio");
local::extract_audio(input, &audio)?;
local::extract_audio(input, &audio, &running)?;

// We check whether the user has aborted before every time-intensive task.
// It's a better experience, but a bit ugly code-wise.
if !running.load(Ordering::SeqCst) {
// Abort early
return Ok(());
}

// Create directory for video chunks
let mut chunk_dir = tmp_dir.to_path_buf();
chunk_dir.push("chunks");
fs::create_dir(&chunk_dir)?;
// Split the video
info!("Splitting video into chunks");
local::split_video(input, &chunk_dir, SEGMENT_LENGTH)?;
local::split_video(input, &chunk_dir, SEGMENT_LENGTH, &running)?;
// Get the list of created chunks
let mut chunks = fs::read_dir(&chunk_dir)?
.map(|res| res.and_then(|readdir| Ok(readdir.path())))
Expand All @@ -99,6 +131,11 @@ fn run_local(
// for the user to watch since it allows seeing the progress at a glance.
chunks.sort();

if !running.load(Ordering::SeqCst) {
// Abort early
return Ok(());
}

// Initialize the global channel for chunks
let (sender, receiver) = channel::unbounded();
// Send all chunks into it
Expand All @@ -122,24 +159,31 @@ fn run_local(
let thread_receiver = receiver.clone();
// Create owned encoded_dir for the thread
let enc = encoded_dir.clone();
// Create copy of running indicator for the thread
let r = running.clone();
// Start it
let handle = thread::spawn(|| {
remote::host_thread(host, thread_receiver, enc);
});
let handle =
thread::Builder::new().name(host.clone()).spawn(|| {
remote::host_thread(host, thread_receiver, enc, r);
})?;
host_threads.push(handle);
}

// Wait for all hosts to finish
for handle in host_threads {
if let Err(e) = handle.join() {
error!("Thread for a host panicked: {:?}", e);
if handle.join().is_err() {
return Err("A host thread panicked".into());
}
}

if !running.load(Ordering::SeqCst) {
// We aborted early
return Ok(());
}

// Combine encoded chunks and audio
info!("Combining encoded chunks into final video");
local::combine(&encoded_dir, &audio, output)?;
local::combine(&encoded_dir, &audio, output, &running)?;

Ok(())
}
29 changes: 23 additions & 6 deletions src/local.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
//! Functions for operations on the local host.
use std::{fs, path::Path, process::Command, time::Duration};
use std::{
fs,
path::Path,
process::Command,
sync::atomic::{AtomicBool, Ordering},
sync::Arc,
time::Duration,
};

use super::Result;

/// Uses `ffmpeg` to locally extract and encode the audio.
pub fn extract_audio(input: &Path, output: &Path) -> Result<()> {
pub fn extract_audio(
input: &Path,
output: &Path,
running: &Arc<AtomicBool>,
) -> Result<()> {
// Convert input and output to &str
let input = input.to_str().ok_or("Input invalid Unicode")?;
let output = output.to_str().ok_or("Output invalid Unicode")?;
Expand All @@ -15,7 +26,7 @@ pub fn extract_audio(input: &Path, output: &Path) -> Result<()> {
"-y", "-i", input, "-vn", "-c:a", "aac", "-b:a", "192k", output,
])
.output()?;
if !output.status.success() {
if !output.status.success() && running.load(Ordering::SeqCst) {
return Err("Failed extracting audio".into());
}

Expand All @@ -27,6 +38,7 @@ pub fn split_video(
input: &Path,
output_dir: &Path,
segment_length: Duration,
running: &Arc<AtomicBool>,
) -> Result<()> {
// Convert input and output to &str
let input = input.to_str().ok_or("Input invalid Unicode")?;
Expand All @@ -49,15 +61,20 @@ pub fn split_video(
output,
])
.output()?;
if !output.status.success() {
if !output.status.success() && running.load(Ordering::SeqCst) {
return Err("Failed splitting video".into());
}

Ok(())
}

/// Uses `ffmpeg` to locally combine the encoded chunks and audio.
pub fn combine(encoded_dir: &Path, audio: &Path, output: &Path) -> Result<()> {
pub fn combine(
encoded_dir: &Path,
audio: &Path,
output: &Path,
running: &Arc<AtomicBool>,
) -> Result<()> {
// Create list of encoded chunks
let mut chunks = fs::read_dir(&encoded_dir)?
.map(|res| res.and_then(|readdir| Ok(readdir.path())))
Expand Down Expand Up @@ -104,7 +121,7 @@ pub fn combine(encoded_dir: &Path, audio: &Path, output: &Path) -> Result<()> {
output,
])
.output()?;
if !output.status.success() {
if !output.status.success() && running.load(Ordering::SeqCst) {
return Err("Failed combining video".into());
}

Expand Down
Loading

0 comments on commit 34e2769

Please sign in to comment.