feat(sdk): expose UD phase + wait_until_complete + UDTerminalState (refs #445)#461
Conversation
…efs #445) SDK-side surface for the distributed UD completion semantics shipped in basilica-backend (#445). Exposes the operator's terminal phase machine to Python users: `Pending -> Ready -> {Succeeded | Failed | Cancelled}`. Rust wire types (crates/basilica-sdk/src/types.rs): - DistributedRankExit struct mirrors the operator's RankExit, populated by basilica-api on `status.distributed.rankExits` when a UD reaches a terminal state. - DistributedStatus.rank_exits field carries them end-to-end. Python facade (crates/basilica-sdk-python/python/basilica/distributed.py): - New `RankExit` dataclass mirrors the wire type. - `DistributedTraining.phase` property reads `DeploymentResponse.phase` as a snake_case string (succeeded | failed | cancelled | ...). - `DistributedTraining.is_terminal` convenience derives membership in the terminal triple. - `DistributedTraining.rank_exits` reads the camelCase `rankExits` JSON list as `List[RankExit]`. Empty while the UD is non-terminal. - `wait_until_complete(timeout)` blocks until any terminal state. Returns the final `WorldStatus` on `succeeded`. Raises `BelowMinimumWorld` on `failed` (with per-rank exit context). Raises `UDTerminalState` if the UD is already terminal at entry. - `wait_until_target_world(timeout)` returns cleanly on `succeeded` (training is done by definition). Raises `BelowMinimumWorld` on `failed` / `cancelled`. - `scale(target)` raises `UDTerminalState` BEFORE the API call when the UD is in a terminal phase. Operator-side defense in depth (UDTerminalState Warning Event) covers kubectl-edit mutations that bypass the SDK. - Async parity: `wait_until_complete_async`. Exceptions (python/basilica/exceptions.py): - New `UDTerminalState(DistributedError)` carrying `phase` and `requested_target` attributes. Re-exports updated in `python/basilica/__init__.py`. Tests (crates/basilica-sdk-python/tests/test_distributed.py): - 11 new tests in `TestPhase5bCompletionSemantics` cover: phase property reads succeeded / running, rank_exits camelCase parsing, scale rejects on succeeded / failed, wait_until_complete entry-guard, wait_until_complete flip mid-wait, wait_until_complete failed-mid-wait carries OOMKilled detail, wait_until_target_world returns clean on succeeded, async parity. - Full distributed test suite passes (60 tests). Pairs with `feat(operator,api,billing-daemon): Succeeded terminal state for distributed UDs (closes #445)` in basilica-backend.
WalkthroughThis PR adds Phase 5b terminal-phase support to the distributed training SDK by introducing terminal phase detection, rank exit diagnostics, and updated wait-until-complete semantics. It includes new ChangesPhase 5b Terminal-Phase Support
Sequence DiagramsequenceDiagram
participant Client
participant DistributedTraining
participant StatusCache
participant PhaseMonitor
participant ExceptionHandler
Client->>DistributedTraining: wait_until_complete()
DistributedTraining->>StatusCache: refresh()
StatusCache-->>DistributedTraining: cached status
DistributedTraining->>PhaseMonitor: check phase
alt Already Terminal
PhaseMonitor-->>DistributedTraining: phase ∈ {succeeded, failed, cancelled}
DistributedTraining->>ExceptionHandler: raise UDTerminalState
ExceptionHandler-->>Client: UDTerminalState
else Poll for Completion
loop Until Terminal or Timeout
PhaseMonitor->>StatusCache: refresh status
StatusCache-->>PhaseMonitor: updated phase
alt Phase == "succeeded"
PhaseMonitor-->>DistributedTraining: return WorldStatus
DistributedTraining-->>Client: WorldStatus
else Phase == "failed" or "cancelled"
PhaseMonitor->>ExceptionHandler: extract rank_exits
ExceptionHandler-->>DistributedTraining: raise BelowMinimumWorld(with rank diagnostics)
DistributedTraining-->>Client: BelowMinimumWorld
else Non-Terminal
Note over PhaseMonitor: continue polling
end
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/basilica-sdk-python/python/basilica/distributed.py (1)
575-584:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winPreserve
failed/cancelledsemantics after the final refresh.The loop handles terminal failure states correctly, but the post-deadline path only checks for
"succeeded". If the UD flips to"failed"or"cancelled"on the last refresh — ortimeout=0skips the loop entirely — callers get a generic timeout message instead of the terminal-state error this method promises.Suggested fix
self.refresh() ws = self.world if self.phase == "succeeded": return + if self.phase in ("failed", "cancelled"): + raise BelowMinimumWorld( + f"wait_until_target_world: UD '{self.name}' reached " + f"terminal phase '{self.phase}' before world reached target", + ready=ws.ready, + required_min=ws.target, + timeout=timeout, + ) if ws.ready < ws.target or ws.target == 0: raise BelowMinimumWorld( f"wait_until_target_world timed out after {timeout}s " f"(ready={ws.ready}, required_min={ws.target})", ready=ws.ready, required_min=ws.target, timeout=timeout, )Apply the same final-state branch to
wait_until_target_world_async().Also applies to: 609-618
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/basilica-sdk-python/python/basilica/distributed.py` around lines 575 - 584, The post-timeout path in wait_until_target_world_async (and the analogous block at the other location) only checks for "succeeded" and therefore returns a generic timeout even if the last refresh set self.phase to "failed" or "cancelled"; update the final-state handling so after the loop (or when timeout=0) you re-check self.phase and raise the same terminal-state exceptions used in the loop for "failed" and "cancelled" (the same branches/exception types used earlier in wait_until_target_world_async/ wait_until_target_world to preserve semantics) before falling back to the BelowMinimumWorld timeout error.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/basilica-sdk-python/python/basilica/distributed.py`:
- Around line 680-691: The timeout path in wait_until_complete (and the async
variant wait_until_complete_async) only treats non-succeeded final states as a
timeout, losing terminal failure diagnostics; modify the final refresh branch in
wait_until_complete (and mirror the change in wait_until_complete_async) to
check for terminal phases like "failed" or "cancelled" after self.refresh() and
raise the corresponding RankExit or Cancelled exception (or raise
BelowMinimumWorld only when truly timed out), including the live world
(self.world), ready/required_min/timeout and any rank-exit context so terminal
failure details are surfaced instead of being reported as a generic timeout.
In `@crates/basilica-sdk-python/tests/test_distributed.py`:
- Line 1275: The fixture helper currently uses "rank_exits or [...]" which
treats an explicitly passed empty list as falsy and replaces it with defaults;
change the assignment to use an explicit None check (e.g., use "rank_exits if
rank_exits is not None else [...]") so that an empty list passed into the
fixture helper is preserved; adjust the initialization where "rankExits":
rank_exits or [...] appears to use the None-check form and keep the default list
only when rank_exits is actually None.
- Line 1254: Change the implicit optional annotation for the parameter/variable
named rank_exits from "rank_exits: list = None" to an explicit optional type
like "rank_exits: list | None = None" so the type clearly indicates
Optional[List] (satisfying Ruff RUF013); locate the declaration of rank_exits in
the affected function or test and update the type annotation accordingly.
In `@crates/basilica-sdk/src/types.rs`:
- Around line 1611-1617: The DistributedStatus struct gained a new field
rank_exits so any manual DistributedStatus literal must include it; update the
_sample_deployment_with_distributed() test/sample initializer to set rank_exits
(e.g., an empty Vec::new()) or replace the explicit DistributedStatus literal
with DistributedStatus { ..Default::default() } (or otherwise fill the new
field) so the crate compiles; target the initializer in the
_sample_deployment_with_distributed() function and ensure the DistributedStatus
literal includes rank_exits: Vec::new().
---
Outside diff comments:
In `@crates/basilica-sdk-python/python/basilica/distributed.py`:
- Around line 575-584: The post-timeout path in wait_until_target_world_async
(and the analogous block at the other location) only checks for "succeeded" and
therefore returns a generic timeout even if the last refresh set self.phase to
"failed" or "cancelled"; update the final-state handling so after the loop (or
when timeout=0) you re-check self.phase and raise the same terminal-state
exceptions used in the loop for "failed" and "cancelled" (the same
branches/exception types used earlier in wait_until_target_world_async/
wait_until_target_world to preserve semantics) before falling back to the
BelowMinimumWorld timeout error.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 15d6dca2-e7bd-4312-9331-20a13fd6221b
📒 Files selected for processing (5)
crates/basilica-sdk-python/python/basilica/__init__.pycrates/basilica-sdk-python/python/basilica/distributed.pycrates/basilica-sdk-python/python/basilica/exceptions.pycrates/basilica-sdk-python/tests/test_distributed.pycrates/basilica-sdk/src/types.rs
| # Timeout: surface as `BelowMinimumWorld` with the live shape. | ||
| self.refresh() | ||
| ws = self.world | ||
| if self.phase == "succeeded": | ||
| return ws | ||
| raise BelowMinimumWorld( | ||
| f"wait_until_complete: timed out after {timeout}s " | ||
| f"(phase={self.phase}, ready={ws.ready}, target={ws.target})", | ||
| ready=ws.ready, | ||
| required_min=ws.min, | ||
| timeout=timeout, | ||
| ) |
There was a problem hiding this comment.
Return terminal failure details here too, not just inside the polling loop.
wait_until_complete*() promises to surface failed / cancelled with rank-exit context, but the final refresh path only checks for "succeeded". A terminal failure that lands on the last refresh is currently reported as a timeout, which loses the new diagnostics this PR adds.
Suggested fix
self.refresh()
ws = self.world
if self.phase == "succeeded":
return ws
+ if self.phase in ("failed", "cancelled"):
+ exits = self.rank_exits
+ bad = [
+ f"rank {e.rank}: exit_code={e.exit_code}"
+ + (f" reason={e.termination_reason}" if e.termination_reason else "")
+ for e in exits
+ if e.exit_code != 0
+ ]
+ detail = "; ".join(bad) if bad else "no per-rank exits recorded"
+ raise BelowMinimumWorld(
+ f"wait_until_complete: UD '{self.name}' reached "
+ f"terminal phase '{self.phase}' ({detail})",
+ ready=ws.ready,
+ required_min=ws.min,
+ timeout=timeout,
+ )
raise BelowMinimumWorld(
f"wait_until_complete: timed out after {timeout}s "
f"(phase={self.phase}, ready={ws.ready}, target={ws.target})",
ready=ws.ready,
required_min=ws.min,
timeout=timeout,
)Mirror the same branch in wait_until_complete_async().
Also applies to: 730-740
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/basilica-sdk-python/python/basilica/distributed.py` around lines 680 -
691, The timeout path in wait_until_complete (and the async variant
wait_until_complete_async) only treats non-succeeded final states as a timeout,
losing terminal failure diagnostics; modify the final refresh branch in
wait_until_complete (and mirror the change in wait_until_complete_async) to
check for terminal phases like "failed" or "cancelled" after self.refresh() and
raise the corresponding RankExit or Cancelled exception (or raise
BelowMinimumWorld only when truly timed out), including the live world
(self.world), ready/required_min/timeout and any rank-exit context so terminal
failure details are surfaced instead of being reported as a generic timeout.
| name: str = "dlc-done", | ||
| namespace: str = "u-test", | ||
| phase: str = "succeeded", | ||
| rank_exits: list = None, |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify the implicit Optional occurrence and whether Ruff config enables RUF013.
rg -n 'rank_exits:\s*list\s*=\s*None' crates/basilica-sdk-python/tests/test_distributed.py
fd -HI 'pyproject.toml' | xargs -r rg -n 'ruff|RUF013|select|extend-select|ignore'Repository: one-covenant/basilica
Length of output: 161
🏁 Script executed:
fd -HI 'pyproject.toml' --max-results 5 | head -10Repository: one-covenant/basilica
Length of output: 147
🏁 Script executed:
cat -n pyproject.toml | head -100Repository: one-covenant/basilica
Length of output: 112
🏁 Script executed:
cat -n crates/basilica-sdk-python/pyproject.toml | grep -A 20 -B 5 'ruff\|RUF013'Repository: one-covenant/basilica
Length of output: 47
🏁 Script executed:
cat crates/basilica-sdk-python/pyproject.tomlRepository: one-covenant/basilica
Length of output: 1949
🏁 Script executed:
find . -name '.ruff.toml' -o -name 'ruff.toml' 2>/dev/nullRepository: one-covenant/basilica
Length of output: 77
🏁 Script executed:
cd crates/basilica-sdk-python && find . -name '*.py' -path '*/tests/*' | head -5 && cd - && echo "---" && rg 'from typing import|Optional' crates/basilica-sdk-python/tests/test_distributed.py | head -20Repository: one-covenant/basilica
Length of output: 283
🏁 Script executed:
cat -n ./ruff.tomlRepository: one-covenant/basilica
Length of output: 1404
🏁 Script executed:
sed -n '1250,1280p' crates/basilica-sdk-python/tests/test_distributed.pyRepository: one-covenant/basilica
Length of output: 1055
🏁 Script executed:
grep 'requires-python' crates/basilica-sdk-python/pyproject.tomlRepository: one-covenant/basilica
Length of output: 92
🏁 Script executed:
sed -n '1250,1310p' crates/basilica-sdk-python/tests/test_distributed.py | cat -nRepository: one-covenant/basilica
Length of output: 2424
🏁 Script executed:
rg -n '_make_terminal_training' crates/basilica-sdk-python/tests/test_distributed.py | head -20Repository: one-covenant/basilica
Length of output: 481
Use explicit optional typing for rank_exits.
rank_exits: list = None violates Ruff RUF013 (implicit Optional). Change to rank_exits: list | None = None to make the optionality explicit in the type annotation.
🧰 Tools
🪛 Ruff (0.15.12)
[warning] 1254-1254: PEP 484 prohibits implicit Optional
Convert to T | None
(RUF013)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/basilica-sdk-python/tests/test_distributed.py` at line 1254, Change
the implicit optional annotation for the parameter/variable named rank_exits
from "rank_exits: list = None" to an explicit optional type like "rank_exits:
list | None = None" so the type clearly indicates Optional[List] (satisfying
Ruff RUF013); locate the declaration of rank_exits in the affected function or
test and update the type annotation accordingly.
| "max": 2, | ||
| "belowMinimum": True, | ||
| }, | ||
| "rankExits": rank_exits or [ |
There was a problem hiding this comment.
Preserve explicit empty rank_exits in the fixture helper.
rank_exits or [...] replaces an intentionally passed empty list with defaults. That makes it impossible to model terminal states with zero exits.
Suggested fix
- "rankExits": rank_exits or [
+ "rankExits": rank_exits if rank_exits is not None else [📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "rankExits": rank_exits or [ | |
| "rankExits": rank_exits if rank_exits is not None else [ |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/basilica-sdk-python/tests/test_distributed.py` at line 1275, The
fixture helper currently uses "rank_exits or [...]" which treats an explicitly
passed empty list as falsy and replaces it with defaults; change the assignment
to use an explicit None check (e.g., use "rank_exits if rank_exits is not None
else [...]") so that an empty list passed into the fixture helper is preserved;
adjust the initialization where "rankExits": rank_exits or [...] appears to use
the None-check form and keep the default list only when rank_exits is actually
None.
| /// Phase 5b (#445): per-rank exit diagnostics. Empty while the UD is | ||
| /// non-terminal. On transition to `Succeeded` / `Failed` the operator | ||
| /// snapshots each worker pod's container `terminated` state and | ||
| /// persists it here so the SDK can surface them after the worker | ||
| /// StatefulSet has been scaled to `replicas: 0`. | ||
| #[serde(default, skip_serializing_if = "Vec::is_empty")] | ||
| pub rank_exits: Vec<DistributedRankExit>, |
There was a problem hiding this comment.
Update explicit DistributedStatus literals for the new field.
Adding rank_exits here makes every manual DistributedStatus { ... } initializer require it. In this file, Line 2730’s _sample_deployment_with_distributed() literal is still missing rank_exits, so this crate will not compile until that initializer is updated (or switched to ..Default::default() where appropriate).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/basilica-sdk/src/types.rs` around lines 1611 - 1617, The
DistributedStatus struct gained a new field rank_exits so any manual
DistributedStatus literal must include it; update the
_sample_deployment_with_distributed() test/sample initializer to set rank_exits
(e.g., an empty Vec::new()) or replace the explicit DistributedStatus literal
with DistributedStatus { ..Default::default() } (or otherwise fill the new
field) so the crate compiles; target the initializer in the
_sample_deployment_with_distributed() function and ensure the DistributedStatus
literal includes rank_exits: Vec::new().
PR #461 added rank_exits: Vec<DistributedRankExit> to DistributedStatus but missed initializing the field in the _sample_deployment_with_distributed test fixture, breaking `cargo check --tests` on every PR opened against main. The other DistributedStatus initializer in the same test module uses ..Default::default() and is unaffected.
Summary
Refs #445 (closed by the matching basilica-backend PR). Exposes the operator's
Succeededterminal state to Python users via:DistributedTraining.phase— operator-driven phase string (pending...succeeded|failed|cancelled).DistributedTraining.is_terminal— convenience for the terminal triple.DistributedTraining.rank_exits: List[RankExit]— per-rank exit diagnostics, populated when the UD reaches a terminal state.wait_until_complete(timeout)— sync + async; blocks until any terminal state, returnsWorldStatusonsucceeded, raisesBelowMinimumWorld(with per-rank exit detail) onfailed, raisesUDTerminalStateif the UD is already terminal at entry.wait_until_target_world(timeout)— returns cleanly onsucceeded(training is done by definition); raisesBelowMinimumWorldonfailed/cancelled.t.scale(target=N)— raisesUDTerminalStateBEFORE the API call when the UD is already terminal. The operator-side defense in depth (UDTerminalStateWarning Event) coverskubectl editmutations that bypass the SDK.UDTerminalState(DistributedError)exception withphaseandrequested_targetattributes.Per-deliverable status
Rust wire types
DistributedRankExitstruct mirrors the operator'sRankExit.DistributedStatus.rank_exits: Vec<DistributedRankExit>field.cargo check -p basilica-sdkclean.Python facade
RankExitdataclass.phase/is_terminal/rank_exitsproperties onDistributedTraining.wait_until_complete(sync + async) withBelowMinimumWorld(carrying per-rank exit context) onfailed,UDTerminalStateon already-terminal-at-entry,BelowMinimumWorldon timeout.wait_until_target_world(sync + async) returns clean onsucceeded.scaleraisesUDTerminalStateon terminal phase BEFORE the API call.UDTerminalStateexception added tobasilica.exceptions.basilica/__init__.py.Tests
tests/test_distributed.py::TestPhase5bCompletionSemantics:phaseproperty readssucceededfrom cached status.phaseproperty reads non-terminalready;is_terminalFalse.rank_exitsparses camelCase JSON correctly.rank_exitsempty when norankExitskey in distributed status.scale(target=N)onsucceededraisesUDTerminalState; PyO3 patch NOT issued.scale(target=N)onfailedraisesUDTerminalState.wait_until_completeon UD already terminal at entry raisesUDTerminalState.wait_until_completereturnsWorldStatuswhen phase flips tosucceededmid-wait.wait_until_completeraisesBelowMinimumWorldwithrank 1: exit_code=137 reason=OOMKilleddetail on mid-waitfailed.wait_until_target_worldreturns clean onsucceededeven withready=0(StatefulSet scaled to 0 by operator).wait_until_complete_asyncexists and is a coroutine function (SDK arch § 9 parity).Architecture doc
§ 6/§ 8/§ 11updates land in the matching backend PR (the doc lives inbasilica-private/docs/architecture/, not in this repo).Test plan
pytest tests/test_distributed.py.examples/22_distributed_with_bench.pyexits 0 unmodified -- the load-bearing user-facing test referenced by issue feat(cli): show resume link for pending card payments in fund list #445. Live-verify transcript on the backend PR.What is NOT in this PR
basilica-backendPR).Phaseenum on the Python side. We use the snake_case string straight from the operator'sstatus.phasebecause the SDK's existingphase: Optional[String]field onDeploymentResponseis already string-typed; introducing a new enum would have churned existing callers without buying type safety the existing surface lacks.SucceededUDs in the SDK. The user's only entry point for cleanup ist.delete(), matching the operator-side contract.