From 6eadbc01fddb847551db5916a113f972702a8c07 Mon Sep 17 00:00:00 2001 From: zktommy Date: Wed, 10 Dec 2025 14:48:29 +0800 Subject: [PATCH 1/5] feat: implement parallel execution helpers for worker management - Added functions to run worker operations in parallel, improving efficiency for starting, stopping, cleaning up, and verifying worker containers. - Updated existing worker management functions to utilize parallel execution, enhancing performance during bulk operations. - Introduced internal wrappers for parallel execution of various worker tasks, including deployment, environment variable updates, and log saving. --- scripts/docker-common.sh | 463 ++++++++++++++++++----- scripts/docker-multi-control.sh | 40 +- scripts/docker-multi-deploy.sh | 109 +++--- scripts/docker-multi-reset-chunk-size.sh | 28 +- scripts/docker-multi-retry.sh | 56 ++- 5 files changed, 523 insertions(+), 173 deletions(-) diff --git a/scripts/docker-common.sh b/scripts/docker-common.sh index 69ba7b7..bbf3f58 100755 --- a/scripts/docker-common.sh +++ b/scripts/docker-common.sh @@ -277,6 +277,152 @@ validate_config() { return $errors } +# ============================================================================= +# PARALLEL EXECUTION HELPERS +# ============================================================================= + +# Run a function in parallel for all workers +# Usage: run_parallel_workers [extra_args...] +# The function will be called with worker index as first arg, then extra_args +# Each worker operation runs in background, then we wait for all to complete +run_parallel_workers() { + local func="$1" + shift + local pids=() + local temp_dirs=() + + # Create temp directory for each worker's output to avoid interleaving + local base_temp_dir=$(mktemp -d) + + log "Running ${#WORKERS[@]} worker operations in parallel..." + + # Launch all worker operations in background + for i in "${!WORKERS[@]}"; do + local worker_spec="${WORKERS[$i]}" + read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + # Create temp file for this worker's output + local worker_temp="${base_temp_dir}/worker_${i}.log" + temp_dirs+=("$worker_temp") + + # Capture wid for use in subshell + local worker_id="$wid" + + # Run function in background, redirecting output to temp file + ( + "$func" "$i" "$@" 2>&1 | while IFS= read -r line; do + echo "[Worker $worker_id] $line" + done > "$worker_temp" + ) & + pids+=($!) + done + + # Wait for all background jobs to complete + local failures=0 + local completed=0 + for i in "${!pids[@]}"; do + local pid="${pids[$i]}" + if wait "$pid"; then + ((completed++)) + else + ((failures++)) + fi + + # Output worker's log file (preserves ordering and prevents interleaving) + if [[ -f "${temp_dirs[$i]}" ]]; then + cat "${temp_dirs[$i]}" + rm -f "${temp_dirs[$i]}" + fi + done + + # Clean up temp directory + rm -rf "$base_temp_dir" 2>/dev/null || true + + if [[ $failures -eq 0 ]]; then + log "All ${completed} worker operations completed successfully" + return 0 + else + error "$failures worker operation(s) failed out of ${#WORKERS[@]}" + return 1 + fi +} + +# Run aggregator and all workers in parallel (when order doesn't matter) +# Usage: run_parallel_all [extra_args...] +run_parallel_all() { + local agg_func="$1" + local worker_func="$2" + shift 2 + + log "Running aggregator and ${#WORKERS[@]} workers in parallel..." + + local pids=() + local agg_temp=$(mktemp) + local base_temp_dir=$(mktemp -d) + local worker_temps=() + + # Start aggregator in background + ( + "$agg_func" "$@" 2>&1 | while IFS= read -r line; do + echo "[Aggregator] $line" + done > "$agg_temp" + ) & + local agg_pid=$! + pids+=($agg_pid) + + # Start all workers in background + for i in "${!WORKERS[@]}"; do + local worker_spec="${WORKERS[$i]}" + read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + local worker_temp="${base_temp_dir}/worker_${i}.log" + worker_temps+=("$worker_temp") + + # Capture wid for use in subshell + local worker_id="$wid" + + ( + "$worker_func" "$i" "$@" 2>&1 | while IFS= read -r line; do + echo "[Worker $worker_id] $line" + done > "$worker_temp" + ) & + pids+=($!) + done + + # Wait for all to complete + local failures=0 + for pid in "${pids[@]}"; do + if ! wait "$pid"; then + ((failures++)) + fi + done + + # Output aggregator log first + if [[ -f "$agg_temp" ]]; then + cat "$agg_temp" + rm -f "$agg_temp" + fi + + # Output worker logs + for worker_temp in "${worker_temps[@]}"; do + if [[ -f "$worker_temp" ]]; then + cat "$worker_temp" + rm -f "$worker_temp" + fi + done + + # Clean up + rm -rf "$base_temp_dir" 2>/dev/null || true + + if [[ $failures -eq 0 ]]; then + log "All operations completed successfully" + return 0 + else + error "$failures operation(s) failed" + return 1 + fi +} + # ============================================================================= # CONTAINER MANAGEMENT FUNCTIONS # ============================================================================= @@ -635,6 +781,17 @@ stop_worker() { stop_and_remove_container "$host" "$user" "$CONTAINER_NAME_WORKER" "$port" } +# Internal wrapper for parallel stop_worker execution +_stop_worker_by_index() { + local idx="$1" + local save_logs="${2:-false}" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + stop_worker "$host" "$user" "$port" "$wid" "$save_logs" "$remote_dir" +} + # Start a single worker container start_worker() { local host="$1" @@ -656,6 +813,17 @@ start_worker() { log "Worker $wid started successfully" } +# Internal wrapper for parallel start_worker execution +_start_worker_by_index() { + local idx="$1" + local env_file="${2:-$ENV_FILE_WORKER}" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + start_worker "$host" "$user" "$port" "$wid" "$remote_dir" "$cpuset_cpus" "$cpuset_mems" "$env_file" +} + # Cleanup a single worker container cleanup_worker() { local host="$1" @@ -667,32 +835,42 @@ cleanup_worker() { force_kill_container "$host" "$user" "$CONTAINER_NAME_WORKER" "$port" } +# Internal wrapper for parallel cleanup_worker execution +_cleanup_worker_by_index() { + local idx="$1" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + cleanup_worker "$host" "$user" "$port" "$wid" +} + # Stop all worker containers stop_all_workers() { local save_logs="${1:-false}" log "Stopping all ${#WORKERS[@]} workers..." - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - stop_worker "$host" "$user" "$port" "$wid" "$save_logs" "$remote_dir" - apply_worker_delay - done - - log "All workers stopped" + if run_parallel_workers _stop_worker_by_index "$save_logs"; then + log "All workers stopped" + return 0 + else + error "Some workers failed to stop" + return 1 + fi } # Start all worker containers start_all_workers() { log "Starting all ${#WORKERS[@]} workers..." - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - start_worker "$host" "$user" "$port" "$wid" "$remote_dir" "$cpuset_cpus" "$cpuset_mems" - apply_worker_delay - done - - log "All workers started" + if run_parallel_workers _start_worker_by_index; then + log "All workers started" + return 0 + else + error "Some workers failed to start" + return 1 + fi } # Get status of all worker containers @@ -714,62 +892,70 @@ start_all_workers() { # done # } +# Internal wrapper for parallel get_worker_status execution +_get_worker_status_by_index() { + local idx="$1" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + log "Worker $wid status on ${user}@${host}:" + + # Step 1: Get running containers + local running_containers + running_containers=$(ssh_exec "$user" "$host" "$port" "sudo docker ps --format '{{.Names}}'") + + if [[ $? -ne 0 ]]; then + error "Failed to connect to worker $wid at ${user}@${host}" + echo " Status: CONNECTION FAILED" + return 1 + fi + + echo " [✓] SSH connected" + + # Step 2: Check if container is running (locally) + if echo "$running_containers" | grep -q '^pico-subblock-worker$'; then + echo " Status: RUNNING" + ssh_exec "$user" "$host" "$port" "sudo docker ps | grep pico-subblock-worker" + return 0 + fi + + # Step 3: Check if container exists but stopped + local all_containers + all_containers=$(ssh_exec "$user" "$host" "$port" "sudo docker ps -a --format '{{.Names}}'") + + if [[ $? -ne 0 ]]; then + error "Failed to connect to worker $wid at ${user}@${host}" + echo " Status: CONNECTION FAILED" + return 1 + fi + + if echo "$all_containers" | grep -q '^pico-subblock-worker$'; then + echo " Status: STOPPED (container exists but not running)" + ssh_exec "$user" "$host" "$port" "sudo docker ps -a | grep pico-subblock-worker" + return 0 + fi + + echo " Status: DOES NOT EXIST" + return 0 +} + # Get status of all worker containers get_all_worker_status() { - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - log "Worker $wid status on ${user}@${host}:" - - # Step 1: Get running containers - local running_containers - running_containers=$(ssh_exec "$user" "$host" "$port" "sudo docker ps --format '{{.Names}}'") - - if [[ $? -ne 0 ]]; then - error "Failed to connect to worker $wid at ${user}@${host}" - echo " Status: CONNECTION FAILED" - continue - fi - - echo " [✓] SSH connected" - - # Step 2: Check if container is running (locally) - if echo "$running_containers" | grep -q '^pico-subblock-worker$'; then - echo " Status: RUNNING" - ssh_exec "$user" "$host" "$port" "sudo docker ps | grep pico-subblock-worker" - continue - fi - - # Step 3: Check if container exists but stopped - local all_containers - all_containers=$(ssh_exec "$user" "$host" "$port" "sudo docker ps -a --format '{{.Names}}'") - - if [[ $? -ne 0 ]]; then - error "Failed to connect to worker $wid at ${user}@${host}" - echo " Status: CONNECTION FAILED" - continue - fi - - if echo "$all_containers" | grep -q '^pico-subblock-worker$'; then - echo " Status: STOPPED (container exists but not running)" - ssh_exec "$user" "$host" "$port" "sudo docker ps -a | grep pico-subblock-worker" - continue - fi - - echo " Status: DOES NOT EXIST" - done + run_parallel_workers _get_worker_status_by_index } # Cleanup all worker containers cleanup_all_workers() { log "Cleaning up all ${#WORKERS[@]} workers..." - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - cleanup_worker "$host" "$user" "$port" "$wid" - apply_worker_delay - done - - log "All workers cleaned up" + if run_parallel_workers _cleanup_worker_by_index; then + log "All workers cleaned up" + return 0 + else + error "Some workers failed to cleanup" + return 1 + fi } # Force kill a single worker container (immediate termination) @@ -784,24 +970,25 @@ force_kill_worker() { return $? } +# Internal wrapper for parallel force_kill_worker execution +_force_kill_worker_by_index() { + local idx="$1" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + force_kill_worker "$host" "$user" "$port" "$wid" +} + # Force kill all worker containers (immediate termination) force_kill_all_workers() { log "Force killing all ${#WORKERS[@]} workers..." - local failures=0 - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - if ! force_kill_worker "$host" "$user" "$port" "$wid"; then - ((failures++)) - fi - apply_worker_delay - done - - if [[ $failures -eq 0 ]]; then + if run_parallel_workers _force_kill_worker_by_index; then log "All workers force killed successfully" return 0 else - error "$failures worker(s) failed to be removed" + error "Some workers failed to be removed" return 1 fi } @@ -815,9 +1002,40 @@ stop_all() { local save_logs="${1:-true}" log "=== Stopping all containers ===" - stop_aggregator "$save_logs" - stop_all_workers "$save_logs" - log "=== All containers stopped ===" + + # Run aggregator and workers in parallel + local agg_pid + local workers_pid + + # Start aggregator stop in background + ( + stop_aggregator "$save_logs" 2>&1 | while IFS= read -r line; do + echo "[Aggregator] $line" + done + ) & + agg_pid=$! + + # Start workers stop in background + ( + stop_all_workers "$save_logs" 2>&1 | while IFS= read -r line; do + echo "$line" + done + ) & + workers_pid=$! + + # Wait for both to complete + local agg_result=0 + local workers_result=0 + wait "$agg_pid" || agg_result=$? + wait "$workers_pid" || workers_result=$? + + if [[ $agg_result -eq 0 ]] && [[ $workers_result -eq 0 ]]; then + log "=== All containers stopped ===" + return 0 + else + error "=== Some containers failed to stop ===" + return 1 + fi } # Start all containers (aggregator + workers) @@ -851,11 +1069,57 @@ show_all_status() { # Cleanup all containers (force remove without logs) cleanup_all() { log "=== Cleaning up all containers ===" - cleanup_aggregator - cleanup_all_workers + + # Run aggregator and workers in parallel + local agg_pid + local workers_pid + + # Start aggregator cleanup in background + ( + cleanup_aggregator 2>&1 | while IFS= read -r line; do + echo "[Aggregator] $line" + done + ) & + agg_pid=$! + + # Start workers cleanup in background + ( + cleanup_all_workers 2>&1 | while IFS= read -r line; do + echo "$line" + done + ) & + workers_pid=$! + + # Wait for both to complete + local agg_result=0 + local workers_result=0 + wait "$agg_pid" || agg_result=$? + wait "$workers_pid" || workers_result=$? + # Wait a moment to ensure cleanup completes sleep 2 - log "=== All containers cleaned up ===" + + if [[ $agg_result -eq 0 ]] && [[ $workers_result -eq 0 ]]; then + log "=== All containers cleaned up ===" + return 0 + else + error "=== Some containers failed to cleanup ===" + return 1 + fi +} + +# Internal wrapper for parallel verify_worker_gone execution +_verify_worker_gone_by_index() { + local idx="$1" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + if ! is_container_gone "$host" "$user" "$CONTAINER_NAME_WORKER" "$port"; then + error "Worker $wid container still exists on ${user}@${host}" + return 1 + fi + return 0 } # Verify all containers are completely removed @@ -870,14 +1134,10 @@ verify_all_containers_gone() { ((failures++)) fi - # Check workers - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - if ! is_container_gone "$host" "$user" "$CONTAINER_NAME_WORKER" "$port"; then - error "Worker $wid container still exists on ${user}@${host}" - ((failures++)) - fi - done + # Check workers in parallel + if ! run_parallel_workers _verify_worker_gone_by_index; then + ((failures++)) + fi if [[ $failures -eq 0 ]]; then log "All containers verified as removed" @@ -892,11 +1152,31 @@ verify_all_containers_gone() { force_kill_all() { log "=== Force killing all containers ===" + # Run aggregator and workers in parallel + local agg_pid + local workers_pid + + # Start aggregator force kill in background + ( + force_kill_aggregator 2>&1 | while IFS= read -r line; do + echo "[Aggregator] $line" + done + ) & + agg_pid=$! + + # Start workers force kill in background + ( + force_kill_all_workers 2>&1 | while IFS= read -r line; do + echo "$line" + done + ) & + workers_pid=$! + + # Wait for both to complete local agg_result=0 local workers_result=0 - - force_kill_aggregator || agg_result=$? - force_kill_all_workers || workers_result=$? + wait "$agg_pid" || agg_result=$? + wait "$workers_pid" || workers_result=$? # Wait a moment to ensure kill completes sleep 2 @@ -960,6 +1240,9 @@ export -f init_all_ssh_connections close_all_ssh_connections # Worker management functions export -f get_worker parse_worker_spec build_worker_lists apply_worker_delay +# Parallel execution helpers +export -f run_parallel_workers run_parallel_all + # Configuration validation export -f validate_config diff --git a/scripts/docker-multi-control.sh b/scripts/docker-multi-control.sh index 6516426..7a9cc1a 100755 --- a/scripts/docker-multi-control.sh +++ b/scripts/docker-multi-control.sh @@ -167,12 +167,16 @@ EOF case "$mode" in all|workers) - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + # Internal wrapper for parallel remove_image execution + _remove_worker_image_by_index() { + local idx="$1" + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" log "Removing worker image on ${user}@${host} (worker $wid)..." remove_image_with_dependencies "$host" "$user" "$port" "$CONTAINER_NAME_WORKER" "$IMAGE_NAME_WORKER" || true - apply_worker_delay - done + } + + run_parallel_workers _remove_worker_image_by_index || true ;; esac @@ -263,16 +267,28 @@ cmd_save_logs() { log "=== Saving All Logs ===" - # Save aggregator logs - local agg_log="${AGG_REMOTE_DIR}/${LOGS_DIR}/aggregator-manual-${timestamp}.log" - save_container_logs "$AGG_HOST" "$AGG_USER" "$CONTAINER_NAME_AGGREGATOR" "$agg_log" "$AGG_PORT" || true - - # Save worker logs - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + # Internal wrapper for parallel save_worker_logs execution + _save_worker_logs_by_index() { + local idx="$1" + local timestamp="$2" + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" local worker_log="${remote_dir}/${LOGS_DIR}/subblock-${wid}-manual-${timestamp}.log" save_container_logs "$host" "$user" "$CONTAINER_NAME_WORKER" "$worker_log" "$port" || true - done + } + + # Save aggregator logs in background + local agg_log="${AGG_REMOTE_DIR}/${LOGS_DIR}/aggregator-manual-${timestamp}.log" + ( + save_container_logs "$AGG_HOST" "$AGG_USER" "$CONTAINER_NAME_AGGREGATOR" "$agg_log" "$AGG_PORT" || true + ) & + local agg_pid=$! + + # Save worker logs in parallel + run_parallel_workers _save_worker_logs_by_index "$timestamp" || true + + # Wait for aggregator log save + wait "$agg_pid" || true log "=== Logs Saved ===" } diff --git a/scripts/docker-multi-deploy.sh b/scripts/docker-multi-deploy.sh index 64bd905..e682fd3 100755 --- a/scripts/docker-multi-deploy.sh +++ b/scripts/docker-multi-deploy.sh @@ -199,6 +199,41 @@ deploy_aggregator() { return 0 } +# Internal wrapper for parallel deploy_worker execution +_deploy_worker_by_index() { + local idx="$1" + local worker_image="$2" + local skip_cleanup="$3" + local keep_tar="$4" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + log "Deploying to worker $wid..." + + if ! deploy_image_to_machine \ + "$host" \ + "$user" \ + "$worker_image" \ + "$remote_dir" \ + "$CONTAINER_NAME_WORKER" \ + "$IMAGE_NAME_WORKER" \ + "worker $wid" \ + "$port"; then + error "Worker $wid deployment failed" + return 1 + else + # Remove tar file if requested + if [[ "$keep_tar" != "true" ]]; then + local image_filename=$(basename "$worker_image") + log "Removing tar file from worker $wid..." + ssh_exec "$user" "$host" "$port" "rm -f '${remote_dir}/${image_filename}'" || true + fi + fi + + return 0 +} + # Deploy to all workers deploy_workers() { local worker_image="$1" @@ -212,47 +247,32 @@ deploy_workers() { return 1 fi - local failed_workers=() - - # Deploy to each worker - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - + # Deploy to all workers in parallel + if run_parallel_workers _deploy_worker_by_index "$worker_image" "$skip_cleanup" "$keep_tar"; then log "" - log "Deploying to worker $wid..." - - if ! deploy_image_to_machine \ - "$host" \ - "$user" \ - "$worker_image" \ - "$remote_dir" \ - "$CONTAINER_NAME_WORKER" \ - "$IMAGE_NAME_WORKER" \ - "worker $wid" \ - "$port"; then - error "Worker $wid deployment failed" - failed_workers+=("$wid") - else - # Remove tar file if requested - if [[ "$keep_tar" != "true" ]]; then - local image_filename=$(basename "$worker_image") - log "Removing tar file from worker $wid..." - ssh_exec "$user" "$host" "$port" "rm -f '${remote_dir}/${image_filename}'" || true - fi - fi - - apply_worker_delay - done - - # Check for failures - if [[ ${#failed_workers[@]} -gt 0 ]]; then - error "Failed to deploy to ${#failed_workers[@]} worker(s): ${failed_workers[*]}" + log "=== All Workers Deployed Successfully ===" + return 0 + else + error "=== Some workers failed to deploy ===" return 1 fi +} + +# Internal wrapper for parallel verify_worker_deployment execution +_verify_worker_deployment_by_index() { + local idx="$1" - log "" - log "=== All Workers Deployed Successfully ===" - return 0 + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + log "Verifying worker $wid image..." + if ssh_exec "$user" "$host" "$port" "$DOCKER_PREFIX images | grep -q '${IMAGE_NAME_WORKER%:*}'"; then + log "✓ Worker $wid image verified: $IMAGE_NAME_WORKER" + return 0 + else + error "✗ Worker $wid image not found!" + return 1 + fi } # Verify deployment on all machines @@ -274,18 +294,11 @@ verify_deployment() { fi fi - # Verify workers + # Verify workers in parallel if [[ "$mode" == "all" ]] || [[ "$mode" == "workers" ]]; then - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - log "Verifying worker $wid image..." - if ssh_exec "$user" "$host" "$port" "$DOCKER_PREFIX images | grep -q '${IMAGE_NAME_WORKER%:*}'"; then - log "✓ Worker $wid image verified: $IMAGE_NAME_WORKER" - else - error "✗ Worker $wid image not found!" - ((failures++)) - fi - done + if ! run_parallel_workers _verify_worker_deployment_by_index; then + ((failures++)) + fi fi if [[ $failures -eq 0 ]]; then diff --git a/scripts/docker-multi-reset-chunk-size.sh b/scripts/docker-multi-reset-chunk-size.sh index e2051ac..1234a78 100755 --- a/scripts/docker-multi-reset-chunk-size.sh +++ b/scripts/docker-multi-reset-chunk-size.sh @@ -66,6 +66,18 @@ reset_env_chunk_size() { " } +# Internal wrapper for parallel reset_env_chunk_size execution +_reset_worker_env_by_index() { + local idx="$1" + local chunk_size="$2" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + local worker_env="${remote_dir}/${ENV_FILE_WORKER}" + reset_env_chunk_size "$host" "$user" "$worker_env" "$chunk_size" "$port" +} + reset_all_env_files() { local chunk_size="$1" @@ -75,14 +87,14 @@ reset_all_env_files() { local agg_env="${AGG_REMOTE_DIR}/${ENV_FILE_AGGREGATOR}" reset_env_chunk_size "$AGG_HOST" "$AGG_USER" "$agg_env" "$chunk_size" "$AGG_PORT" - # Update worker envs - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - local worker_env="${remote_dir}/${ENV_FILE_WORKER}" - reset_env_chunk_size "$host" "$user" "$worker_env" "$chunk_size" "$port" - done - - log "All .env files reset" + # Update worker envs in parallel + if run_parallel_workers _reset_worker_env_by_index "$chunk_size"; then + log "All .env files reset" + return 0 + else + error "Some worker .env files failed to reset" + return 1 + fi } main() { diff --git a/scripts/docker-multi-retry.sh b/scripts/docker-multi-retry.sh index 6363cb0..da355cc 100755 --- a/scripts/docker-multi-retry.sh +++ b/scripts/docker-multi-retry.sh @@ -93,6 +93,18 @@ update_env_chunk_size() { " } +# Internal wrapper for parallel update_env_chunk_size execution +_update_worker_env_by_index() { + local idx="$1" + local chunk_size="$2" + + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + + local worker_env="${remote_dir}/${ENV_FILE_WORKER}" + update_env_chunk_size "$host" "$user" "$worker_env" "$chunk_size" "$port" +} + update_all_env_files() { local chunk_size="$1" @@ -102,14 +114,26 @@ update_all_env_files() { local agg_env="${AGG_REMOTE_DIR}/${ENV_FILE_AGGREGATOR}" update_env_chunk_size "$AGG_HOST" "$AGG_USER" "$agg_env" "$chunk_size" "$AGG_PORT" - # Update worker envs - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - local worker_env="${remote_dir}/${ENV_FILE_WORKER}" - update_env_chunk_size "$host" "$user" "$worker_env" "$chunk_size" "$port" - done + # Update worker envs in parallel + if run_parallel_workers _update_worker_env_by_index "$chunk_size"; then + log "All .env files updated" + return 0 + else + error "Some worker .env files failed to update" + return 1 + fi +} + +# Internal wrapper for parallel save_worker_logs execution +_save_worker_logs_by_index() { + local idx="$1" + local timestamp="$2" + local log_prefix="$3" - log "All .env files updated" + local worker_spec="${WORKERS[$idx]}" + read -r host user port wid idx_val remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + local worker_log="${remote_dir}/${LOGS_DIR}/subblock-${wid}-${log_prefix}-${timestamp}.log" + save_container_logs "$host" "$user" "$CONTAINER_NAME_WORKER" "$worker_log" "$port" || true } save_all_logs() { @@ -118,16 +142,18 @@ save_all_logs() { log "Saving logs from failed run..." - # Save aggregator logs + # Save aggregator logs in background local agg_log="${AGG_REMOTE_DIR}/${LOGS_DIR}/aggregator-${log_prefix}-${timestamp}.log" - save_container_logs "$AGG_HOST" "$AGG_USER" "$CONTAINER_NAME_AGGREGATOR" "$agg_log" "$AGG_PORT" || true + ( + save_container_logs "$AGG_HOST" "$AGG_USER" "$CONTAINER_NAME_AGGREGATOR" "$agg_log" "$AGG_PORT" || true + ) & + local agg_pid=$! - # Optionally save worker logs - for worker_spec in "${WORKERS[@]}"; do - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" - local worker_log="${remote_dir}/${LOGS_DIR}/subblock-${wid}-${log_prefix}-${timestamp}.log" - save_container_logs "$host" "$user" "$CONTAINER_NAME_WORKER" "$worker_log" "$port" || true - done + # Save worker logs in parallel + run_parallel_workers _save_worker_logs_by_index "$timestamp" "$log_prefix" || true + + # Wait for aggregator log save + wait "$agg_pid" || true } main() { From 810d20efd463b27d8397a0e3e4a0147aea146779 Mon Sep 17 00:00:00 2001 From: zktommy Date: Wed, 10 Dec 2025 16:04:13 +0800 Subject: [PATCH 2/5] feat: add fast-retry feature and configurable proving timeout - Introduced a new `fast-retry` feature that enables quicker retries without log saving or changes to CHUNK_SIZE, allowing for faster recovery from proving failures. - Added a `proving_timeout_seconds` configuration option to set custom timeout durations for proving operations, defaulting to 30 seconds for fast-retry and 120 seconds otherwise. - Updated the proving client logic to handle the new timeout settings and retry mechanisms, improving overall efficiency and error handling during proving operations. - Enhanced Docker control scripts to support the new fast-retry functionality. --- bin/eth-proofs/Cargo.toml | 4 + bin/eth-proofs/src/main.rs | 16 ++ crates/proving-client/Cargo.toml | 4 + crates/proving-client/src/client.rs | 232 ++++++++++++++++++++-------- crates/proving-client/src/config.rs | 3 + scripts/docker-multi-control.sh | 13 +- scripts/docker-multi-fast-retry.sh | 168 ++++++++++++++++++++ 7 files changed, 378 insertions(+), 62 deletions(-) create mode 100644 scripts/docker-multi-fast-retry.sh diff --git a/bin/eth-proofs/Cargo.toml b/bin/eth-proofs/Cargo.toml index aee9fd6..b0e36ac 100644 --- a/bin/eth-proofs/Cargo.toml +++ b/bin/eth-proofs/Cargo.toml @@ -40,3 +40,7 @@ latest-execution-witness = ["fetcher/latest-execution-witness"] # skip-pending-on-retry feature skips pending blocks during retry attempts # this is useful when proving the latest blocks if a block failed to prove and triggered a retry skip-pending-on-retry = ["proving-client/skip-pending-on-retry"] + +# fast-retry feature enables faster retry with no log saving, no CHUNK_SIZE change, +# and always skips replaying failed blocks (resumes from latest) +fast-retry = ["proving-client/fast-retry"] diff --git a/bin/eth-proofs/src/main.rs b/bin/eth-proofs/src/main.rs index e0b6c55..be752ab 100644 --- a/bin/eth-proofs/src/main.rs +++ b/bin/eth-proofs/src/main.rs @@ -135,6 +135,13 @@ struct Args { )] pub proving_subblock_urls: Option>, + #[clap( + long, + env = "PROVING_TIMEOUT_SECONDS", + help = "Timeout for proving operations in seconds (default: 30 for fast-retry, 120 otherwise)" + )] + pub proving_timeout_seconds: Option, + #[clap( long, default_value = "false", @@ -302,6 +309,14 @@ fn init_proving_client(args: &Args) -> (Arc, Arc (Arc, Arc = None; // variable for saving the last proving inputs (for retry on timeout) @@ -78,7 +81,7 @@ impl ProvingClient { loop { // try to receive a proving or proved message with a timeout let msg = timeout( - Duration::from_secs(MAX_PROVING_WAITING_SECONDS), + Duration::from_secs(max_proving_waiting_seconds), self.comm_endpoint.recv(), ) .await; @@ -99,47 +102,77 @@ impl ProvingClient { .as_millis() as u64; let block_number = report.block_number; if current_time - start_timestamps[&block_number] - >= MAX_PROVING_WAITING_SECONDS * 1000 + >= max_proving_waiting_seconds * 1000 { warn!( "proving-client: proving timeout for block {block_number}, attempting to restart docker containers and retry", ); - restart_proving_clients().await; - #[cfg(feature = "skip-pending-on-retry")] - pending_msgs.clear(); + #[cfg(feature = "fast-retry")] + { + restart_proving_clients_fast().await; - // re-initialize aggregator and subblock clients - info!( - "proving-client: reinitializing aggregator and subblock clients" - ); - agg_client = self.init_agg_proving_client(&token).await; - subblock_clients = self.init_subblock_proving_clients(&token).await; - - // re-send the last proving inputs to retry the failed block - if let Some(ref inputs) = last_proving_inputs { + // re-initialize aggregator and subblock clients info!( - "proving-client: resending proving inputs for block {}", + "proving-client: reinitializing aggregator and subblock clients" + ); + agg_client = self.init_agg_proving_client(&token).await; + subblock_clients = + self.init_subblock_proving_clients(&token).await; + + // fast-retry: clear pending messages and skip replay + pending_msgs.clear(); + start_timestamps.remove(&block_number); + proving_block_report = None; + last_proving_inputs = None; + info!( + "proving-client: fast retry complete, skipping block {} and resuming from latest", block_number ); - let start_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() - as u64; - start_timestamps.insert(block_number, start_time); - send_proving_inputs( - inputs.clone(), - &mut agg_client, - &mut subblock_clients, - ) - .await; + } + + #[cfg(not(feature = "fast-retry"))] + { + restart_proving_clients().await; + + #[cfg(feature = "skip-pending-on-retry")] + pending_msgs.clear(); + + // re-initialize aggregator and subblock clients info!( - "proving-client: proving inputs resent, continuing to wait for proof" + "proving-client: reinitializing aggregator and subblock clients" ); - } else { - error!("proving-client: no proving inputs saved for retry"); - panic!("proving-client: cannot retry without proving inputs"); + agg_client = self.init_agg_proving_client(&token).await; + subblock_clients = + self.init_subblock_proving_clients(&token).await; + + // re-send the last proving inputs to retry the failed block + if let Some(ref inputs) = last_proving_inputs { + info!( + "proving-client: resending proving inputs for block {}", + block_number + ); + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + as u64; + start_timestamps.insert(block_number, start_time); + send_proving_inputs( + inputs.clone(), + &mut agg_client, + &mut subblock_clients, + ) + .await; + info!( + "proving-client: proving inputs resent, continuing to wait for proof" + ); + } else { + error!("proving-client: no proving inputs saved for retry"); + panic!( + "proving-client: cannot retry without proving inputs" + ); + } } } } else { @@ -268,40 +301,68 @@ impl ProvingClient { warn!( "proving-client: proving timeout for block {block_number}, attempting to restart docker containers and retry", ); - restart_proving_clients().await; - #[cfg(feature = "skip-pending-on-retry")] - pending_msgs.clear(); + #[cfg(feature = "fast-retry")] + { + restart_proving_clients_fast().await; - // re-initialize aggregator and subblock clients - info!("proving-client: reinitializing aggregator and subblock clients"); - agg_client = self.init_agg_proving_client(&token).await; - subblock_clients = self.init_subblock_proving_clients(&token).await; + // re-initialize aggregator and subblock clients + info!( + "proving-client: reinitializing aggregator and subblock clients" + ); + agg_client = self.init_agg_proving_client(&token).await; + subblock_clients = self.init_subblock_proving_clients(&token).await; - // re-send the last proving inputs to retry the failed block - if let Some(ref inputs) = last_proving_inputs { + // fast-retry: clear pending messages and skip replay + pending_msgs.clear(); + start_timestamps.remove(&block_number); + proving_block_report = None; + last_proving_inputs = None; info!( - "proving-client: resending proving inputs for block {}", + "proving-client: fast retry complete, skipping block {} and resuming from latest", block_number ); - let start_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() - as u64; - start_timestamps.insert(block_number, start_time); - send_proving_inputs( - inputs.clone(), - &mut agg_client, - &mut subblock_clients, - ) - .await; + } + + #[cfg(not(feature = "fast-retry"))] + { + restart_proving_clients().await; + + #[cfg(feature = "skip-pending-on-retry")] + pending_msgs.clear(); + + // re-initialize aggregator and subblock clients info!( - "proving-client: proving inputs resent, continuing to wait for proof" + "proving-client: reinitializing aggregator and subblock clients" ); - } else { - error!("proving-client: no proving inputs saved for retry"); - panic!("proving-client: cannot retry without proving inputs"); + agg_client = self.init_agg_proving_client(&token).await; + subblock_clients = self.init_subblock_proving_clients(&token).await; + + // re-send the last proving inputs to retry the failed block + if let Some(ref inputs) = last_proving_inputs { + info!( + "proving-client: resending proving inputs for block {}", + block_number + ); + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + as u64; + start_timestamps.insert(block_number, start_time); + send_proving_inputs( + inputs.clone(), + &mut agg_client, + &mut subblock_clients, + ) + .await; + info!( + "proving-client: proving inputs resent, continuing to wait for proof" + ); + } else { + error!("proving-client: no proving inputs saved for retry"); + panic!("proving-client: cannot retry without proving inputs"); + } } } } @@ -531,7 +592,7 @@ async fn send_proving_inputs( } } -// restart proving clients +// restart proving clients (legacy retry with CHUNK_SIZE reduction) async fn restart_proving_clients() { // restart docker containers using the retry script let retry_result = Command::new("./scripts/docker-multi-control.sh") @@ -570,3 +631,52 @@ async fn restart_proving_clients() { ); sleep(Duration::from_secs(DOCKER_RETRY_WAIT_SECONDS)).await; } + +// fast restart proving clients (no log saving, no CHUNK_SIZE changes) +#[cfg(feature = "fast-retry")] +#[allow(dead_code)] +async fn restart_proving_clients_fast() { + // restart docker containers using the fast-retry script + let retry_result = Command::new("./scripts/docker-multi-control.sh") + .arg("fast-retry") + .status() + .await; + + match retry_result { + Ok(status) if status.success() => { + info!("proving-client: docker containers fast-restarted successfully"); + } + Ok(status) => { + error!( + "proving-client: docker fast-retry script failed with exit code: {:?}", + status.code() + ); + panic!( + "proving-client: cannot recover from docker restart failure - manual intervention required" + ); + } + Err(e) => { + error!( + "proving-client: failed to execute docker fast-retry script: {}", + e + ); + panic!( + "proving-client: cannot recover from docker restart failure - manual intervention required" + ); + } + } + + // wait for containers to fully initialize + info!( + "proving-client: waiting {}s for docker containers to initialize", + DOCKER_RETRY_WAIT_SECONDS + ); + sleep(Duration::from_secs(DOCKER_RETRY_WAIT_SECONDS)).await; +} + +// fallback: use legacy retry if fast-retry feature is not enabled +#[cfg(not(feature = "fast-retry"))] +#[allow(dead_code)] +async fn restart_proving_clients_fast() { + restart_proving_clients().await; +} diff --git a/crates/proving-client/src/config.rs b/crates/proving-client/src/config.rs index e3be13a..7597332 100644 --- a/crates/proving-client/src/config.rs +++ b/crates/proving-client/src/config.rs @@ -12,4 +12,7 @@ pub struct ProvingClientConfig { // subbblock proving grpc urls pub subblock_urls: Vec, + + // proving timeout in seconds + pub proving_timeout_seconds: u64, } diff --git a/scripts/docker-multi-control.sh b/scripts/docker-multi-control.sh index 7a9cc1a..4dc1177 100755 --- a/scripts/docker-multi-control.sh +++ b/scripts/docker-multi-control.sh @@ -24,6 +24,7 @@ Commands: force-kill Force kill all containers (immediate) restart Restart all containers retry Retry with smaller CHUNK_SIZE (force kill mode) + fast-retry Fast retry without logs or CHUNK_SIZE changes reset-chunk-size Reset CHUNK_SIZE to normal value status Show container status logs Show container logs @@ -58,9 +59,12 @@ Examples: # Restart only aggregator $0 restart --agg-only - # Retry failed block + # Retry failed block (with CHUNK_SIZE reduction) $0 retry --chunk-size 2097152 + # Fast retry without logs or CHUNK_SIZE changes + $0 fast-retry + # Reset CHUNK_SIZE to normal $0 reset-chunk-size @@ -199,6 +203,10 @@ cmd_retry() { "${SCRIPT_DIR}/docker-multi-retry.sh" "$@" } +cmd_fast_retry() { + "${SCRIPT_DIR}/docker-multi-fast-retry.sh" "$@" +} + cmd_reset_chunk_size() { "${SCRIPT_DIR}/docker-multi-reset-chunk-size.sh" "$@" } @@ -446,6 +454,9 @@ main() { retry) cmd_retry "$@" ;; + fast-retry) + cmd_fast_retry "$@" + ;; reset-chunk-size) cmd_reset_chunk_size "$@" ;; diff --git a/scripts/docker-multi-fast-retry.sh b/scripts/docker-multi-fast-retry.sh new file mode 100644 index 0000000..9acd045 --- /dev/null +++ b/scripts/docker-multi-fast-retry.sh @@ -0,0 +1,168 @@ +#!/usr/bin/env bash +# ============================================================================= +# Docker Multi-Machine Fast Retry Script +# ============================================================================= +# Fast retry for proving failures - no log saving, no CHUNK_SIZE changes +# This script: +# 1. Force kills all containers with verification +# 2. Waits for cleanup +# 3. Restarts all containers with existing configuration +# +# Unlike docker-multi-retry.sh, this script: +# - Does NOT save logs (faster) +# - Does NOT modify CHUNK_SIZE (keeps current value) +# - Minimizes downtime for quick recovery +# ============================================================================= + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +source "${SCRIPT_DIR}/docker-common.sh" + +usage() { + cat < Date: Wed, 10 Dec 2025 17:17:21 +0800 Subject: [PATCH 3/5] refactor: enhance parallel worker execution and error handling - Improved the `run_parallel_workers` and `run_parallel_all` functions to include better error handling and logging for worker operations. - Added checks for successful creation of temporary directories and files, ensuring robust execution. - Updated output handling to include messages for workers with no output, enhancing clarity in logs. - Exported additional internal worker functions for better modularity and reusability across scripts. --- scripts/docker-common.sh | 146 ++++++++++++++++------- scripts/docker-multi-control.sh | 2 + scripts/docker-multi-deploy.sh | 2 + scripts/docker-multi-reset-chunk-size.sh | 1 + scripts/docker-multi-retry.sh | 2 + 5 files changed, 111 insertions(+), 42 deletions(-) diff --git a/scripts/docker-common.sh b/scripts/docker-common.sh index bbf3f58..a877679 100755 --- a/scripts/docker-common.sh +++ b/scripts/docker-common.sh @@ -288,51 +288,77 @@ validate_config() { run_parallel_workers() { local func="$1" shift + local extra_args=("$@") local pids=() - local temp_dirs=() + local temp_files=() # Create temp directory for each worker's output to avoid interleaving - local base_temp_dir=$(mktemp -d) + local base_temp_dir + base_temp_dir=$(mktemp -d) || { + error "Failed to create temp directory" + return 1 + } log "Running ${#WORKERS[@]} worker operations in parallel..." # Launch all worker operations in background for i in "${!WORKERS[@]}"; do local worker_spec="${WORKERS[$i]}" - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" || { + error "Failed to parse worker spec for index $i" + continue + } # Create temp file for this worker's output local worker_temp="${base_temp_dir}/worker_${i}.log" - temp_dirs+=("$worker_temp") + temp_files+=("$worker_temp") - # Capture wid for use in subshell - local worker_id="$wid" + # Run function in background with proper error handling + { + # Execute in a subshell that won't exit on errors + ( + set +e + # Call the function and capture all output + if [[ ${#extra_args[@]} -gt 0 ]]; then + result=$("$func" "$i" "${extra_args[@]}" 2>&1) + else + result=$("$func" "$i" 2>&1) + fi + # Prefix each line with worker ID + echo "$result" | while IFS= read -r line || [[ -n "$line" ]]; do + echo "[Worker $wid] $line" + done + ) + } > "$worker_temp" 2>&1 & - # Run function in background, redirecting output to temp file - ( - "$func" "$i" "$@" 2>&1 | while IFS= read -r line; do - echo "[Worker $worker_id] $line" - done > "$worker_temp" - ) & pids+=($!) done + # Verify we have PIDs to wait for + if [[ ${#pids[@]} -eq 0 ]]; then + error "No worker operations were started" + rm -rf "$base_temp_dir" 2>/dev/null || true + return 1 + fi + # Wait for all background jobs to complete local failures=0 local completed=0 for i in "${!pids[@]}"; do local pid="${pids[$i]}" - if wait "$pid"; then - ((completed++)) + if wait "$pid" 2>/dev/null; then + completed=$((completed + 1)) else - ((failures++)) + failures=$((failures + 1)) fi # Output worker's log file (preserves ordering and prevents interleaving) - if [[ -f "${temp_dirs[$i]}" ]]; then - cat "${temp_dirs[$i]}" - rm -f "${temp_dirs[$i]}" + if [[ -f "${temp_files[$i]}" ]] && [[ -s "${temp_files[$i]}" ]]; then + cat "${temp_files[$i]}" + elif [[ -f "${temp_files[$i]}" ]]; then + echo "[Worker ${i}] (no output)" fi + rm -f "${temp_files[$i]}" 2>/dev/null || true done # Clean up temp directory @@ -342,7 +368,7 @@ run_parallel_workers() { log "All ${completed} worker operations completed successfully" return 0 else - error "$failures worker operation(s) failed out of ${#WORKERS[@]}" + warn "$failures worker operation(s) failed out of ${#WORKERS[@]}" return 1 fi } @@ -353,62 +379,93 @@ run_parallel_all() { local agg_func="$1" local worker_func="$2" shift 2 + local extra_args=("$@") log "Running aggregator and ${#WORKERS[@]} workers in parallel..." local pids=() - local agg_temp=$(mktemp) - local base_temp_dir=$(mktemp -d) + local agg_temp + agg_temp=$(mktemp) || { + error "Failed to create temp file for aggregator" + return 1 + } + + local base_temp_dir + base_temp_dir=$(mktemp -d) || { + error "Failed to create temp directory" + rm -f "$agg_temp" + return 1 + } local worker_temps=() # Start aggregator in background - ( - "$agg_func" "$@" 2>&1 | while IFS= read -r line; do - echo "[Aggregator] $line" - done > "$agg_temp" - ) & + { + ( + set +e + if [[ ${#extra_args[@]} -gt 0 ]]; then + result=$("$agg_func" "${extra_args[@]}" 2>&1) + else + result=$("$agg_func" 2>&1) + fi + echo "$result" | while IFS= read -r line || [[ -n "$line" ]]; do + echo "[Aggregator] $line" + done + ) + } > "$agg_temp" 2>&1 & local agg_pid=$! pids+=($agg_pid) # Start all workers in background for i in "${!WORKERS[@]}"; do local worker_spec="${WORKERS[$i]}" - read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" + read -r host user port wid idx remote_dir cpuset_cpus cpuset_mems <<< "$worker_spec" || { + error "Failed to parse worker spec for index $i" + continue + } local worker_temp="${base_temp_dir}/worker_${i}.log" worker_temps+=("$worker_temp") - # Capture wid for use in subshell - local worker_id="$wid" - - ( - "$worker_func" "$i" "$@" 2>&1 | while IFS= read -r line; do - echo "[Worker $worker_id] $line" - done > "$worker_temp" - ) & + { + ( + set +e + if [[ ${#extra_args[@]} -gt 0 ]]; then + result=$("$worker_func" "$i" "${extra_args[@]}" 2>&1) + else + result=$("$worker_func" "$i" 2>&1) + fi + echo "$result" | while IFS= read -r line || [[ -n "$line" ]]; do + echo "[Worker $wid] $line" + done + ) + } > "$worker_temp" 2>&1 & pids+=($!) done # Wait for all to complete local failures=0 for pid in "${pids[@]}"; do - if ! wait "$pid"; then - ((failures++)) + if ! wait "$pid" 2>/dev/null; then + failures=$((failures + 1)) fi done # Output aggregator log first - if [[ -f "$agg_temp" ]]; then + if [[ -f "$agg_temp" ]] && [[ -s "$agg_temp" ]]; then cat "$agg_temp" - rm -f "$agg_temp" + elif [[ -f "$agg_temp" ]]; then + echo "[Aggregator] (no output)" fi + rm -f "$agg_temp" 2>/dev/null || true # Output worker logs for worker_temp in "${worker_temps[@]}"; do - if [[ -f "$worker_temp" ]]; then + if [[ -f "$worker_temp" ]] && [[ -s "$worker_temp" ]]; then cat "$worker_temp" - rm -f "$worker_temp" + elif [[ -f "$worker_temp" ]]; then + echo "[Worker] (no output)" fi + rm -f "$worker_temp" 2>/dev/null || true done # Clean up @@ -418,7 +475,7 @@ run_parallel_all() { log "All operations completed successfully" return 0 else - error "$failures operation(s) failed" + warn "$failures operation(s) failed" return 1 fi } @@ -1262,6 +1319,11 @@ export -f stop_worker start_worker cleanup_worker force_kill_worker export -f stop_all_workers start_all_workers get_all_worker_status export -f cleanup_all_workers force_kill_all_workers +# Internal worker wrapper functions (need to be exported for background jobs) +export -f _stop_worker_by_index _start_worker_by_index _cleanup_worker_by_index +export -f _force_kill_worker_by_index _get_worker_status_by_index +export -f _verify_worker_gone_by_index + # Combined functions export -f stop_all start_all restart_all show_all_status export -f cleanup_all force_kill_all diff --git a/scripts/docker-multi-control.sh b/scripts/docker-multi-control.sh index 4dc1177..814dad8 100755 --- a/scripts/docker-multi-control.sh +++ b/scripts/docker-multi-control.sh @@ -179,6 +179,7 @@ EOF log "Removing worker image on ${user}@${host} (worker $wid)..." remove_image_with_dependencies "$host" "$user" "$port" "$CONTAINER_NAME_WORKER" "$IMAGE_NAME_WORKER" || true } + export -f _remove_worker_image_by_index run_parallel_workers _remove_worker_image_by_index || true ;; @@ -284,6 +285,7 @@ cmd_save_logs() { local worker_log="${remote_dir}/${LOGS_DIR}/subblock-${wid}-manual-${timestamp}.log" save_container_logs "$host" "$user" "$CONTAINER_NAME_WORKER" "$worker_log" "$port" || true } + export -f _save_worker_logs_by_index # Save aggregator logs in background local agg_log="${AGG_REMOTE_DIR}/${LOGS_DIR}/aggregator-manual-${timestamp}.log" diff --git a/scripts/docker-multi-deploy.sh b/scripts/docker-multi-deploy.sh index e682fd3..11dd30b 100755 --- a/scripts/docker-multi-deploy.sh +++ b/scripts/docker-multi-deploy.sh @@ -233,6 +233,7 @@ _deploy_worker_by_index() { return 0 } +export -f _deploy_worker_by_index # Deploy to all workers deploy_workers() { @@ -274,6 +275,7 @@ _verify_worker_deployment_by_index() { return 1 fi } +export -f _verify_worker_deployment_by_index # Verify deployment on all machines verify_deployment() { diff --git a/scripts/docker-multi-reset-chunk-size.sh b/scripts/docker-multi-reset-chunk-size.sh index 1234a78..4579ffc 100755 --- a/scripts/docker-multi-reset-chunk-size.sh +++ b/scripts/docker-multi-reset-chunk-size.sh @@ -77,6 +77,7 @@ _reset_worker_env_by_index() { local worker_env="${remote_dir}/${ENV_FILE_WORKER}" reset_env_chunk_size "$host" "$user" "$worker_env" "$chunk_size" "$port" } +export -f _reset_worker_env_by_index reset_all_env_files() { local chunk_size="$1" diff --git a/scripts/docker-multi-retry.sh b/scripts/docker-multi-retry.sh index da355cc..1b30991 100755 --- a/scripts/docker-multi-retry.sh +++ b/scripts/docker-multi-retry.sh @@ -104,6 +104,7 @@ _update_worker_env_by_index() { local worker_env="${remote_dir}/${ENV_FILE_WORKER}" update_env_chunk_size "$host" "$user" "$worker_env" "$chunk_size" "$port" } +export -f _update_worker_env_by_index update_all_env_files() { local chunk_size="$1" @@ -135,6 +136,7 @@ _save_worker_logs_by_index() { local worker_log="${remote_dir}/${LOGS_DIR}/subblock-${wid}-${log_prefix}-${timestamp}.log" save_container_logs "$host" "$user" "$CONTAINER_NAME_WORKER" "$worker_log" "$port" || true } +export -f _save_worker_logs_by_index save_all_logs() { local timestamp=$(date +"$TIMESTAMP_FORMAT") From 00110cfc43941c666d6c5f3791f6036b2302f968 Mon Sep 17 00:00:00 2001 From: zktommy Date: Wed, 10 Dec 2025 18:12:15 +0800 Subject: [PATCH 4/5] fix: update permissions for docker-multi-fast-retry script --- scripts/docker-multi-fast-retry.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 scripts/docker-multi-fast-retry.sh diff --git a/scripts/docker-multi-fast-retry.sh b/scripts/docker-multi-fast-retry.sh old mode 100644 new mode 100755 From 3f2f912721d96e39c0271a8d51aeea31587613bf Mon Sep 17 00:00:00 2001 From: zktommy Date: Thu, 11 Dec 2025 12:32:02 +0800 Subject: [PATCH 5/5] refactor: resolve clippy issues --- crates/proving-client/src/client.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/proving-client/src/client.rs b/crates/proving-client/src/client.rs index f37c3aa..3789cdb 100644 --- a/crates/proving-client/src/client.rs +++ b/crates/proving-client/src/client.rs @@ -73,6 +73,7 @@ impl ProvingClient { // variable for saving the block number proving in progress let mut proving_block_report: Option = None; // variable for saving the last proving inputs (for retry on timeout) + #[cfg(not(feature = "fast-retry"))] let mut last_proving_inputs: Option = None; // queue for saving the pending messages when a block is proving let mut pending_msgs = VecDeque::new(); @@ -124,7 +125,6 @@ impl ProvingClient { pending_msgs.clear(); start_timestamps.remove(&block_number); proving_block_report = None; - last_proving_inputs = None; info!( "proving-client: fast retry complete, skipping block {} and resuming from latest", block_number @@ -204,7 +204,10 @@ impl ProvingClient { report.block_number, ); // save the proving inputs for potential retry on timeout - last_proving_inputs = Some(proving_msg.proving_inputs); + #[cfg(not(feature = "fast-retry"))] + { + last_proving_inputs = Some(proving_msg.proving_inputs); + } proving_block_report = Some(report); } } @@ -290,7 +293,10 @@ impl ProvingClient { report.block_number, ); // save the proving inputs for potential retry on timeout - last_proving_inputs = Some(proving_msg.proving_inputs); + #[cfg(not(feature = "fast-retry"))] + { + last_proving_inputs = Some(proving_msg.proving_inputs); + } proving_block_report = Some(report); } } @@ -317,7 +323,6 @@ impl ProvingClient { pending_msgs.clear(); start_timestamps.remove(&block_number); proving_block_report = None; - last_proving_inputs = None; info!( "proving-client: fast retry complete, skipping block {} and resuming from latest", block_number @@ -593,6 +598,7 @@ async fn send_proving_inputs( } // restart proving clients (legacy retry with CHUNK_SIZE reduction) +#[cfg(not(feature = "fast-retry"))] async fn restart_proving_clients() { // restart docker containers using the retry script let retry_result = Command::new("./scripts/docker-multi-control.sh")