diff --git a/apps/remotebuddy/src/autonomous_engine.queue.test.ts b/apps/remotebuddy/src/autonomous_engine.queue.test.ts new file mode 100644 index 0000000..b52fd44 --- /dev/null +++ b/apps/remotebuddy/src/autonomous_engine.queue.test.ts @@ -0,0 +1,164 @@ +import { describe, expect, test } from "bun:test"; + +import type { AutonomyComponentArea } from "shared"; + +import { + QueueBackpressureController, + buildQueueBacklogMetrics, + shouldThrottleQueue, + type QueueTelemetry, +} from "./autonomous_engine"; + +function makeTelemetry(overrides: Partial = {}): QueueTelemetry { + return { + capturedAt: "2026-03-06T00:00:00.000Z", + queueP95Ms: 750, + idleWorkers: 3, + pendingByPriority: { + interactive: 4, + normal: 6, + background: 2, + }, + pendingSnapshot: [], + ...overrides, + }; +} + +describe("shouldThrottleQueue", () => { + test("throttles when queue latency exceeds the threshold", () => { + const decision = shouldThrottleQueue(makeTelemetry({ queueP95Ms: 1800 }), 1000); + expect(decision.throttle).toBe(true); + expect(decision.reason).toBe("queue_latency_exceeded"); + expect(decision.queueP95Ms).toBe(1800); + }); + + test("throttles when idle workers are exhausted", () => { + const decision = shouldThrottleQueue(makeTelemetry({ idleWorkers: 0 }), 1000); + expect(decision.throttle).toBe(true); + expect(decision.reason).toBe("no_idle_workers"); + expect(decision.idleWorkers).toBe(0); + }); + + test("allows minting when telemetry is healthy", () => { + const decision = shouldThrottleQueue(makeTelemetry(), 1000); + expect(decision.throttle).toBe(false); + expect(decision.reason).toBeNull(); + expect(decision.queueP95Ms).toBe(750); + }); + + test("fails safe when telemetry is unavailable", () => { + const decision = shouldThrottleQueue(null, 1000); + expect(decision.throttle).toBe(true); + expect(decision.reason).toBe("telemetry_unavailable"); + }); + + test("fails safe when telemetry is missing critical fields", () => { + const decision = shouldThrottleQueue( + makeTelemetry({ queueP95Ms: null, idleWorkers: null }), + 1000, + ); + expect(decision.throttle).toBe(true); + expect(decision.reason).toBe("telemetry_unavailable"); + }); +}); + +describe("QueueBackpressureController", () => { + test("requires stabilized samples before clearing backpressure", () => { + const controller = new QueueBackpressureController({ + thresholdMs: 1000, + recoverySamples: 2, + recoveryDurationMs: 1_000, + }); + const breach = controller.evaluate(makeTelemetry({ queueP95Ms: 2000 }), 0); + expect(breach.decision.throttle).toBe(true); + expect(breach.decision.reason).toBe("queue_latency_exceeded"); + + const firstHealthy = controller.evaluate(makeTelemetry({ queueP95Ms: 800 }), 200); + expect(firstHealthy.decision.throttle).toBe(true); + expect(firstHealthy.decision.reason).toBe("recovery_cooldown"); + expect(firstHealthy.transition).toBeNull(); + + const cleared = controller.evaluate(makeTelemetry({ queueP95Ms: 800 }), 1_300); + expect(cleared.decision.throttle).toBe(false); + expect(cleared.transition).toBe("cleared"); + }); + + test("keeps throttling when telemetry drops during recovery", () => { + const controller = new QueueBackpressureController({ + thresholdMs: 1000, + recoverySamples: 2, + recoveryDurationMs: 1_000, + }); + const idleBreach = controller.evaluate(makeTelemetry({ idleWorkers: 0 }), 0); + expect(idleBreach.decision.reason).toBe("no_idle_workers"); + + const telemetryGap = controller.evaluate(null, 250); + expect(telemetryGap.decision.reason).toBe("telemetry_unavailable"); + expect(telemetryGap.decision.throttle).toBe(true); + + const recoveryStart = controller.evaluate(makeTelemetry(), 600); + expect(recoveryStart.decision.reason).toBe("recovery_cooldown"); + expect(recoveryStart.decision.throttle).toBe(true); + + const recoveryComplete = controller.evaluate(makeTelemetry(), 1_700); + expect(recoveryComplete.decision.throttle).toBe(false); + expect(recoveryComplete.transition).toBe("cleared"); + }); + + test("fails open when telemetry is unavailable for an extended window", () => { + const controller = new QueueBackpressureController({ + thresholdMs: 1000, + recoverySamples: 1, + recoveryDurationMs: 200, + telemetryFailOpenMs: 300, + }); + controller.evaluate(makeTelemetry({ queueP95Ms: 1500 }), 0); + + const telemetryGap = controller.evaluate(null, 150); + expect(telemetryGap.decision.reason).toBe("telemetry_unavailable"); + expect(telemetryGap.decision.throttle).toBe(true); + + const failOpen = controller.evaluate(null, 500); + expect(failOpen.decision.throttle).toBe(false); + expect(failOpen.transition).toBe("cleared"); + + const reengage = controller.evaluate(makeTelemetry({ queueP95Ms: 1700 }), 800); + expect(reengage.decision.throttle).toBe(true); + expect(reengage.decision.reason).toBe("queue_latency_exceeded"); + }); +}); + +describe("buildQueueBacklogMetrics", () => { + test("returns structured per-skill backlog data with complete metadata", () => { + const backlogMetrics = buildQueueBacklogMetrics({ + telemetry: makeTelemetry({ queueP95Ms: 900, idleWorkers: 4 }), + pendingAutonomyBySkill: { + "apps/server": 3, + "apps/remotebuddy": 2, + "apps/workerpals": 1, + } as Record, + dispatchByComponent: { + "apps/server": 5, + "apps/workerpals": 1, + }, + targetSkill: "apps/remotebuddy" as AutonomyComponentArea, + pendingAutonomySampleLimit: 500, + pendingAutonomySampleSize: 6, + pendingAutonomySampleTruncated: true, + }); + expect(backlogMetrics).not.toBeNull(); + expect(backlogMetrics?.skills.length).toBeGreaterThanOrEqual(8); + const pendingTotal = + backlogMetrics?.skills.reduce((sum, row) => sum + row.pending_autonomy_requests, 0) ?? 0; + expect(pendingTotal).toBe(6); + const ratioSum = + backlogMetrics?.skills.reduce((sum, row) => sum + row.backlog_ratio, 0) ?? 0; + expect(ratioSum).toBeCloseTo(1, 5); + const targetSkill = backlogMetrics?.skills.find((row) => row.skill === "apps/remotebuddy"); + expect(targetSkill?.is_target_skill).toBe(true); + expect(targetSkill?.pending_autonomy_requests).toBe(2); + expect(backlogMetrics?.pending_autonomy_sample_limit).toBe(500); + expect(backlogMetrics?.pending_autonomy_sample_size).toBe(6); + expect(backlogMetrics?.pending_autonomy_sample_truncated).toBe(true); + }); +}); diff --git a/apps/remotebuddy/src/autonomous_engine.ts b/apps/remotebuddy/src/autonomous_engine.ts index 17c3523..eddcba1 100644 --- a/apps/remotebuddy/src/autonomous_engine.ts +++ b/apps/remotebuddy/src/autonomous_engine.ts @@ -284,6 +284,76 @@ const AUTO_INGEST_SEED_PATTERNS: Array<{ }, ]; +const QUEUE_BACKLOG_PRIORITY_KEYS = [ + "interactive", + "normal", + "background", + "evaluation", + "worker", + "ops", +] as const; +type QueueBacklogPriority = (typeof QUEUE_BACKLOG_PRIORITY_KEYS)[number]; +type QueueBacklogTriState = "true" | "false" | "unknown"; + +const BACKLOG_INTERACTIVE_CAUTION = 40; +const BACKLOG_INTERACTIVE_CRITICAL = 60; +const BACKLOG_TOTAL_CAUTION = 80; +const BACKLOG_TOTAL_CRITICAL = 120; + +type QueueBacklogTelemetry = { + pending_total?: number | null; + pending_by_priority?: Record; + queue_p95_ms?: number | null; + sample_size?: number | null; + sample_limit?: number | null; + has_more?: boolean | null; + counts?: Record | null; + total?: number | null; + total_count?: number | null; + pagination?: Record | null; + pagination_meta?: Record | null; + page_info?: Record | null; + page?: Record | null; + meta?: Record | null; + next_cursor?: string | null; + previous_cursor?: string | null; + updated_at?: string | null; + source?: string | null; +}; + +type QueueBacklogMetrics = { + pendingTotal: number | null; + pendingByPriority: Record; + queueP95Ms: number | null; + sampleSize: number | null; + sampleLimit: number | null; + truncated: QueueBacklogTriState; + updatedAt: string | null; + source: string; + confidence: QueueBacklogTriState; + notes: string[]; +}; + +type QueueBacklogEvidence = { + pending_total: number | null; + pending_by_priority: Record; + queue_p95_ms: number | null; + sample_size: number | null; + sample_limit: number | null; + truncated: QueueBacklogTriState; + confidence: QueueBacklogTriState; + updated_at: string | null; + source: string; + notes: string[]; +}; + +type QueueBacklogEvaluation = { + severity: "clear" | "caution" | "critical" | "unknown"; + throttle: boolean; + reason: string; + evidence: QueueBacklogEvidence; +}; + type SourceCurationStatus = "candidate" | "trusted" | "watchlist" | "archived"; type FeedbackPriorForScoring = { @@ -796,6 +866,307 @@ function asNumber(value: unknown, fallback = 0): number { return Number.isFinite(n) ? n : fallback; } +function clampNonNegativeInt(value: unknown, fallback = 0): number { + const normalized = Math.floor(asNumber(value, fallback)); + if (!Number.isFinite(normalized)) return Math.max(0, Math.floor(fallback)); + return Math.max(0, normalized); +} + +function optionalNonNegativeInt(value: unknown): number | null { + if (value === null || value === undefined) return null; + const normalized = Math.floor(asNumber(value, Number.NaN)); + if (!Number.isFinite(normalized)) return null; + return Math.max(0, normalized); +} + +function optionalBoolean(value: unknown): boolean | null { + if (typeof value === "boolean") return value; + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (!normalized) return null; + if (["1", "true", "yes", "on"].includes(normalized)) return true; + if (["0", "false", "no", "off"].includes(normalized)) return false; + } + return null; +} + +function firstNonNegativeInt(values: Array): number | null { + for (const value of values) { + const normalized = optionalNonNegativeInt(value); + if (normalized !== null) return normalized; + } + return null; +} + +function firstBooleanFlag(values: Array): boolean | null { + for (const value of values) { + const normalized = optionalBoolean(value); + if (normalized !== null) return normalized; + } + return null; +} + +function firstNonEmptyString(values: Array): string | null { + for (const value of values) { + const text = asString(value); + if (text) return text; + } + return null; +} + +function triStateFromBoolean(value: boolean | null | undefined): QueueBacklogTriState { + if (value === true) return "true"; + if (value === false) return "false"; + return "unknown"; +} + +function normalizeQueuePriority(value: unknown): QueueBacklogPriority { + const text = asString(value).toLowerCase(); + if (text === "interactive" || text === "urgent" || text === "int") return "interactive"; + if (text === "background" || text === "bg") return "background"; + if (text === "evaluation" || text === "eval" || text === "analysis") return "evaluation"; + if (text === "worker" || text === "forceworker" || text === "workerpal") return "worker"; + if (text === "ops" || text === "admin") return "ops"; + return "normal"; +} + +export function buildQueueBacklogMetrics( + telemetry: QueueBacklogTelemetry | null, +): QueueBacklogMetrics | null { + if (!telemetry) return null; + const raw = asObject(telemetry); + const pendingByPriorityInput = asObject(raw.pending_by_priority ?? raw.pendingByPriority ?? {}); + const canonicalSeeds: Record = { + interactive: 0, + normal: 0, + background: 0, + evaluation: 0, + worker: 0, + ops: 0, + }; + const aliasAccumulator: Record = { + interactive: 0, + normal: 0, + background: 0, + evaluation: 0, + worker: 0, + ops: 0, + }; + for (const [key, value] of Object.entries(pendingByPriorityInput).slice(0, 96)) { + const normalizedKey = normalizeQueuePriority(key); + const count = clampNonNegativeInt(value, 0); + if (count <= 0) continue; + const canonicalMatch = asString(key).trim().toLowerCase() === normalizedKey; + if (canonicalMatch) { + canonicalSeeds[normalizedKey] = Math.min(10_000, count); + continue; + } + aliasAccumulator[normalizedKey] = Math.min( + 10_000, + aliasAccumulator[normalizedKey] + count, + ); + } + const pendingByPriority: Record = { + interactive: 0, + normal: 0, + background: 0, + evaluation: 0, + worker: 0, + ops: 0, + }; + for (const priority of QUEUE_BACKLOG_PRIORITY_KEYS) { + const seeded = canonicalSeeds[priority]; + const alias = aliasAccumulator[priority]; + if (seeded > 0 || alias > 0) { + pendingByPriority[priority] = Math.min(10_000, seeded + alias); + } + } + for (const priority of QUEUE_BACKLOG_PRIORITY_KEYS) { + if (pendingByPriority[priority] > 0) continue; + const fallbackValue = firstNonNegativeInt([ + pendingByPriorityInput[priority], + raw[`pending_${priority}`], + raw[`pending${priority.charAt(0).toUpperCase()}${priority.slice(1)}`], + raw[`${priority}_pending`], + ]); + if (fallbackValue !== null) { + pendingByPriority[priority] = Math.min(10_000, fallbackValue); + } + } + const counts = asObject(raw.counts); + const paginationEnvelopes = [ + asObject(raw.pagination ?? {}), + asObject(raw.pagination_meta ?? raw.paginationMeta ?? {}), + asObject(raw.page_info ?? raw.pageInfo ?? {}), + asObject(raw.page ?? {}), + asObject(raw.meta ?? {}), + ]; + const paginationPendingTotals = paginationEnvelopes.flatMap((meta) => [ + meta.pending_total, + meta.pendingTotal, + meta.total_pending, + meta.totalPending, + ]); + const paginationTotalCounts = paginationEnvelopes.flatMap((meta) => [ + meta.total_count, + meta.totalCount, + ]); + const pendingTotal = + firstNonNegativeInt([ + raw.pending_total, + raw.pendingTotal, + raw.pending, + raw.total_pending, + raw.totalPending, + raw.queue_pending, + raw.queuePending, + raw.queue_total, + raw.queueTotal, + counts.pending, + counts.pending_total, + counts.pendingTotal, + counts.total_pending, + counts.totalPending, + counts.queue_pending, + counts.queuePending, + counts.queue_total, + counts.queueTotal, + ...paginationPendingTotals, + raw.total_count, + raw.totalCount, + counts.total_count, + counts.totalCount, + ...paginationTotalCounts, + ]) ?? null; + const queueP95Ms = + optionalNonNegativeInt( + raw.queue_p95_ms ?? raw.queueP95Ms ?? raw.queue_ms ?? raw.queueMs ?? null, + ) ?? null; + const sampleSize = optionalNonNegativeInt(raw.sample_size ?? raw.sampleSize ?? null); + const sampleLimit = optionalNonNegativeInt(raw.sample_limit ?? raw.sampleLimit ?? raw.limit); + const hasMoreExplicit = firstBooleanFlag([ + raw.has_more, + raw.hasMore, + ...paginationEnvelopes.flatMap((meta) => [meta.has_more, meta.hasMore]), + ]); + const nextCursor = firstNonEmptyString([ + raw.next_cursor, + raw.nextCursor, + raw.next_page_token, + raw.nextPageToken, + raw.page_token, + raw.pageToken, + ...paginationEnvelopes.flatMap((meta) => [ + meta.next_cursor, + meta.nextCursor, + meta.next_page_token, + meta.nextPageToken, + meta.page_token, + meta.pageToken, + ]), + ]); + const paginationTotalForTruncation = + firstNonNegativeInt([ + ...paginationPendingTotals, + ...paginationTotalCounts, + counts.pending_total, + counts.pendingTotal, + counts.total_pending, + counts.totalPending, + counts.total_count, + counts.totalCount, + raw.pending_total, + raw.pendingTotal, + raw.total_pending, + raw.totalPending, + raw.total_count, + raw.totalCount, + ]) ?? null; + let truncated = triStateFromBoolean(hasMoreExplicit); + if (truncated === "unknown" && paginationTotalForTruncation !== null && sampleSize !== null) { + truncated = paginationTotalForTruncation > sampleSize ? "true" : "false"; + } + if (truncated === "unknown" && nextCursor) { + truncated = "true"; + } + const updatedAt = asString(raw.updated_at ?? raw.updatedAt ?? ""); + const source = asString(raw.source) || "requests_pending"; + let confidence: QueueBacklogTriState; + if (pendingTotal !== null && truncated === "false") { + confidence = "true"; + } else if (pendingTotal === null || truncated === "true") { + confidence = "false"; + } else { + confidence = "unknown"; + } + const notes: string[] = []; + if (pendingTotal === null) notes.push("pending_total_missing"); + if (truncated === "true") notes.push("telemetry_truncated"); + else if (truncated === "unknown") notes.push("telemetry_truncation_unknown"); + if (confidence === "false") notes.push("telemetry_low_confidence"); + else if (confidence === "unknown") notes.push("telemetry_confidence_unknown"); + return { + pendingTotal, + pendingByPriority, + queueP95Ms, + sampleSize, + sampleLimit, + truncated, + updatedAt: updatedAt || null, + source, + confidence, + notes, + }; +} +function evaluateQueueBacklog(metrics: QueueBacklogMetrics | null): QueueBacklogEvaluation | null { + if (!metrics) return null; + const interactive = metrics.pendingByPriority.interactive ?? 0; + const normal = metrics.pendingByPriority.normal ?? 0; + const background = metrics.pendingByPriority.background ?? 0; + const total = + metrics.pendingTotal ?? + [interactive, normal, background, metrics.pendingByPriority.evaluation, metrics.pendingByPriority.worker, metrics.pendingByPriority.ops].reduce( + (sum, value) => sum + value, + 0, + ); + const telemetryConfidence = metrics.confidence; + const telemetryReliable = telemetryConfidence === "true"; + const telemetryUnreliable = telemetryConfidence === "false"; + let severity: QueueBacklogEvaluation["severity"] = "clear"; + if (!telemetryReliable) { + severity = "unknown"; + } else if (interactive >= BACKLOG_INTERACTIVE_CRITICAL || total >= BACKLOG_TOTAL_CRITICAL) { + severity = "critical"; + } else if (interactive >= BACKLOG_INTERACTIVE_CAUTION || total >= BACKLOG_TOTAL_CAUTION) { + severity = "caution"; + } + const throttle = telemetryReliable && severity !== "clear"; + const reason = + severity === "unknown" + ? telemetryUnreliable + ? "backlog_telemetry_unreliable" + : "backlog_telemetry_unknown" + : throttle + ? `queue_backlog_high interactive=${interactive} total=${total}` + : "queue_backlog_within_limits"; + return { + severity, + throttle, + reason, + evidence: { + pending_total: metrics.pendingTotal, + pending_by_priority: metrics.pendingByPriority, + queue_p95_ms: metrics.queueP95Ms, + sample_size: metrics.sampleSize, + sample_limit: metrics.sampleLimit, + truncated: metrics.truncated, + confidence: metrics.confidence, + updated_at: metrics.updatedAt, + source: metrics.source, + notes: metrics.notes, + }, + }; +} function uniqueLowercaseTokens(values: string[], max = 24): string[] { const out: string[] = []; const seen = new Set(); @@ -2617,6 +2988,228 @@ export class RemoteBuddyAutonomousEngine { return [...trusted, ...archived]; } + private async fetchAutonomyBacklogCounts(limit = 120): Promise { + const boundedLimit = Math.max(10, Math.min(400, Math.floor(limit))); + const qs = new URLSearchParams({ + status: "pending", + limit: String(boundedLimit), + }); + let res: Response; + try { + res = await fetch(`${this.server}/requests?${qs.toString()}`, { + method: "GET", + headers: this.headers(), + }); + } catch (error) { + console.warn( + `[RemoteBuddyAutonomousEngine] backlog fetch failed: ${String(error)}`, + ); + return null; + } + if (!res.ok) { + console.warn( + `[RemoteBuddyAutonomousEngine] backlog fetch failed with HTTP ${res.status}`, + ); + return null; + } + const data = (await res.json().catch(() => ({}))) as Record; + if (data.ok === false) return null; + const pendingRows = Array.isArray(data.requests) ? data.requests : []; + const sampledRows = pendingRows.slice(0, boundedLimit); + const pendingByPriority: Record = { + interactive: 0, + normal: 0, + background: 0, + evaluation: 0, + worker: 0, + ops: 0, + }; + for (const row of sampledRows) { + const record = asObject(row); + const priority = normalizeQueuePriority(record.priority ?? record.queuePriority); + pendingByPriority[priority] += 1; + } + const counts = asObject(data.counts); + const pagination = asObject(data.pagination ?? {}); + const paginationMeta = asObject(data.pagination_meta ?? data.paginationMeta ?? {}); + const pageInfo = asObject(data.page_info ?? data.pageInfo ?? {}); + const page = asObject(data.page ?? {}); + const meta = asObject(data.meta ?? {}); + const totalCandidates = [ + counts.pending, + counts.total_pending, + counts.totalPending, + counts.total, + counts.total_count, + counts.totalCount, + data.pending_total, + data.pendingTotal, + data.total_pending, + data.totalPending, + data.total, + data.total_count, + data.totalCount, + pagination.total_pending, + pagination.totalPending, + pagination.total, + pagination.total_count, + pagination.totalCount, + paginationMeta.total_pending, + paginationMeta.totalPending, + paginationMeta.total, + paginationMeta.total_count, + paginationMeta.totalCount, + pageInfo.total_pending, + pageInfo.totalPending, + pageInfo.total, + pageInfo.total_count, + pageInfo.totalCount, + page.total_pending, + page.totalPending, + page.total, + page.total_count, + page.totalCount, + meta.total_pending, + meta.totalPending, + meta.total, + meta.total_count, + meta.totalCount, + ]; + const pendingTotal = firstNonNegativeInt(totalCandidates); + const slo = asObject(data.slo); + const requestSlo = asObject(slo.requests); + const queueWait = asObject(requestSlo.queueWaitMs); + const hasMoreExplicit = firstBooleanFlag([ + data.has_more, + data.hasMore, + pagination.has_more, + pagination.hasMore, + paginationMeta.has_more, + paginationMeta.hasMore, + pageInfo.has_more, + pageInfo.hasMore, + page.has_more, + page.hasMore, + meta.has_more, + meta.hasMore, + ]); + const nextCursor = firstNonEmptyString([ + data.next_cursor, + data.nextCursor, + data.next_page_token, + data.nextPageToken, + pagination.next_cursor, + pagination.nextCursor, + pagination.next_page_token, + pagination.nextPageToken, + paginationMeta.next_cursor, + paginationMeta.nextCursor, + paginationMeta.next_page_token, + paginationMeta.nextPageToken, + pageInfo.next_cursor, + pageInfo.nextCursor, + pageInfo.next_page_token, + pageInfo.nextPageToken, + page.next_cursor, + page.nextCursor, + page.next_page_token, + page.nextPageToken, + meta.next_cursor, + meta.nextCursor, + meta.next_page_token, + meta.nextPageToken, + ]); + const prevCursor = firstNonEmptyString([ + data.prev_cursor, + data.prevCursor, + data.previous_cursor, + data.previousCursor, + data.prev_page_token, + data.prevPageToken, + pagination.prev_cursor, + pagination.prevCursor, + pagination.previous_cursor, + pagination.previousCursor, + pagination.prev_page_token, + pagination.prevPageToken, + paginationMeta.prev_cursor, + paginationMeta.prevCursor, + paginationMeta.previous_cursor, + paginationMeta.previousCursor, + paginationMeta.prev_page_token, + paginationMeta.prevPageToken, + pageInfo.prev_cursor, + pageInfo.prevCursor, + pageInfo.previous_cursor, + pageInfo.previousCursor, + pageInfo.prev_page_token, + pageInfo.prevPageToken, + page.prev_cursor, + page.prevCursor, + page.previous_cursor, + page.previousCursor, + page.prev_page_token, + page.prevPageToken, + meta.prev_cursor, + meta.prevCursor, + meta.previous_cursor, + meta.previousCursor, + meta.prev_page_token, + meta.prevPageToken, + ]); + const telemetry: QueueBacklogTelemetry = { + pending_total: pendingTotal, + pending_by_priority: pendingByPriority, + queue_p95_ms: optionalNonNegativeInt(queueWait.p95), + sample_size: sampledRows.length, + sample_limit: boundedLimit, + has_more: hasMoreExplicit, + counts, + total: firstNonNegativeInt([ + data.total, + data.total_count, + counts.total, + counts.total_count, + pagination.total, + pagination.total_count, + paginationMeta.total, + paginationMeta.total_count, + pageInfo.total, + pageInfo.total_count, + page.total, + page.total_count, + meta.total, + meta.total_count, + ]), + total_count: firstNonNegativeInt([ + data.total_count, + data.totalCount, + counts.total_count, + counts.totalCount, + pagination.total_count, + pagination.totalCount, + paginationMeta.total_count, + paginationMeta.totalCount, + pageInfo.total_count, + pageInfo.totalCount, + page.total_count, + page.totalCount, + meta.total_count, + meta.totalCount, + ]), + pagination, + pagination_meta: paginationMeta, + page_info: pageInfo, + page, + meta, + next_cursor: nextCursor, + previous_cursor: prevCursor, + updated_at: asString(data.ts ?? data.updatedAt ?? ""), + source: "requests_pending", + }; + return evaluateQueueBacklog(buildQueueBacklogMetrics(telemetry)); + } + private buildAutoInspirationEntries(commitHistoryHints: EngineCommitHistoryHint[]): Array> { const staticEntries = AUTO_INGEST_SEED_PATTERNS.map((seed) => ({ source_type: "internal_doc", @@ -2809,8 +3402,11 @@ export class RemoteBuddyAutonomousEngine { responseHash: sha256(output.text), tokenUsage, latencyMs: Date.now() - startedAt, - }, - }; + }, +}; + + + } private async enqueueSyntheticRequest( @@ -3716,6 +4312,66 @@ export class RemoteBuddyAutonomousEngine { return; } + this.setPhase("evaluate_queue_backlog"); + const backlogAssessment = await this.fetchAutonomyBacklogCounts(); + const backlogEvidencePayload = backlogAssessment?.evidence + ? { queue_backlog: backlogAssessment.evidence } + : undefined; + if (backlogAssessment?.throttle) { + this.setPhase("record_blocked_queue_backlog"); + await this.postObjective({ + runId, + snapshotId: snapshot.snapshot_id, + sessionId: this.sessionId, + candidates: candidatesPayload, + objective: { + id: objectiveId, + candidate_id: selected.candidate.id, + title: selected.candidate.title, + instruction: selected.candidate.problem_statement, + objective_type: selected.candidate.objective_type, + component_area: selected.candidate.component_area, + trigger_type: selected.candidate.trigger_type, + target_paths: selected.candidate.target_paths, + scope: selected.candidate.scope, + confidence: selected.candidate.confidence, + risk_level: selected.candidate.risk_level, + status: "blocked", + block_reason: backlogAssessment.reason || "queue_backlog_throttle", + ...(backlogEvidencePayload ? { evidence: backlogEvidencePayload } : {}), + score_breakdown: { + llm_score: selected.llmScore, + impact_signal: selected.impactSignal, + penalties: selected.penalties, + ema_success: selected.emaSuccess, + ema_user_accept: selected.emaUserAccept, + engine_idea_prior_score: selected.engineIdeaPriorScore, + engine_idea_novelty_score: selected.engineIdeaNoveltyScore, + engine_idea_novelty_bonus: selected.engineIdeaNoveltyBonus, + engine_idea_sample_count: selected.engineIdeaSampleCount, + engine_source_prior_score: selected.engineSourcePriorScore, + engine_source_novelty_score: selected.engineSourceNoveltyScore, + engine_source_novelty_bonus: selected.engineSourceNoveltyBonus, + engine_source_sample_count: selected.engineSourceSampleCount, + engine_source_trust_score: selected.engineSourceTrustScore, + engine_source_freshness_score: selected.engineSourceFreshnessScore, + engine_source_curation_status: selected.engineSourceCurationStatus, + engine_source_curation_reason: selected.engineSourceCurationReason, + engine_source_trust_boost: selected.engineSourceTrustBoost, + explore_rate_configured: adaptiveExplore.baseRate, + effective_explore_rate: adaptiveExplore.effectiveRate, + explore_rate_adjustment: adaptiveExplore.adjustment, + final_score: selected.finalScore, + selection_strategy: selectedStrategy, + selection_roll: selection.roll, + }, + }, + llmCalls, + }); + outcomeDetail = "queue_backlog_throttled"; + return; + } + if (selected.candidate.requires_user_input) { this.setPhase("record_blocked_requires_input"); await this.postObjective({ @@ -3737,6 +4393,7 @@ export class RemoteBuddyAutonomousEngine { risk_level: selected.candidate.risk_level, status: "blocked", block_reason: "requires_user_input", + ...(backlogEvidencePayload ? { evidence: backlogEvidencePayload } : {}), score_breakdown: { llm_score: selected.llmScore, impact_signal: selected.impactSignal, @@ -3858,6 +4515,7 @@ export class RemoteBuddyAutonomousEngine { risk_level: selected.candidate.risk_level, status: "failed", block_reason: "request_enqueue_failed", + ...(backlogEvidencePayload ? { evidence: backlogEvidencePayload } : {}), }, llmCalls, }); @@ -3885,6 +4543,7 @@ export class RemoteBuddyAutonomousEngine { risk_level: selected.candidate.risk_level, status: "dispatched", request_id: requestId, + ...(backlogEvidencePayload ? { evidence: backlogEvidencePayload } : {}), score_breakdown: { llm_score: selected.llmScore, impact_signal: selected.impactSignal, diff --git a/apps/remotebuddy/src/startup/checklist.ts b/apps/remotebuddy/src/startup/checklist.ts index 4af7805..0c7d6bf 100644 --- a/apps/remotebuddy/src/startup/checklist.ts +++ b/apps/remotebuddy/src/startup/checklist.ts @@ -4,6 +4,8 @@ * and optionally blocks job dispatch until a synthetic probe completes. */ export const STARTUP_FAILURE_CODES = { + BUN_VERSION_UNSUPPORTED: "startup.bun_version_unsupported", + DOCKER_VERSION_UNSUPPORTED: "startup.docker_version_unsupported", MERGE_IN_PROGRESS: "startup.merge_in_progress", REPO_DIRTY: "startup.repo_dirty", ALERTS_ACTIVE: "startup.alerts_active", @@ -16,7 +18,13 @@ export type StartupFailureCode = type StartupCheckStatus = "pass" | "fail"; -export type StartupCheckCategory = "repo" | "alerts" | "synthetic" | "dispatch"; +export type StartupCheckCategory = + | "runtime" + | "infrastructure" + | "repo" + | "alerts" + | "synthetic" + | "dispatch"; export interface StartupChecklistOptions { syntheticMaxLatencyMs?: number; @@ -79,12 +87,48 @@ export interface StartupChecklistResult { history: StartupCheckRecord[]; } +export interface StartupTelemetryPhaseEvent { + type: "startup_phase"; + code: StartupFailureCode; + category: StartupCheckCategory; + step: number; + status: StartupCheckStatus; + detail: string; + startedAtMs: number; + endedAtMs: number; + durationMs: number; + error?: { + message: string; + raw?: unknown; + stack?: string; + }; +} + +export interface StartupTelemetryUnknownFailureEvent { + type: "startup_unknown_failure"; + code: StartupFailureCode; + phase: string; + step: number; + whenMs: number; + error: { + message: string; + raw?: unknown; + }; +} + +export type StartupTelemetryEvent = + | StartupTelemetryPhaseEvent + | StartupTelemetryUnknownFailureEvent; + export interface StartupChecklistContext { describeRepo(): Promise; listFiringAlerts(): Promise; syntheticTester: SyntheticStartupTester; + readBunVersion(): Promise; + readDockerVersion(): Promise; now?: () => number; log?: (entry: StartupCheckRecord) => void; + telemetry?: (event: StartupTelemetryEvent) => void; } type StartupCheckDefinition = { @@ -111,9 +155,123 @@ const DEFAULT_SYNTHETIC_PROBE = "probe.remote_startup"; const DISPATCH_CHECK_LABEL = "Job dispatch must succeed."; const DISPATCH_CHECK_ACTION = "Inspect RemoteBuddy + WorkerPals logs, repair dependencies, then rerun dispatch."; +const MIN_BUN_VERSION = "1.1.0"; +const MIN_DOCKER_VERSION = "25.0.0"; + +const normalizeVersion = (value: string | null | undefined): string => + typeof value === "string" ? value.trim() : ""; + +const parseVersionParts = (value: string): number[] => + normalizeVersion(value) + .split(/[^\d]+/g) + .filter(Boolean) + .map((part) => { + const asNumber = Number(part); + return Number.isFinite(asNumber) ? asNumber : 0; + }); + +const compareVersions = (actual: string, minimum: string): number => { + const actualParts = parseVersionParts(actual); + const minimumParts = parseVersionParts(minimum); + if (actualParts.length === 0 || minimumParts.length === 0) return 0; + const maxLength = Math.max(actualParts.length, minimumParts.length); + for (let i = 0; i < maxLength; i += 1) { + const a = actualParts[i] ?? 0; + const b = minimumParts[i] ?? 0; + if (a > b) return 1; + if (a < b) return -1; + } + return 0; +}; const defaultChecks = Object.freeze( [ + { + code: STARTUP_FAILURE_CODES.BUN_VERSION_UNSUPPORTED, + label: `Bun runtime must be >= ${MIN_BUN_VERSION}.`, + action: "Upgrade to Bun 1.1+ (`bun upgrade` or reinstall the runtime).", + category: "runtime", + run: async (ctx) => { + let versionValue: string; + try { + versionValue = await ctx.readBunVersion(); + } catch (error) { + if (error instanceof Error) { + error.message = `Bun version probe failed: ${error.message}`; + throw error; + } + throw new Error( + `Bun version probe failed: ${typeof error === "string" ? error : "unknown error"}`, + ); + } + const version = normalizeVersion(versionValue); + if (!version) { + return { + ok: false, + detail: `Bun runtime version not detected; minimum supported version is ${MIN_BUN_VERSION}.`, + }; + } + if (parseVersionParts(version).length === 0) { + return { + ok: false, + detail: `Unable to parse Bun runtime version "${version}".`, + }; + } + if (compareVersions(version, MIN_BUN_VERSION) < 0) { + return { + ok: false, + detail: `Bun runtime ${version} detected; minimum supported version is ${MIN_BUN_VERSION}.`, + }; + } + return { + ok: true, + detail: `Bun runtime ${version} satisfies minimum ${MIN_BUN_VERSION}.`, + }; + }, + }, + { + code: STARTUP_FAILURE_CODES.DOCKER_VERSION_UNSUPPORTED, + label: `Docker Engine must be >= ${MIN_DOCKER_VERSION}.`, + action: "Upgrade Docker Engine/Desktop to a supported version (25.x or newer) before dispatch.", + category: "infrastructure", + run: async (ctx) => { + let versionValue: string; + try { + versionValue = await ctx.readDockerVersion(); + } catch (error) { + if (error instanceof Error) { + error.message = `Docker version probe failed: ${error.message}`; + throw error; + } + throw new Error( + `Docker version probe failed: ${typeof error === "string" ? error : "unknown error"}`, + ); + } + const version = normalizeVersion(versionValue); + if (!version) { + return { + ok: false, + detail: `Docker version not detected; minimum supported version is ${MIN_DOCKER_VERSION}.`, + }; + } + if (parseVersionParts(version).length === 0) { + return { + ok: false, + detail: `Unable to parse Docker version "${version}".`, + }; + } + if (compareVersions(version, MIN_DOCKER_VERSION) < 0) { + return { + ok: false, + detail: `Docker version ${version} detected; minimum supported version is ${MIN_DOCKER_VERSION}.`, + }; + } + return { + ok: true, + detail: `Docker version ${version} satisfies minimum ${MIN_DOCKER_VERSION}.`, + }; + }, + }, { code: STARTUP_FAILURE_CODES.MERGE_IN_PROGRESS, label: "Git merge or rebase must be resolved.", @@ -234,6 +392,26 @@ export const STARTUP_CHECK_STRUCTURE = Object.freeze( const nowMs = (ctx: StartupChecklistContext) => ctx.now ? ctx.now() : Date.now(); +const emitPhaseTelemetry = ( + ctx: StartupChecklistContext, + event: Omit, +) => { + ctx.telemetry?.({ + type: "startup_phase", + ...event, + }); +}; + +const emitUnknownFailureTelemetry = ( + ctx: StartupChecklistContext, + event: Omit, +) => { + ctx.telemetry?.({ + type: "startup_unknown_failure", + ...event, + }); +}; + const memoizeContext = ( ctx: StartupChecklistContext, ): StartupChecklistContext => { @@ -263,11 +441,13 @@ export class StartupChecklist { let status: StartupCheckStatus = "pass"; let detail = check.label; let failureErrorMessage: string | undefined; + let thrownError: unknown; try { const outcome = await check.run(ctx, options); status = outcome.ok ? "pass" : "fail"; detail = outcome.detail ?? check.label; } catch (error) { + thrownError = error; status = "fail"; detail = error instanceof Error @@ -297,6 +477,35 @@ export class StartupChecklist { }; history.push(record); ctx.log?.(record); + emitPhaseTelemetry(ctx, { + code: check.code, + category: check.category, + step, + status, + detail, + startedAtMs, + endedAtMs, + durationMs, + error: thrownError + ? { + message: detail, + raw: thrownError, + stack: thrownError instanceof Error ? thrownError.stack : undefined, + } + : undefined, + }); + if (thrownError) { + emitUnknownFailureTelemetry(ctx, { + code: check.code, + phase: check.label, + step, + whenMs: endedAtMs, + error: { + message: detail, + raw: thrownError, + }, + }); + } if (status === "fail") { return { ok: false, @@ -356,8 +565,19 @@ export const gateDispatchWithStartupPreflight = async ( }; result.history.push(successRecord); ctx.log?.(successRecord); + emitPhaseTelemetry(ctx, { + code: STARTUP_FAILURE_CODES.DISPATCH_FAILED, + category: "dispatch", + step: dispatchStep, + status: "pass", + detail: successRecord.detail, + startedAtMs, + endedAtMs, + durationMs, + }); return result; } catch (error) { + const rawError = error; const errorMessage = error instanceof Error ? error.message @@ -383,6 +603,31 @@ export const gateDispatchWithStartupPreflight = async ( }; ctx.log?.(failureRecord); const history = [...result.history, failureRecord]; + emitPhaseTelemetry(ctx, { + code: STARTUP_FAILURE_CODES.DISPATCH_FAILED, + category: "dispatch", + step: dispatchStep, + status: "fail", + detail, + startedAtMs, + endedAtMs, + durationMs, + error: { + message: detail, + raw: rawError, + stack: error instanceof Error ? error.stack : undefined, + }, + }); + emitUnknownFailureTelemetry(ctx, { + code: STARTUP_FAILURE_CODES.DISPATCH_FAILED, + phase: DISPATCH_CHECK_LABEL, + step: dispatchStep, + whenMs: endedAtMs, + error: { + message: detail, + raw: rawError, + }, + }); return { ok: false, failure: {