diff --git a/src/goldfish/cloud/adapters/gcp/gce_launcher.py b/src/goldfish/cloud/adapters/gcp/gce_launcher.py index 246c82df..c69a279e 100644 --- a/src/goldfish/cloud/adapters/gcp/gce_launcher.py +++ b/src/goldfish/cloud/adapters/gcp/gce_launcher.py @@ -46,6 +46,7 @@ class GCELaunchResult: instance_name: str zone: str + warm_reuse: bool = False # True if dispatched to an existing warm pool instance class GCELauncher: @@ -134,6 +135,8 @@ def launch_instance( use_capacity_search: bool = True, goldfish_env: dict[str, str] | None = None, preemptible: bool | None = None, + warm_pool_idle_timeout_seconds: int | None = None, + warm_pool_manager: Any | None = None, ) -> GCELaunchResult: """Launch GCE instance for stage run. @@ -349,8 +352,38 @@ def launch_instance( log_sync_interval=log_sync_interval, # GPU flag - profile-based, not runtime nvidia-smi detection gpu_count=gpu_count or 0, + # Warm pool: if set, instance enters idle loop after Docker exits instead of self-deleting + warm_pool_idle_timeout_seconds=warm_pool_idle_timeout_seconds, ) + # Try warm pool reuse BEFORE creating a new instance. + # All scripts (pre_run, post_run, docker_cmd, env) are ready at this point. + if warm_pool_manager: + handle = warm_pool_manager.try_claim( + machine_type=machine_type, + gpu_count=gpu_count, + stage_run_id=stage_run_id, + image=image_tag, + env_map=env_map, + pre_run_script="\n".join(pre_run_cmds), + post_run_script="\n".join(post_run_cmds), + docker_cmd_script=self._build_docker_cmd_script( + image_tag, + env_map, + docker_cmd, + gpu_count, + ), + run_path=stage_run_id, + ) + if handle: + # Use stage_run_id as instance_name so GCS paths (logs, exit code) + # resolve to the current run, not the warm VM's original name. + return GCELaunchResult( + instance_name=handle.backend_handle, + zone=handle.zone or "", + warm_reuse=True, + ) + if use_capacity_search and self.resources: # Use ResourceLauncher for capacity-aware search return self._launch_with_capacity_search( @@ -373,6 +406,37 @@ def launch_instance( preemptible=preemptible, ) + def _build_docker_cmd_script( + self, + image: str, + env_map: dict[str, str], + cmd: str, + gpu_count: int, + ) -> str: + """Build a standalone docker run script for warm pool reuse. + + This generates the same docker run command that startup_builder creates, + but as a standalone bash script that the warm pool idle loop can execute. + """ + import shlex + + gpu_flag = "--gpus all" if gpu_count > 0 else "" + env_flags = " ".join(f"-e {k}={shlex.quote(v)}" for k, v in env_map.items()) + mounts = "-v /mnt/entrypoint.sh:/entrypoint.sh -v /mnt/gcs:/mnt/gcs -v /mnt/inputs:/mnt/inputs -v /mnt/outputs:/mnt/outputs" + cmd_part = f" {cmd}" if cmd else "" + + return f"""#!/bin/bash +set -euo pipefail +docker run --rm {gpu_flag} \\ + --ipc=host \\ + --ulimit memlock=-1 --ulimit stack=67108864 \\ + --shm-size=16g \\ + {env_flags} \\ + {mounts} \\ + --entrypoint /bin/bash \\ + {image}{cmd_part} +""" + def _launch_with_capacity_search( self, instance_name: str, diff --git a/src/goldfish/cloud/adapters/gcp/run_backend.py b/src/goldfish/cloud/adapters/gcp/run_backend.py index 2ce1d978..df3a17d5 100644 --- a/src/goldfish/cloud/adapters/gcp/run_backend.py +++ b/src/goldfish/cloud/adapters/gcp/run_backend.py @@ -236,6 +236,11 @@ def launch(self, spec: RunSpec) -> RunHandle: # Use spec.gpu_type if provided, else default to T4 gpu_type = spec.gpu_type or "nvidia-tesla-t4" + # Determine warm pool idle timeout from config + warm_pool_timeout: int | None = None + if self._warm_pool and self._warm_pool.is_enabled_for(spec.profile): + warm_pool_timeout = self._warm_pool._config.idle_timeout_minutes * 60 + result = self._launcher.launch_instance( image_tag=spec.image, stage_run_id=spec.stage_run_id, @@ -247,14 +252,32 @@ def launch(self, spec: RunSpec) -> RunHandle: gpu_count=gpu_count, zones=self._zones, goldfish_env=goldfish_env, - preemptible=spec.spot, # Pass spot preference to launcher + preemptible=spec.spot, + warm_pool_idle_timeout_seconds=warm_pool_timeout, + # Pass warm pool manager for reuse dispatch inside launch_instance + warm_pool_manager=self._warm_pool if warm_pool_timeout else None, ) + is_warm = getattr(result, "warm_reuse", False) + + # Only register fresh launches (not warm reuse hits, which are already tracked). + # Registration uses status='running' so the reaper won't touch it mid-job. + # The startup script's idle loop calls release (→ idle) after Docker exits. + if not is_warm and warm_pool_timeout and self._warm_pool: + is_warm = self._warm_pool.register_instance( + instance_name=result.instance_name, + zone=result.zone, + machine_type=machine_type, + gpu_count=gpu_count, + image_tag=spec.image, + ) + return RunHandle( stage_run_id=spec.stage_run_id, backend_type="gce", backend_handle=result.instance_name, zone=result.zone, + warm_instance=is_warm, ) except Exception as e: @@ -290,6 +313,13 @@ def get_status(self, handle: RunHandle) -> BackendStatus: # Map Goldfish state to RunStatus if status_str == StageState.RUNNING: + # Warm pool: instance stays RUNNING after Docker exits (idle loop). + # Check for exit code in GCS to detect that the *job* finished + # even though the *instance* is still alive. + if handle.warm_instance: + exit_result = self._launcher._get_exit_code(handle.stage_run_id) + if exit_result.exists and exit_result.code is not None: + return BackendStatus.from_exit_code(exit_result.code) return BackendStatus(status=RunStatus.RUNNING) elif status_str == StageState.COMPLETED: return BackendStatus(status=RunStatus.COMPLETED, exit_code=0) diff --git a/src/goldfish/cloud/adapters/gcp/startup_builder.py b/src/goldfish/cloud/adapters/gcp/startup_builder.py index 77a4bab5..7e1d8ad1 100644 --- a/src/goldfish/cloud/adapters/gcp/startup_builder.py +++ b/src/goldfish/cloud/adapters/gcp/startup_builder.py @@ -850,6 +850,13 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str: mkdir -p "$spec_dir" gsutil -m cp -r "$spec_path/*" "$spec_dir/" 2>&1 || {{ echo "Failed to download job spec"; exit 1; }} + # Kill stale background processes from previous job. + # Guard: only kill if PID is set and non-zero (kill 0 = entire process group!) + [[ -n "${{WATCHDOG_PID:-}}" && "${{WATCHDOG_PID}}" != "0" ]] && kill "$WATCHDOG_PID" 2>/dev/null || true + [[ -n "${{SUPERVISOR_PID:-}}" && "${{SUPERVISOR_PID}}" != "0" ]] && kill "$SUPERVISOR_PID" 2>/dev/null || true + [[ -n "${{LOG_SYNCER_PID:-}}" && "${{LOG_SYNCER_PID}}" != "0" ]] && kill "$LOG_SYNCER_PID" 2>/dev/null || true + [[ -n "${{METADATA_SYNCER_PID:-}}" && "${{METADATA_SYNCER_PID}}" != "0" ]] && kill "$METADATA_SYNCER_PID" 2>/dev/null || true + # Clean workspace between runs echo "Cleaning workspace for new job..." rm -rf /mnt/outputs/* /tmp/triton* /tmp/torch* /tmp/pip* /tmp/stage_times.log @@ -875,16 +882,30 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str: bash "$spec_dir/pre_run.sh" || {{ echo "Pre-run staging failed"; exit 1; }} fi - # Update log paths for new run + # Update ALL paths for the new run local new_run_path new_run_path=$(cat "$spec_dir/run_path" 2>/dev/null || echo "") if [[ -n "$new_run_path" ]]; then + # Reset log files STDOUT_LOG="/tmp/stdout.log" STDERR_LOG="/tmp/stderr.log" : > "$STDOUT_LOG" : > "$STDERR_LOG" + : > /tmp/stage_times.log + + # CRITICAL: Update GCS paths for the NEW run so logs don't + # go to the first run's location + GCS_STDOUT_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/stdout.log" + GCS_STDERR_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/stderr.log" + GCS_METRICS_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/metrics.jsonl" + GCS_SVS_DURING_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/outputs/.goldfish/svs_findings_during.json" + GCS_EXIT_CODE_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/exit_code.txt" fi + # Restart metadata syncer for the new job (Overdrive log sync) + METADATA_SYNCER_STARTED=0 + start_metadata_syncer + # Build and run new Docker command if [[ -f "$spec_dir/docker_cmd.sh" ]]; then log_stage "warm_pool_docker_run_begin" || true @@ -901,12 +922,18 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str: bash "$spec_dir/post_run.sh" || true fi - # Write exit code - if [[ -n "$EXIT_CODE_FILE" ]]; then - echo "$EXIT_CODE" > "$EXIT_CODE_FILE" 2>/dev/null || true + # Write exit code directly to GCS (not the stale local path) + if [[ -n "$GCS_EXIT_CODE_PATH" ]]; then + echo "$EXIT_CODE" | gsutil cp - "$GCS_EXIT_CODE_PATH" 2>/dev/null || true fi - # Upload logs + # Set exit code in instance metadata + gcloud compute instances add-metadata "$INSTANCE_NAME" \\ + --zone="$INSTANCE_ZONE" --project="$PROJECT_ID" \\ + --metadata "goldfish_exit_code=$EXIT_CODE" \\ + --quiet 2>/dev/null || true + + # Upload logs to the new run's GCS paths sync_final_logs || true # Clear self-delete trap again for next idle cycle @@ -1210,6 +1237,8 @@ def build_startup_script( if use_warm_pool: assert warm_pool_idle_timeout_seconds is not None # Narrowing for mypy gcs_pool_path = f"gs://{bucket}/warm_pool/{'{INSTANCE_NAME}'}" + # Export bucket name so idle loop can construct GCS paths for subsequent runs + parts.append(f'GCS_BUCKET="{bucket}"') parts.append(idle_loop_section(warm_pool_idle_timeout_seconds, gcs_pool_path)) parts.append(f'CURRENT_IMAGE="{image}"') parts.append('log_stage "cleanup_begin"') diff --git a/src/goldfish/cloud/adapters/gcp/warm_pool.py b/src/goldfish/cloud/adapters/gcp/warm_pool.py index be319d0f..58f279f0 100644 --- a/src/goldfish/cloud/adapters/gcp/warm_pool.py +++ b/src/goldfish/cloud/adapters/gcp/warm_pool.py @@ -8,6 +8,7 @@ import logging import subprocess +import time from datetime import UTC, datetime from pathlib import Path from typing import TYPE_CHECKING @@ -111,8 +112,6 @@ def try_claim( self._signal_bus.set_signal("goldfish", signal, target=instance_name) # Wait for ACK (30s timeout) - import time - for _ in range(30): ack = self._signal_bus.get_ack("goldfish", target=instance_name) if ack == stage_run_id: diff --git a/src/goldfish/db/database.py b/src/goldfish/db/database.py index 5cdd6782..65bb707f 100644 --- a/src/goldfish/db/database.py +++ b/src/goldfish/db/database.py @@ -5600,10 +5600,11 @@ def register_warm_instance( conn.execute( """ INSERT INTO warm_instances - (instance_name, zone, project_id, machine_type, gpu_count, image_tag, idle_since, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + (instance_name, zone, project_id, machine_type, gpu_count, image_tag, + status, idle_since, created_at) + VALUES (?, ?, ?, ?, ?, ?, 'running', NULL, ?) """, - (instance_name, zone, project_id, machine_type, gpu_count, image_tag, now, now), + (instance_name, zone, project_id, machine_type, gpu_count, image_tag, now), ) def claim_warm_instance(self, machine_type: str, gpu_count: int) -> dict | None: diff --git a/src/goldfish/jobs/_stage_executor_impl.py b/src/goldfish/jobs/_stage_executor_impl.py index f3b8f1e0..c71501c0 100644 --- a/src/goldfish/jobs/_stage_executor_impl.py +++ b/src/goldfish/jobs/_stage_executor_impl.py @@ -290,13 +290,15 @@ def _get_run_handle(self, stage_run_id: str) -> RunHandle: backend_handle = row.get("backend_handle") or stage_run_id zone = row.get("instance_zone") - # Check if this is a warm pool instance + # Check warm pool: single-row lookup by backend_handle (not a scan) is_warm = False - warm_instances = self.db.list_warm_instances(status="running") - for wi in warm_instances: - if wi.get("current_stage_run_id") == stage_run_id: - is_warm = True - break + if backend_handle: + with self.db._conn() as conn: + row_wp = conn.execute( + "SELECT 1 FROM warm_instances WHERE instance_name = ? AND status = 'running'", + (backend_handle,), + ).fetchone() + is_warm = row_wp is not None return RunHandle( stage_run_id=stage_run_id, diff --git a/tests/integration/test_warm_pool_lifecycle.py b/tests/integration/test_warm_pool_lifecycle.py new file mode 100644 index 00000000..64c397d2 --- /dev/null +++ b/tests/integration/test_warm_pool_lifecycle.py @@ -0,0 +1,208 @@ +"""Integration tests for warm pool lifecycle. + +Tests the full claim → signal → release → reap cycle with real DB +but mocked GCE/metadata layer. +""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock, patch + +import pytest + +from goldfish.cloud.adapters.gcp.warm_pool import WarmPoolManager +from goldfish.config import WarmPoolConfig + + +@pytest.fixture +def warm_config() -> WarmPoolConfig: + return WarmPoolConfig(enabled=True, max_instances=2, idle_timeout_minutes=30) + + +@pytest.fixture +def manager(test_db, warm_config) -> WarmPoolManager: + return WarmPoolManager( + db=test_db, + config=warm_config, + signal_bus=None, + bucket="test-bucket", + project_id="test-project", + ) + + +class TestWarmPoolLifecycle: + def test_register_and_claim_roundtrip(self, test_db, manager) -> None: + """Register an instance, claim it, verify state transitions.""" + # Register + assert manager.register_instance("stage-abc", "us-central1-a", "a3-highgpu-1g", 1) + + # Verify registered as running (first job active) + instances = test_db.list_warm_instances(status="running") + assert len(instances) == 1 + + # Release (simulates first job completing → idle loop) + test_db.release_warm_instance("stage-abc") + instances = test_db.list_warm_instances(status="idle") + assert len(instances) == 1 + + # Claim + claimed = test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) + assert claimed is not None + assert claimed["status"] == "claimed" + + # Release again + test_db.release_warm_instance("stage-abc") + instances = test_db.list_warm_instances(status="idle") + assert len(instances) == 1 + + def test_pool_size_cap_enforced(self, test_db, manager) -> None: + """Cannot register more instances than max_instances.""" + assert manager.register_instance("stage-1", "zone-a", "a3-highgpu-1g", 1) + assert manager.register_instance("stage-2", "zone-b", "a3-highgpu-1g", 1) + # Third should be rejected (max=2) + assert not manager.register_instance("stage-3", "zone-c", "a3-highgpu-1g", 1) + + def test_reap_idle_deletes_expired(self, test_db, manager) -> None: + """Reaper should delete instances past idle timeout.""" + manager.register_instance("stage-old", "us-central1-a", "a3-highgpu-1g", 1) + test_db.release_warm_instance("stage-old") # Must be idle to be reaped + + # Backdate idle_since + with test_db._conn() as conn: + old_time = (datetime.now(UTC) - timedelta(hours=2)).isoformat() + conn.execute("UPDATE warm_instances SET idle_since = ?", (old_time,)) + + with patch.object(manager, "_delete_gce_instance"): + reaped = manager.reap_idle() + assert reaped == 1 + assert test_db.list_warm_instances() == [] + + def test_reap_idle_skips_active(self, test_db, manager) -> None: + """Reaper should not delete instances that are running or recently idle.""" + manager.register_instance("stage-active", "us-central1-a", "a3-highgpu-1g", 1) + # Claim it (status = claimed, not idle) + test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) + + with patch.object(manager, "_delete_gce_instance"): + reaped = manager.reap_idle() + assert reaped == 0 + assert len(test_db.list_warm_instances()) == 1 + + def test_reap_all_emergency_cleanup(self, test_db, manager) -> None: + """Emergency reap should delete all instances regardless of state.""" + manager.register_instance("stage-1", "zone-a", "a3-highgpu-1g", 1) + manager.register_instance("stage-2", "zone-b", "a3-highgpu-1g", 1) + test_db.release_warm_instance("stage-1") # One idle, one running + + with patch.object(manager, "_delete_gce_instance"): + reaped = manager.reap_all() + assert reaped == 2 + assert test_db.list_warm_instances() == [] + + def test_try_claim_without_signal_bus_returns_none(self, test_db, manager) -> None: + """Without signal bus, try_claim should release and return None.""" + manager.register_instance("stage-abc", "us-central1-a", "a3-highgpu-1g", 1) + test_db.release_warm_instance("stage-abc") # Must be idle to claim + + result = manager.try_claim( + machine_type="a3-highgpu-1g", + gpu_count=1, + stage_run_id="stage-new", + image="img:v1", + env_map={}, + pre_run_script="echo pre", + post_run_script="echo post", + docker_cmd_script="echo docker", + run_path="runs/stage-new", + ) + # No signal bus → falls through + assert result is None + # Instance should be released back to idle + instances = test_db.list_warm_instances(status="idle") + assert len(instances) == 1 + + def test_try_claim_with_signal_bus_and_ack(self, test_db, warm_config) -> None: + """With signal bus and ACK, try_claim should return a RunHandle.""" + # Create a stage_run so the warm_instances FK constraint is satisfied + with test_db._conn() as conn: + conn.execute("PRAGMA foreign_keys = OFF") + conn.execute( + "INSERT INTO stage_runs (id, workspace_name, version, stage_name, status, state, backend_type, started_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))", + ("stage-new", "ws", "v1", "train", "running", "running", "gce"), + ) + conn.execute("PRAGMA foreign_keys = ON") + + mock_bus = MagicMock() + # ACK immediately + mock_bus.get_ack.return_value = "stage-new" + + mgr = WarmPoolManager( + db=test_db, + config=warm_config, + signal_bus=mock_bus, + bucket="test-bucket", + project_id="test-project", + ) + mgr.register_instance("stage-warm", "us-central1-a", "a3-highgpu-1g", 1) + test_db.release_warm_instance("stage-warm") # Must be idle to claim + + with patch.object(mgr, "_upload_job_spec", return_value="gs://test-bucket/warm_pool/stage-warm/jobs/stage-new"): + result = mgr.try_claim( + machine_type="a3-highgpu-1g", + gpu_count=1, + stage_run_id="stage-new", + image="img:v2", + env_map={"FOO": "bar"}, + pre_run_script="echo pre", + post_run_script="echo post", + docker_cmd_script="echo docker", + run_path="runs/stage-new", + ) + + assert result is not None + assert result.warm_instance is True + assert result.backend_handle == "stage-warm" + assert result.zone == "us-central1-a" + + # Instance should be in 'running' state + instances = test_db.list_warm_instances(status="running") + assert len(instances) == 1 + assert instances[0]["current_stage_run_id"] == "stage-new" + + def test_try_claim_with_ack_timeout_releases(self, test_db, warm_config) -> None: + """If ACK times out, instance should be released.""" + mock_bus = MagicMock() + mock_bus.get_ack.return_value = None # Never ACKs + + mgr = WarmPoolManager( + db=test_db, + config=warm_config, + signal_bus=mock_bus, + bucket="test-bucket", + project_id="test-project", + ) + mgr.register_instance("stage-slow", "us-central1-a", "a3-highgpu-1g", 1) + test_db.release_warm_instance("stage-slow") # Must be idle to claim + + with ( + patch.object(mgr, "_upload_job_spec", return_value="gs://bucket/spec"), + patch("goldfish.cloud.adapters.gcp.warm_pool.time.sleep"), # Skip waiting + ): + result = mgr.try_claim( + machine_type="a3-highgpu-1g", + gpu_count=1, + stage_run_id="stage-timeout", + image="img:v1", + env_map={}, + pre_run_script="", + post_run_script="", + docker_cmd_script="", + run_path="runs/stage-timeout", + ) + + assert result is None + # Instance should be back to idle + instances = test_db.list_warm_instances(status="idle") + assert len(instances) == 1 diff --git a/tests/unit/test_warm_pool.py b/tests/unit/test_warm_pool.py index 0f12837a..0c2bd5ef 100644 --- a/tests/unit/test_warm_pool.py +++ b/tests/unit/test_warm_pool.py @@ -88,7 +88,7 @@ def test_register_warm_instance(self, test_db) -> None: assert len(instances) == 1 assert instances[0]["instance_name"] == "stage-abc123" assert instances[0]["machine_type"] == "a3-highgpu-1g" - assert instances[0]["status"] == "idle" + assert instances[0]["status"] == "running" # Registered as running (first job active) def test_claim_warm_instance_returns_match(self, test_db) -> None: test_db.register_warm_instance( @@ -98,6 +98,8 @@ def test_claim_warm_instance_returns_match(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) + # Register creates as 'running'; release to 'idle' so it's claimable + test_db.release_warm_instance("stage-abc") claimed = test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) assert claimed is not None @@ -112,6 +114,7 @@ def test_claim_warm_instance_no_match(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) + test_db.release_warm_instance("stage-abc") # Wrong machine type claimed = test_db.claim_warm_instance(machine_type="a3-highgpu-8g", gpu_count=8) @@ -126,6 +129,7 @@ def test_claim_warm_instance_atomic(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) + test_db.release_warm_instance("stage-abc") first = test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) second = test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) @@ -140,7 +144,7 @@ def test_release_warm_instance(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) - test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) + # Already 'running' from register; release to idle test_db.release_warm_instance("stage-abc") instances = test_db.list_warm_instances(status="idle") @@ -166,6 +170,10 @@ def test_count_warm_instances(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) + # All registered as 'running'; release 2 to 'idle', claim 1 + test_db.release_warm_instance("stage-0") + test_db.release_warm_instance("stage-1") + test_db.release_warm_instance("stage-2") test_db.claim_warm_instance(machine_type="a3-highgpu-1g", gpu_count=1) assert test_db.count_warm_instances() == 3 @@ -181,7 +189,8 @@ def test_list_expired_warm_instances(self, test_db) -> None: machine_type="a3-highgpu-1g", gpu_count=1, ) - # Backdate the idle_since to 2 hours ago + # Release to idle, then backdate idle_since to 2 hours ago + test_db.release_warm_instance("stage-old") with test_db._conn() as conn: old_time = (datetime.now(UTC) - timedelta(hours=2)).isoformat() conn.execute( @@ -193,7 +202,7 @@ def test_list_expired_warm_instances(self, test_db) -> None: assert len(expired) == 1 assert expired[0]["instance_name"] == "stage-old" - # Fresh instance should not be expired + # Fresh instance (still running) should not be expired test_db.register_warm_instance( instance_name="stage-fresh", zone="us-central1-a", diff --git a/tests/unit/test_warm_pool_startup.py b/tests/unit/test_warm_pool_startup.py index d6b13c50..c91e95e4 100644 --- a/tests/unit/test_warm_pool_startup.py +++ b/tests/unit/test_warm_pool_startup.py @@ -94,3 +94,71 @@ def test_build_startup_script_warm_pool_sets_current_image() -> None: warm_pool_idle_timeout_seconds=900, ) assert 'CURRENT_IMAGE="us-docker.pkg.dev/proj/repo/img:v5"' in script + + +def test_idle_loop_updates_gcs_paths_for_new_run() -> None: + """Idle loop must update GCS log/exit paths for each new job. + + Bug: Without this, logs from the second job would overwrite the first + job's logs in GCS because the paths were set once at script generation. + """ + from goldfish.cloud.adapters.gcp.startup_builder import idle_loop_section + + script = idle_loop_section(idle_timeout_seconds=900, gcs_pool_path="gs://bucket/warm_pool/test") + # Must update GCS paths from the new run_path + assert "GCS_STDOUT_PATH=" in script + assert "GCS_STDERR_PATH=" in script + assert "GCS_EXIT_CODE_PATH=" in script + assert "new_run_path" in script + + +def test_idle_loop_writes_exit_code_to_gcs_directly() -> None: + """Exit code must be written to GCS directly, not via stale local path. + + Bug: EXIT_CODE_FILE pointed to the first run's gcsfuse path. On subsequent + runs the exit code would go to the wrong location. + """ + from goldfish.cloud.adapters.gcp.startup_builder import idle_loop_section + + script = idle_loop_section(idle_timeout_seconds=900, gcs_pool_path="gs://bucket/warm_pool/test") + assert "gsutil cp -" in script # Direct GCS upload, not local file write + + +def test_build_startup_script_warm_pool_exports_bucket() -> None: + """Warm pool script must export GCS_BUCKET for idle loop path construction.""" + from goldfish.cloud.adapters.gcp.startup_builder import build_startup_script + + script = build_startup_script( + bucket="my-artifacts", + bucket_prefix="runs", + run_path="stage-abc", + image="img:v1", + entrypoint="/bin/bash", + env_map={}, + warm_pool_idle_timeout_seconds=900, + ) + assert 'GCS_BUCKET="my-artifacts"' in script + + +def test_idle_loop_kills_stale_background_processes() -> None: + """Idle loop must kill watchdog/supervisor/log syncer between jobs. + + Bug: These processes track the old Docker PID. Without killing them, + the watchdog could kill the instance mid-job, and the log syncer + would upload to stale GCS paths. + """ + from goldfish.cloud.adapters.gcp.startup_builder import idle_loop_section + + script = idle_loop_section(idle_timeout_seconds=900, gcs_pool_path="gs://bucket/warm_pool/test") + assert "WATCHDOG_PID" in script + assert "SUPERVISOR_PID" in script + assert "LOG_SYNCER_PID" in script + assert "METADATA_SYNCER_PID" in script + + +def test_idle_loop_restarts_metadata_syncer() -> None: + """Metadata syncer must be restarted for each new job (Overdrive log sync).""" + from goldfish.cloud.adapters.gcp.startup_builder import idle_loop_section + + script = idle_loop_section(idle_timeout_seconds=900, gcs_pool_path="gs://bucket/warm_pool/test") + assert "start_metadata_syncer" in script