Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/goldfish/cloud/adapters/gcp/gce_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class GCELaunchResult:

instance_name: str
zone: str
warm_reuse: bool = False # True if dispatched to an existing warm pool instance


class GCELauncher:
Expand Down Expand Up @@ -134,6 +135,8 @@ def launch_instance(
use_capacity_search: bool = True,
goldfish_env: dict[str, str] | None = None,
preemptible: bool | None = None,
warm_pool_idle_timeout_seconds: int | None = None,
warm_pool_manager: Any | None = None,
) -> GCELaunchResult:
"""Launch GCE instance for stage run.

Expand Down Expand Up @@ -349,8 +352,38 @@ def launch_instance(
log_sync_interval=log_sync_interval,
# GPU flag - profile-based, not runtime nvidia-smi detection
gpu_count=gpu_count or 0,
# Warm pool: if set, instance enters idle loop after Docker exits instead of self-deleting
warm_pool_idle_timeout_seconds=warm_pool_idle_timeout_seconds,
)

# Try warm pool reuse BEFORE creating a new instance.
# All scripts (pre_run, post_run, docker_cmd, env) are ready at this point.
if warm_pool_manager:
handle = warm_pool_manager.try_claim(
machine_type=machine_type,
gpu_count=gpu_count,
stage_run_id=stage_run_id,
image=image_tag,
env_map=env_map,
pre_run_script="\n".join(pre_run_cmds),
post_run_script="\n".join(post_run_cmds),
docker_cmd_script=self._build_docker_cmd_script(
image_tag,
env_map,
docker_cmd,
gpu_count,
),
run_path=stage_run_id,
)
if handle:
# Use stage_run_id as instance_name so GCS paths (logs, exit code)
# resolve to the current run, not the warm VM's original name.
return GCELaunchResult(
instance_name=handle.backend_handle,
zone=handle.zone or "",
warm_reuse=True,
)

if use_capacity_search and self.resources:
# Use ResourceLauncher for capacity-aware search
return self._launch_with_capacity_search(
Expand All @@ -373,6 +406,37 @@ def launch_instance(
preemptible=preemptible,
)

def _build_docker_cmd_script(
self,
image: str,
env_map: dict[str, str],
cmd: str,
gpu_count: int,
) -> str:
"""Build a standalone docker run script for warm pool reuse.

