Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions docs/src/core/concepts/job-runners.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,49 @@ branches above:
`compute_node_min_time_for_new_jobs_seconds`.
8. **Start jobs** — call `start_job`, assign a GPU device if needed, then spawn the command via
`AsyncCliCommand`, recording stdout/stderr to per-job log files.
9. **Wait** — sleep with `SIGCHLD`-aware wakeup so subprocess exits are observed promptly. Adaptive
backoff doubles the interval toward `claim_backoff_max_secs` when an iteration produces no
progress; any completion, claim, or early wakeup resets it to the base interval. Pinned to the
base interval while offline so the drain-ping cadence is preserved.
9. **Wait** — sleep with `SIGCHLD`-aware wakeup so subprocess exits are observed promptly. The sleep
length is set by the adaptive-backoff rules described in [Adaptive Backoff](#adaptive-backoff).
While offline, the adaptive ramp is bypassed so drain-ping cadence is preserved (the drain ping
itself self-rate-limits via `drain_ping_interval_secs`).
10. **Exit checks** — break out on `SIGTERM`, on the end-of-allocation deadline (direct mode applies
a pre-deadline window so `SIGTERM` has time to land before `SIGKILL`), or after an idle interval
past `compute_node_wait_for_new_jobs_seconds` with no pending actions that could add capacity.

### Adaptive Backoff

The wait between iterations adapts to what the runner is doing. Three regimes:

| State | Wait |
| ----------------------------- | ---------------------------------------- |
| Making progress | `job_completion_poll_interval` (base) |
| Busy at capacity, no progress | doubles toward `claim_backoff_max_secs` |
| Idle (no children to reap) | `min(job_completion_poll_interval, 30s)` |

**Busy-at-capacity case (long-running workflows).** When the runner is fully loaded and nothing is
completing or being claimed, polling at the base interval would generate unnecessary requests for
hours. Each iteration with no progress doubles the wait, capped at `claim_backoff_max_secs` (default
300s). The wait resets to base immediately on any progress: a local completion, a successful claim,
or a `SIGCHLD` wake-up.
Comment on lines +248 to +258

**Closing case (workflow tail or pre-idle-exit).** When the runner has no tracked children,
`SIGCHLD` cannot fire — every wake-up has to come from the timer, and the only things the loop can
react to are workflow-complete, idle-exit (`compute_node_wait_for_new_jobs_seconds`), and the
`end_time` deadline. Letting the wait grow toward the busy cap would only delay these checks. The
idle wait therefore clamps to `min(job_completion_poll_interval, 30s)`: at most 30s, so a long
configured base does not delay closing-case detection; and never longer than the configured base, so
a user who chose a base shorter than 30s keeps that tighter cadence.

**Example.** A 5000-job × 5-day workflow run with `job_completion_poll_interval = 300s` and
`claim_backoff_max_secs = 3600s`:

- During the busy phase, each runner polls at 300s — about 12 requests/hour.
- After a runner finishes its last local job but the workflow is not yet done, it polls at 30s until
the workflow is observed complete or the idle-exit timer fires.

Without the idle clamp, a node that drained near the start of a 1-hour backoff window could sit
asleep for nearly the full hour after the workflow completed elsewhere. The clamp bounds that
worst-case waste to ~30s.

## Surviving Server Outages (Offline Drain)

Job runners need the server to claim work, report completions, and unblock dependents. But a running
Expand Down
144 changes: 125 additions & 19 deletions src/client/job_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,50 @@ impl Default for Wakeup {
}
}

/// Upper bound on the idle-wait interval when the runner has no tracked
/// children. In that state every wake-up has to come from the timer
/// (SIGCHLD can't fire), and the only things we'd react to are
/// workflow-complete and idle-exit (`compute_node_wait_for_new_jobs_seconds`)
/// — neither needs slower than 30s resolution.
const IDLE_BACKOFF_CAP_SECS: f64 = 30.0;

