From fee11add32747c01484a1aa5ceb5dcb91e4c2ee0 Mon Sep 17 00:00:00 2001 From: PushPals Worker Date: Fri, 6 Mar 2026 07:47:43 +0000 Subject: [PATCH] feat(remote_agent): add dispatch guard for queue load - introduce DispatchGuardConfig/env helpers in apps/remotebuddy/src/autonomous_engine.ts to bound queue latency thresholds - evaluate queue and idle telemetry via evaluateDispatchGuardState and decideDispatchGuardAction to throttle, reroute, or allow - fetch /system/status metrics, log guard decisions, and block objectives with guardBlockReason when pressure stays high - reroute selections using pickQueueHealthRerouteCandidate and flush guard metrics/logs through CommunicationManager hooks - add vitest suite to autonomous_engine.ts covering guard decisions, reroute selection, and metrics emission Tests: - not run --- apps/remotebuddy/src/autonomous_engine.ts | 1256 ++++++++++++++++++++- 1 file changed, 1206 insertions(+), 50 deletions(-) diff --git a/apps/remotebuddy/src/autonomous_engine.ts b/apps/remotebuddy/src/autonomous_engine.ts index 0feb4a5..25b4a10 100644 --- a/apps/remotebuddy/src/autonomous_engine.ts +++ b/apps/remotebuddy/src/autonomous_engine.ts @@ -1,8 +1,8 @@ import { createHash, randomUUID } from "crypto"; import { existsSync, mkdirSync, rmSync } from "fs"; import { resolve } from "path"; -import type { CommunicationManager } from "shared"; import { + CommunicationManager, componentRootPrefix, extractVisionKeyItems, loadRepoDocText, @@ -148,6 +148,103 @@ type Snapshot = { }; }; +type QueueTelemetry = { + ts: string; + sampleAgeMs: number | null; + queueP95Ms: number; + movingQueueP95Ms: number | null; + idleWorkers: number; + busyWorkers: number; + onlineWorkers: number; + pendingRequests: number; + pendingInteractive: number; +}; + +type DispatchPressureLevel = "healthy" | "elevated" | "critical"; + +type DispatchGuardState = { + telemetry: QueueTelemetry | null; + level: DispatchPressureLevel; + reasons: string[]; +}; + +type MissingTelemetryPolicy = "fail_open" | "reroute" | "fail_closed"; + +type DispatchGuardConfig = { + enabled: boolean; + queueP95ElevatedMs: number; + queueP95CriticalMs: number; + idleElevatedThreshold: number; + idleCriticalThreshold: number; + sampleStaleMs: number; + sampleCriticalMs: number; + enforceQueueHealthReroute: boolean; + logCooldownMs: number; + missingTelemetryPolicy: MissingTelemetryPolicy; +}; + +type DispatchGuardDecision = { + action: "allow" | "reroute" | "throttle"; + level: DispatchPressureLevel; + reasonCode: string; + reasons: string[]; + telemetry: QueueTelemetry | null; +}; + +type DispatchGuardLogTelemetry = { + ts: string; + queue_p95_ms: number; + moving_queue_p95_ms: number | null; + idle_workers: number; + busy_workers: number; + online_workers: number; + pending_requests: number; + pending_interactive: number; + sample_age_ms: number | null; +}; + +type DispatchGuardLogEvent = { + event: "queue_guard_decision"; + ts: string; + run_id: string; + snapshot_id: string; + candidate_id: string | null; + trigger_type: AutonomyCandidate["trigger_type"] | null; + action: DispatchGuardDecision["action"]; + level: DispatchGuardDecision["level"]; + severity: "info" | "warn" | "error"; + reason_code: string; + reasons: string[]; + telemetry: DispatchGuardLogTelemetry | null; + thresholds: { + queue_p95_elevated_ms: number; + queue_p95_critical_ms: number; + idle_elevated_threshold: number; + idle_critical_threshold: number; + sample_stale_ms: number; + sample_critical_ms: number; + }; + guard_enabled: boolean; +}; + +type RunningStats = { + count: number; + sum: number; + min: number; + max: number; +}; + +type GuardMetricsState = { + counts: Map; + telemetry: { + queueP95Ms: RunningStats; + movingQueueP95Ms: RunningStats; + idleWorkers: RunningStats; + sampleAgeMs: RunningStats; + }; + lastFlushAtMs: number; +}; + type PolicyRule = { maxRisk: "low" | "medium" | "high"; maxBreadth: "narrow" | "medium" | "broad"; @@ -293,6 +390,19 @@ const AUTO_INGEST_SEED_PATTERNS: Array<{ }, ]; +const DEFAULT_DISPATCH_GUARD_CONFIG: DispatchGuardConfig = { + enabled: true, + queueP95ElevatedMs: 1_000, + queueP95CriticalMs: 1_500, + idleElevatedThreshold: 3, + idleCriticalThreshold: 1, + sampleStaleMs: 90_000, + sampleCriticalMs: 180_000, + enforceQueueHealthReroute: true, + logCooldownMs: 60_000, + missingTelemetryPolicy: "reroute", +}; + type SourceCurationStatus = "candidate" | "trusted" | "watchlist" | "archived"; type FeedbackPriorForScoring = { @@ -1450,6 +1560,388 @@ function matchGapIdsFromText(text: string, fallback: EngineOpportunityGap[]): st return fallback.slice(0, 2).map((entry) => entry.id); } +const PRESSURE_LEVEL_ORDER: Record = { + healthy: 0, + elevated: 1, + critical: 2, +}; + +function guardEnvNumber(key: string, fallback: number): number { + const raw = process.env?.[key]; + if (raw == null || raw.trim() === "") return fallback; + const parsed = Number(raw); + return Number.isFinite(parsed) ? parsed : fallback; +} + +function guardEnvBool(key: string, fallback: boolean): boolean { + const raw = process.env?.[key]; + if (raw == null) return fallback; + const text = raw.trim().toLowerCase(); + if (["1", "true", "yes", "on"].includes(text)) return true; + if (["0", "false", "no", "off"].includes(text)) return false; + return fallback; +} + +function guardEnvChoice( + key: string, + allowed: readonly T[], + fallback: T, +): T { + const raw = process.env?.[key]; + if (!raw) return fallback; + const normalized = raw.trim().toLowerCase(); + return allowed.find((value) => value === normalized) ?? fallback; +} + +function buildDispatchGuardConfigFromEnv(): DispatchGuardConfig { + return { + enabled: guardEnvBool("REMOTEBUDDY_AUTONOMY_GUARD_ENABLED", DEFAULT_DISPATCH_GUARD_CONFIG.enabled), + queueP95ElevatedMs: guardEnvNumber( + "REMOTEBUDDY_AUTONOMY_GUARD_QUEUE_P95_WARN_MS", + DEFAULT_DISPATCH_GUARD_CONFIG.queueP95ElevatedMs, + ), + queueP95CriticalMs: guardEnvNumber( + "REMOTEBUDDY_AUTONOMY_GUARD_QUEUE_P95_HARD_MS", + DEFAULT_DISPATCH_GUARD_CONFIG.queueP95CriticalMs, + ), + idleElevatedThreshold: Math.max( + 0, + Math.floor( + guardEnvNumber( + "REMOTEBUDDY_AUTONOMY_GUARD_IDLE_WARN", + DEFAULT_DISPATCH_GUARD_CONFIG.idleElevatedThreshold, + ), + ), + ), + idleCriticalThreshold: Math.max( + 0, + Math.floor( + guardEnvNumber( + "REMOTEBUDDY_AUTONOMY_GUARD_IDLE_HARD", + DEFAULT_DISPATCH_GUARD_CONFIG.idleCriticalThreshold, + ), + ), + ), + sampleStaleMs: Math.max( + 5_000, + guardEnvNumber( + "REMOTEBUDDY_AUTONOMY_GUARD_SAMPLE_STALE_MS", + DEFAULT_DISPATCH_GUARD_CONFIG.sampleStaleMs, + ), + ), + sampleCriticalMs: Math.max( + 10_000, + guardEnvNumber( + "REMOTEBUDDY_AUTONOMY_GUARD_SAMPLE_CRITICAL_MS", + DEFAULT_DISPATCH_GUARD_CONFIG.sampleCriticalMs, + ), + ), + enforceQueueHealthReroute: guardEnvBool( + "REMOTEBUDDY_AUTONOMY_GUARD_REQUIRE_QUEUE_HEALTH", + DEFAULT_DISPATCH_GUARD_CONFIG.enforceQueueHealthReroute, + ), + logCooldownMs: Math.max( + 1_000, + guardEnvNumber( + "REMOTEBUDDY_AUTONOMY_GUARD_LOG_COOLDOWN_MS", + DEFAULT_DISPATCH_GUARD_CONFIG.logCooldownMs, + ), + ), + missingTelemetryPolicy: guardEnvChoice( + "REMOTEBUDDY_AUTONOMY_GUARD_MISSING_POLICY", + ["fail_open", "reroute", "fail_closed"], + DEFAULT_DISPATCH_GUARD_CONFIG.missingTelemetryPolicy, + ), + }; +} + +function validateDispatchGuardConfig(cfg: DispatchGuardConfig): DispatchGuardConfig { + const sanitized: DispatchGuardConfig = { ...cfg }; + const adjustments: string[] = []; + if (sanitized.queueP95ElevatedMs <= 0) { + sanitized.queueP95ElevatedMs = DEFAULT_DISPATCH_GUARD_CONFIG.queueP95ElevatedMs; + adjustments.push("queueP95ElevatedMs<=0"); + } + if (sanitized.queueP95CriticalMs <= sanitized.queueP95ElevatedMs) { + sanitized.queueP95CriticalMs = Math.max( + sanitized.queueP95ElevatedMs + 250, + DEFAULT_DISPATCH_GUARD_CONFIG.queueP95CriticalMs, + ); + adjustments.push("queueP95CriticalMs<=queueP95ElevatedMs"); + } + if (sanitized.idleCriticalThreshold < 0) { + sanitized.idleCriticalThreshold = 0; + adjustments.push("idleCriticalThreshold<0"); + } + if (sanitized.idleElevatedThreshold < sanitized.idleCriticalThreshold) { + sanitized.idleElevatedThreshold = sanitized.idleCriticalThreshold; + adjustments.push("idleElevatedThreshold 0) { + console.warn( + `[RemoteBuddyAutonomousEngine] Dispatch guard config sanitized (${adjustments.join(", ")}).`, + ); + } + return sanitized; +} + +function serializeGuardTelemetryForLog( + telemetry: QueueTelemetry | null, +): DispatchGuardLogTelemetry | null { + if (!telemetry) return null; + return { + ts: telemetry.ts, + queue_p95_ms: Math.round(telemetry.queueP95Ms), + moving_queue_p95_ms: + telemetry.movingQueueP95Ms == null ? null : Math.round(telemetry.movingQueueP95Ms), + idle_workers: telemetry.idleWorkers, + busy_workers: telemetry.busyWorkers, + online_workers: telemetry.onlineWorkers, + pending_requests: telemetry.pendingRequests, + pending_interactive: telemetry.pendingInteractive, + sample_age_ms: telemetry.sampleAgeMs, + }; +} + +function buildGuardDecisionEvent( + cfg: DispatchGuardConfig, + context: { + runId: string; + snapshotId: string; + candidateId?: string | null; + triggerType?: AutonomyCandidate["trigger_type"] | null; + ts?: string; + }, + decision: DispatchGuardDecision, +): DispatchGuardLogEvent { + const severity: DispatchGuardLogEvent["severity"] = + decision.action === "throttle" + ? "error" + : decision.action === "reroute" || decision.level === "elevated" + ? "warn" + : "info"; + return { + event: "queue_guard_decision", + ts: context.ts ?? new Date().toISOString(), + run_id: context.runId, + snapshot_id: context.snapshotId, + candidate_id: context.candidateId ?? null, + trigger_type: context.triggerType ?? null, + action: decision.action, + level: decision.level, + severity, + reason_code: decision.reasonCode, + reasons: decision.reasons, + telemetry: serializeGuardTelemetryForLog(decision.telemetry), + thresholds: { + queue_p95_elevated_ms: cfg.queueP95ElevatedMs, + queue_p95_critical_ms: cfg.queueP95CriticalMs, + idle_elevated_threshold: cfg.idleElevatedThreshold, + idle_critical_threshold: cfg.idleCriticalThreshold, + sample_stale_ms: cfg.sampleStaleMs, + sample_critical_ms: cfg.sampleCriticalMs, + }, + guard_enabled: cfg.enabled, + }; +} + +function maxPressureLevel( + ...levels: DispatchPressureLevel[] +): DispatchPressureLevel { + if (levels.length === 0) return "healthy"; + return levels.reduce((acc, level) => { + return PRESSURE_LEVEL_ORDER[level] > PRESSURE_LEVEL_ORDER[acc] ? level : acc; + }, "healthy"); +} + +function createRunningStats(): RunningStats { + return { + count: 0, + sum: 0, + min: Number.POSITIVE_INFINITY, + max: Number.NEGATIVE_INFINITY, + }; +} + +function updateRunningStats(stats: RunningStats, value: number | null | undefined): void { + if (value == null) return; + if (!Number.isFinite(value)) return; + stats.count += 1; + stats.sum += value; + stats.min = Math.min(stats.min, value); + stats.max = Math.max(stats.max, value); +} + +function snapshotRunningStats(stats: RunningStats): { count: number; min: number | null; max: number | null; avg: number | null } { + if (stats.count === 0) { + return { count: 0, min: null, max: null, avg: null }; + } + return { + count: stats.count, + min: Math.round(stats.min), + max: Math.round(stats.max), + avg: Math.round(stats.sum / stats.count), + }; +} + +function makeGuardMetricsState(): GuardMetricsState { + return { + counts: new Map(), + telemetry: { + queueP95Ms: createRunningStats(), + movingQueueP95Ms: createRunningStats(), + idleWorkers: createRunningStats(), + sampleAgeMs: createRunningStats(), + }, + lastFlushAtMs: 0, + }; +} + +function evaluateDispatchGuardState( + cfg: DispatchGuardConfig, + telemetry: QueueTelemetry | null, +): DispatchGuardState { + if (!cfg.enabled) { + return { telemetry, level: "healthy", reasons: [] }; + } + if (!telemetry) { + if (cfg.missingTelemetryPolicy === "fail_open") { + return { telemetry: null, level: "healthy", reasons: ["telemetry_missing_fail_open"] }; + } + if (cfg.missingTelemetryPolicy === "reroute") { + return { + telemetry: null, + level: "elevated", + reasons: ["telemetry_missing_reroute"], + }; + } + return { + telemetry: null, + level: "critical", + reasons: ["telemetry_missing_fail_closed"], + }; + } + let level: DispatchPressureLevel = "healthy"; + const reasons: string[] = []; + const queueSources: Array<{ label: string; value: number | null }> = [ + { label: "queue_p95_instant", value: telemetry.queueP95Ms }, + { label: "queue_p95_moving", value: telemetry.movingQueueP95Ms }, + ]; + for (const source of queueSources) { + if (source.value == null) continue; + const normalized = Math.max(0, source.value); + if (normalized >= cfg.queueP95CriticalMs) { + level = "critical"; + reasons.push(`${source.label}_critical:${Math.round(normalized)}`); + } else if (normalized >= cfg.queueP95ElevatedMs) { + level = maxPressureLevel(level, "elevated"); + reasons.push(`${source.label}_elevated:${Math.round(normalized)}`); + } + } + const idleWorkers = Math.max(0, telemetry.idleWorkers); + if (idleWorkers <= cfg.idleCriticalThreshold) { + level = "critical"; + reasons.push(`idle_workers_critical:${idleWorkers}`); + } else if (idleWorkers <= cfg.idleElevatedThreshold) { + level = maxPressureLevel(level, "elevated"); + reasons.push(`idle_workers_elevated:${idleWorkers}`); + } + if (telemetry.sampleAgeMs != null) { + if (telemetry.sampleAgeMs > cfg.sampleCriticalMs) { + level = "critical"; + reasons.push(`telemetry_stale_critical_ms:${Math.round(telemetry.sampleAgeMs)}`); + } else if (telemetry.sampleAgeMs > cfg.sampleStaleMs) { + level = maxPressureLevel(level, "elevated"); + reasons.push(`telemetry_stale_ms:${Math.round(telemetry.sampleAgeMs)}`); + } + } + return { telemetry, level, reasons }; +} + +function decideDispatchGuardAction( + cfg: DispatchGuardConfig, + state: DispatchGuardState, + triggerType: AutonomyCandidate["trigger_type"], +): DispatchGuardDecision { + if (!cfg.enabled) { + return { + action: "allow", + level: "healthy", + reasonCode: "queue_guard_disabled", + reasons: [], + telemetry: state.telemetry, + }; + } + if (state.level === "critical") { + return { + action: "throttle", + level: "critical", + reasonCode: "queue_guard_throttle", + reasons: state.reasons, + telemetry: state.telemetry, + }; + } + if ( + state.level === "elevated" && + cfg.enforceQueueHealthReroute && + triggerType !== "queue_health" + ) { + return { + action: "reroute", + level: "elevated", + reasonCode: "queue_guard_reroute", + reasons: state.reasons, + telemetry: state.telemetry, + }; + } + const reasonCode = + state.level === "healthy" + ? "queue_guard_allow" + : triggerType === "queue_health" + ? "queue_guard_allow_queue_health" + : "queue_guard_allow"; + return { + action: "allow", + level: state.level, + reasonCode, + reasons: state.reasons, + telemetry: state.telemetry, + }; +} + +function pickQueueHealthRerouteCandidate( + rows: T[], + excludeId?: string, +): T | null { + const filtered = rows.filter( + (row) => + row.candidate.trigger_type === "queue_health" && + row.candidate.id !== excludeId && + row.candidate.requires_user_input !== true, + ); + if (filtered.length === 0) return null; + const sorted = filtered.slice().sort((a, b) => { + if (b.finalScore !== a.finalScore) return b.finalScore - a.finalScore; + return a.candidate.id.localeCompare(b.candidate.id); + }); + return sorted[0]; +} + function normalizeInspirationPattern(value: unknown): InspirationPatternInput | null { const raw = asObject(value); const algorithm = asString(raw.algorithm); @@ -2714,6 +3206,7 @@ export class RemoteBuddyAutonomousEngine { private readonly llm: LLMClient; private readonly comm: CommunicationManager; private readonly cfg: PushPalsConfig["remotebuddy"]["autonomy"]; + private readonly dispatchGuardCfg: DispatchGuardConfig; private timer: ReturnType | null = null; private heartbeatTimer: ReturnType | null = null; private inFlight = false; @@ -2725,6 +3218,8 @@ export class RemoteBuddyAutonomousEngine { private lastOutcome: "none" | "success" | "skipped" | "failed" = "none"; private lastDetail = "not_started"; private lastCompletedAtMs = 0; + private lastGuardLogAtMs = 0; + private guardMetricsState: GuardMetricsState = makeGuardMetricsState(); constructor(opts: { server: string; @@ -2749,6 +3244,7 @@ export class RemoteBuddyAutonomousEngine { this.llm = opts.llm; this.comm = opts.comm; this.cfg = opts.config.remotebuddy.autonomy; + this.dispatchGuardCfg = validateDispatchGuardConfig(buildDispatchGuardConfigFromEnv()); } private setPhase(phase: string): void { @@ -3262,6 +3758,175 @@ export class RemoteBuddyAutonomousEngine { return data.ok && data.requestId ? data.requestId : null; } + private async fetchQueueTelemetry(): Promise { + try { + const res = await fetch(`${this.server}/system/status`, { + method: "GET", + headers: this.headers(), + }); + if (!res.ok) return null; + const payload = (await res.json()) as Record; + if (!asBoolean(payload.ok, false)) return null; + const ts = asString(payload.ts) || new Date().toISOString(); + const workers = asObject(payload.workers); + const queues = asObject(payload.queues); + const slo = asObject(payload.slo); + const requestSlo = asObject(slo.requests); + const queueWait = asObject(asObject(requestSlo.queueWaitMs)); + const queueP95Ms = Math.max( + 0, + asNumber(queueWait.p95 ?? queueWait.P95 ?? queueWait["p_95"], 0), + ); + const movingQueueP95Raw = asNumber( + queueWait.movingP95 ?? + queueWait.moving_p95 ?? + queueWait.p95Moving ?? + queueWait.p95_moving ?? + queueWait.rollingP95 ?? + queueWait.rolling_p95, + Number.NaN, + ); + const movingQueueP95Ms = Number.isFinite(movingQueueP95Raw) + ? Math.max(0, movingQueueP95Raw) + : null; + const idleWorkers = Math.max(0, Math.floor(asNumber(workers.idle, 0))); + const busyWorkers = Math.max(0, Math.floor(asNumber(workers.busy, 0))); + const onlineWorkers = Math.max( + 0, + Math.floor(asNumber(workers.online, idleWorkers + busyWorkers)), + ); + const requestCounts = asObject(asObject(queues).requests); + const pendingRequests = Math.max(0, Math.floor(asNumber(requestCounts.pending, 0))); + const priorityCounts = asObject(asObject(queues).requestPriorities); + const pendingInteractive = Math.max( + 0, + Math.floor(asNumber(priorityCounts.interactive, 0)), + ); + const sampleMs = Date.parse(ts); + const sampleAgeMs = Number.isFinite(sampleMs) ? Math.max(0, Date.now() - sampleMs) : null; + return { + ts, + sampleAgeMs, + queueP95Ms, + movingQueueP95Ms, + idleWorkers, + busyWorkers, + onlineWorkers, + pendingRequests, + pendingInteractive, + }; + } catch (error) { + console.warn( + `[RemoteBuddyAutonomousEngine] queue telemetry fetch failed: ${error instanceof Error ? error.message : String(error)}`, + ); + return null; + } + } + + private evaluateDispatchGuardForCandidate( + candidate: AutonomyCandidate, + telemetry: QueueTelemetry | null, + ): DispatchGuardDecision { + const state = evaluateDispatchGuardState(this.dispatchGuardCfg, telemetry); + return decideDispatchGuardAction(this.dispatchGuardCfg, state, candidate.trigger_type); + } + + private logDispatchGuardDecision( + runId: string, + snapshotId: string, + decision: DispatchGuardDecision, + context?: { candidateId?: string; triggerType?: AutonomyCandidate["trigger_type"] }, + ): void { + if (!this.dispatchGuardCfg.enabled) return; + this.recordDispatchGuardMetrics(decision); + if (decision.reasonCode === "queue_guard_disabled") return; + const eventPayload = buildGuardDecisionEvent( + this.dispatchGuardCfg, + { + runId, + snapshotId, + candidateId: context?.candidateId ?? null, + triggerType: context?.triggerType ?? null, + }, + decision, + ); + const serialized = JSON.stringify(eventPayload); + const now = Date.now(); + const shouldLogImmediately = decision.action !== "allow" || decision.level !== "healthy"; + if (shouldLogImmediately) { + console.log(`[RemoteBuddyAutonomousEngine] ${serialized}`); + void this.comm.emit("log", { + level: eventPayload.severity, + message: serialized, + }); + this.lastGuardLogAtMs = now; + return; + } + if (now - this.lastGuardLogAtMs < this.dispatchGuardCfg.logCooldownMs) { + return; + } + console.log(`[RemoteBuddyAutonomousEngine] ${serialized}`); + void this.comm.emit("log", { level: "info", message: serialized }); + this.lastGuardLogAtMs = now; + } + + private recordDispatchGuardMetrics(decision: DispatchGuardDecision): void { + if (!this.dispatchGuardCfg.enabled) return; + const state = this.guardMetricsState; + const key = `${decision.action}.${decision.level}.${decision.reasonCode}`; + state.counts.set(key, (state.counts.get(key) ?? 0) + 1); + const telemetry = decision.telemetry; + if (telemetry) { + updateRunningStats(state.telemetry.queueP95Ms, telemetry.queueP95Ms); + updateRunningStats(state.telemetry.movingQueueP95Ms, telemetry.movingQueueP95Ms); + updateRunningStats(state.telemetry.idleWorkers, telemetry.idleWorkers); + updateRunningStats(state.telemetry.sampleAgeMs, telemetry.sampleAgeMs); + } + this.flushDispatchGuardMetrics(false); + } + + private flushDispatchGuardMetrics(force: boolean): void { + const now = Date.now(); + if ( + !force && + now - this.guardMetricsState.lastFlushAtMs < this.dispatchGuardCfg.logCooldownMs + ) { + return; + } + if (this.guardMetricsState.counts.size === 0) return; + const payload = { + event: "queue_guard_metrics", + ts: new Date().toISOString(), + counts: Object.fromEntries(this.guardMetricsState.counts), + telemetry: { + queue_p95_ms: snapshotRunningStats(this.guardMetricsState.telemetry.queueP95Ms), + moving_queue_p95_ms: snapshotRunningStats( + this.guardMetricsState.telemetry.movingQueueP95Ms, + ), + idle_workers: snapshotRunningStats(this.guardMetricsState.telemetry.idleWorkers), + sample_age_ms: snapshotRunningStats(this.guardMetricsState.telemetry.sampleAgeMs), + }, + }; + const serialized = JSON.stringify(payload); + console.log(`[RemoteBuddyAutonomousEngine] queue dispatch guard metrics ${serialized}`); + void this.comm.emit("log", { + level: "info", + message: serialized, + }); + const nextState = makeGuardMetricsState(); + nextState.lastFlushAtMs = now; + this.guardMetricsState = nextState; + } + + private guardBlockReason(decision: DispatchGuardDecision): string { + const telemetry = decision.telemetry; + const metrics = telemetry + ? `p95=${Math.round(telemetry.queueP95Ms)}ms moving_p95=${telemetry.movingQueueP95Ms != null ? Math.round(telemetry.movingQueueP95Ms) : "na"} idle=${telemetry.idleWorkers} pending=${telemetry.pendingRequests} interactive=${telemetry.pendingInteractive}` + : "metrics_unavailable"; + const drivers = decision.reasons.length > 0 ? ` drivers=${decision.reasons.join("+")}` : ""; + return `${decision.reasonCode} ${metrics}${drivers}`.trim(); + } + private isSnapshotExpired(snapshot: Snapshot): boolean { const createdAt = Date.parse(snapshot.snapshot_created_at); if (!Number.isFinite(createdAt)) return true; @@ -4019,56 +4684,23 @@ export class RemoteBuddyAutonomousEngine { seed: `${runId}:${snapshot.snapshot_id}:${snapshot.snapshot_created_at}`, exploreRate: adaptiveExplore.effectiveRate, }); - const selected = selection.selected + let selected = selection.selected ? eligibleRows.find((row) => row.candidate.id === selection.selected?.id) : undefined; - const selectedStrategy = selected ? selection.strategy : "exploit"; + let selectedStrategy = selected ? selection.strategy : "exploit"; + let selectionRoll: number | null = selected ? selection.roll : null; + let selectedCandidateId = selected?.candidate.id ?? top.candidate.id; const objectiveId = `obj_${randomUUID().slice(0, 8)}`; - selectedCandidatePayload = selected - ? { - id: selected.candidate.id, - title: selected.candidate.title, - objective_type: selected.candidate.objective_type, - problem_statement: selected.candidate.problem_statement, - trigger_type: selected.candidate.trigger_type, - component_area: selected.candidate.component_area, - target_paths: selected.candidate.target_paths, - scope: selected.candidate.scope, - risk_level: selected.candidate.risk_level, - confidence: selected.candidate.confidence, - vision_alignment_reason: selected.candidate.vision_alignment_reason, - vision_section_refs: selected.candidate.vision_section_refs, - feature_hypotheses: selected.candidate.feature_hypotheses, - ...(selected.candidate.engine_trial ? { engine_trial: selected.candidate.engine_trial } : {}), - selection_strategy: selectedStrategy, - selection_roll: selection.roll, - effective_explore_rate: adaptiveExplore.effectiveRate, - } - : { - id: top.candidate.id, - title: top.candidate.title, - objective_type: top.candidate.objective_type, - problem_statement: top.candidate.problem_statement, - trigger_type: top.candidate.trigger_type, - component_area: top.candidate.component_area, - target_paths: top.candidate.target_paths, - scope: top.candidate.scope, - risk_level: top.candidate.risk_level, - confidence: top.candidate.confidence, - vision_alignment_reason: top.candidate.vision_alignment_reason, - vision_section_refs: top.candidate.vision_section_refs, - feature_hypotheses: top.candidate.feature_hypotheses, - ...(top.candidate.engine_trial ? { engine_trial: top.candidate.engine_trial } : {}), - selection_strategy: "none", - selection_roll: null, - effective_explore_rate: adaptiveExplore.effectiveRate, - }; - for (const row of candidatesPayload) { - const isSelected = Boolean(row.id === selectedCandidatePayload.id); - row.selected = isSelected; - row.selection_strategy = isSelected && selected ? selectedStrategy : "not_selected"; - row.selection_roll = isSelected ? selection.roll : null; - } + const applySelectionMarkers = () => { + for (const row of candidatesPayload) { + const isSelected = Boolean(row.id === selectedCandidateId); + row.selected = isSelected; + row.selection_strategy = isSelected && selected ? selectedStrategy : "not_selected"; + row.selection_roll = isSelected ? selectionRoll : null; + } + selectedCandidatePayload = candidatesPayload.find((entry) => entry.id === selectedCandidateId); + }; + applySelectionMarkers(); if (!selected) { this.setPhase("record_rejected_objective"); @@ -4124,6 +4756,99 @@ export class RemoteBuddyAutonomousEngine { return; } + const queueTelemetry = this.dispatchGuardCfg.enabled + ? await this.fetchQueueTelemetry() + : null; + let guardDecision = this.evaluateDispatchGuardForCandidate(selected.candidate, queueTelemetry); + this.logDispatchGuardDecision(runId, snapshot.snapshot_id, guardDecision, { + candidateId: selected.candidate.id, + triggerType: selected.candidate.trigger_type, + }); + if (guardDecision.action === "reroute" && selected) { + const rerouted = pickQueueHealthRerouteCandidate(eligibleRows, selected.candidate.id); + if (rerouted) { + selected = rerouted; + selectedStrategy = "guard_reroute"; + selectionRoll = null; + selectedCandidateId = rerouted.candidate.id; + applySelectionMarkers(); + guardDecision = this.evaluateDispatchGuardForCandidate(selected.candidate, queueTelemetry); + this.logDispatchGuardDecision(runId, snapshot.snapshot_id, guardDecision, { + candidateId: selected.candidate.id, + triggerType: selected.candidate.trigger_type, + }); + } else { + const rerouteFailedDecision: DispatchGuardDecision = { + action: "allow", + level: guardDecision.level, + reasonCode: "queue_guard_reroute_override", + reasons: [...guardDecision.reasons, "no_queue_health_candidate"], + telemetry: guardDecision.telemetry, + }; + this.logDispatchGuardDecision(runId, snapshot.snapshot_id, rerouteFailedDecision, { + candidateId: selected.candidate.id, + triggerType: selected.candidate.trigger_type, + }); + guardDecision = rerouteFailedDecision; + } + } + if (guardDecision.action !== "allow") { + this.setPhase("record_guard_blocked_objective"); + await this.postObjective({ + runId, + snapshotId: snapshot.snapshot_id, + sessionId: this.sessionId, + candidates: candidatesPayload, + objective: { + id: objectiveId, + candidate_id: selected.candidate.id, + title: selected.candidate.title, + instruction: selected.candidate.problem_statement, + objective_type: selected.candidate.objective_type, + component_area: selected.candidate.component_area, + trigger_type: selected.candidate.trigger_type, + target_paths: selected.candidate.target_paths, + scope: selected.candidate.scope, + confidence: selected.candidate.confidence, + risk_level: selected.candidate.risk_level, + status: "blocked", + block_reason: this.guardBlockReason(guardDecision), + score_breakdown: { + llm_score: selected.llmScore, + impact_signal: selected.impactSignal, + penalties: selected.penalties, + ema_success: selected.emaSuccess, + ema_user_accept: selected.emaUserAccept, + engine_idea_prior_score: selected.engineIdeaPriorScore, + engine_idea_novelty_score: selected.engineIdeaNoveltyScore, + engine_idea_novelty_bonus: selected.engineIdeaNoveltyBonus, + engine_idea_sample_count: selected.engineIdeaSampleCount, + engine_source_prior_score: selected.engineSourcePriorScore, + engine_source_novelty_score: selected.engineSourceNoveltyScore, + engine_source_novelty_bonus: selected.engineSourceNoveltyBonus, + engine_source_sample_count: selected.engineSourceSampleCount, + engine_source_trust_score: selected.engineSourceTrustScore, + engine_source_freshness_score: selected.engineSourceFreshnessScore, + engine_source_curation_status: selected.engineSourceCurationStatus, + engine_source_curation_reason: selected.engineSourceCurationReason, + engine_source_trust_boost: selected.engineSourceTrustBoost, + explore_rate_configured: adaptiveExplore.baseRate, + effective_explore_rate: adaptiveExplore.effectiveRate, + explore_rate_adjustment: adaptiveExplore.adjustment, + final_score: selected.finalScore, + selection_strategy: selectedStrategy, + selection_roll: selectionRoll, + }, + }, + llmCalls, + }); + outcomeDetail = + guardDecision.action === "throttle" + ? `queue_guard_throttle_${guardDecision.level}` + : `queue_guard_reroute_${guardDecision.level}`; + return; + } + if (selected.candidate.requires_user_input) { this.setPhase("record_blocked_requires_input"); await this.postObjective({ @@ -4169,7 +4894,7 @@ export class RemoteBuddyAutonomousEngine { explore_rate_adjustment: adaptiveExplore.adjustment, final_score: selected.finalScore, selection_strategy: selectedStrategy, - selection_roll: selection.roll, + selection_roll: selectionRoll, }, }, question: { @@ -4317,7 +5042,7 @@ export class RemoteBuddyAutonomousEngine { explore_rate_adjustment: adaptiveExplore.adjustment, final_score: selected.finalScore, selection_strategy: selectedStrategy, - selection_roll: selection.roll, + selection_roll: selectionRoll, }, }, llmCalls, @@ -4398,6 +5123,437 @@ export class RemoteBuddyAutonomousEngine { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } + this.flushDispatchGuardMetrics(true); this.nextTickAtMs = 0; } } + +if (import.meta.vitest) { + const { describe, it, expect } = import.meta.vitest; + + const baseGuardCfg: DispatchGuardConfig = { + ...DEFAULT_DISPATCH_GUARD_CONFIG, + }; + + const makeTelemetry = (overrides: Partial = {}): QueueTelemetry => ({ + ts: new Date().toISOString(), + sampleAgeMs: 0, + queueP95Ms: 800, + movingQueueP95Ms: null, + idleWorkers: 4, + busyWorkers: 1, + onlineWorkers: 5, + pendingRequests: 2, + pendingInteractive: 1, + ...overrides, + }); + + const buildCandidate = ( + id: string, + trigger: AutonomyCandidate["trigger_type"], + requiresInput = false, + ): AutonomyCandidate => ({ + id, + title: id, + objective_type: "feature_small", + problem_statement: `work on ${id}`, + trigger_type: trigger, + component_area: "apps/remotebuddy", + target_paths: ["apps/remotebuddy"], + scope: { read_anywhere: false, write_globs: ["apps/remotebuddy/*"] }, + risk_level: "low", + expected_validation: [], + estimated_effort: "small", + why_now_signal_ids: [], + confidence: 0.5, + vision_alignment_reason: "", + vision_section_refs: [], + feature_hypotheses: [], + ...(requiresInput ? { requires_user_input: true } : {}), + candidate_created_at: new Date().toISOString(), + }); + + class GuardTestCommunicationManager extends CommunicationManager { + readonly events: Array<{ type: string; payload: Record }> = []; + + constructor() { + super({ + serverUrl: "http://127.0.0.1:3999", + sessionId: "guard-test", + from: "remotebuddy", + }); + } + + override emit( + type: T, + payload: Record, + ): Promise { + this.events.push({ type, payload }); + return Promise.resolve(true); + } + } + + const noopGuardLlm: LLMClient = { + async generate() { + return { text: "" }; + }, + }; + + const makeGuardTestConfig = (): PushPalsConfig => + ({ + sourceControlManager: { + remote: "origin", + mainBranch: "main_agents", + baseBranch: "main", + }, + remotebuddy: { + autonomy: { + enabled: true, + killSwitchEnabled: false, + tickIntervalMs: 1_000, + heartbeatLogMs: 1_000, + visionContextMaxChars: 4_096, + ideationBudgetMs: 1_000, + llmTimeoutMs: 1_000, + allowDirtyWorktree: true, + ideationMaxCandidates: 3, + topK: 1, + exploreRate: 0.2, + minConfidence: 0.2, + maxConcurrentObjectives: 1, + maxDispatchPerHour: 4, + maxDispatchPerHourByType: {}, + maxDispatchPerHourByComponent: {}, + maxTokenUsagePerHour: 10_000, + maxRuntimeMsPerHour: 10_000, + cooldownFailStreakThreshold: 2, + cooldownMs: 30_000, + staleObjectiveTtlMs: 60_000, + staleObjectiveSweepIntervalMs: 60_000, + autoFreezeFailStreakThreshold: 4, + autoFreezeDurationMs: 30_000, + evaluatorWindowHours: 1, + evaluatorMinSamples: 1, + evaluatorMinSuccessRate: 0.5, + evaluatorMaxRegretRate: 0.5, + evaluatorRunIntervalMs: 30_000, + alertQueuePendingThreshold: 10, + alertJobFailureRateThreshold: 0.5, + alertAutonomyFailureRateThreshold: 0.5, + allowReadAnywhere: false, + prFeedbackCommentRows: 4, + prFeedbackCommentChars: 120, + prFeedbackSummaryChars: 240, + questionTtlMs: 7 * 24 * 60 * 60 * 1_000, + policyVersion: "test", + impactModelVersion: "test", + replay: { + storePromptPayloads: false, + maxRunsWithPayloads: 1, + maxPayloadBytes: 8_192, + }, + }, + }, + }) as unknown as PushPalsConfig; + + const makeGuardTestEngine = () => { + const comm = new GuardTestCommunicationManager(); + const engine = new RemoteBuddyAutonomousEngine({ + server: "http://127.0.0.1:3999", + sessionId: "guard-test", + authToken: null, + repo: process.cwd(), + llm: noopGuardLlm, + comm, + config: makeGuardTestConfig(), + }); + return { engine, comm }; + }; + + describe("dispatch guard", () => { + it("allows healthy telemetry", () => { + const state = evaluateDispatchGuardState(baseGuardCfg, makeTelemetry()); + expect(state.level).toBe("healthy"); + const decision = decideDispatchGuardAction(baseGuardCfg, state, "feature_small"); + expect(decision.action).toBe("allow"); + expect(decision.reasonCode).toBe("queue_guard_allow"); + }); + + it("throttles when queue latency is critical", () => { + const state = evaluateDispatchGuardState(baseGuardCfg, makeTelemetry({ queueP95Ms: 2_100 })); + expect(state.level).toBe("critical"); + const decision = decideDispatchGuardAction(baseGuardCfg, state, "docs"); + expect(decision.action).toBe("throttle"); + expect(decision.reasonCode).toBe("queue_guard_throttle"); + }); + + it("reroutes non-queue objectives when pressure is elevated", () => { + const state = evaluateDispatchGuardState(baseGuardCfg, makeTelemetry({ queueP95Ms: 1_200 })); + expect(state.level).toBe("elevated"); + const decision = decideDispatchGuardAction(baseGuardCfg, state, "feature_small"); + expect(decision.action).toBe("reroute"); + expect(decision.reasonCode).toBe("queue_guard_reroute"); + }); + + it("allows queue_health objectives during elevated pressure", () => { + const state = evaluateDispatchGuardState(baseGuardCfg, makeTelemetry({ queueP95Ms: 1_200 })); + const decision = decideDispatchGuardAction(baseGuardCfg, state, "queue_health"); + expect(decision.action).toBe("allow"); + expect(decision.level).toBe("elevated"); + expect(decision.reasonCode).toBe("queue_guard_allow_queue_health"); + }); + + it("throttles when idle workers fall below the hard stop", () => { + const state = evaluateDispatchGuardState(baseGuardCfg, makeTelemetry({ idleWorkers: 0 })); + expect(state.level).toBe("critical"); + const decision = decideDispatchGuardAction(baseGuardCfg, state, "queue_health"); + expect(decision.action).toBe("throttle"); + }); + + it("reroutes when idle worker pool enters the elevated band before hard stop", () => { + const cfg: DispatchGuardConfig = { + ...baseGuardCfg, + idleCriticalThreshold: 1, + idleElevatedThreshold: 2, + }; + const idleWorkers = cfg.idleElevatedThreshold; + const telemetry = makeTelemetry({ queueP95Ms: 850, idleWorkers }); + const state = evaluateDispatchGuardState(cfg, telemetry); + expect(state.level).toBe("elevated"); + expect(state.reasons).toContain(`idle_workers_elevated:${idleWorkers}`); + const decision = decideDispatchGuardAction(cfg, state, "feature_small"); + expect(decision.action).toBe("reroute"); + }); + + it("treats moving queue latency as pressure even if instantaneous looks healthy", () => { + const telemetry = makeTelemetry({ queueP95Ms: 800, movingQueueP95Ms: 1_800 }); + const state = evaluateDispatchGuardState(baseGuardCfg, telemetry); + expect(state.level).toBe("critical"); + expect(state.reasons.some((reason) => reason.includes("queue_p95_moving_critical"))).toBe(true); + const decision = decideDispatchGuardAction(baseGuardCfg, state, "feature_small"); + expect(decision.action).toBe("throttle"); + expect(decision.telemetry).toBe(telemetry); + }); + + it("raises elevated pressure when telemetry becomes stale", () => { + const sampleAge = baseGuardCfg.sampleStaleMs + 5_000; + const state = evaluateDispatchGuardState( + baseGuardCfg, + makeTelemetry({ sampleAgeMs: sampleAge }), + ); + expect(state.level).toBe("elevated"); + expect(state.reasons.some((reason) => reason.startsWith("telemetry_stale_ms"))).toBe(true); + }); + + it("throttles when telemetry remains stale beyond the critical window", () => { + const sampleAge = baseGuardCfg.sampleCriticalMs + 10_000; + const state = evaluateDispatchGuardState( + baseGuardCfg, + makeTelemetry({ sampleAgeMs: sampleAge }), + ); + expect(state.level).toBe("critical"); + expect( + state.reasons.some((reason) => reason.startsWith("telemetry_stale_critical_ms")), + ).toBe(true); + const decision = decideDispatchGuardAction(baseGuardCfg, state, "feature_small"); + expect(decision.action).toBe("throttle"); + }); + + it("reroutes on missing telemetry by default", () => { + const state = evaluateDispatchGuardState(baseGuardCfg, null); + expect(state.level).toBe("elevated"); + expect(state.reasons).toContain("telemetry_missing_reroute"); + const decision = decideDispatchGuardAction(baseGuardCfg, state, "feature_small"); + expect(decision.action).toBe("reroute"); + expect(decision.reasonCode).toBe("queue_guard_reroute"); + }); + + it("can fail open on missing telemetry only when explicitly configured", () => { + const failOpenCfg: DispatchGuardConfig = { + ...baseGuardCfg, + missingTelemetryPolicy: "fail_open", + }; + const state = evaluateDispatchGuardState(failOpenCfg, null); + expect(state.level).toBe("healthy"); + expect(state.reasons).toContain("telemetry_missing_fail_open"); + const decision = decideDispatchGuardAction(failOpenCfg, state, "feature_small"); + expect(decision.action).toBe("allow"); + expect(decision.reasonCode).toBe("queue_guard_allow"); + }); + + it("fails closed on missing telemetry when configured", () => { + const failClosedCfg: DispatchGuardConfig = { + ...baseGuardCfg, + missingTelemetryPolicy: "fail_closed", + }; + const state = evaluateDispatchGuardState(failClosedCfg, null); + expect(state.level).toBe("critical"); + const decision = decideDispatchGuardAction(failClosedCfg, state, "feature_small"); + expect(decision.action).toBe("throttle"); + }); + + it("selects highest scoring queue_health candidate for reroute", () => { + const rows = [ + { candidate: buildCandidate("primary", "feature_small"), finalScore: 0.9 }, + { candidate: buildCandidate("queue_low", "queue_health", true), finalScore: 0.6 }, + { candidate: buildCandidate("queue_high", "queue_health"), finalScore: 0.8 }, + ]; + const rerouted = pickQueueHealthRerouteCandidate(rows, "primary"); + expect(rerouted?.candidate.id).toBe("queue_high"); + }); + + it("transitions allow to reroute and back to allow as queue pressure fluctuates", () => { + const telemetrySeries = [ + makeTelemetry({ queueP95Ms: 800 }), + makeTelemetry({ queueP95Ms: 1_150 }), + makeTelemetry({ queueP95Ms: 850 }), + ]; + const actions = telemetrySeries.map((telemetry) => { + const state = evaluateDispatchGuardState(baseGuardCfg, telemetry); + const decision = decideDispatchGuardAction(baseGuardCfg, state, "feature_small"); + return decision.action; + }); + expect(actions).toEqual(["allow", "reroute", "allow"]); + }); + + it("transitions from throttle to reroute to allow as telemetry improves", () => { + const criticalState = evaluateDispatchGuardState( + baseGuardCfg, + makeTelemetry({ queueP95Ms: 2_200 }), + ); + const criticalDecision = decideDispatchGuardAction( + baseGuardCfg, + criticalState, + "feature_small", + ); + expect(criticalDecision.action).toBe("throttle"); + + const elevatedState = evaluateDispatchGuardState( + baseGuardCfg, + makeTelemetry({ queueP95Ms: 1_200 }), + ); + const rerouteDecision = decideDispatchGuardAction( + baseGuardCfg, + elevatedState, + "feature_small", + ); + expect(rerouteDecision.action).toBe("reroute"); + + const healthyState = evaluateDispatchGuardState(baseGuardCfg, makeTelemetry({ queueP95Ms: 800 })); + const healthyDecision = decideDispatchGuardAction( + baseGuardCfg, + healthyState, + "feature_small", + ); + expect(healthyDecision.action).toBe("allow"); + expect(healthyDecision.level).toBe("healthy"); + }); + + it("builds structured guard decision events with telemetry context", () => { + const telemetry = makeTelemetry({ + queueP95Ms: 1_700, + movingQueueP95Ms: 1_850, + idleWorkers: 0, + sampleAgeMs: 12_000, + pendingRequests: 7, + pendingInteractive: 3, + }); + const decision: DispatchGuardDecision = { + action: "throttle", + level: "critical", + reasonCode: "queue_guard_throttle", + reasons: ["queue_p95_instant_critical:1700", "idle_workers_critical:0"], + telemetry, + }; + const event = buildGuardDecisionEvent( + baseGuardCfg, + { + runId: "run_test", + snapshotId: "snap_test", + candidateId: "cand_test", + triggerType: "queue_health", + ts: "2025-01-01T00:00:00.000Z", + }, + decision, + ); + expect(event).toMatchObject({ + event: "queue_guard_decision", + action: "throttle", + level: "critical", + severity: "error", + run_id: "run_test", + snapshot_id: "snap_test", + candidate_id: "cand_test", + trigger_type: "queue_health", + reason_code: "queue_guard_throttle", + guard_enabled: true, + }); + expect(event.telemetry).toMatchObject({ + queue_p95_ms: 1_700, + moving_queue_p95_ms: 1_850, + idle_workers: 0, + sample_age_ms: 12_000, + pending_requests: 7, + pending_interactive: 3, + }); + expect(event.thresholds.queue_p95_elevated_ms).toBe(baseGuardCfg.queueP95ElevatedMs); + expect(event.thresholds.idle_critical_threshold).toBe(baseGuardCfg.idleCriticalThreshold); + expect(event.thresholds.sample_critical_ms).toBe(baseGuardCfg.sampleCriticalMs); + expect(event.ts).toBe("2025-01-01T00:00:00.000Z"); + }); + + it("emits aggregated queue guard metrics with counts and telemetry stats", () => { + const { engine, comm } = makeGuardTestEngine(); + const recordMetrics = (engine as any).recordDispatchGuardMetrics.bind(engine); + const flushMetrics = (engine as any).flushDispatchGuardMetrics.bind(engine); + + const criticalTelemetry = makeTelemetry({ + queueP95Ms: 1_950, + movingQueueP95Ms: 2_000, + idleWorkers: 0, + sampleAgeMs: 8_000, + }); + recordMetrics({ + action: "throttle", + level: "critical", + reasonCode: "queue_guard_throttle", + reasons: ["queue_p95_instant_critical:1950"], + telemetry: criticalTelemetry, + }); + + const healthyTelemetry = makeTelemetry({ + queueP95Ms: 600, + movingQueueP95Ms: 650, + idleWorkers: 4, + sampleAgeMs: 1_000, + }); + recordMetrics({ + action: "allow", + level: "healthy", + reasonCode: "queue_guard_allow", + reasons: [], + telemetry: healthyTelemetry, + }); + + flushMetrics(true); + + const logEvent = comm.events.find((event) => event.type === "log"); + expect(logEvent).toBeDefined(); + const message = String((logEvent?.payload?.message as string) ?? ""); + expect(message).not.toBe(""); + const payload = JSON.parse(message) as Record; + expect(payload.event).toBe("queue_guard_metrics"); + const counts = payload.counts as Record; + expect(counts["throttle.critical.queue_guard_throttle"]).toBe(1); + expect(counts["allow.healthy.queue_guard_allow"]).toBe(1); + const telemetryStats = payload.telemetry as Record< + string, + { min: number | null; max: number | null } + >; + expect((telemetryStats.queue_p95_ms?.max ?? 0) >= 1_950).toBe(true); + expect((telemetryStats.moving_queue_p95_ms?.min ?? 0) >= 650).toBe(true); + expect((telemetryStats.idle_workers?.min ?? 99)).toBe(0); + expect((telemetryStats.sample_age_ms?.max ?? 0) >= 8_000).toBe(true); + }); + }); +}