This generates the same docker run command that startup_builder creates,
but as a standalone bash script that the warm pool idle loop can execute.
"""
import shlex

gpu_flag = "--gpus all" if gpu_count > 0 else ""
env_flags = " ".join(f"-e {k}={shlex.quote(v)}" for k, v in env_map.items())
mounts = "-v /mnt/entrypoint.sh:/entrypoint.sh -v /mnt/gcs:/mnt/gcs -v /mnt/inputs:/mnt/inputs -v /mnt/outputs:/mnt/outputs"
cmd_part = f" {cmd}" if cmd else ""

return f"""#!/bin/bash
set -euo pipefail
docker run --rm {gpu_flag} \\
--ipc=host \\
--ulimit memlock=-1 --ulimit stack=67108864 \\
--shm-size=16g \\
{env_flags} \\
{mounts} \\
--entrypoint /bin/bash \\
{image}{cmd_part}
"""

def _launch_with_capacity_search(
self,
instance_name: str,
Expand Down
32 changes: 31 additions & 1 deletion src/goldfish/cloud/adapters/gcp/run_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ def launch(self, spec: RunSpec) -> RunHandle:
# Use spec.gpu_type if provided, else default to T4
gpu_type = spec.gpu_type or "nvidia-tesla-t4"

# Determine warm pool idle timeout from config
warm_pool_timeout: int | None = None
if self._warm_pool and self._warm_pool.is_enabled_for(spec.profile):
warm_pool_timeout = self._warm_pool._config.idle_timeout_minutes * 60

result = self._launcher.launch_instance(
image_tag=spec.image,
stage_run_id=spec.stage_run_id,
Expand All @@ -247,14 +252,32 @@ def launch(self, spec: RunSpec) -> RunHandle:
gpu_count=gpu_count,
zones=self._zones,
goldfish_env=goldfish_env,
preemptible=spec.spot, # Pass spot preference to launcher
preemptible=spec.spot,
warm_pool_idle_timeout_seconds=warm_pool_timeout,
# Pass warm pool manager for reuse dispatch inside launch_instance
warm_pool_manager=self._warm_pool if warm_pool_timeout else None,
)

is_warm = getattr(result, "warm_reuse", False)

# Only register fresh launches (not warm reuse hits, which are already tracked).
# Registration uses status='running' so the reaper won't touch it mid-job.
# The startup script's idle loop calls release (→ idle) after Docker exits.
if not is_warm and warm_pool_timeout and self._warm_pool:
is_warm = self._warm_pool.register_instance(
instance_name=result.instance_name,
zone=result.zone,
machine_type=machine_type,
gpu_count=gpu_count,
image_tag=spec.image,
)

return RunHandle(
stage_run_id=spec.stage_run_id,
backend_type="gce",
backend_handle=result.instance_name,
zone=result.zone,
warm_instance=is_warm,
)

except Exception as e:
Expand Down Expand Up @@ -290,6 +313,13 @@ def get_status(self, handle: RunHandle) -> BackendStatus:

# Map Goldfish state to RunStatus
if status_str == StageState.RUNNING:
# Warm pool: instance stays RUNNING after Docker exits (idle loop).
# Check for exit code in GCS to detect that the *job* finished
# even though the *instance* is still alive.
if handle.warm_instance:
exit_result = self._launcher._get_exit_code(handle.stage_run_id)
if exit_result.exists and exit_result.code is not None:
return BackendStatus.from_exit_code(exit_result.code)
return BackendStatus(status=RunStatus.RUNNING)
elif status_str == StageState.COMPLETED:
return BackendStatus(status=RunStatus.COMPLETED, exit_code=0)
Expand Down
39 changes: 34 additions & 5 deletions src/goldfish/cloud/adapters/gcp/startup_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,13 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str:
mkdir -p "$spec_dir"
gsutil -m cp -r "$spec_path/*" "$spec_dir/" 2>&1 || {{ echo "Failed to download job spec"; exit 1; }}

# Kill stale background processes from previous job.
# Guard: only kill if PID is set and non-zero (kill 0 = entire process group!)
[[ -n "${{WATCHDOG_PID:-}}" && "${{WATCHDOG_PID}}" != "0" ]] && kill "$WATCHDOG_PID" 2>/dev/null || true
[[ -n "${{SUPERVISOR_PID:-}}" && "${{SUPERVISOR_PID}}" != "0" ]] && kill "$SUPERVISOR_PID" 2>/dev/null || true
[[ -n "${{LOG_SYNCER_PID:-}}" && "${{LOG_SYNCER_PID}}" != "0" ]] && kill "$LOG_SYNCER_PID" 2>/dev/null || true
[[ -n "${{METADATA_SYNCER_PID:-}}" && "${{METADATA_SYNCER_PID}}" != "0" ]] && kill "$METADATA_SYNCER_PID" 2>/dev/null || true

# Clean workspace between runs
echo "Cleaning workspace for new job..."
rm -rf /mnt/outputs/* /tmp/triton* /tmp/torch* /tmp/pip* /tmp/stage_times.log
Expand All @@ -875,16 +882,30 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str:
bash "$spec_dir/pre_run.sh" || {{ echo "Pre-run staging failed"; exit 1; }}
fi

# Update log paths for new run
# Update ALL paths for the new run
local new_run_path
new_run_path=$(cat "$spec_dir/run_path" 2>/dev/null || echo "")
if [[ -n "$new_run_path" ]]; then
# Reset log files
STDOUT_LOG="/tmp/stdout.log"
STDERR_LOG="/tmp/stderr.log"
: > "$STDOUT_LOG"
: > "$STDERR_LOG"
: > /tmp/stage_times.log

# CRITICAL: Update GCS paths for the NEW run so logs don't
# go to the first run's location
GCS_STDOUT_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/stdout.log"
GCS_STDERR_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/stderr.log"
GCS_METRICS_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/metrics.jsonl"
GCS_SVS_DURING_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/outputs/.goldfish/svs_findings_during.json"
GCS_EXIT_CODE_PATH="gs://${{GCS_BUCKET:-}}/runs/$new_run_path/logs/exit_code.txt"
fi

# Restart metadata syncer for the new job (Overdrive log sync)
METADATA_SYNCER_STARTED=0
start_metadata_syncer

# Build and run new Docker command
if [[ -f "$spec_dir/docker_cmd.sh" ]]; then
log_stage "warm_pool_docker_run_begin" || true
Expand All @@ -901,12 +922,18 @@ def idle_loop_section(idle_timeout_seconds: int, gcs_pool_path: str) -> str:
bash "$spec_dir/post_run.sh" || true
fi

# Write exit code
if [[ -n "$EXIT_CODE_FILE" ]]; then
echo "$EXIT_CODE" > "$EXIT_CODE_FILE" 2>/dev/null || true
# Write exit code directly to GCS (not the stale local path)
if [[ -n "$GCS_EXIT_CODE_PATH" ]]; then
echo "$EXIT_CODE" | gsutil cp - "$GCS_EXIT_CODE_PATH" 2>/dev/null || true
fi

# Upload logs
# Set exit code in instance metadata
gcloud compute instances add-metadata "$INSTANCE_NAME" \\
--zone="$INSTANCE_ZONE" --project="$PROJECT_ID" \\
--metadata "goldfish_exit_code=$EXIT_CODE" \\
--quiet 2>/dev/null || true

# Upload logs to the new run's GCS paths
sync_final_logs || true

# Clear self-delete trap again for next idle cycle
Expand Down Expand Up @@ -1210,6 +1237,8 @@ def build_startup_script(
if use_warm_pool:
assert warm_pool_idle_timeout_seconds is not None # Narrowing for mypy
gcs_pool_path = f"gs://{bucket}/warm_pool/{'{INSTANCE_NAME}'}"
# Export bucket name so idle loop can construct GCS paths for subsequent runs
parts.append(f'GCS_BUCKET="{bucket}"')
parts.append(idle_loop_section(warm_pool_idle_timeout_seconds, gcs_pool_path))
parts.append(f'CURRENT_IMAGE="{image}"')
parts.append('log_stage "cleanup_begin"')
Expand Down
3 changes: 1 addition & 2 deletions src/goldfish/cloud/adapters/gcp/warm_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import logging
import subprocess
import time
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -111,8 +112,6 @@ def try_claim(
self._signal_bus.set_signal("goldfish", signal, target=instance_name)

# Wait for ACK (30s timeout)
import time

for _ in range(30):
ack = self._signal_bus.get_ack("goldfish", target=instance_name)
if ack == stage_run_id:
Expand Down
7 changes: 4 additions & 3 deletions src/goldfish/db/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5600,10 +5600,11 @@ def register_warm_instance(
conn.execute(
"""
INSERT INTO warm_instances
(instance_name, zone, project_id, machine_type, gpu_count, image_tag, idle_since, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
(instance_name, zone, project_id, machine_type, gpu_count, image_tag,
status, idle_since, created_at)
VALUES (?, ?, ?, ?, ?, ?, 'running', NULL, ?)
""",
(instance_name, zone, project_id, machine_type, gpu_count, image_tag, now, now),
(instance_name, zone, project_id, machine_type, gpu_count, image_tag, now),
)

def claim_warm_instance(self, machine_type: str, gpu_count: int) -> dict | None:
Expand Down
14 changes: 8 additions & 6 deletions src/goldfish/jobs/_stage_executor_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,15 @@ def _get_run_handle(self, stage_run_id: str) -> RunHandle:
backend_handle = row.get("backend_handle") or stage_run_id
zone = row.get("instance_zone")

# Check if this is a warm pool instance
# Check warm pool: single-row lookup by backend_handle (not a scan)
is_warm = False
warm_instances = self.db.list_warm_instances(status="running")
for wi in warm_instances:
if wi.get("current_stage_run_id") == stage_run_id:
is_warm = True
break
if backend_handle:
with self.db._conn() as conn:
row_wp = conn.execute(
"SELECT 1 FROM warm_instances WHERE instance_name = ? AND status = 'running'",
(backend_handle,),
).fetchone()
is_warm = row_wp is not None

return RunHandle(
stage_run_id=stage_run_id,
Expand Down
Loading
Loading