/// Effective sleep interval when the runner has no tracked children. The
/// configured base is the runner's preferred completion-check cadence; when
/// there are no children to check, that rationale doesn't apply and we'd
/// rather keep closing-case detection (workflow-complete, idle-exit,
/// `end_time`) responsive. So idle waits clamp at
/// `min(base, IDLE_BACKOFF_CAP_SECS)`: at most 30s so a long configured
/// base does not delay closing-case detection, and never longer than the
/// configured base so a user who chose a base shorter than 30s keeps that
/// tighter cadence.
fn idle_poll_interval(base: f64) -> f64 {
base.min(IDLE_BACKOFF_CAP_SECS)
}

/// Compute the next idle-wait interval for the runner's main loop given the
/// current wait, the configured base, the configured cap, and whether the
/// last iteration made any progress (a local completion was reaped, or a
/// claim returned jobs).
/// current wait, the configured base, the configured cap, whether the last
/// iteration made any progress (a local completion was reaped, or a claim
/// returned jobs), and whether the runner is currently idle (no children
/// being tracked).
///
/// Progress resets the wait to `base`. An idle iteration doubles the wait
/// toward `cap`. The cap is clamped to at least `base` so callers cannot
/// accidentally shrink the wait below the configured floor.
fn next_poll_interval(current: f64, base: f64, cap: f64, made_progress: bool) -> f64 {
/// In the non-idle path, progress resets the wait to `base`; an idle
/// iteration doubles the wait toward `cap`. The cap is clamped to at least
/// `base` so callers cannot accidentally shrink the wait below the
/// configured floor.
///
/// In the idle path, the runner is in a closing state (workflow tail or
/// pre-idle-exit) and the wait is simply [`idle_poll_interval`] — no ramp,
/// no base floor. See `idle_poll_interval` for the rationale.
fn next_poll_interval(
current: f64,
base: f64,
cap: f64,
made_progress: bool,
is_idle: bool,
) -> f64 {
if is_idle {
return idle_poll_interval(base);
}
let effective_cap = cap.max(base);
if made_progress {
base
Expand Down Expand Up @@ -1060,13 +1095,23 @@ impl JobRunner {
// interval too, so an eventual transition back online starts
// fresh rather than inheriting a stale long wait.
let pre_sleep_progress = completions > 0 || jobs_claimed;
let is_idle_now = self.running_jobs.is_empty();
if self.offline {
self.current_poll_interval = self.job_completion_poll_interval;
}
// When progress already happened this iteration, sleep the
// base interval rather than a previously-backed-off value so
// we react to the next change promptly.
let wait_secs = if pre_sleep_progress {
// When the runner has no children to reap, the configured base
// interval (which controls completion-check cadence) is
// irrelevant — every wake has to come from the timer, and the
// only things we'd react to are workflow-complete, idle-exit,
// and end_time. Use `idle_poll_interval` so closing-case
// detection stays responsive even when the user has set a
// long base for cost reasons. Otherwise, when progress already
// happened this iteration, sleep the base interval rather
// than a previously-backed-off value so we react to the next
// change promptly.
let wait_secs = if is_idle_now {
idle_poll_interval(self.job_completion_poll_interval)
} else if pre_sleep_progress {
self.job_completion_poll_interval
} else {
self.current_poll_interval
Expand All @@ -1088,15 +1133,17 @@ impl JobRunner {
self.job_completion_poll_interval,
self.claim_backoff_max_secs,
made_progress,
is_idle_now,
);
if (next - self.current_poll_interval).abs() > f64::EPSILON {
debug!(
"Adaptive backoff: poll interval {:.1}s -> {:.1}s (base {:.1}s, cap {:.1}s, made_progress={})",
"Adaptive backoff: poll interval {:.1}s -> {:.1}s (base {:.1}s, cap {:.1}s, made_progress={}, is_idle={})",
self.current_poll_interval,
next,
self.job_completion_poll_interval,
self.claim_backoff_max_secs,
made_progress
made_progress,
is_idle_now
);
self.current_poll_interval = next;
}
Expand Down Expand Up @@ -3673,7 +3720,7 @@ mod tests {
let mut current = base;
let steps = [60.0, 120.0, 240.0, 300.0, 300.0];
for expected in steps {
current = next_poll_interval(current, base, cap, false);
current = next_poll_interval(current, base, cap, false, false);
assert!(
(current - expected).abs() < f64::EPSILON,
"expected {expected}, got {current}"
Expand All @@ -3688,11 +3735,11 @@ mod tests {
// Wind up to the cap.
let mut current = base;
for _ in 0..10 {
current = next_poll_interval(current, base, cap, false);
current = next_poll_interval(current, base, cap, false, false);
}
assert!((current - cap).abs() < f64::EPSILON);
// Any progress resets immediately.
current = next_poll_interval(current, base, cap, true);
current = next_poll_interval(current, base, cap, true, false);
assert!((current - base).abs() < f64::EPSILON);
}

Expand All @@ -3702,7 +3749,7 @@ mod tests {
// the wait below base.
let base = 30.0;
let cap = 5.0;
let next = next_poll_interval(base, base, cap, false);
let next = next_poll_interval(base, base, cap, false, false);
assert!((next - base).abs() < f64::EPSILON);
}

Expand All @@ -3712,7 +3759,7 @@ mod tests {
// some reason.
let base = 30.0;
let cap = 300.0;
let next = next_poll_interval(10.0, base, cap, true);
let next = next_poll_interval(10.0, base, cap, true, false);
assert!((next - base).abs() < f64::EPSILON);
}

Expand All @@ -3721,10 +3768,69 @@ mod tests {
// An idle step from current=base must never return less than base.
let base = 30.0;
let cap = 300.0;
let next = next_poll_interval(base, base, cap, false);
let next = next_poll_interval(base, base, cap, false, false);
assert!(next >= base);
}

#[test]
fn next_poll_interval_is_idle_short_base_keeps_base() {
// When `base` is shorter than the idle cap, idle waits stay at
// base (don't slow down to 30s). Default-config case.
let base = 5.0;
let cap = 1800.0;
let next = next_poll_interval(cap, base, cap, false, true);
assert!(
(next - base).abs() < f64::EPSILON,
"expected base {base}, got {next}"
);
}

#[test]
fn next_poll_interval_is_idle_long_base_clamps_to_idle_cap() {
// When `base` is longer than the idle cap, idle waits clamp to
// 30s — the case the user with base=300s and cap=1hr cares about.
// Worst-case closing detection lag is bounded by the idle cap,
// independent of the configured base and cap.
let base = 300.0;
let cap = 3600.0;
let next = next_poll_interval(cap, base, cap, false, true);
assert!(
(next - IDLE_BACKOFF_CAP_SECS).abs() < f64::EPSILON,
"expected idle cap {IDLE_BACKOFF_CAP_SECS}, got {next}"
);
}

#[test]
fn next_poll_interval_is_idle_ignores_progress_and_current() {
// Idle wait is purely a function of (base, IDLE_BACKOFF_CAP_SECS):
// it does not ramp from `current` and is unaffected by progress.
let base = 60.0;
let cap = 1800.0;
let expected = idle_poll_interval(base);
for current in [base, 100.0, 900.0, cap] {
for made_progress in [true, false] {
let next = next_poll_interval(current, base, cap, made_progress, true);
assert!(
(next - expected).abs() < f64::EPSILON,
"current={current} made_progress={made_progress}: \
expected {expected}, got {next}",
);
}
}
}

#[test]
fn idle_poll_interval_returns_minimum() {
// Direct invariant test on the helper: idle wait is always the
// minimum of base and the constant cap.
assert!((idle_poll_interval(5.0) - 5.0).abs() < f64::EPSILON);
assert!((idle_poll_interval(300.0) - IDLE_BACKOFF_CAP_SECS).abs() < f64::EPSILON);
assert!(
(idle_poll_interval(IDLE_BACKOFF_CAP_SECS) - IDLE_BACKOFF_CAP_SECS).abs()
< f64::EPSILON
);
}

#[test]
fn wakeup_notify_before_wait_returns_immediately() {
let w = Wakeup::new();
Expand Down
Loading