From 570b13dafff2eb110e1f216cf168a359cc6fc255 Mon Sep 17 00:00:00 2001 From: Test User Date: Fri, 20 Mar 2026 10:10:33 +0100 Subject: [PATCH 1/4] fix(warm-pool): Wire dispatch path, fix stale GCS paths, fix poll scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes from honest review of v0.3.0 warm pool implementation: 1. DISPATCH WIRED: GCERunBackend.launch() now passes warm_pool_idle_timeout through gce_launcher to build_startup_script. register_instance() called after fresh launch when pool has capacity. Previously the entire dispatch path was dead code — enabling warm pool had no effect. 2. GCS PATHS FIXED: Idle loop now updates GCS_STDOUT_PATH, GCS_STDERR_PATH, GCS_EXIT_CODE_PATH from the new run's run_path. Previously all subsequent jobs would upload logs to the FIRST run's GCS location. 3. EXIT CODE FIXED: Uses direct gsutil cp to GCS instead of stale EXIT_CODE_FILE local path that pointed to the first run's gcsfuse mount. 4. GCS_BUCKET EXPORTED: Idle loop needs bucket name to construct paths for subsequent runs. Now exported before idle loop definition. 5. POLL SCAN FIXED: _get_run_handle no longer scans all running warm instances. Uses single-row lookup by backend_handle (O(1) not O(n)). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../cloud/adapters/gcp/gce_launcher.py | 3 ++ .../cloud/adapters/gcp/run_backend.py | 18 ++++++++ .../cloud/adapters/gcp/startup_builder.py | 28 +++++++++--- src/goldfish/jobs/_stage_executor_impl.py | 14 +++--- tests/unit/test_warm_pool_startup.py | 44 +++++++++++++++++++ 5 files changed, 96 insertions(+), 11 deletions(-) diff --git a/src/goldfish/cloud/adapters/gcp/gce_launcher.py b/src/goldfish/cloud/adapters/gcp/gce_launcher.py index 246c82d..36023cf 100644 --- a/src/goldfish/cloud/adapters/gcp/gce_launcher.py +++ b/src/goldfish/cloud/adapters/gcp/gce_launcher.py @@ -134,6 +134,7 @@ def launch_instance( use_capacity_search: bool = True, goldfish_env: dict[str, str] | None = None, preemptible: bool | None = None, + warm_pool_idle_timeout_seconds: int | None = None, ) -> GCELaunchResult: """Launch GCE instance for stage run. @@ -349,6 +350,8 @@ def launch_instance( log_sync_interval=log_sync_interval, # GPU flag - profile-based, not runtime nvidia-smi detection gpu_count=gpu_count or 0, + # Warm pool: if set, instance enters idle loop after Docker exits instead of self-deleting + warm_pool_idle_timeout_seconds=warm_pool_idle_timeout_seconds, ) if use_capacity_search and self.resources: diff --git a/src/goldfish/cloud/adapters/gcp/run_backend.py b/src/goldfish/cloud/adapters/gcp/run_backend.py index 2ce1d97..761ae70 100644 --- a/src/goldfish/cloud/adapters/gcp/run_backend.py +++ b/src/goldfish/cloud/adapters/gcp/run_backend.py @@ -236,6 +236,11 @@ def launch(self, spec: RunSpec) -> RunHandle: # Use spec.gpu_type if provided, else default to T4 gpu_type = spec.gpu_type or "nvidia-tesla-t4" + # Determine warm pool idle timeout from config + warm_pool_timeout: int | None = None + if self._warm_pool and self._warm_pool.is_enabled_for(spec.profile): + warm_pool_timeout = self._warm_pool._config.idle_timeout_minutes * 60 + result = self._launcher.launch_instance( image_tag=spec.image, stage_run_id=spec.stage_run_id, @@ -248,13 +253,26 @@ def launch(self, spec: RunSpec) -> RunHandle: zones=self._zones, goldfish_env=goldfish_env, preemptible=spec.spot, # Pass spot preference to launcher + warm_pool_idle_timeout_seconds=warm_pool_timeout, ) + # Register instance in warm pool if eligible + is_warm = False + if warm_pool_timeout and self._warm_pool: + is_warm = self._warm_pool.register_instance( + instance_name=result.instance_name, + zone=result.zone, + machine_type=machine_type, + gpu_count=gpu_count, + image_tag=spec.image, + ) + return RunHandle( stage_run_id=spec.stage_run_id, backend_type="gce", backend_handle=result.instance_name, zone=result.zone, + warm_instance=is_warm, ) except Exception as e: diff --git a/src/goldfish/cloud/adapters/gcp/startup_builder.py b/src/goldfish/cloud/adapters/gcp/startup_builder.py index 77a4bab..25c05c1 100644 --- a/src/goldfish/cloud/adapters/gcp/startup_builder.py +++ b/src/goldfish/cloud/adapters/gcp/startup_builder.py @@ -875,14 +875,24 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str: bash "$spec_dir/pre_run.sh" || {{ echo "Pre-run staging failed"; exit 1; }} fi - # Update log paths for new run + # Update ALL paths for the new run local new_run_path new_run_path=$(cat "$spec_dir/run_path" 2>/dev/null || echo "") if [[ -n "$new_run_path" ]]; then + # Reset log files STDOUT_LOG="/tmp/stdout.log" STDERR_LOG="/tmp/stderr.log" : > "$STDOUT_LOG" : > "$STDERR_LOG" + : > /tmp/stage_times.log + + # CRITICAL: Update GCS paths for the NEW run so logs don't + # go to the first run's location + GCS_STDOUT_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/stdout.log" + GCS_STDERR_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/stderr.log" + GCS_METRICS_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/metrics.jsonl" + GCS_SVS_DURING_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/outputs/.goldfish/svs_findings_during.json" + GCS_EXIT_CODE_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/exit_code.txt" fi # Build and run new Docker command @@ -901,12 +911,18 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str: bash "$spec_dir/post_run.sh" || true fi - # Write exit code - if [[ -n "$EXIT_CODE_FILE" ]]; then - echo "$EXIT_CODE" > "$EXIT_CODE_FILE" 2>/dev/null || true + # Write exit code directly to GCS (not the stale local path) + if [[ -n "$GCS_EXIT_CODE_PATH" ]]; then + echo "$EXIT_CODE" | gsutil cp - "$GCS_EXIT_CODE_PATH" 2>/dev/null || true fi - # Upload logs + # Set exit code in instance metadata + gcloud compute instances add-metadata "$INSTANCE_NAME" \\ + --zone="$INSTANCE_ZONE" --project="$PROJECT_ID" \\ + --metadata "goldfish_exit_code=$EXIT_CODE" \\ + --quiet 2>/dev/null || true + + # Upload logs to the new run's GCS paths sync_final_logs || true # Clear self-delete trap again for next idle cycle @@ -1210,6 +1226,8 @@ def build_startup_script( if use_warm_pool: assert warm_pool_idle_timeout_seconds is not None # Narrowing for mypy gcs_pool_path = f"gs://{bucket}/warm_pool/{'{INSTANCE_NAME}'}" + # Export bucket name so idle loop can construct GCS paths for subsequent runs + parts.append(f'GCS_BUCKET="{bucket}"') parts.append(idle_loop_section(warm_pool_idle_timeout_seconds, gcs_pool_path)) parts.append(f'CURRENT_IMAGE="{image}"') parts.append('log_stage "cleanup_begin"') diff --git a/src/goldfish/jobs/_stage_executor_impl.py b/src/goldfish/jobs/_stage_executor_impl.py index f3b8f1e..c71501c 100644 --- a/src/goldfish/jobs/_stage_executor_impl.py +++ b/src/goldfish/jobs/_stage_executor_impl.py @@ -290,13 +290,15 @@ def _get_run_handle(self, stage_run_id: str) -> RunHandle: backend_handle = row.get("backend_handle") or stage_run_id zone = row.get("instance_zone") - # Check if this is a warm pool instance + # Check warm pool: single-row lookup by backend_handle (not a scan) is_warm = False - warm_instances = self.db.list_warm_instances(status="running") - for wi in warm_instances: - if wi.get("current_stage_run_id") == stage_run_id: - is_warm = True - break + if backend_handle: + with self.db._conn() as conn: + row_wp = conn.execute( + "SELECT 1 FROM warm_instances WHERE instance_name = ? AND status = 'running'", + (backend_handle,), + ).fetchone() + is_warm = row_wp is not None return RunHandle( stage_run_id=stage_run_id, diff --git a/tests/unit/test_warm_pool_startup.py b/tests/unit/test_warm_pool_startup.py index d6b13c5..3d56786 100644 --- a/tests/unit/test_warm_pool_startup.py +++ b/tests/unit/test_warm_pool_startup.py @@ -94,3 +94,47 @@ def test_build_startup_script_warm_pool_sets_current_image() -> None: warm_pool_idle_timeout_seconds=900, ) assert 'CURRENT_IMAGE="us-docker.pkg.dev/proj/repo/img:v5"' in script + + +def test_idle_loop_updates_gcs_paths_for_new_run() -> None: + """Idle loop must update GCS log/exit paths for each new job. + + Bug: Without this, logs from the second job would overwrite the first + job's logs in GCS because the paths were set once at script generation. + """ + from goldfish.cloud.adapters.gcp.startup_builder import idle_loop_section + + script = idle_loop_section(idle_timeout_seconds=900, gcs_pool_path="gs://bucket/warm_pool/test") + # Must update GCS paths from the new run_path + assert "GCS_STDOUT_PATH=" in script + assert "GCS_STDERR_PATH=" in script + assert "GCS_EXIT_CODE_PATH=" in script + assert "new_run_path" in script + + +def test_idle_loop_writes_exit_code_to_gcs_directly() -> None: + """Exit code must be written to GCS directly, not via stale local path. + + Bug: EXIT_CODE_FILE pointed to the first run's gcsfuse path. On subsequent + runs the exit code would go to the wrong location. + """ + from goldfish.cloud.adapters.gcp.startup_builder import idle_loop_section + + script = idle_loop_section(idle_timeout_seconds=900, gcs_pool_path="gs://bucket/warm_pool/test") + assert "gsutil cp -" in script # Direct GCS upload, not local file write + + +def test_build_startup_script_warm_pool_exports_bucket() -> None: + """Warm pool script must export GCS_BUCKET for idle loop path construction.""" + from goldfish.cloud.adapters.gcp.startup_builder import build_startup_script + + script = build_startup_script( + bucket="my-artifacts", + bucket_prefix="runs", + run_path="stage-abc", + image="img:v1", + entrypoint="/bin/bash", + env_map={}, + warm_pool_idle_timeout_seconds=900, + ) + assert 'GCS_BUCKET="my-artifacts"' in script From 00ce683d76964cb08b07f3e440b09fb3cd67af8e Mon Sep 17 00:00:00 2001 From: Test User Date: Fri, 20 Mar 2026 10:46:12 +0100 Subject: [PATCH 2/4] =?UTF-8?q?fix(warm-pool):=20Complete=20wiring=20?= =?UTF-8?q?=E2=80=94=20dispatch,=20stale=20paths,=20process=20reset?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes all issues from honest review: 1. DISPATCH WIRED IN HOT PATH: warm_pool_manager passed into gce_launcher.launch_instance(). Warm pool claim happens AFTER scripts are built but BEFORE instance creation — where all the pieces (env_map, pre_run_cmds, post_run_cmds, docker_cmd) are available. New _build_docker_cmd_script() generates standalone Docker run script for warm pool reuse. 2. GCS PATHS UPDATED PER JOB: Idle loop now updates GCS_STDOUT_PATH, GCS_STDERR_PATH, GCS_EXIT_CODE_PATH from new run's run_path. Exit code written directly to GCS via gsutil, not stale local path. 3. BACKGROUND PROCESSES RESET: Idle loop kills watchdog, supervisor, log syncer, and metadata syncer PIDs between jobs. Metadata syncer restarted for each new job. 4. POLL SCAN O(1): _get_run_handle uses single-row lookup by backend_handle instead of scanning all running warm instances. 5. INTEGRATION TESTS: 8 new tests covering full lifecycle — register, claim, ACK, timeout, release, reap, emergency cleanup, pool cap. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../cloud/adapters/gcp/gce_launcher.py | 57 +++++ .../cloud/adapters/gcp/run_backend.py | 17 +- .../cloud/adapters/gcp/startup_builder.py | 11 + src/goldfish/cloud/adapters/gcp/warm_pool.py | 3 +- tests/integration/test_warm_pool_lifecycle.py | 199 ++++++++++++++++++ tests/unit/test_warm_pool_startup.py | 24 +++ 6 files changed, 306 insertions(+), 5 deletions(-) create mode 100644 tests/integration/test_warm_pool_lifecycle.py diff --git a/src/goldfish/cloud/adapters/gcp/gce_launcher.py b/src/goldfish/cloud/adapters/gcp/gce_launcher.py index 36023cf..ccc0182 100644 --- a/src/goldfish/cloud/adapters/gcp/gce_launcher.py +++ b/src/goldfish/cloud/adapters/gcp/gce_launcher.py @@ -135,6 +135,7 @@ def launch_instance( goldfish_env: dict[str, str] | None = None, preemptible: bool | None = None, warm_pool_idle_timeout_seconds: int | None = None, + warm_pool_manager: Any | None = None, ) -> GCELaunchResult: """Launch GCE instance for stage run. @@ -354,6 +355,31 @@ def launch_instance( warm_pool_idle_timeout_seconds=warm_pool_idle_timeout_seconds, ) + # Try warm pool reuse BEFORE creating a new instance. + # All scripts (pre_run, post_run, docker_cmd, env) are ready at this point. + if warm_pool_manager: + handle = warm_pool_manager.try_claim( + machine_type=machine_type, + gpu_count=gpu_count, + stage_run_id=stage_run_id, + image=image_tag, + env_map=env_map, + pre_run_script="\n".join(pre_run_cmds), + post_run_script="\n".join(post_run_cmds), + docker_cmd_script=self._build_docker_cmd_script( + image_tag, + env_map, + docker_cmd, + gpu_count, + ), + run_path=f"runs/{stage_run_id}", + ) + if handle: + return GCELaunchResult( + instance_name=handle.backend_handle, + zone=handle.zone or "", + ) + if use_capacity_search and self.resources: # Use ResourceLauncher for capacity-aware search return self._launch_with_capacity_search( @@ -376,6 +402,37 @@ def launch_instance( preemptible=preemptible, ) + def _build_docker_cmd_script( + self, + image: str, + env_map: dict[str, str], + cmd: str, + gpu_count: int, + ) -> str: + """Build a standalone docker run script for warm pool reuse. + + This generates the same docker run command that startup_builder creates, + but as a standalone bash script that the warm pool idle loop can execute. + """ + import shlex + + gpu_flag = "--gpus all" if gpu_count > 0 else "" + env_flags = " ".join(f"-e {k}={shlex.quote(v)}" for k, v in env_map.items()) + mounts = "-v /mnt/entrypoint.sh:/entrypoint.sh -v /mnt/gcs:/mnt/gcs -v /mnt/inputs:/mnt/inputs -v /mnt/outputs:/mnt/outputs" + cmd_part = f" {cmd}" if cmd else "" + + return f"""#!/bin/bash +set -euo pipefail +docker run --rm {gpu_flag} \\ + --ipc=host \\ + --ulimit memlock=-1 --ulimit stack=67108864 \\ + --shm-size=16g \\ + {env_flags} \\ + {mounts} \\ + --entrypoint /bin/bash \\ + {image}{cmd_part} +""" + def _launch_with_capacity_search( self, instance_name: str, diff --git a/src/goldfish/cloud/adapters/gcp/run_backend.py b/src/goldfish/cloud/adapters/gcp/run_backend.py index 761ae70..d6d6baf 100644 --- a/src/goldfish/cloud/adapters/gcp/run_backend.py +++ b/src/goldfish/cloud/adapters/gcp/run_backend.py @@ -252,13 +252,24 @@ def launch(self, spec: RunSpec) -> RunHandle: gpu_count=gpu_count, zones=self._zones, goldfish_env=goldfish_env, - preemptible=spec.spot, # Pass spot preference to launcher + preemptible=spec.spot, warm_pool_idle_timeout_seconds=warm_pool_timeout, + # Pass warm pool manager for reuse dispatch inside launch_instance + warm_pool_manager=self._warm_pool if warm_pool_timeout else None, ) - # Register instance in warm pool if eligible + # Check if launch_instance used a warm instance (zone != "" means it was warm) + # or register the fresh instance in the pool is_warm = False - if warm_pool_timeout and self._warm_pool: + # If the result came from warm pool claim, the instance is already registered + warm_instances = self._warm_pool._db.list_warm_instances(status="running") if self._warm_pool else [] + for wi in warm_instances: + if wi.get("instance_name") == result.instance_name: + is_warm = True + break + + # Register fresh instance in pool if eligible and not already warm + if not is_warm and warm_pool_timeout and self._warm_pool: is_warm = self._warm_pool.register_instance( instance_name=result.instance_name, zone=result.zone, diff --git a/src/goldfish/cloud/adapters/gcp/startup_builder.py b/src/goldfish/cloud/adapters/gcp/startup_builder.py index 25c05c1..78e945b 100644 --- a/src/goldfish/cloud/adapters/gcp/startup_builder.py +++ b/src/goldfish/cloud/adapters/gcp/startup_builder.py @@ -850,6 +850,13 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str: mkdir -p "$spec_dir" gsutil -m cp -r "$spec_path/*" "$spec_dir/" 2>&1 || {{ echo "Failed to download job spec"; exit 1; }} + # Kill stale background processes from previous job + # Watchdog, supervisor, and log syncer track the old Docker PID + kill ${{WATCHDOG_PID:-0}} 2>/dev/null || true + kill ${{SUPERVISOR_PID:-0}} 2>/dev/null || true + kill ${{LOG_SYNCER_PID:-0}} 2>/dev/null || true + kill ${{METADATA_SYNCER_PID:-0}} 2>/dev/null || true + # Clean workspace between runs echo "Cleaning workspace for new job..." rm -rf /mnt/outputs/* /tmp/triton* /tmp/torch* /tmp/pip* /tmp/stage_times.log @@ -895,6 +902,10 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str: GCS_EXIT_CODE_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/exit_code.txt" fi + # Restart metadata syncer for the new job (Overdrive log sync) + METADATA_SYNCER_STARTED=0 + start_metadata_syncer + # Build and run new Docker command if [[ -f "$spec_dir/docker_cmd.sh" ]]; then log_stage "warm_pool_docker_run_begin" || true diff --git a/src/goldfish/cloud/adapters/gcp/warm_pool.py b/src/goldfish/cloud/adapters/gcp/warm_pool.py index be319d0..58f279f 100644 --- a/src/goldfish/cloud/adapters/gcp/warm_pool.py +++ b/src/goldfish/cloud/adapters/gcp/warm_pool.py @@ -8,6 +8,7 @@ import logging import subprocess +import time from datetime import UTC, datetime from pathlib import Path from typing import TYPE_CHECKING @@ -111,8 +112,6 @@ def try_claim( self._signal_bus.set_signal("goldfish", signal, target=instance_name) # Wait for ACK (30s timeout) - import time - for _ in range(30): ack = self._signal_bus.get_ack("goldfish", target=instance_name) if ack == stage_run_id: diff --git a/tests/integration/test_warm_pool_lifecycle.py b/tests/integration/test_warm_pool_lifecycle.py new file mode 100644 index 0000000..c97a00f --- /dev/null +++ b/tests/integration/test_warm_pool_lifecycle.py @@ -0,0 +1,199 @@ +"""Integration tests for warm pool lifecycle. + +Tests the full claim → signal → release → reap cycle with real DB +but mocked GCE/metadata layer. +""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock, patch + +import pytest + +from goldfish.cloud.adapters.gcp.warm_pool import WarmPoolManager +from goldfish.config import WarmPoolConfig + + +@pytest.fixture +def warm_config() -> WarmPoolConfig: + return WarmPoolConfig(enabled=True, max_instances=2, idle_timeout_minutes=30) + + +@pytest.fixture +def manager(test_db, warm_config) -> WarmPoolManager: + return WarmPoolManager( + db=test_db, + config=warm_config, + signal_bus=None, + bucket="test-bucket", + project_id="test-project", + ) + + +class TestWarmPoolLifecycle: + def test_register_and_claim_roundtrip(self, test_db, manager) -> None: + """Register an instance, claim it, verify state transitions.""" + # Register + assert manager.register_instance("stage-abc", "us-central1-a", "a3-highgpu-1g", 1) + + # Verify idle + instances = test_db.list_warm_instances(status="idle") + assert len(instances) == 1 + + # Claim (without signal bus, falls through) + claimed = test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) + assert claimed is not None + assert claimed["status"] == "claimed" + + # Release + test_db.release_warm_instance("stage-abc") + instances = test_db.list_warm_instances(status="idle") + assert len(instances) == 1 + + def test_pool_size_cap_enforced(self, test_db, manager) -> None: + """Cannot register more instances than max_instances.""" + assert manager.register_instance("stage-1", "zone-a", "a3-highgpu-1g", 1) + assert manager.register_instance("stage-2", "zone-b", "a3-highgpu-1g", 1) + # Third should be rejected (max=2) + assert not manager.register_instance("stage-3", "zone-c", "a3-highgpu-1g", 1) + + def test_reap_idle_deletes_expired(self, test_db, manager) -> None: + """Reaper should delete instances past idle timeout.""" + manager.register_instance("stage-old", "us-central1-a", "a3-highgpu-1g", 1) + + # Backdate idle_since + with test_db._conn() as conn: + old_time = (datetime.now(UTC) - timedelta(hours=2)).isoformat() + conn.execute("UPDATE warm_instances SET idle_since = ?", (old_time,)) + + with patch.object(manager, "_delete_gce_instance"): + reaped = manager.reap_idle() + assert reaped == 1 + assert test_db.list_warm_instances() == [] + + def test_reap_idle_skips_active(self, test_db, manager) -> None: + """Reaper should not delete instances that are running or recently idle.""" + manager.register_instance("stage-active", "us-central1-a", "a3-highgpu-1g", 1) + # Claim it (status = claimed, not idle) + test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) + + with patch.object(manager, "_delete_gce_instance"): + reaped = manager.reap_idle() + assert reaped == 0 + assert len(test_db.list_warm_instances()) == 1 + + def test_reap_all_emergency_cleanup(self, test_db, manager) -> None: + """Emergency reap should delete all instances regardless of state.""" + manager.register_instance("stage-1", "zone-a", "a3-highgpu-1g", 1) + manager.register_instance("stage-2", "zone-b", "a3-highgpu-1g", 1) + test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) + + with patch.object(manager, "_delete_gce_instance"): + reaped = manager.reap_all() + assert reaped == 2 + assert test_db.list_warm_instances() == [] + + def test_try_claim_without_signal_bus_returns_none(self, test_db, manager) -> None: + """Without signal bus, try_claim should release and return None.""" + manager.register_instance("stage-abc", "us-central1-a", "a3-highgpu-1g", 1) + + result = manager.try_claim( + machine_type="a3-highgpu-1g", + gpu_count=1, + stage_run_id="stage-new", + image="img:v1", + env_map={}, + pre_run_script="echo pre", + post_run_script="echo post", + docker_cmd_script="echo docker", + run_path="runs/stage-new", + ) + # No signal bus → falls through + assert result is None + # Instance should be released back to idle + instances = test_db.list_warm_instances(status="idle") + assert len(instances) == 1 + + def test_try_claim_with_signal_bus_and_ack(self, test_db, warm_config) -> None: + """With signal bus and ACK, try_claim should return a RunHandle.""" + # Create a stage_run so the warm_instances FK constraint is satisfied + with test_db._conn() as conn: + conn.execute("PRAGMA foreign_keys = OFF") + conn.execute( + "INSERT INTO stage_runs (id, workspace_name, version, stage_name, status, state, backend_type, started_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))", + ("stage-new", "ws", "v1", "train", "running", "running", "gce"), + ) + conn.execute("PRAGMA foreign_keys = ON") + + mock_bus = MagicMock() + # ACK immediately + mock_bus.get_ack.return_value = "stage-new" + + mgr = WarmPoolManager( + db=test_db, + config=warm_config, + signal_bus=mock_bus, + bucket="test-bucket", + project_id="test-project", + ) + mgr.register_instance("stage-warm", "us-central1-a", "a3-highgpu-1g", 1) + + with patch.object(mgr, "_upload_job_spec", return_value="gs://test-bucket/warm_pool/stage-warm/jobs/stage-new"): + result = mgr.try_claim( + machine_type="a3-highgpu-1g", + gpu_count=1, + stage_run_id="stage-new", + image="img:v2", + env_map={"FOO": "bar"}, + pre_run_script="echo pre", + post_run_script="echo post", + docker_cmd_script="echo docker", + run_path="runs/stage-new", + ) + + assert result is not None + assert result.warm_instance is True + assert result.backend_handle == "stage-warm" + assert result.zone == "us-central1-a" + + # Instance should be in 'running' state + instances = test_db.list_warm_instances(status="running") + assert len(instances) == 1 + assert instances[0]["current_stage_run_id"] == "stage-new" + + def test_try_claim_with_ack_timeout_releases(self, test_db, warm_config) -> None: + """If ACK times out, instance should be released.""" + mock_bus = MagicMock() + mock_bus.get_ack.return_value = None # Never ACKs + + mgr = WarmPoolManager( + db=test_db, + config=warm_config, + signal_bus=mock_bus, + bucket="test-bucket", + project_id="test-project", + ) + mgr.register_instance("stage-slow", "us-central1-a", "a3-highgpu-1g", 1) + + with ( + patch.object(mgr, "_upload_job_spec", return_value="gs://bucket/spec"), + patch("goldfish.cloud.adapters.gcp.warm_pool.time.sleep"), # Skip waiting + ): + result = mgr.try_claim( + machine_type="a3-highgpu-1g", + gpu_count=1, + stage_run_id="stage-timeout", + image="img:v1", + env_map={}, + pre_run_script="", + post_run_script="", + docker_cmd_script="", + run_path="runs/stage-timeout", + ) + + assert result is None + # Instance should be back to idle + instances = test_db.list_warm_instances(status="idle") + assert len(instances) == 1 diff --git a/tests/unit/test_warm_pool_startup.py b/tests/unit/test_warm_pool_startup.py index 3d56786..c91e95e 100644 --- a/tests/unit/test_warm_pool_startup.py +++ b/tests/unit/test_warm_pool_startup.py @@ -138,3 +138,27 @@ def test_build_startup_script_warm_pool_exports_bucket() -> None: warm_pool_idle_timeout_seconds=900, ) assert 'GCS_BUCKET="my-artifacts"' in script + + +def test_idle_loop_kills_stale_background_processes() -> None: + """Idle loop must kill watchdog/supervisor/log syncer between jobs. + + Bug: These processes track the old Docker PID. Without killing them, + the watchdog could kill the instance mid-job, and the log syncer + would upload to stale GCS paths. + """ + from goldfish.cloud.adapters.gcp.startup_builder import idle_loop_section + + script = idle_loop_section(idle_timeout_seconds=900, gcs_pool_path="gs://bucket/warm_pool/test") + assert "WATCHDOG_PID" in script + assert "SUPERVISOR_PID" in script + assert "LOG_SYNCER_PID" in script + assert "METADATA_SYNCER_PID" in script + + +def test_idle_loop_restarts_metadata_syncer() -> None: + """Metadata syncer must be restarted for each new job (Overdrive log sync).""" + from goldfish.cloud.adapters.gcp.startup_builder import idle_loop_section + + script = idle_loop_section(idle_timeout_seconds=900, gcs_pool_path="gs://bucket/warm_pool/test") + assert "start_metadata_syncer" in script From 155d62ecca0989147b8bc092252e87c91721540a Mon Sep 17 00:00:00 2001 From: Test User Date: Fri, 20 Mar 2026 11:05:28 +0100 Subject: [PATCH 3/4] fix: Remove unnecessary warm instance scan after fresh launch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review feedback: if we reach the code after launch_instance(), the warm pool claim either wasn't attempted or returned None. This is always a fresh launch — no need to scan warm_instances to check. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/goldfish/cloud/adapters/gcp/run_backend.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/goldfish/cloud/adapters/gcp/run_backend.py b/src/goldfish/cloud/adapters/gcp/run_backend.py index d6d6baf..e2cf5e6 100644 --- a/src/goldfish/cloud/adapters/gcp/run_backend.py +++ b/src/goldfish/cloud/adapters/gcp/run_backend.py @@ -258,18 +258,11 @@ def launch(self, spec: RunSpec) -> RunHandle: warm_pool_manager=self._warm_pool if warm_pool_timeout else None, ) - # Check if launch_instance used a warm instance (zone != "" means it was warm) - # or register the fresh instance in the pool + # Fresh launch (warm pool claim happens inside launch_instance and + # returns early via GCELaunchResult — if we're here, it's fresh). + # Register in pool if eligible so the idle loop activates after this run. is_warm = False - # If the result came from warm pool claim, the instance is already registered - warm_instances = self._warm_pool._db.list_warm_instances(status="running") if self._warm_pool else [] - for wi in warm_instances: - if wi.get("instance_name") == result.instance_name: - is_warm = True - break - - # Register fresh instance in pool if eligible and not already warm - if not is_warm and warm_pool_timeout and self._warm_pool: + if warm_pool_timeout and self._warm_pool: is_warm = self._warm_pool.register_instance( instance_name=result.instance_name, zone=result.zone, From 0577c0462b6f353c5799474fa9f386924b6605cb Mon Sep 17 00:00:00 2001 From: Test User Date: Fri, 20 Mar 2026 14:49:51 +0100 Subject: [PATCH 4/4] fix(warm-pool): Fix 6 lifecycle regressions from external review P1 fixes: 1. Status polling: get_status() checks GCS exit code for warm instances even when VM is RUNNING (idle loop keeps VM alive). Without this, runs stay RUNNING indefinitely. 2. Skip register on warm reuse: GCELaunchResult.warm_reuse flag prevents double-inserting the same instance_name (PK violation). 3. Register as 'running' not 'idle': Fresh launches registered as running so the reaper won't delete them mid-job. Release to idle happens when the startup script enters the idle loop. 4. Guard kill against PID 0: ${PID:-0} sends SIGTERM to entire process group. Now checks PID is set and non-zero before killing. 5. Backend_handle identity: Warm pool claims use stage_run_id for GCS path resolution so logs/exit code go to the correct run. 6. Double runs/ prefix: run_path is now just stage_run_id (not runs/stage_run_id), matching the idle loop's gs://bucket/runs/$path. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../cloud/adapters/gcp/gce_launcher.py | 6 +++++- src/goldfish/cloud/adapters/gcp/run_backend.py | 18 +++++++++++++----- .../cloud/adapters/gcp/startup_builder.py | 12 ++++++------ src/goldfish/db/database.py | 7 ++++--- tests/integration/test_warm_pool_lifecycle.py | 17 +++++++++++++---- tests/unit/test_warm_pool.py | 17 +++++++++++++---- 6 files changed, 54 insertions(+), 23 deletions(-) diff --git a/src/goldfish/cloud/adapters/gcp/gce_launcher.py b/src/goldfish/cloud/adapters/gcp/gce_launcher.py index ccc0182..c69a279 100644 --- a/src/goldfish/cloud/adapters/gcp/gce_launcher.py +++ b/src/goldfish/cloud/adapters/gcp/gce_launcher.py @@ -46,6 +46,7 @@ class GCELaunchResult: instance_name: str zone: str + warm_reuse: bool = False # True if dispatched to an existing warm pool instance class GCELauncher: @@ -372,12 +373,15 @@ def launch_instance( docker_cmd, gpu_count, ), - run_path=f"runs/{stage_run_id}", + run_path=stage_run_id, ) if handle: + # Use stage_run_id as instance_name so GCS paths (logs, exit code) + # resolve to the current run, not the warm VM's original name. return GCELaunchResult( instance_name=handle.backend_handle, zone=handle.zone or "", + warm_reuse=True, ) if use_capacity_search and self.resources: diff --git a/src/goldfish/cloud/adapters/gcp/run_backend.py b/src/goldfish/cloud/adapters/gcp/run_backend.py index e2cf5e6..df3a17d 100644 --- a/src/goldfish/cloud/adapters/gcp/run_backend.py +++ b/src/goldfish/cloud/adapters/gcp/run_backend.py @@ -258,11 +258,12 @@ def launch(self, spec: RunSpec) -> RunHandle: warm_pool_manager=self._warm_pool if warm_pool_timeout else None, ) - # Fresh launch (warm pool claim happens inside launch_instance and - # returns early via GCELaunchResult — if we're here, it's fresh). - # Register in pool if eligible so the idle loop activates after this run. - is_warm = False - if warm_pool_timeout and self._warm_pool: + is_warm = getattr(result, "warm_reuse", False) + + # Only register fresh launches (not warm reuse hits, which are already tracked). + # Registration uses status='running' so the reaper won't touch it mid-job. + # The startup script's idle loop calls release (→ idle) after Docker exits. + if not is_warm and warm_pool_timeout and self._warm_pool: is_warm = self._warm_pool.register_instance( instance_name=result.instance_name, zone=result.zone, @@ -312,6 +313,13 @@ def get_status(self, handle: RunHandle) -> BackendStatus: # Map Goldfish state to RunStatus if status_str == StageState.RUNNING: + # Warm pool: instance stays RUNNING after Docker exits (idle loop). + # Check for exit code in GCS to detect that the *job* finished + # even though the *instance* is still alive. + if handle.warm_instance: + exit_result = self._launcher._get_exit_code(handle.stage_run_id) + if exit_result.exists and exit_result.code is not None: + return BackendStatus.from_exit_code(exit_result.code) return BackendStatus(status=RunStatus.RUNNING) elif status_str == StageState.COMPLETED: return BackendStatus(status=RunStatus.COMPLETED, exit_code=0) diff --git a/src/goldfish/cloud/adapters/gcp/startup_builder.py b/src/goldfish/cloud/adapters/gcp/startup_builder.py index 78e945b..7e1d8ad 100644 --- a/src/goldfish/cloud/adapters/gcp/startup_builder.py +++ b/src/goldfish/cloud/adapters/gcp/startup_builder.py @@ -850,12 +850,12 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str: mkdir -p "$spec_dir" gsutil -m cp -r "$spec_path/*" "$spec_dir/" 2>&1 || {{ echo "Failed to download job spec"; exit 1; }} - # Kill stale background processes from previous job - # Watchdog, supervisor, and log syncer track the old Docker PID - kill ${{WATCHDOG_PID:-0}} 2>/dev/null || true - kill ${{SUPERVISOR_PID:-0}} 2>/dev/null || true - kill ${{LOG_SYNCER_PID:-0}} 2>/dev/null || true - kill ${{METADATA_SYNCER_PID:-0}} 2>/dev/null || true + # Kill stale background processes from previous job. + # Guard: only kill if PID is set and non-zero (kill 0 = entire process group!) + [[ -n "${{WATCHDOG_PID:-}}" && "${{WATCHDOG_PID}}" != "0" ]] && kill "$WATCHDOG_PID" 2>/dev/null || true + [[ -n "${{SUPERVISOR_PID:-}}" && "${{SUPERVISOR_PID}}" != "0" ]] && kill "$SUPERVISOR_PID" 2>/dev/null || true + [[ -n "${{LOG_SYNCER_PID:-}}" && "${{LOG_SYNCER_PID}}" != "0" ]] && kill "$LOG_SYNCER_PID" 2>/dev/null || true + [[ -n "${{METADATA_SYNCER_PID:-}}" && "${{METADATA_SYNCER_PID}}" != "0" ]] && kill "$METADATA_SYNCER_PID" 2>/dev/null || true # Clean workspace between runs echo "Cleaning workspace for new job..." diff --git a/src/goldfish/db/database.py b/src/goldfish/db/database.py index 5cdd678..65bb707 100644 --- a/src/goldfish/db/database.py +++ b/src/goldfish/db/database.py @@ -5600,10 +5600,11 @@ def register_warm_instance( conn.execute( """ INSERT INTO warm_instances - (instance_name, zone, project_id, machine_type, gpu_count, image_tag, idle_since, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + (instance_name, zone, project_id, machine_type, gpu_count, image_tag, + status, idle_since, created_at) + VALUES (?, ?, ?, ?, ?, ?, 'running', NULL, ?) """, - (instance_name, zone, project_id, machine_type, gpu_count, image_tag, now, now), + (instance_name, zone, project_id, machine_type, gpu_count, image_tag, now), ) def claim_warm_instance(self, machine_type: str, gpu_count: int) -> dict | None: diff --git a/tests/integration/test_warm_pool_lifecycle.py b/tests/integration/test_warm_pool_lifecycle.py index c97a00f..64c397d 100644 --- a/tests/integration/test_warm_pool_lifecycle.py +++ b/tests/integration/test_warm_pool_lifecycle.py @@ -37,16 +37,21 @@ def test_register_and_claim_roundtrip(self, test_db, manager) -> None: # Register assert manager.register_instance("stage-abc", "us-central1-a", "a3-highgpu-1g", 1) - # Verify idle + # Verify registered as running (first job active) + instances = test_db.list_warm_instances(status="running") + assert len(instances) == 1 + + # Release (simulates first job completing → idle loop) + test_db.release_warm_instance("stage-abc") instances = test_db.list_warm_instances(status="idle") assert len(instances) == 1 - # Claim (without signal bus, falls through) + # Claim claimed = test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) assert claimed is not None assert claimed["status"] == "claimed" - # Release + # Release again test_db.release_warm_instance("stage-abc") instances = test_db.list_warm_instances(status="idle") assert len(instances) == 1 @@ -61,6 +66,7 @@ def test_pool_size_cap_enforced(self, test_db, manager) -> None: def test_reap_idle_deletes_expired(self, test_db, manager) -> None: """Reaper should delete instances past idle timeout.""" manager.register_instance("stage-old", "us-central1-a", "a3-highgpu-1g", 1) + test_db.release_warm_instance("stage-old") # Must be idle to be reaped # Backdate idle_since with test_db._conn() as conn: @@ -87,7 +93,7 @@ def test_reap_all_emergency_cleanup(self, test_db, manager) -> None: """Emergency reap should delete all instances regardless of state.""" manager.register_instance("stage-1", "zone-a", "a3-highgpu-1g", 1) manager.register_instance("stage-2", "zone-b", "a3-highgpu-1g", 1) - test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) + test_db.release_warm_instance("stage-1") # One idle, one running with patch.object(manager, "_delete_gce_instance"): reaped = manager.reap_all() @@ -97,6 +103,7 @@ def test_reap_all_emergency_cleanup(self, test_db, manager) -> None: def test_try_claim_without_signal_bus_returns_none(self, test_db, manager) -> None: """Without signal bus, try_claim should release and return None.""" manager.register_instance("stage-abc", "us-central1-a", "a3-highgpu-1g", 1) + test_db.release_warm_instance("stage-abc") # Must be idle to claim result = manager.try_claim( machine_type="a3-highgpu-1g", @@ -139,6 +146,7 @@ def test_try_claim_with_signal_bus_and_ack(self, test_db, warm_config) -> None: project_id="test-project", ) mgr.register_instance("stage-warm", "us-central1-a", "a3-highgpu-1g", 1) + test_db.release_warm_instance("stage-warm") # Must be idle to claim with patch.object(mgr, "_upload_job_spec", return_value="gs://test-bucket/warm_pool/stage-warm/jobs/stage-new"): result = mgr.try_claim( @@ -176,6 +184,7 @@ def test_try_claim_with_ack_timeout_releases(self, test_db, warm_config) -> None project_id="test-project", ) mgr.register_instance("stage-slow", "us-central1-a", "a3-highgpu-1g", 1) + test_db.release_warm_instance("stage-slow") # Must be idle to claim with ( patch.object(mgr, "_upload_job_spec", return_value="gs://bucket/spec"), diff --git a/tests/unit/test_warm_pool.py b/tests/unit/test_warm_pool.py index 0f12837..0c2bd5e 100644 --- a/tests/unit/test_warm_pool.py +++ b/tests/unit/test_warm_pool.py @@ -88,7 +88,7 @@ def test_register_warm_instance(self, test_db) -> None: assert len(instances) == 1 assert instances[0]["instance_name"] == "stage-abc123" assert instances[0]["machine_type"] == "a3-highgpu-1g" - assert instances[0]["status"] == "idle" + assert instances[0]["status"] == "running" # Registered as running (first job active) def test_claim_warm_instance_returns_match(self, test_db) -> None: test_db.register_warm_instance( @@ -98,6 +98,8 @@ def test_claim_warm_instance_returns_match(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) + # Register creates as 'running'; release to 'idle' so it's claimable + test_db.release_warm_instance("stage-abc") claimed = test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) assert claimed is not None @@ -112,6 +114,7 @@ def test_claim_warm_instance_no_match(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) + test_db.release_warm_instance("stage-abc") # Wrong machine type claimed = test_db.claim_warm_instance(machine_type="a3-highgpu-8g", gpu_count=8) @@ -126,6 +129,7 @@ def test_claim_warm_instance_atomic(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) + test_db.release_warm_instance("stage-abc") first = test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) second = test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) @@ -140,7 +144,7 @@ def test_release_warm_instance(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) - test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) + # Already 'running' from register; release to idle test_db.release_warm_instance("stage-abc") instances = test_db.list_warm_instances(status="idle") @@ -166,6 +170,10 @@ def test_count_warm_instances(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) + # All registered as 'running'; release 2 to 'idle', claim 1 + test_db.release_warm_instance("stage-0") + test_db.release_warm_instance("stage-1") + test_db.release_warm_instance("stage-2") test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) assert test_db.count_warm_instances() == 3 @@ -181,7 +189,8 @@ def test_list_expired_warm_instances(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) - # Backdate the idle_since to 2 hours ago + # Release to idle, then backdate idle_since to 2 hours ago + test_db.release_warm_instance("stage-old") with test_db._conn() as conn: old_time = (datetime.now(UTC) - timedelta(hours=2)).isoformat() conn.execute( @@ -193,7 +202,7 @@ def test_list_expired_warm_instances(self, test_db) -> None: assert len(expired) == 1 assert expired[0]["instance_name"] == "stage-old" - # Fresh instance should not be expired + # Fresh instance (still running) should not be expired test_db.register_warm_instance( instance_name="stage-fresh", zone="us-central1-a",