diff --git a/bin/eth-proofs/Cargo.toml b/bin/eth-proofs/Cargo.toml index aee9fd6..b0e36ac 100644 --- a/bin/eth-proofs/Cargo.toml +++ b/bin/eth-proofs/Cargo.toml @@ -40,3 +40,7 @@ latest-execution-witness = ["fetcher/latest-execution-witness"] # skip-pending-on-retry feature skips pending blocks during retry attempts # this is useful when proving the latest blocks if a block failed to prove and triggered a retry skip-pending-on-retry = ["proving-client/skip-pending-on-retry"] + +# fast-retry feature enables faster retry with no log saving, no CHUNK_SIZE change, +# and always skips replaying failed blocks (resumes from latest) +fast-retry = ["proving-client/fast-retry"] diff --git a/bin/eth-proofs/src/main.rs b/bin/eth-proofs/src/main.rs index e0b6c55..be752ab 100644 --- a/bin/eth-proofs/src/main.rs +++ b/bin/eth-proofs/src/main.rs @@ -135,6 +135,13 @@ struct Args { )] pub proving_subblock_urls: Option>, + #[clap( + long, + env = "PROVING_TIMEOUT_SECONDS", + help = "Timeout for proving operations in seconds (default: 30 for fast-retry, 120 otherwise)" + )] + pub proving_timeout_seconds: Option, + #[clap( long, default_value = "false", @@ -302,6 +309,14 @@ fn init_proving_client(args: &Args) -> (Arc, Arc (Arc, Arc = None; // variable for saving the last proving inputs (for retry on timeout) + #[cfg(not(feature = "fast-retry"))] let mut last_proving_inputs: Option = None; // queue for saving the pending messages when a block is proving let mut pending_msgs = VecDeque::new(); @@ -78,7 +82,7 @@ impl ProvingClient { loop { // try to receive a proving or proved message with a timeout let msg = timeout( - Duration::from_secs(MAX_PROVING_WAITING_SECONDS), + Duration::from_secs(max_proving_waiting_seconds), self.comm_endpoint.recv(), ) .await; @@ -99,47 +103,76 @@ impl ProvingClient { .as_millis() as u64; let block_number = report.block_number; if current_time - start_timestamps[&block_number] - >= MAX_PROVING_WAITING_SECONDS * 1000 + >= max_proving_waiting_seconds * 1000 { warn!( "proving-client: proving timeout for block {block_number}, attempting to restart docker containers and retry", ); - restart_proving_clients().await; - #[cfg(feature = "skip-pending-on-retry")] - pending_msgs.clear(); - - // re-initialize aggregator and subblock clients - info!( - "proving-client: reinitializing aggregator and subblock clients" - ); - agg_client = self.init_agg_proving_client(&token).await; - subblock_clients = self.init_subblock_proving_clients(&token).await; + #[cfg(feature = "fast-retry")] + { + restart_proving_clients_fast().await; - // re-send the last proving inputs to retry the failed block - if let Some(ref inputs) = last_proving_inputs { + // re-initialize aggregator and subblock clients info!( - "proving-client: resending proving inputs for block {}", + "proving-client: reinitializing aggregator and subblock clients" + ); + agg_client = self.init_agg_proving_client(&token).await; + subblock_clients = + self.init_subblock_proving_clients(&token).await; + + // fast-retry: clear pending messages and skip replay + pending_msgs.clear(); + start_timestamps.remove(&block_number); + proving_block_report = None; + info!( + "proving-client: fast retry complete, skipping block {} and resuming from latest", block_number ); - let start_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() - as u64; - start_timestamps.insert(block_number, start_time); - send_proving_inputs( - inputs.clone(), - &mut agg_client, - &mut subblock_clients, - ) - .await; + } + + #[cfg(not(feature = "fast-retry"))] + { + restart_proving_clients().await; + + #[cfg(feature = "skip-pending-on-retry")] + pending_msgs.clear(); + + // re-initialize aggregator and subblock clients info!( - "proving-client: proving inputs resent, continuing to wait for proof" + "proving-client: reinitializing aggregator and subblock clients" ); - } else { - error!("proving-client: no proving inputs saved for retry"); - panic!("proving-client: cannot retry without proving inputs"); + agg_client = self.init_agg_proving_client(&token).await; + subblock_clients = + self.init_subblock_proving_clients(&token).await; + + // re-send the last proving inputs to retry the failed block + if let Some(ref inputs) = last_proving_inputs { + info!( + "proving-client: resending proving inputs for block {}", + block_number + ); + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + as u64; + start_timestamps.insert(block_number, start_time); + send_proving_inputs( + inputs.clone(), + &mut agg_client, + &mut subblock_clients, + ) + .await; + info!( + "proving-client: proving inputs resent, continuing to wait for proof" + ); + } else { + error!("proving-client: no proving inputs saved for retry"); + panic!( + "proving-client: cannot retry without proving inputs" + ); + } } } } else { @@ -171,7 +204,10 @@ impl ProvingClient { report.block_number, ); // save the proving inputs for potential retry on timeout - last_proving_inputs = Some(proving_msg.proving_inputs); + #[cfg(not(feature = "fast-retry"))] + { + last_proving_inputs = Some(proving_msg.proving_inputs); + } proving_block_report = Some(report); } } @@ -257,7 +293,10 @@ impl ProvingClient { report.block_number, ); // save the proving inputs for potential retry on timeout - last_proving_inputs = Some(proving_msg.proving_inputs); + #[cfg(not(feature = "fast-retry"))] + { + last_proving_inputs = Some(proving_msg.proving_inputs); + } proving_block_report = Some(report); } } @@ -268,40 +307,67 @@ impl ProvingClient { warn!( "proving-client: proving timeout for block {block_number}, attempting to restart docker containers and retry", ); - restart_proving_clients().await; - #[cfg(feature = "skip-pending-on-retry")] - pending_msgs.clear(); + #[cfg(feature = "fast-retry")] + { + restart_proving_clients_fast().await; - // re-initialize aggregator and subblock clients - info!("proving-client: reinitializing aggregator and subblock clients"); - agg_client = self.init_agg_proving_client(&token).await; - subblock_clients = self.init_subblock_proving_clients(&token).await; + // re-initialize aggregator and subblock clients + info!( + "proving-client: reinitializing aggregator and subblock clients" + ); + agg_client = self.init_agg_proving_client(&token).await; + subblock_clients = self.init_subblock_proving_clients(&token).await; - // re-send the last proving inputs to retry the failed block - if let Some(ref inputs) = last_proving_inputs { + // fast-retry: clear pending messages and skip replay + pending_msgs.clear(); + start_timestamps.remove(&block_number); + proving_block_report = None; info!( - "proving-client: resending proving inputs for block {}", + "proving-client: fast retry complete, skipping block {} and resuming from latest", block_number ); - let start_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() - as u64; - start_timestamps.insert(block_number, start_time); - send_proving_inputs( - inputs.clone(), - &mut agg_client, - &mut subblock_clients, - ) - .await; + } + + #[cfg(not(feature = "fast-retry"))] + { + restart_proving_clients().await; + + #[cfg(feature = "skip-pending-on-retry")] + pending_msgs.clear(); + + // re-initialize aggregator and subblock clients info!( - "proving-client: proving inputs resent, continuing to wait for proof" + "proving-client: reinitializing aggregator and subblock clients" ); - } else { - error!("proving-client: no proving inputs saved for retry"); - panic!("proving-client: cannot retry without proving inputs"); + agg_client = self.init_agg_proving_client(&token).await; + subblock_clients = self.init_subblock_proving_clients(&token).await; + + // re-send the last proving inputs to retry the failed block + if let Some(ref inputs) = last_proving_inputs { + info!( + "proving-client: resending proving inputs for block {}", + block_number + ); + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + as u64; + start_timestamps.insert(block_number, start_time); + send_proving_inputs( + inputs.clone(), + &mut agg_client, + &mut subblock_clients, + ) + .await; + info!( + "proving-client: proving inputs resent, continuing to wait for proof" + ); + } else { + error!("proving-client: no proving inputs saved for retry"); + panic!("proving-client: cannot retry without proving inputs"); + } } } } @@ -531,7 +597,8 @@ async fn send_proving_inputs( } } -// restart proving clients +// restart proving clients (legacy retry with CHUNK_SIZE reduction) +#[cfg(not(feature = "fast-retry"))] async fn restart_proving_clients() { // restart docker containers using the retry script let retry_result = Command::new("./scripts/docker-multi-control.sh") @@ -570,3 +637,52 @@ async fn restart_proving_clients() { ); sleep(Duration::from_secs(DOCKER_RETRY_WAIT_SECONDS)).await; } + +// fast restart proving clients (no log saving, no CHUNK_SIZE changes) +#[cfg(feature = "fast-retry")] +#[allow(dead_code)] +async fn restart_proving_clients_fast() { + // restart docker containers using the fast-retry script + let retry_result = Command::new("./scripts/docker-multi-control.sh") + .arg("fast-retry") + .status() + .await; + + match retry_result { + Ok(status) if status.success() => { + info!("proving-client: docker containers fast-restarted successfully"); + } + Ok(status) => { + error!( + "proving-client: docker fast-retry script failed with exit code: {:?}", + status.code() + ); + panic!( + "proving-client: cannot recover from docker restart failure - manual intervention required" + ); + } + Err(e) => { + error!( + "proving-client: failed to execute docker fast-retry script: {}", + e + ); + panic!( + "proving-client: cannot recover from docker restart failure - manual intervention required" + ); + } + } + + // wait for containers to fully initialize + info!( + "proving-client: waiting {}s for docker containers to initialize", + DOCKER_RETRY_WAIT_SECONDS + ); + sleep(Duration::from_secs(DOCKER_RETRY_WAIT_SECONDS)).await; +} + +// fallback: use legacy retry if fast-retry feature is not enabled +#[cfg(not(feature = "fast-retry"))] +#[allow(dead_code)] +async fn restart_proving_clients_fast() { + restart_proving_clients().await; +} diff --git a/crates/proving-client/src/config.rs b/crates/proving-client/src/config.rs index e3be13a..7597332 100644 --- a/crates/proving-client/src/config.rs +++ b/crates/proving-client/src/config.rs @@ -12,4 +12,7 @@ pub struct ProvingClientConfig { // subbblock proving grpc urls pub subblock_urls: Vec, + + // proving timeout in seconds + pub proving_timeout_seconds: u64, } diff --git a/scripts/docker-common.sh b/scripts/docker-common.sh index 69ba7b7..a877679 100755 --- a/scripts/docker-common.sh +++ b/scripts/docker-common.sh @@ -277,6 +277,209 @@ validate_config() { return $errors } +# ============================================================================= +# PARALLEL EXECUTION HELPERS +# ============================================================================= + +# Run a function in parallel for all workers +# Usage: run_parallel_workers [extra_args...] +# The function will be called with worker index as first arg, then extra_args +# Each worker operation runs in background, then we wait for all to complete +run_parallel_workers() { + local func="$1" + shift + local extra_args=("$@") + local pids=() + local temp_files=() + + # Create temp directory for each worker's output to avoid interleaving + local base_temp_dir + base_temp_dir=$(mktemp -d) || { + error "Failed to create temp directory" + return 1 + } + + log "Running ${#WORKERS[@]} worker operations in parallel..." + + # Launch all worker operations in background + for i in "${!WORKERS[@]}"; do + local worker_spec="${WORKERS[$i]}" + read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" || { + error "Failed to parse worker spec for index $i" + continue + } + + # Create temp file for this worker's output + local worker_temp="${base_temp_dir}/worker_${i}.log" + temp_files+=("$worker_temp") + + # Run function in background with proper error handling + { + # Execute in a subshell that won't exit on errors + ( + set +e + # Call the function and capture all output + if [[ ${#extra_args[@]} -gt 0 ]]; then + result=$("$func" "$i" "${extra_args[@]}" 2>&1) + else + result=$("$func" "$i" 2>&1) + fi + # Prefix each line with worker ID + echo "$result" | while IFS= read -r line || [[ -n "$line" ]]; do + echo "[Worker $wid] $line" + done + ) + } > "$worker_temp" 2>&1 & + + pids+=($!) + done + + # Verify we have PIDs to wait for + if [[ ${#pids[@]} -eq 0 ]]; then + error "No worker operations were started" + rm -rf "$base_temp_dir" 2>/dev/null || true + return 1 + fi + + # Wait for all background jobs to complete + local failures=0 + local completed=0 + for i in "${!pids[@]}"; do + local pid="${pids[$i]}" + if wait "$pid" 2>/dev/null; then + completed=$((completed + 1)) + else + failures=$((failures + 1)) + fi + + # Output worker's log file (preserves ordering and prevents interleaving) + if [[ -f "${temp_files[$i]}" ]] && [[ -s "${temp_files[$i]}" ]]; then + cat "${temp_files[$i]}" + elif [[ -f "${temp_files[$i]}" ]]; then + echo "[Worker ${i}] (no output)" + fi + rm -f "${temp_files[$i]}" 2>/dev/null || true + done + + # Clean up temp directory + rm -rf "$base_temp_dir" 2>/dev/null || true + + if [[ $failures -eq 0 ]]; then + log "All ${completed} worker operations completed successfully" + return 0 + else + warn "$failures worker operation(s) failed out of ${#WORKERS[@]}" + return 1 + fi +} + +# Run aggregator and all workers in parallel (when order doesn't matter) +# Usage: run_parallel_all [extra_args...] +run_parallel_all() { + local agg_func="$1" + local worker_func="$2" + shift 2 + local extra_args=("$@") + + log "Running aggregator and ${#WORKERS[@]} workers in parallel..." + + local pids=() + local agg_temp + agg_temp=$(mktemp) || { + error "Failed to create temp file for aggregator" + return 1 + } + + local base_temp_dir + base_temp_dir=$(mktemp -d) || { + error "Failed to create temp directory" + rm -f "$agg_temp" + return 1 + } + local worker_temps=() + + # Start aggregator in background + { + ( + set +e + if [[ ${#extra_args[@]} -gt 0 ]]; then + result=$("$agg_func" "${extra_args[@]}" 2>&1) + else + result=$("$agg_func" 2>&1) + fi + echo "$result" | while IFS= read -r line || [[ -n "$line" ]]; do + echo "[Aggregator] $line" + done + ) + } > "$agg_temp" 2>&1 & + local agg_pid=$! + pids+=($agg_pid) + + # Start all workers in background + for i in "${!WORKERS[@]}"; do + local worker_spec="${WORKERS[$i]}" + read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" || { + error "Failed to parse worker spec for index $i" + continue + } + + local worker_temp="${base_temp_dir}/worker_${i}.log" + worker_temps+=("$worker_temp") + + { + ( + set +e + if [[ ${#extra_args[@]} -gt 0 ]]; then + result=$("$worker_func" "$i" "${extra_args[@]}" 2>&1) + else + result=$("$worker_func" "$i" 2>&1) + fi + echo "$result" | while IFS= read -r line || [[ -n "$line" ]]; do + echo "[Worker $wid] $line" + done + ) + } > "$worker_temp" 2>&1 & + pids+=($!) + done + + # Wait for all to complete + local failures=0 + for pid in "${pids[@]}"; do + if ! wait "$pid" 2>/dev/null; then + failures=$((failures + 1)) + fi + done + + # Output aggregator log first + if [[ -f "$agg_temp" ]] && [[ -s "$agg_temp" ]]; then + cat "$agg_temp" + elif [[ -f "$agg_temp" ]]; then + echo "[Aggregator] (no output)" + fi + rm -f "$agg_temp" 2>/dev/null || true + + # Output worker logs + for worker_temp in "${worker_temps[@]}"; do + if [[ -f "$worker_temp" ]] && [[ -s "$worker_temp" ]]; then + cat "$worker_temp" + elif [[ -f "$worker_temp" ]]; then + echo "[Worker] (no output)" + fi + rm -f "$worker_temp" 2>/dev/null || true + done + + # Clean up + rm -rf "$base_temp_dir" 2>/dev/null || true + + if [[ $failures -eq 0 ]]; then + log "All operations completed successfully" + return 0 + else + warn "$failures operation(s) failed" + return 1 + fi +} + # ============================================================================= # CONTAINER MANAGEMENT FUNCTIONS # ============================================================================= @@ -635,6 +838,17 @@ stop_worker() { stop_and_remove_container "$host" "$user" "$CONTAINER_NAME_WORKER" "$port" } +# Internal wrapper for parallel stop_worker execution +_stop_worker_by_index() { + local idx="$1" + local save_logs="${2:-false}" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + stop_worker "$host" "$user" "$port" "$wid" "$save_logs" "$remote_dir" +} + # Start a single worker container start_worker() { local host="$1" @@ -656,6 +870,17 @@ start_worker() { log "Worker $wid started successfully" } +# Internal wrapper for parallel start_worker execution +_start_worker_by_index() { + local idx="$1" + local env_file="${2:-$ENV_FILE_WORKER}" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + start_worker "$host" "$user" "$port" "$wid" "$remote_dir" "$cpuset_cpus" "$cpuset_mems" "$env_file" +} + # Cleanup a single worker container cleanup_worker() { local host="$1" @@ -667,32 +892,42 @@ cleanup_worker() { force_kill_container "$host" "$user" "$CONTAINER_NAME_WORKER" "$port" } +# Internal wrapper for parallel cleanup_worker execution +_cleanup_worker_by_index() { + local idx="$1" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + cleanup_worker "$host" "$user" "$port" "$wid" +} + # Stop all worker containers stop_all_workers() { local save_logs="${1:-false}" log "Stopping all ${#WORKERS[@]} workers..." - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - stop_worker "$host" "$user" "$port" "$wid" "$save_logs" "$remote_dir" - apply_worker_delay - done - - log "All workers stopped" + if run_parallel_workers _stop_worker_by_index "$save_logs"; then + log "All workers stopped" + return 0 + else + error "Some workers failed to stop" + return 1 + fi } # Start all worker containers start_all_workers() { log "Starting all ${#WORKERS[@]} workers..." - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - start_worker "$host" "$user" "$port" "$wid" "$remote_dir" "$cpuset_cpus" "$cpuset_mems" - apply_worker_delay - done - - log "All workers started" + if run_parallel_workers _start_worker_by_index; then + log "All workers started" + return 0 + else + error "Some workers failed to start" + return 1 + fi } # Get status of all worker containers @@ -714,62 +949,70 @@ start_all_workers() { # done # } +# Internal wrapper for parallel get_worker_status execution +_get_worker_status_by_index() { + local idx="$1" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + log "Worker $wid status on ${user}@${host}:" + + # Step 1: Get running containers + local running_containers + running_containers=$(ssh_exec "$user" "$host" "$port" "sudo docker ps --format '{{.Names}}'") + + if [[ $? -ne 0 ]]; then + error "Failed to connect to worker $wid at ${user}@${host}" + echo " Status: CONNECTION FAILED" + return 1 + fi + + echo " [✓] SSH connected" + + # Step 2: Check if container is running (locally) + if echo "$running_containers" | grep -q '^pico-subblock-worker$'; then + echo " Status: RUNNING" + ssh_exec "$user" "$host" "$port" "sudo docker ps | grep pico-subblock-worker" + return 0 + fi + + # Step 3: Check if container exists but stopped + local all_containers + all_containers=$(ssh_exec "$user" "$host" "$port" "sudo docker ps -a --format '{{.Names}}'") + + if [[ $? -ne 0 ]]; then + error "Failed to connect to worker $wid at ${user}@${host}" + echo " Status: CONNECTION FAILED" + return 1 + fi + + if echo "$all_containers" | grep -q '^pico-subblock-worker$'; then + echo " Status: STOPPED (container exists but not running)" + ssh_exec "$user" "$host" "$port" "sudo docker ps -a | grep pico-subblock-worker" + return 0 + fi + + echo " Status: DOES NOT EXIST" + return 0 +} + # Get status of all worker containers get_all_worker_status() { - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - log "Worker $wid status on ${user}@${host}:" - - # Step 1: Get running containers - local running_containers - running_containers=$(ssh_exec "$user" "$host" "$port" "sudo docker ps --format '{{.Names}}'") - - if [[ $? -ne 0 ]]; then - error "Failed to connect to worker $wid at ${user}@${host}" - echo " Status: CONNECTION FAILED" - continue - fi - - echo " [✓] SSH connected" - - # Step 2: Check if container is running (locally) - if echo "$running_containers" | grep -q '^pico-subblock-worker$'; then - echo " Status: RUNNING" - ssh_exec "$user" "$host" "$port" "sudo docker ps | grep pico-subblock-worker" - continue - fi - - # Step 3: Check if container exists but stopped - local all_containers - all_containers=$(ssh_exec "$user" "$host" "$port" "sudo docker ps -a --format '{{.Names}}'") - - if [[ $? -ne 0 ]]; then - error "Failed to connect to worker $wid at ${user}@${host}" - echo " Status: CONNECTION FAILED" - continue - fi - - if echo "$all_containers" | grep -q '^pico-subblock-worker$'; then - echo " Status: STOPPED (container exists but not running)" - ssh_exec "$user" "$host" "$port" "sudo docker ps -a | grep pico-subblock-worker" - continue - fi - - echo " Status: DOES NOT EXIST" - done + run_parallel_workers _get_worker_status_by_index } # Cleanup all worker containers cleanup_all_workers() { log "Cleaning up all ${#WORKERS[@]} workers..." - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - cleanup_worker "$host" "$user" "$port" "$wid" - apply_worker_delay - done - - log "All workers cleaned up" + if run_parallel_workers _cleanup_worker_by_index; then + log "All workers cleaned up" + return 0 + else + error "Some workers failed to cleanup" + return 1 + fi } # Force kill a single worker container (immediate termination) @@ -784,24 +1027,25 @@ force_kill_worker() { return $? } +# Internal wrapper for parallel force_kill_worker execution +_force_kill_worker_by_index() { + local idx="$1" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + force_kill_worker "$host" "$user" "$port" "$wid" +} + # Force kill all worker containers (immediate termination) force_kill_all_workers() { log "Force killing all ${#WORKERS[@]} workers..." - local failures=0 - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - if ! force_kill_worker "$host" "$user" "$port" "$wid"; then - ((failures++)) - fi - apply_worker_delay - done - - if [[ $failures -eq 0 ]]; then + if run_parallel_workers _force_kill_worker_by_index; then log "All workers force killed successfully" return 0 else - error "$failures worker(s) failed to be removed" + error "Some workers failed to be removed" return 1 fi } @@ -815,9 +1059,40 @@ stop_all() { local save_logs="${1:-true}" log "=== Stopping all containers ===" - stop_aggregator "$save_logs" - stop_all_workers "$save_logs" - log "=== All containers stopped ===" + + # Run aggregator and workers in parallel + local agg_pid + local workers_pid + + # Start aggregator stop in background + ( + stop_aggregator "$save_logs" 2>&1 | while IFS= read -r line; do + echo "[Aggregator] $line" + done + ) & + agg_pid=$! + + # Start workers stop in background + ( + stop_all_workers "$save_logs" 2>&1 | while IFS= read -r line; do + echo "$line" + done + ) & + workers_pid=$! + + # Wait for both to complete + local agg_result=0 + local workers_result=0 + wait "$agg_pid" || agg_result=$? + wait "$workers_pid" || workers_result=$? + + if [[ $agg_result -eq 0 ]] && [[ $workers_result -eq 0 ]]; then + log "=== All containers stopped ===" + return 0 + else + error "=== Some containers failed to stop ===" + return 1 + fi } # Start all containers (aggregator + workers) @@ -851,11 +1126,57 @@ show_all_status() { # Cleanup all containers (force remove without logs) cleanup_all() { log "=== Cleaning up all containers ===" - cleanup_aggregator - cleanup_all_workers + + # Run aggregator and workers in parallel + local agg_pid + local workers_pid + + # Start aggregator cleanup in background + ( + cleanup_aggregator 2>&1 | while IFS= read -r line; do + echo "[Aggregator] $line" + done + ) & + agg_pid=$! + + # Start workers cleanup in background + ( + cleanup_all_workers 2>&1 | while IFS= read -r line; do + echo "$line" + done + ) & + workers_pid=$! + + # Wait for both to complete + local agg_result=0 + local workers_result=0 + wait "$agg_pid" || agg_result=$? + wait "$workers_pid" || workers_result=$? + # Wait a moment to ensure cleanup completes sleep 2 - log "=== All containers cleaned up ===" + + if [[ $agg_result -eq 0 ]] && [[ $workers_result -eq 0 ]]; then + log "=== All containers cleaned up ===" + return 0 + else + error "=== Some containers failed to cleanup ===" + return 1 + fi +} + +# Internal wrapper for parallel verify_worker_gone execution +_verify_worker_gone_by_index() { + local idx="$1" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + if ! is_container_gone "$host" "$user" "$CONTAINER_NAME_WORKER" "$port"; then + error "Worker $wid container still exists on ${user}@${host}" + return 1 + fi + return 0 } # Verify all containers are completely removed @@ -870,14 +1191,10 @@ verify_all_containers_gone() { ((failures++)) fi - # Check workers - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - if ! is_container_gone "$host" "$user" "$CONTAINER_NAME_WORKER" "$port"; then - error "Worker $wid container still exists on ${user}@${host}" - ((failures++)) - fi - done + # Check workers in parallel + if ! run_parallel_workers _verify_worker_gone_by_index; then + ((failures++)) + fi if [[ $failures -eq 0 ]]; then log "All containers verified as removed" @@ -892,11 +1209,31 @@ verify_all_containers_gone() { force_kill_all() { log "=== Force killing all containers ===" + # Run aggregator and workers in parallel + local agg_pid + local workers_pid + + # Start aggregator force kill in background + ( + force_kill_aggregator 2>&1 | while IFS= read -r line; do + echo "[Aggregator] $line" + done + ) & + agg_pid=$! + + # Start workers force kill in background + ( + force_kill_all_workers 2>&1 | while IFS= read -r line; do + echo "$line" + done + ) & + workers_pid=$! + + # Wait for both to complete local agg_result=0 local workers_result=0 - - force_kill_aggregator || agg_result=$? - force_kill_all_workers || workers_result=$? + wait "$agg_pid" || agg_result=$? + wait "$workers_pid" || workers_result=$? # Wait a moment to ensure kill completes sleep 2 @@ -960,6 +1297,9 @@ export -f init_all_ssh_connections close_all_ssh_connections # Worker management functions export -f get_worker parse_worker_spec build_worker_lists apply_worker_delay +# Parallel execution helpers +export -f run_parallel_workers run_parallel_all + # Configuration validation export -f validate_config @@ -979,6 +1319,11 @@ export -f stop_worker start_worker cleanup_worker force_kill_worker export -f stop_all_workers start_all_workers get_all_worker_status export -f cleanup_all_workers force_kill_all_workers +# Internal worker wrapper functions (need to be exported for background jobs) +export -f _stop_worker_by_index _start_worker_by_index _cleanup_worker_by_index +export -f _force_kill_worker_by_index _get_worker_status_by_index +export -f _verify_worker_gone_by_index + # Combined functions export -f stop_all start_all restart_all show_all_status export -f cleanup_all force_kill_all diff --git a/scripts/docker-multi-control.sh b/scripts/docker-multi-control.sh index 6516426..814dad8 100755 --- a/scripts/docker-multi-control.sh +++ b/scripts/docker-multi-control.sh @@ -24,6 +24,7 @@ Commands: force-kill Force kill all containers (immediate) restart Restart all containers retry Retry with smaller CHUNK_SIZE (force kill mode) + fast-retry Fast retry without logs or CHUNK_SIZE changes reset-chunk-size Reset CHUNK_SIZE to normal value status Show container status logs Show container logs @@ -58,9 +59,12 @@ Examples: # Restart only aggregator $0 restart --agg-only - # Retry failed block + # Retry failed block (with CHUNK_SIZE reduction) $0 retry --chunk-size 2097152 + # Fast retry without logs or CHUNK_SIZE changes + $0 fast-retry + # Reset CHUNK_SIZE to normal $0 reset-chunk-size @@ -167,12 +171,17 @@ EOF case "$mode" in all|workers) - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + # Internal wrapper for parallel remove_image execution + _remove_worker_image_by_index() { + local idx="$1" + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" log "Removing worker image on ${user}@${host} (worker $wid)..." remove_image_with_dependencies "$host" "$user" "$port" "$CONTAINER_NAME_WORKER" "$IMAGE_NAME_WORKER" || true - apply_worker_delay - done + } + export -f _remove_worker_image_by_index + + run_parallel_workers _remove_worker_image_by_index || true ;; esac @@ -195,6 +204,10 @@ cmd_retry() { "${SCRIPT_DIR}/docker-multi-retry.sh" "$@" } +cmd_fast_retry() { + "${SCRIPT_DIR}/docker-multi-fast-retry.sh" "$@" +} + cmd_reset_chunk_size() { "${SCRIPT_DIR}/docker-multi-reset-chunk-size.sh" "$@" } @@ -263,16 +276,29 @@ cmd_save_logs() { log "=== Saving All Logs ===" - # Save aggregator logs - local agg_log="${AGG_REMOTE_DIR}/${LOGS_DIR}/aggregator-manual-${timestamp}.log" - save_container_logs "$AGG_HOST" "$AGG_USER" "$CONTAINER_NAME_AGGREGATOR" "$agg_log" "$AGG_PORT" || true - - # Save worker logs - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + # Internal wrapper for parallel save_worker_logs execution + _save_worker_logs_by_index() { + local idx="$1" + local timestamp="$2" + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" local worker_log="${remote_dir}/${LOGS_DIR}/subblock-${wid}-manual-${timestamp}.log" save_container_logs "$host" "$user" "$CONTAINER_NAME_WORKER" "$worker_log" "$port" || true - done + } + export -f _save_worker_logs_by_index + + # Save aggregator logs in background + local agg_log="${AGG_REMOTE_DIR}/${LOGS_DIR}/aggregator-manual-${timestamp}.log" + ( + save_container_logs "$AGG_HOST" "$AGG_USER" "$CONTAINER_NAME_AGGREGATOR" "$agg_log" "$AGG_PORT" || true + ) & + local agg_pid=$! + + # Save worker logs in parallel + run_parallel_workers _save_worker_logs_by_index "$timestamp" || true + + # Wait for aggregator log save + wait "$agg_pid" || true log "=== Logs Saved ===" } @@ -430,6 +456,9 @@ main() { retry) cmd_retry "$@" ;; + fast-retry) + cmd_fast_retry "$@" + ;; reset-chunk-size) cmd_reset_chunk_size "$@" ;; diff --git a/scripts/docker-multi-deploy.sh b/scripts/docker-multi-deploy.sh index 64bd905..11dd30b 100755 --- a/scripts/docker-multi-deploy.sh +++ b/scripts/docker-multi-deploy.sh @@ -199,6 +199,42 @@ deploy_aggregator() { return 0 } +# Internal wrapper for parallel deploy_worker execution +_deploy_worker_by_index() { + local idx="$1" + local worker_image="$2" + local skip_cleanup="$3" + local keep_tar="$4" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + log "Deploying to worker $wid..." + + if ! deploy_image_to_machine \ + "$host" \ + "$user" \ + "$worker_image" \ + "$remote_dir" \ + "$CONTAINER_NAME_WORKER" \ + "$IMAGE_NAME_WORKER" \ + "worker $wid" \ + "$port"; then + error "Worker $wid deployment failed" + return 1 + else + # Remove tar file if requested + if [[ "$keep_tar" != "true" ]]; then + local image_filename=$(basename "$worker_image") + log "Removing tar file from worker $wid..." + ssh_exec "$user" "$host" "$port" "rm -f '${remote_dir}/${image_filename}'" || true + fi + fi + + return 0 +} +export -f _deploy_worker_by_index + # Deploy to all workers deploy_workers() { local worker_image="$1" @@ -212,48 +248,34 @@ deploy_workers() { return 1 fi - local failed_workers=() - - # Deploy to each worker - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - + # Deploy to all workers in parallel + if run_parallel_workers _deploy_worker_by_index "$worker_image" "$skip_cleanup" "$keep_tar"; then log "" - log "Deploying to worker $wid..." - - if ! deploy_image_to_machine \ - "$host" \ - "$user" \ - "$worker_image" \ - "$remote_dir" \ - "$CONTAINER_NAME_WORKER" \ - "$IMAGE_NAME_WORKER" \ - "worker $wid" \ - "$port"; then - error "Worker $wid deployment failed" - failed_workers+=("$wid") - else - # Remove tar file if requested - if [[ "$keep_tar" != "true" ]]; then - local image_filename=$(basename "$worker_image") - log "Removing tar file from worker $wid..." - ssh_exec "$user" "$host" "$port" "rm -f '${remote_dir}/${image_filename}'" || true - fi - fi - - apply_worker_delay - done - - # Check for failures - if [[ ${#failed_workers[@]} -gt 0 ]]; then - error "Failed to deploy to ${#failed_workers[@]} worker(s): ${failed_workers[*]}" + log "=== All Workers Deployed Successfully ===" + return 0 + else + error "=== Some workers failed to deploy ===" return 1 fi +} + +# Internal wrapper for parallel verify_worker_deployment execution +_verify_worker_deployment_by_index() { + local idx="$1" - log "" - log "=== All Workers Deployed Successfully ===" - return 0 + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + log "Verifying worker $wid image..." + if ssh_exec "$user" "$host" "$port" "$DOCKER_PREFIX images | grep -q '${IMAGE_NAME_WORKER%:*}'"; then + log "✓ Worker $wid image verified: $IMAGE_NAME_WORKER" + return 0 + else + error "✗ Worker $wid image not found!" + return 1 + fi } +export -f _verify_worker_deployment_by_index # Verify deployment on all machines verify_deployment() { @@ -274,18 +296,11 @@ verify_deployment() { fi fi - # Verify workers + # Verify workers in parallel if [[ "$mode" == "all" ]] || [[ "$mode" == "workers" ]]; then - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - log "Verifying worker $wid image..." - if ssh_exec "$user" "$host" "$port" "$DOCKER_PREFIX images | grep -q '${IMAGE_NAME_WORKER%:*}'"; then - log "✓ Worker $wid image verified: $IMAGE_NAME_WORKER" - else - error "✗ Worker $wid image not found!" - ((failures++)) - fi - done + if ! run_parallel_workers _verify_worker_deployment_by_index; then + ((failures++)) + fi fi if [[ $failures -eq 0 ]]; then diff --git a/scripts/docker-multi-fast-retry.sh b/scripts/docker-multi-fast-retry.sh new file mode 100755 index 0000000..9acd045 --- /dev/null +++ b/scripts/docker-multi-fast-retry.sh @@ -0,0 +1,168 @@ +#!/usr/bin/env bash +# ============================================================================= +# Docker Multi-Machine Fast Retry Script +# ============================================================================= +# Fast retry for proving failures - no log saving, no CHUNK_SIZE changes +# This script: +# 1. Force kills all containers with verification +# 2. Waits for cleanup +# 3. Restarts all containers with existing configuration +# +# Unlike docker-multi-retry.sh, this script: +# - Does NOT save logs (faster) +# - Does NOT modify CHUNK_SIZE (keeps current value) +# - Minimizes downtime for quick recovery +# ============================================================================= + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +source "${SCRIPT_DIR}/docker-common.sh" + +usage() { + cat <