|
17 | 17 |
|
18 | 18 | DEFAULT_GRID_SEARCH_SCRIPT = "/app/run_grid_search.py" |
19 | 19 | WORKSPACE_GRID_SEARCH_SCRIPT = "/home/dev/workspace/run_grid_search.py" |
| 20 | +PROCESS_SHUTDOWN_TIMEOUT_SECONDS = 10 |
| 21 | +TEE_THREAD_JOIN_TIMEOUT_SECONDS = 5 |
20 | 22 |
|
21 | 23 |
|
22 | 24 | @dataclass(frozen=True) |
@@ -172,9 +174,7 @@ def _validate_gpu_assignments(invocations: list[JobInvocation]) -> None: |
172 | 174 | return |
173 | 175 |
|
174 | 176 | available_set = set(available) |
175 | | - unavailable = { |
176 | | - gpu: names for gpu, names in requested.items() if gpu not in available_set |
177 | | - } |
| 177 | + unavailable = {gpu: names for gpu, names in requested.items() if gpu not in available_set} |
178 | 178 | if unavailable: |
179 | 179 | details = ", ".join( |
180 | 180 | f"GPU {gpu} requested by {', '.join(names)}" |
@@ -413,6 +413,7 @@ def run(preset: Preset, *, results_dir: Path, dry_run: bool = False) -> int: |
413 | 413 | int |
414 | 414 | ``0`` if all jobs exited 0 (or ``dry_run`` was set), ``1`` otherwise. |
415 | 415 | """ |
| 416 | + results_dir = results_dir.resolve() |
416 | 417 | results_dir.mkdir(parents=True, exist_ok=True) |
417 | 418 | invocations = build_invocations(preset, results_dir=results_dir) |
418 | 419 | _validate_gpu_assignments(invocations) |
@@ -444,23 +445,33 @@ def _terminate_all(jobs: list[_RunningJob]) -> None: |
444 | 445 | Parameters |
445 | 446 | ---------- |
446 | 447 | jobs : list of _RunningJob |
447 | | - Jobs whose subprocesses should be SIGTERM'd, waited on, and whose tee |
448 | | - threads should be joined. |
| 448 | + Jobs whose subprocesses should be SIGTERM'd, escalated to SIGKILL if |
| 449 | + needed, and whose tee threads should be joined with bounded waits. |
449 | 450 | """ |
450 | 451 | for j in jobs: |
451 | 452 | if j.proc.poll() is None: |
452 | 453 | j.proc.terminate() |
453 | 454 | for j in jobs: |
454 | | - j.proc.wait() |
455 | | - j.tee_thread.join() |
| 455 | + try: |
| 456 | + j.proc.wait(timeout=PROCESS_SHUTDOWN_TIMEOUT_SECONDS) |
| 457 | + except subprocess.TimeoutExpired: |
| 458 | + j.proc.kill() |
| 459 | + try: |
| 460 | + j.proc.wait(timeout=PROCESS_SHUTDOWN_TIMEOUT_SECONDS) |
| 461 | + except subprocess.TimeoutExpired: |
| 462 | + print( |
| 463 | + f"[{_ts()}] {j.inv.job.name} did not exit after SIGKILL", |
| 464 | + file=sys.stderr, |
| 465 | + ) |
| 466 | + j.tee_thread.join(timeout=TEE_THREAD_JOIN_TIMEOUT_SECONDS) |
456 | 467 |
|
457 | 468 |
|
458 | 469 | def _prepare_pixi_env(pixi_env: str) -> None: |
459 | 470 | """Prepare a pixi environment before parallel job launch. |
460 | 471 |
|
461 | | - ``pixi run`` is deliberately called once per env even when the interpreter |
462 | | - directory already exists, because pixi may still need to materialize PyPI |
463 | | - packages into that environment after image startup. |
| 472 | + Preparation is skipped when a baked interpreter is already available, when |
| 473 | + prebuilt environments are required, or when ``SAMPLEWORKS_SKIP_ENV_PREPARE`` |
| 474 | + is truthy. Otherwise, ``pixi run`` is called once for the environment. |
464 | 475 |
|
465 | 476 | Parameters |
466 | 477 | ---------- |
@@ -576,24 +587,39 @@ def _spawn(inv: JobInvocation) -> _RunningJob: |
576 | 587 | inv.log_path.parent.mkdir(parents=True, exist_ok=True) |
577 | 588 | inv.output_dir.mkdir(parents=True, exist_ok=True) |
578 | 589 | log_file = open(inv.log_path, "wb") |
| 590 | + proc: subprocess.Popen[bytes] | None = None |
| 591 | + thread: threading.Thread | None = None |
579 | 592 | try: |
580 | 593 | proc = subprocess.Popen( |
581 | 594 | inv.argv, |
582 | 595 | env=inv.env, |
| 596 | + cwd=str(_pixi_project_dir()), |
583 | 597 | stdout=subprocess.PIPE, |
584 | 598 | stderr=subprocess.STDOUT, |
585 | 599 | bufsize=0, |
586 | 600 | ) |
| 601 | + if proc.stdout is None: |
| 602 | + raise RuntimeError(f"Job {inv.job.name!r} started without a stdout pipe") |
| 603 | + thread = threading.Thread( |
| 604 | + target=_tee, |
| 605 | + args=(inv.job.name, proc.stdout, log_file), |
| 606 | + daemon=True, |
| 607 | + ) |
| 608 | + thread.start() |
587 | 609 | except BaseException: |
588 | 610 | log_file.close() |
| 611 | + if proc is not None and proc.poll() is None: |
| 612 | + proc.kill() |
| 613 | + try: |
| 614 | + proc.wait(timeout=PROCESS_SHUTDOWN_TIMEOUT_SECONDS) |
| 615 | + except subprocess.TimeoutExpired: |
| 616 | + print( |
| 617 | + f"[{_ts()}] {inv.job.name} did not exit after failed spawn cleanup", |
| 618 | + file=sys.stderr, |
| 619 | + ) |
589 | 620 | raise |
590 | | - assert proc.stdout is not None |
591 | | - thread = threading.Thread( |
592 | | - target=_tee, |
593 | | - args=(inv.job.name, proc.stdout, log_file), |
594 | | - daemon=True, |
595 | | - ) |
596 | | - thread.start() |
| 621 | + if proc is None or thread is None: |
| 622 | + raise RuntimeError(f"Job {inv.job.name!r} failed to initialize") |
597 | 623 | print(f"[{_ts()}] launched {inv.job.name} (pid {proc.pid})", file=sys.stderr) |
598 | 624 | return _RunningJob(inv=inv, proc=proc, tee_thread=thread) |
599 | 625 |
|
|
0 commit comments