Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/basilica-sdk-python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "basilica-sdk-python"
version = "0.29.4"
version = "0.29.5"
edition = "2021"
authors = ["Basilica Team"]
description = "Python bindings for the Basilica SDK"
Expand Down
2 changes: 1 addition & 1 deletion crates/basilica-sdk-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "maturin"

[project]
name = "basilica-sdk"
version = "0.29.4"
version = "0.29.5"
description = "Python SDK for deploying containerized applications on the Basilica GPU cloud"
readme = "README.md"
license = { text = "MIT OR Apache-2.0" }
Expand Down
60 changes: 52 additions & 8 deletions crates/basilica-sdk-python/python/basilica/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,39 @@ def __init__(
_SHELL_SCRIPT_LAUNCHERS = frozenset({"bash", "sh", "/bin/bash", "/bin/sh"})


def _normalize_bench_param(bench: Any) -> str:
"""
basilica-backend#661 / SDK-S2: collapse the bench API surface to a
bool opt-in.

``bench=True`` -> ``"on-start"`` wire token (probe scheduled).
``bench=False`` -> ``"off"`` wire token (no probe; default).
``bench="on-start" | "off"`` -> back-compat, emits DeprecationWarning
pointing at the bool form. Any other input raises ValidationError
via the downstream wire-shape check (preserves the existing
"invalid bench mode" error path).

The wire shape stays the operator's canonical ``"on-start" | "off"``
string. Only the user-facing parameter type narrows.
"""
if isinstance(bench, bool):
return "on-start" if bench else "off"
if isinstance(bench, str):
if bench in ("on-start", "off"):
warnings.warn(
f"bench={bench!r} (str) is deprecated and will be removed "
f"in the next major. Use bench="
f"{'True' if bench == 'on-start' else 'False'} instead. "
f"See basilica-backend#661 / SDK-S2.",
DeprecationWarning,
stacklevel=3,
)
return bench
# Pass through to the downstream validator so the existing error
# path stays consistent (raises ValidationError with field="bench").
return bench # type: ignore[return-value]


def _shell_join_preserving_vars(command: List[str]) -> str:
"""
Join an argv list into a single shell command string for the operator's
Expand Down Expand Up @@ -1115,7 +1148,7 @@ def deploy_distributed(
provider_filter: Optional[ProviderFilter] = None,
topology_spread: str = "provider-aware",
nccl_env: Optional[Dict[str, str]] = None,
bench: str = "off",
bench: Union[bool, str] = False,
bench_placement: str = "preferred",
rendezvous_backend: str = "etcd-v2",
command: Optional[List[str]] = None,
Expand Down Expand Up @@ -1156,10 +1189,14 @@ def deploy_distributed(
| none`. Default `provider-aware`. SDK arch § 4.
nccl_env: NCCL env vars merged on top of operator defaults.
User values win on collision.
bench: `"on-start"` to schedule a 2-rank NCCL bench probe in the
user's namespace alongside workers (counts against the
namespace rank budget; result lands on `training.bench`).
`"off"` (default) skips the probe. SDK arch § 7.
bench: ``True`` to opt in to the per-UD NCCL bench probe;
``False`` (default) skips the probe. Reads back as
``training.bench`` (``BenchResult | None``) after the UD
reaches a terminal state. Replaces the ``"on-start"`` /
``"off"`` string modes (still accepted with
``DeprecationWarning``; removed in the next major). See
``basilica-backend#661`` / SDK-S2 for the rationale.
SDK arch § 7.
bench_placement: Placement policy for the bench Pod pair on
multi-tenant clusters. `"preferred"` (default) lets the
bench fall back off the worker pair when those nodes have
Expand Down Expand Up @@ -1225,6 +1262,11 @@ def deploy_distributed(
field="wait_for_bench",
value=wait_for_bench,
)
# basilica-backend#661 / SDK-S2: collapse bench API to bool.
# ``True`` -> "on-start"; ``False`` -> "off"; legacy str modes
# remain accepted with DeprecationWarning, normalized to the
# wire token here.
bench = _normalize_bench_param(bench)

request_dict = self._build_distributed_request(
name=name,
Expand Down Expand Up @@ -1295,7 +1337,7 @@ def deploy_distributed_managed(
provider_filter: Optional[ProviderFilter] = None,
topology_spread: str = "provider-aware",
nccl_env: Optional[Dict[str, str]] = None,
bench: str = "off",
bench: Union[bool, str] = False,
bench_placement: str = "preferred",
rendezvous_backend: str = "etcd-v2",
command: Optional[List[str]] = None,
Expand Down Expand Up @@ -2321,7 +2363,7 @@ async def deploy_distributed_async(
provider_filter: Optional[ProviderFilter] = None,
topology_spread: str = "provider-aware",
nccl_env: Optional[Dict[str, str]] = None,
bench: str = "off",
bench: Union[bool, str] = False,
bench_placement: str = "preferred",
rendezvous_backend: str = "etcd-v2",
command: Optional[List[str]] = None,
Expand Down Expand Up @@ -2350,6 +2392,8 @@ async def deploy_distributed_async(
field="wait_for_bench",
value=wait_for_bench,
)
# basilica-backend#661 / SDK-S2: collapse bench API to bool.
bench = _normalize_bench_param(bench)

request_dict = self._build_distributed_request(
name=name,
Expand Down Expand Up @@ -2414,7 +2458,7 @@ def deploy_distributed_managed_async(
provider_filter: Optional[ProviderFilter] = None,
topology_spread: str = "provider-aware",
nccl_env: Optional[Dict[str, str]] = None,
bench: str = "off",
bench: Union[bool, str] = False,
bench_placement: str = "preferred",
rendezvous_backend: str = "etcd-v2",
command: Optional[List[str]] = None,
Expand Down
9 changes: 7 additions & 2 deletions crates/basilica-sdk-python/python/basilica/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def distributed(
provider_filter: Optional[Union[ProviderFilter, Dict[str, List[str]]]] = None,
topology_spread: str = "provider-aware",
nccl_env: Optional[Dict[str, str]] = None,
bench: str = "off",
bench: Union[bool, str] = False,
rendezvous_backend: str = "etcd-v2",
env: Optional[Dict[str, str]] = None,
pip_packages: Optional[List[str]] = None,
Expand Down Expand Up @@ -512,7 +512,12 @@ def distributed(
provider_filter: ProviderFilter or `{"include": [...], "exclude": [...]}` dict.
topology_spread: One of `pack | provider-aware | region-aware | none`.
nccl_env: NCCL env vars merged on top of operator defaults.
bench: `on-start` to schedule a 2-rank NCCL bench probe; `off` (default).
bench: ``True`` to opt in to the per-UD NCCL bench probe; ``False``
(default) skips it. Reads back as ``training.bench``
(``BenchResult | None``) post-terminal. The legacy str modes
``"on-start"`` / ``"off"`` remain accepted with
``DeprecationWarning``; removed in the next major. See
``basilica-backend#661`` / SDK-S2.
rendezvous_backend: `etcd-v2` (default) | `c10d` | `static`.
env: Environment variables passed to the worker pods.
pip_packages: Additional pip packages to install.
Expand Down
167 changes: 117 additions & 50 deletions crates/basilica-sdk-python/python/basilica/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
import time
import warnings
from dataclasses import dataclass, field
from datetime import datetime
from types import TracebackType
Expand Down Expand Up @@ -510,26 +511,90 @@ def bench(self) -> Optional[BenchResult]:
@property
def bench_status(self) -> Optional[BenchStatus]:
"""
Issue B/N (refs #506): full bench probe state, including
explicit lifecycle [`phase`], `started_at` / `completed_at`
timing, and a human-readable `message`.
DEPRECATED (basilica-backend#661, SDK-S2): full bench probe state.

Returns `None` when bench is off (`mode != on-start`) or the
operator has not yet observed the Job at all. Otherwise the
returned [`BenchStatus`] reflects the operator's
`status.distributed.bench` exactly.
Use ``training.bench`` (``BenchResult | None``) for the result
payload, and ``training.bench_diagnostics`` (``dict | None``)
for the rarely-needed debug detail (phase / message / timings).
This four-property enum surface (``is_terminal`` / ``is_successful``
/ ``is_failed`` / ``is_skipped``) is being collapsed because the
diagnostic is OPT-IN: most callers want a yes/no answer, not a
four-phase ceremony.

DECOUPLING CONTRACT: this surface is INDEPENDENT of
`WorldStatus.ready`. `wait_until_min_world` polls workers only;
bench has its own opt-in waiter, `wait_until_bench_complete`.
Returns ``None`` when bench is off (``mode != on-start``) or the
operator has not yet observed the Job at all. Otherwise the
returned :class:`BenchStatus` reflects the operator's
``status.distributed.bench`` exactly. Removed in the next major.
"""
warnings.warn(
"DistributedTraining.bench_status is deprecated and will be "
"removed in the next major. Use training.bench (BenchResult | "
"None) for the result and training.bench_diagnostics (dict | "
"None) for the rare debug detail. See "
"basilica-backend#661 / SDK-S2.",
DeprecationWarning,
stacklevel=2,
)
return self._bench_status_no_warn

@property
def _bench_status_no_warn(self) -> Optional[BenchStatus]:
"""Internal: read the full BenchStatus without emitting the
deprecation warning. Used by ``bench_diagnostics`` and the
legacy ``wait_until_bench_complete`` waiter so they remain
callable without double-warning the user."""
if self._cached_status is None:
self.refresh()
bench_raw = (self._cached_status or {}).get("distributed", {}).get("bench")
if not bench_raw:
return None
return BenchStatus.from_status_dict(bench_raw)
Comment on lines +541 to 551
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Collapse mode="off" to None in the internal bench accessor.

The operator can still publish a bookkeeping bench block when the user opted out, e.g. {"mode": "off", "phase": "Skipped"}. Returning a BenchStatus here breaks the documented None-on-bench-off behavior for bench_status and wait_until_bench_complete[_async], and it can make wait_for_bench="required" fail even though bench was explicitly disabled.

💡 Suggested fix
     def _bench_status_no_warn(self) -> Optional[BenchStatus]:
         """Internal: read the full BenchStatus without emitting the
         deprecation warning. Used by ``bench_diagnostics`` and the
         legacy ``wait_until_bench_complete`` waiter so they remain
         callable without double-warning the user."""
         if self._cached_status is None:
             self.refresh()
         bench_raw = (self._cached_status or {}).get("distributed", {}).get("bench")
         if not bench_raw:
             return None
+        if bench_raw.get("mode", "off") != "on-start":
+            return None
         return BenchStatus.from_status_dict(bench_raw)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _bench_status_no_warn(self) -> Optional[BenchStatus]:
"""Internal: read the full BenchStatus without emitting the
deprecation warning. Used by ``bench_diagnostics`` and the
legacy ``wait_until_bench_complete`` waiter so they remain
callable without double-warning the user."""
if self._cached_status is None:
self.refresh()
bench_raw = (self._cached_status or {}).get("distributed", {}).get("bench")
if not bench_raw:
return None
return BenchStatus.from_status_dict(bench_raw)
def _bench_status_no_warn(self) -> Optional[BenchStatus]:
"""Internal: read the full BenchStatus without emitting the
deprecation warning. Used by ``bench_diagnostics`` and the
legacy ``wait_until_bench_complete`` waiter so they remain
callable without double-warning the user."""
if self._cached_status is None:
self.refresh()
bench_raw = (self._cached_status or {}).get("distributed", {}).get("bench")
if not bench_raw:
return None
if bench_raw.get("mode", "off") != "on-start":
return None
return BenchStatus.from_status_dict(bench_raw)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/basilica-sdk-python/python/basilica/distributed.py` around lines 541 -
551, The internal accessor _bench_status_no_warn should treat a published bench
block with "mode": "off" as no bench; after retrieving bench_raw in
_bench_status_no_warn, check if bench_raw.get("mode") == "off" (or equivalent
string) and return None instead of calling BenchStatus.from_status_dict, so
callers like bench_status and wait_until_bench_complete[_async] see None when
benching was opted out; keep the existing behavior for other modes and only
collapse the "off" case to None.


@property
def bench_diagnostics(self) -> Optional[Dict[str, Any]]:
"""
basilica-backend#661 / SDK-S2: simplified bench debug surface.

Returns ``None`` when bench was not requested (``mode != on-start``)
OR when the operator has not yet published a bench block.
Otherwise returns a ``dict`` with the small set of fields a
researcher might inspect when ``training.bench`` is ``None`` and
they want to know WHY the probe didn't measure:

- ``mode``: ``"on-start"`` (the only non-off value the user can
set) or ``"off"``.
- ``phase``: operator's bench-Job lifecycle phase
(``"Pending"`` / ``"Running"`` / ``"Succeeded"`` /
``"Failed"`` / ``"TimedOut"`` / ``"Skipped"``).
- ``message``: human-readable reason from the operator.
- ``started_at`` / ``completed_at``: timing.
- ``last_attempt_at`` / ``last_attempt_outcome``: most recent
attempt outcome (e.g. ``"skipped"`` when workers exited
before the bench-controller observed them).

Most users only ever read ``training.bench``; this attribute is
the escape hatch for the rare case where they need to debug a
``None`` result. Replaces the multi-property ``BenchStatus``
enum exposed via ``bench_status``.
"""
bs = self._bench_status_no_warn
if bs is None:
return None
# Mode=off means "user didn't ask for a probe". Surfacing
# diagnostics in that case would be confusing -- there's nothing
# to debug. Collapse to None.
if bs.mode == "off":
return None
return {
"mode": bs.mode,
"phase": bs.phase,
"message": bs.message,
"started_at": bs.started_at,
"completed_at": bs.completed_at,
"last_attempt_at": bs.last_attempt_at,
"last_attempt_outcome": bs.last_attempt_outcome,
}

def metrics(self) -> DistributedMetrics:
"""Platform-side metric snapshot for this UD (SDK arch § 6)."""
if self._cached_status is None:
Expand Down Expand Up @@ -709,44 +774,36 @@ async def wait_until_min_world_async(self, timeout: int = 300) -> None:

def wait_until_bench_complete(self, timeout: int = 1500) -> Optional[BenchStatus]:
"""
Issue B/N (refs #506): OPT-IN waiter for the bench probe.

Most callers do NOT need this; bench is best-effort per
`basilica_bench.py:75-78` and the world is considered ready
when workers reach Ready (see :py:meth:`wait_until_min_world`).
This method exists for the small set of callers that DO want
the bench measurement before continuing (e.g. a research run
that records busbw metadata into checkpoint headers).

Returns the terminal :py:class:`BenchStatus` once `phase`
enters a terminal state. Closes #480: the four terminal phases
are:

- `Succeeded` -- bench probe produced a measurement; the result
payload is on `BenchStatus.result` (mirrored on
`DistributedTraining.bench` for backward compatibility).
- `Failed` -- bench probe ran but errored. See `message`.
- `TimedOut` -- bench probe's own deadline elapsed before
completion. See `message`.
- `Skipped` -- the operator decided not to run the bench probe
(e.g. workers exited before the bench-controller observed
them). See `message` for the reason. NOT a failure; the
workload itself may have completed cleanly.

If `bench.mode != on-start` returns `None` immediately (nothing
to wait on). Polls every 5s.

Default timeout matches the operator's `BENCH_ACTIVE_DEADLINE_SECONDS`
(1500s = 25 min). Raises `TimeoutError` past the deadline if the
bench has not reached a terminal phase. Callers handling a
terminal `Skipped` should branch on `bs.is_skipped` (or
`bs.phase != "Succeeded"`); see `examples/20_distributed_diloco.py`
and `examples/22_distributed_with_bench.py`.
DEPRECATED (basilica-backend#661, SDK-S2): OPT-IN waiter for the bench probe.

Use ``training.bench`` (returns ``BenchResult | None``) after the
UD reaches a terminal state. The four-phase explicit-wait
ceremony is being collapsed: most callers want to know whether
the probe measured (``bench is not None``), not which of the
four terminal phases it landed in. For the rare debug path use
``training.bench_diagnostics`` (dict with phase / message /
timings).

Remains functional for two minor versions. Returns the terminal
:class:`BenchStatus` once ``phase`` enters a terminal state
(``Succeeded`` / ``Failed`` / ``TimedOut`` / ``Skipped``).
``mode != on-start`` returns ``None`` immediately. Raises
``TimeoutError`` if the bench has not reached a terminal phase
within ``timeout`` seconds. Polls every 5s.
"""
warnings.warn(
"DistributedTraining.wait_until_bench_complete is deprecated "
"and will be removed in the next major. Use training.bench "
"(BenchResult | None) after the UD reaches a terminal state, "
"and training.bench_diagnostics for debug detail. See "
"basilica-backend#661 / SDK-S2.",
DeprecationWarning,
stacklevel=2,
)
deadline = time.monotonic() + max(timeout, 0)
while time.monotonic() < deadline:
self.refresh()
bs = self.bench_status
bs = self._bench_status_no_warn
if bs is None:
# bench is Off — nothing to wait for.
return None
Expand All @@ -755,7 +812,7 @@ def wait_until_bench_complete(self, timeout: int = 1500) -> Optional[BenchStatus
time.sleep(min(5, max(timeout // 10, 1)))
# One last refresh.
self.refresh()
bs = self.bench_status
bs = self._bench_status_no_warn
if bs is not None and bs.is_terminal:
return bs
raise TimeoutError(
Expand All @@ -767,22 +824,32 @@ async def wait_until_bench_complete_async(
self, timeout: int = 1500
) -> Optional[BenchStatus]:
"""
Async variant of :py:meth:`wait_until_bench_complete`.
DEPRECATED (basilica-backend#661, SDK-S2): async variant of
:py:meth:`wait_until_bench_complete`.

Same terminal-phase set: `Succeeded` | `Failed` | `TimedOut` |
`Skipped` (closes #480). Same `None`-on-bench-off semantics.
Use ``training.bench`` post-terminal instead. Same
``None``-on-bench-off semantics; removed in the next major.
"""
warnings.warn(
"DistributedTraining.wait_until_bench_complete_async is "
"deprecated and will be removed in the next major. Use "
"training.bench (BenchResult | None) after the UD reaches "
"a terminal state, and training.bench_diagnostics for debug "
"detail. See basilica-backend#661 / SDK-S2.",
DeprecationWarning,
stacklevel=2,
)
deadline = asyncio.get_event_loop().time() + max(timeout, 0)
while asyncio.get_event_loop().time() < deadline:
await self.refresh_async()
bs = self.bench_status
bs = self._bench_status_no_warn
if bs is None:
return None
if bs.is_terminal:
return bs
await asyncio.sleep(min(5, max(timeout // 10, 1)))
await self.refresh_async()
bs = self.bench_status
bs = self._bench_status_no_warn
if bs is not None and bs.is_terminal:
return bs
raise TimeoutError(
Expand Down
Loading
Loading