diff --git a/apps/client/src/components/SystemPane.tsx b/apps/client/src/components/SystemPane.tsx index 625bb21..ab4a9f0 100644 --- a/apps/client/src/components/SystemPane.tsx +++ b/apps/client/src/components/SystemPane.tsx @@ -13,6 +13,7 @@ import { clip, formatDuration, formatPercent, + formatTokenCount, prettyTs, queueValue, relativeMs, @@ -66,6 +67,14 @@ function serializeUnknown(value: unknown, maxChars = 320): string { } } +function serviceLabel(service: string): string { + const normalized = service.trim().toLowerCase(); + if (normalized === "localbuddy") return "LocalBuddy"; + if (normalized === "remotebuddy") return "RemoteBuddy"; + if (normalized === "workerpals") return "WorkerPals"; + return service || "Unknown"; +} + function initialDraftForQuestion(question: AutonomyQuestionRow): string { const schema = question.expectedAnswerSchema ?? {}; if (question.questionType === "single_choice") { @@ -197,6 +206,7 @@ export function SystemPane({ const recentEvents = useMemo(() => events.slice(-40).reverse(), [events]); const requestSlo = systemSummary.slo?.requests; const jobSlo = systemSummary.slo?.jobs; + const llmUsage = systemSummary.llmUsage; const autonomyOps = systemSummary.autonomy ?? autonomyInsights.opsSummary; const safety = autonomyOps?.safetyState ?? null; const evaluator = autonomyOps?.latestEvaluatorScorecard ?? autonomyInsights.latestEvaluatorScorecard; @@ -220,6 +230,28 @@ export function SystemPane({ .slice(0, 20), [autonomyQuestions], ); + const llmUsageRows = useMemo(() => { + const defaults = ["localbuddy", "remotebuddy", "workerpals"]; + const byService = new Map((llmUsage?.services ?? []).map((row) => [row.service.toLowerCase(), row])); + for (const service of defaults) { + if (!byService.has(service)) { + byService.set(service, { + service, + promptTokens: 0, + completionTokens: 0, + totalTokens: 0, + callCount: 0, + avgTokensPerCall: null, + estimatedCallCount: 0, + lastCallAt: null, + }); + } + } + return [...byService.values()].sort((a, b) => { + if (b.totalTokens !== a.totalTokens) return b.totalTokens - a.totalTokens; + return serviceLabel(a.service).localeCompare(serviceLabel(b.service)); + }); + }, [llmUsage]); useEffect(() => { setQuestionDrafts((prev) => { @@ -347,6 +379,13 @@ export function SystemPane({ tone="warning" theme={theme} /> + 0 ? "accent" : "warning"} + theme={theme} + /> + + + + LLM Usage + + + {llmUsage?.windowHours ?? 24}h window | {llmUsage?.callCount ?? 0} calls + + + + total {formatTokenCount(llmUsage?.totalTokens)} tokens | prompt{" "} + {formatTokenCount(llmUsage?.promptTokens)} | completion{" "} + {formatTokenCount(llmUsage?.completionTokens)} | avg{" "} + {formatTokenCount(llmUsage?.avgTokensPerCall)} per call + + {llmUsage && llmUsage.estimatedCallCount > 0 ? ( + + {llmUsage.estimatedCallCount} call{llmUsage.estimatedCallCount === 1 ? "" : "s"} using estimated token counts. + + ) : null} + + {llmUsageRows.map((row) => ( + + + + {serviceLabel(row.service)} + + 0 ? theme.accent : theme.textMuted, + fontFamily: theme.fontMono, + }, + ]} + > + {row.callCount} call{row.callCount === 1 ? "" : "s"} + + + 0 ? theme.accent : theme.textMuted, + fontFamily: theme.fontSans, + }, + ]} + > + {formatTokenCount(row.totalTokens)} + + + avg {formatTokenCount(row.avgTokensPerCall)} / call + + + in {formatTokenCount(row.promptTokens)} | out {formatTokenCount(row.completionTokens)} + + + {row.lastCallAt ? `last call ${relativeMs(row.lastCallAt)}` : "no usage recorded"} + + {row.estimatedCallCount > 0 ? ( + + estimated {row.estimatedCallCount} call{row.estimatedCallCount === 1 ? "" : "s"} + + ) : null} + + ))} + + + @@ -1059,6 +1175,7 @@ const styles = StyleSheet.create({ marginBottom: 8, }, systemTitle: { fontSize: 14, fontWeight: "700" }, + tokenValue: { fontSize: 24, fontWeight: "700", marginTop: 6 }, systemDetail: { fontSize: 12, marginTop: 7 }, systemMeta: { fontSize: 11, marginTop: 5 }, workerPanel: { diff --git a/apps/client/src/components/dashboardFormatters.ts b/apps/client/src/components/dashboardFormatters.ts index bb463e0..7c008d6 100644 --- a/apps/client/src/components/dashboardFormatters.ts +++ b/apps/client/src/components/dashboardFormatters.ts @@ -57,6 +57,20 @@ export function formatDuration(valueMs: number | null | undefined): string { return `${Math.round(valueMs / 1000)}s`; } +function trimTrailingZeroes(value: string): string { + return value.replace(/\.0+$/, "").replace(/(\.\d*[1-9])0+$/, "$1"); +} + +export function formatTokenCount(value: number | null | undefined): string { + if (typeof value !== "number" || !Number.isFinite(value) || value < 0) return "--"; + if (value < 1_000) return Math.round(value).toLocaleString(); + if (value < 1_000_000) return `${trimTrailingZeroes((value / 1_000).toFixed(value < 10_000 ? 1 : 0))}k`; + if (value < 1_000_000_000) { + return `${trimTrailingZeroes((value / 1_000_000).toFixed(value < 10_000_000 ? 1 : 0))}M`; + } + return value.toExponential(2).replace("+", ""); +} + export function formatEtaMs(valueMs: number | null | undefined): string { if (typeof valueMs !== "number" || !Number.isFinite(valueMs) || valueMs <= 0) return "now"; if (valueMs < 1_000) return `${Math.round(valueMs)}ms`; diff --git a/apps/client/src/lib/pushpalsApi.ts b/apps/client/src/lib/pushpalsApi.ts index 2103208..a09cdc6 100644 --- a/apps/client/src/lib/pushpalsApi.ts +++ b/apps/client/src/lib/pushpalsApi.ts @@ -460,6 +460,28 @@ export interface SystemRepoSummary { refreshedAt?: string; } +export interface LlmUsageServiceSummary { + service: string; + promptTokens: number; + completionTokens: number; + totalTokens: number; + callCount: number; + avgTokensPerCall: number | null; + estimatedCallCount: number; + lastCallAt: string | null; +} + +export interface LlmUsageSummary { + windowHours: number; + promptTokens: number; + completionTokens: number; + totalTokens: number; + callCount: number; + avgTokensPerCall: number | null; + estimatedCallCount: number; + services: LlmUsageServiceSummary[]; +} + export interface SystemStatusSummary { workers?: { total: number; online: number; busy: number; idle: number }; queues?: { @@ -471,6 +493,7 @@ export interface SystemStatusSummary { requests?: RequestSloSummary; jobs?: JobSloSummary; }; + llmUsage?: LlmUsageSummary; repo?: SystemRepoSummary; autonomy?: AutonomyOpsSummary; ts?: string; @@ -746,6 +769,7 @@ export async function fetchSystemStatus( workers: payload.workers, queues: payload.queues, slo: payload.slo, + llmUsage: payload.llmUsage, autonomy: payload.autonomy, repo: payload.repo, ts: payload.ts, diff --git a/apps/localbuddy/src/localbuddy_main.ts b/apps/localbuddy/src/localbuddy_main.ts index 269bdee..a6c6de5 100644 --- a/apps/localbuddy/src/localbuddy_main.ts +++ b/apps/localbuddy/src/localbuddy_main.ts @@ -407,6 +407,8 @@ class LocalBuddyServer { endpoint: llmCfg.endpoint, model: llmCfg.model, apiKey: llmCfg.apiKey, + serverUrl: this.server, + authToken: this.authToken, }); console.log(`[LocalBuddy] LLM client initialized`); } diff --git a/apps/remotebuddy/src/autonomous_engine.adjacent_possible.test.ts b/apps/remotebuddy/src/autonomous_engine.adjacent_possible.test.ts new file mode 100644 index 0000000..1579239 --- /dev/null +++ b/apps/remotebuddy/src/autonomous_engine.adjacent_possible.test.ts @@ -0,0 +1,247 @@ +import { describe, expect, test } from "bun:test"; +import { + adjacent_possible, + type EngineCommitHistoryHint, + type EngineOpportunityGap, +} from "./autonomous_engine"; + +const queueMotif: EngineCommitHistoryHint = { + motif_id: "queue_backpressure", + label: "Queue backpressure and throughput", + count: 4, + signal: 0.82, + objective_ids: ["workforce_scaling", "reliable_autonomous_delivery"], + gap_ids: ["workforce_throughput_gap", "delivery_reliability_gap"], + sample_subjects: ["queue saturation fix"], +}; + +const throughputGap: EngineOpportunityGap = { + id: "workforce_throughput_gap", + label: "Workforce throughput gap", + score: 0.76, + evidence: [], +}; + +const deliveryReliabilityGap: EngineOpportunityGap = { + id: "delivery_reliability_gap", + label: "Delivery reliability gap", + score: 0.6, + evidence: [], +}; + +const clamp01 = (value: number): number => Math.min(1, Math.max(0, value)); + +describe("adjacent_possible", () => { + test("recombines motifs with bottlenecks and emits telemetry", () => { + const result = adjacent_possible({ + hints: [queueMotif], + gaps: [throughputGap], + }); + + expect(result.ideas).toHaveLength(1); + const idea = result.ideas[0]; + expect(idea.id).toBe("adjacent_possible_queue_backpressure_workforce_throughput_gap"); + expect(idea.motif_label).toContain("Queue backpressure"); + expect(idea.gap_label).toContain("Workforce throughput"); + expect(idea.candidate_shape.component_area).toBe("apps/server"); + expect(idea.evidence.some((entry) => entry.startsWith("gap_score="))).toBe(true); + const pairTelemetry = result.telemetry.find( + (event) => + event.step === "pair_attempt" && + event.motif_id === queueMotif.motif_id && + event.gap_id === throughputGap.id, + ); + expect(pairTelemetry?.accepted).toBe(true); + expect(pairTelemetry?.metrics?.score).toBeGreaterThan(0.5); + expect(pairTelemetry?.attempt_id).toBe( + `${queueMotif.motif_id}::${throughputGap.id}`, + ); + const emissionTelemetry = result.telemetry.find( + (event) => + event.step === "idea_emitted" && + event.motif_id === queueMotif.motif_id && + event.gap_id === throughputGap.id, + ); + expect(emissionTelemetry?.metrics?.rank).toBe(1); + expect(emissionTelemetry?.attempt_id).toBe(pairTelemetry?.attempt_id); + }); + + test("guardrails drop unsupported motif-gap combinations", () => { + const governanceGap: EngineOpportunityGap = { + id: "governance_gap", + label: "Governance gap", + score: 0.7, + evidence: [], + }; + const result = adjacent_possible({ + hints: [queueMotif], + gaps: [governanceGap], + }); + + expect(result.ideas).toHaveLength(0); + const rejectionTelemetry = result.telemetry.find( + (event) => + event.step === "pair_attempt" && + event.reason === "gap_not_supported" && + event.gap_id === governanceGap.id, + ); + expect(rejectionTelemetry?.accepted).toBe(false); + expect(rejectionTelemetry?.attempt_id).toBe( + `${queueMotif.motif_id}::${governanceGap.id}`, + ); + const guardrailEvent = result.telemetry.find( + (event) => event.step === "guardrail_drop" && event.reason === "gap_not_supported", + ); + expect(guardrailEvent).toBeTruthy(); + expect(guardrailEvent?.accepted).toBe(false); + expect(guardrailEvent?.attempt_id).toBe(rejectionTelemetry?.attempt_id); + }); + + test("enforces motif and gap thresholds before pairing", () => { + const weakMotif: EngineCommitHistoryHint = { + ...queueMotif, + motif_id: "queue_backpressure_weak", + signal: 0.05, + }; + const weakGap: EngineOpportunityGap = { + id: "activation_gap", + label: "Activation gap", + score: 0.1, + evidence: [], + }; + const result = adjacent_possible({ + hints: [weakMotif], + gaps: [throughputGap, weakGap], + minMotifSignal: 0.25, + minGapScore: 0.5, + }); + + expect(result.ideas).toHaveLength(0); + expect( + result.telemetry.some((event) => event.reason === "motif_signal_below_threshold"), + ).toBe(true); + expect( + result.telemetry.some((event) => event.reason === "gap_score_below_threshold"), + ).toBe(true); + expect(result.telemetry.filter((event) => event.step === "pair_attempt")).toHaveLength(0); + }); + + test("rejects malformed motif and gap metadata with telemetry reasons", () => { + const invalidMotifId: EngineCommitHistoryHint = { + ...queueMotif, + motif_id: " ", + label: queueMotif.label, + }; + const invalidMotifLabel: EngineCommitHistoryHint = { + ...queueMotif, + motif_id: "queue_backpressure_label_missing", + label: " ", + }; + const invalidGapId: EngineOpportunityGap = { + ...throughputGap, + id: " ", + label: "Throughput gap missing id", + }; + const invalidGapLabel: EngineOpportunityGap = { + ...throughputGap, + id: "activation_gap_with_invalid_label", + label: " ", + }; + const result = adjacent_possible({ + hints: [invalidMotifId, invalidMotifLabel], + gaps: [invalidGapId, invalidGapLabel], + }); + + expect(result.ideas).toHaveLength(0); + const reasons = result.telemetry + .filter((event) => Boolean(event.reason)) + .map((event) => event.reason); + expect(reasons).toContain("invalid_motif_id"); + expect(reasons).toContain("invalid_motif_label"); + expect(reasons).toContain("invalid_gap_id"); + expect(reasons).toContain("invalid_gap_label"); + }); + + test("deduplicates gap inputs before pairing and still emits truncation telemetry", () => { + const duplicateGap: EngineOpportunityGap = { + ...throughputGap, + label: throughputGap.label, + score: throughputGap.score - 0.1, + }; + const result = adjacent_possible({ + hints: [queueMotif], + gaps: [throughputGap, deliveryReliabilityGap, duplicateGap], + maxIdeas: 1, + }); + + expect(result.ideas).toHaveLength(1); + const pairAttempts = result.telemetry.filter((event) => event.step === "pair_attempt"); + expect(pairAttempts).toHaveLength(2); + expect(result.telemetry.some((event) => event.reason === "duplicate_pair")).toBe(false); + const truncationTelemetry = result.telemetry.find( + (event) => + event.step === "idea_truncated" && + event.gap_id === deliveryReliabilityGap.id && + event.reason === "max_ideas_limit", + ); + expect(truncationTelemetry).toBeTruthy(); + }); + + test("deduplicates motif hints and preserves strongest signal plus coverage", () => { + const weakHint: EngineCommitHistoryHint = { + ...queueMotif, + signal: 0.31, + count: 8, + gap_ids: ["workforce_throughput_gap"], + }; + const strongHint: EngineCommitHistoryHint = { + ...queueMotif, + signal: 0.9, + count: 2, + gap_ids: ["delivery_reliability_gap"], + }; + const result = adjacent_possible({ + hints: [weakHint, strongHint], + gaps: [throughputGap], + }); + + expect(result.ideas).toHaveLength(1); + const motifEvents = result.telemetry.filter( + (event) => event.step === "motif_screen" && event.motif_id === queueMotif.motif_id, + ); + expect(motifEvents).toHaveLength(1); + expect(motifEvents[0]?.metrics?.signal).toBeCloseTo(0.9, 2); + const idea = result.ideas[0]; + expect(idea.evidence).toContain("coverage_boost=0.08"); + }); + + test("scores omit motif objective boosts for unrelated gaps", () => { + const lowReliabilityGap: EngineOpportunityGap = { + ...deliveryReliabilityGap, + score: 0.31, + label: "Delivery reliability bottleneck", + }; + const result = adjacent_possible({ + hints: [queueMotif], + gaps: [lowReliabilityGap], + }); + + expect(result.ideas).toHaveLength(1); + const idea = result.ideas[0]; + const evidenceMap = idea.evidence.reduce>((acc, entry) => { + const [key, raw] = entry.split("="); + if (!key || typeof raw === "undefined") return acc; + const parsed = Number(raw); + if (!Number.isNaN(parsed)) acc[key] = parsed; + return acc; + }, {}); + expect(evidenceMap.objective_boost).toBeUndefined(); + const expectedScore = clamp01( + 0.5 * (evidenceMap.gap_score ?? 0) + + 0.3 * (evidenceMap.motif_signal ?? 0) + + 0.12 * (evidenceMap.motif_novelty ?? 0) + + (evidenceMap.coverage_boost ?? 0), + ); + expect(idea.score).toBeCloseTo(expectedScore, 5); + }); +}); diff --git a/apps/remotebuddy/src/autonomous_engine.opportunity_graph.test.ts b/apps/remotebuddy/src/autonomous_engine.opportunity_graph.test.ts new file mode 100644 index 0000000..27505e2 --- /dev/null +++ b/apps/remotebuddy/src/autonomous_engine.opportunity_graph.test.ts @@ -0,0 +1,186 @@ +import { describe, expect, test } from "bun:test"; +import { + extractQueueHealthMetrics, + parseJobFailureRateFromEvidence, + parseLatencyEvidenceInMs, + type QueueHealthMetric, +} from "./autonomous_engine"; + +describe("parseLatencyEvidenceInMs", () => { + test("parses queue latency expressed in milliseconds", () => { + expect(parseLatencyEvidenceInMs("queue_p95=250ms job_failure_rate=0.01")).toBe(250); + }); + + test("parses queue latency expressed in seconds", () => { + expect(parseLatencyEvidenceInMs("queue_p95=1.2s pending=4")).toBeCloseTo(1200); + }); + + test("parses queue latency expressed in microseconds (us)", () => { + const parsed = parseLatencyEvidenceInMs("queue_p95=500us"); + expect(parsed).not.toBeNull(); + expect(parsed).toBeCloseTo(0.5); + }); + + test("parses queue latency expressed in microseconds using µ", () => { + const parsed = parseLatencyEvidenceInMs("queue_p95=750µs job_failure_rate=0"); + expect(parsed).not.toBeNull(); + expect(parsed).toBeCloseTo(0.75); + }); + + test("parses queue latency expressed in microseconds using μ", () => { + const parsed = parseLatencyEvidenceInMs("queue_p95=100μs"); + expect(parsed).not.toBeNull(); + expect(parsed).toBeCloseTo(0.1); + }); +}); + +describe("parseJobFailureRateFromEvidence", () => { + test("parses decimal ratios directly", () => { + expect(parseJobFailureRateFromEvidence("job_failure_rate=0.07")).toBeCloseTo(0.07); + }); + + test("normalizes percentages to ratios", () => { + expect(parseJobFailureRateFromEvidence("job_failure_rate=7%")).toBeCloseTo(0.07); + }); + + test("treats bare integers above one as percentages", () => { + expect(parseJobFailureRateFromEvidence("job_failure_rate=7")).toBeCloseTo(0.07); + }); +}); + +describe("extractQueueHealthMetrics", () => { + test("returns no metrics when neither signals nor traits have queue evidence", () => { + expect(extractQueueHealthMetrics({ topSignals: [], stateTraits: [] })).toEqual([]); + }); + + test("prefers queue_health signals over supporting traits", () => { + const metrics = extractQueueHealthMetrics({ + topSignals: [ + { + signal_id: "sig_queue_high", + type: "queue_health", + value: 0.88, + evidence: "queue_p95=1500ms job_failure_rate=0.45", + }, + ], + stateTraits: [ + { + trait_id: "trait_queue_latency", + category: "weakness", + focus: "queue_latency", + score: 0.7, + evidence: "request queue p95=900ms job failure rate=0.12", + }, + ], + }); + + expect(metrics).toHaveLength(2); + const latency = metrics.find((entry) => entry.metric === "queue_latency_ms") as QueueHealthMetric; + expect(latency.value).toBe(1500); + expect(latency.sourceType).toBe("signal"); + + const failure = metrics.find((entry) => entry.metric === "job_failure_rate") as QueueHealthMetric; + expect(failure.value).toBeCloseTo(0.45); + expect(failure.sourceType).toBe("signal"); + }); + + test("selects the highest-severity metric when conflicting queue signals exist", () => { + const metrics = extractQueueHealthMetrics({ + topSignals: [ + { + signal_id: "sig_moderate", + type: "queue_health", + value: 0.4, + evidence: "queue_p95=800ms job_failure_rate=0.1", + }, + { + signal_id: "sig_severe", + type: "queue_health", + value: 0.92, + evidence: "queue_p95=2200ms job_failure_rate=0.4", + }, + ], + stateTraits: [], + }); + + const latency = metrics.find((entry) => entry.metric === "queue_latency_ms") as QueueHealthMetric; + expect(latency.value).toBe(2200); + expect(latency.sourceId).toBe("sig_severe"); + + const failure = metrics.find((entry) => entry.metric === "job_failure_rate") as QueueHealthMetric; + expect(failure.value).toBeCloseTo(0.4); + expect(failure.sourceId).toBe("sig_severe"); + }); + + test("selection is deterministic regardless of signal ordering", () => { + const scenario = [ + { + signal_id: "sig_b", + type: "queue_health", + value: 0.5, + evidence: "queue_p95=900ms job_failure_rate=0.2", + }, + { + signal_id: "sig_a", + type: "queue_health", + value: 0.5, + evidence: "queue_p95=900ms job_failure_rate=0.2", + }, + ]; + + const metricsA = extractQueueHealthMetrics({ topSignals: scenario, stateTraits: [] }); + const metricsB = extractQueueHealthMetrics({ topSignals: [...scenario].reverse(), stateTraits: [] }); + + expect(metricsA).toEqual(metricsB); + const latency = metricsA.find((entry) => entry.metric === "queue_latency_ms") as QueueHealthMetric; + expect(latency.sourceId).toBe("sig_a"); + }); + + test("ignores traits without queue context even if they contain latency strings", () => { + const metrics = extractQueueHealthMetrics({ + topSignals: [], + stateTraits: [ + { + trait_id: "trait_general_latency", + category: "weakness", + focus: "latency", + score: 0.4, + evidence: "latency=2000ms job_failure_rate=0.35", + }, + { + trait_id: "trait_queue_latency", + category: "weakness", + focus: "queue_latency", + score: 0.7, + evidence: "queue_p95=900ms job_failure_rate=0.12", + }, + ], + }); + + expect(metrics).toHaveLength(2); + expect(metrics.every((metric) => metric.sourceId === "trait_queue_latency")).toBe(true); + }); + + test("uses trait evidence that explicitly references queue semantics", () => { + const metrics = extractQueueHealthMetrics({ + topSignals: [], + stateTraits: [ + { + trait_id: "trait_alert_latency", + category: "weakness", + focus: "alert_latency", + score: 0.55, + evidence: "Queue p95=1800ms job_failure_rate=0.25", + }, + ], + }); + + const latency = metrics.find((entry) => entry.metric === "queue_latency_ms") as QueueHealthMetric; + expect(latency.value).toBe(1800); + expect(latency.sourceType).toBe("trait"); + + const failure = metrics.find((entry) => entry.metric === "job_failure_rate") as QueueHealthMetric; + expect(failure.value).toBeCloseTo(0.25); + expect(failure.sourceType).toBe("trait"); + }); +}); diff --git a/apps/remotebuddy/src/autonomous_engine.ts b/apps/remotebuddy/src/autonomous_engine.ts index 17c3523..6c15515 100644 --- a/apps/remotebuddy/src/autonomous_engine.ts +++ b/apps/remotebuddy/src/autonomous_engine.ts @@ -239,6 +239,15 @@ const ENGINE_EXPLORE_RATE_MIN = 0.1; const ENGINE_EXPLORE_RATE_MAX = 0.6; const ENGINE_NOVELTY_SAMPLE_SATURATION = 12; const ENGINE_EXPLORE_POOL_MAX = 3; +const ADJACENT_POSSIBLE_DEFAULT_MAX_IDEAS = 3; +const ADJACENT_POSSIBLE_MAX_IDEAS = 5; +const ADJACENT_POSSIBLE_MIN_SIGNAL = 0.2; +const ADJACENT_POSSIBLE_MIN_GAP_SCORE = 0.25; +const ADJACENT_POSSIBLE_NOVELTY_DIVISOR = ENGINE_NOVELTY_SAMPLE_SATURATION; +const ADJACENT_POSSIBLE_GAP_WEIGHT = 0.5; +const ADJACENT_POSSIBLE_SIGNAL_WEIGHT = 0.3; +const ADJACENT_POSSIBLE_NOVELTY_WEIGHT = 0.12; +const ADJACENT_POSSIBLE_COVERAGE_BOOST = 0.08; const AUTO_INGEST_SEED_PATTERNS: Array<{ algorithm: string; whenToUse: string; @@ -736,6 +745,35 @@ export interface EngineCommitHistoryHint { sample_subjects: string[]; } +export interface AdjacentPossibleIdea { + id: string; + motif_id: string; + gap_id: string; + motif_label: string; + gap_label: string; + score: number; + summary: string; + hypothesis: string; + evidence: string[]; + candidate_shape: EngineCandidateShape; +} + +export interface AdjacentPossibleTelemetryEvent { + step: + | "motif_screen" + | "gap_screen" + | "pair_attempt" + | "guardrail_drop" + | "idea_emitted" + | "idea_truncated"; + motif_id?: string; + gap_id?: string; + attempt_id?: string; + accepted: boolean; + reason?: string; + metrics?: Record; +} + export interface EngineInspirationSourcePattern { id: string; source_type: string; @@ -777,6 +815,194 @@ type EngineIdeaBlueprint = { candidate_shape: EngineCandidateShape; }; +export type QueueHealthMetricKind = "queue_latency_ms" | "job_failure_rate"; +export type QueueHealthMetricSource = "signal" | "trait"; + +export interface QueueHealthMetric { + metric: QueueHealthMetricKind; + value: number; + sourceType: QueueHealthMetricSource; + sourceId: string; + evidence: string; +} + +const MICRO_SYMBOL_REGEX = /[\u00B5\u03BC]/gi; +const LATENCY_UNIT_PATTERN = "(?:ms|milliseconds?|us|usec|microseconds?|microsecond|s|sec(?:ond)?s?)"; +const LATENCY_PATTERNS: RegExp[] = [ + new RegExp( + `queue[_ ]?p95\\s*[:=]?\\s*([+-]?\\d+(?:[.,]\\d+)?)(?:\\s*(${LATENCY_UNIT_PATTERN}))?`, + "i", + ), + new RegExp( + `latency\\s*[:=]?\\s*([+-]?\\d+(?:[.,]\\d+)?)(?:\\s*(${LATENCY_UNIT_PATTERN}))?`, + "i", + ), + new RegExp( + `p95\\s*[:=]?\\s*([+-]?\\d+(?:[.,]\\d+)?)(?:\\s*(${LATENCY_UNIT_PATTERN}))?`, + "i", + ), +]; +const LATENCY_UNIT_TO_MS: Record = { + ms: 1, + millisecond: 1, + milliseconds: 1, + s: 1000, + sec: 1000, + secs: 1000, + second: 1000, + seconds: 1000, + us: 0.001, + usec: 0.001, + microsecond: 0.001, + microseconds: 0.001, +}; +const SOURCE_RANK: Record = { signal: 2, trait: 1 }; +const QUEUE_TRAIT_KEYWORDS = ["queue", "backpressure", "backlog", "dispatch", "pending"]; +const QUEUE_EVIDENCE_KEYWORDS = ["queue", "backpressure", "backlog", "dispatch", "pending"]; + +export function parseLatencyEvidenceInMs(evidence: string | null | undefined): number | null { + const text = asString(evidence); + if (!text) return null; + const normalized = text.normalize("NFKC").replace(MICRO_SYMBOL_REGEX, "u"); + for (const pattern of LATENCY_PATTERNS) { + const match = normalized.match(pattern); + if (!match) continue; + const rawValue = match[1]?.replace(/,/g, ""); + const parsedValue = Number(rawValue); + if (!Number.isFinite(parsedValue)) continue; + const rawUnit = (match[2] || "ms").replace(MICRO_SYMBOL_REGEX, "u"); + const unit = rawUnit.replace(/[^a-z]/gi, "").toLowerCase(); + const multiplier = LATENCY_UNIT_TO_MS[unit] ?? 1; + return parsedValue * multiplier; + } + return null; +} + +export function extractQueueHealthMetrics(params: { + topSignals?: Snapshot["top_signals"]; + stateTraits?: Snapshot["state_traits"]; +}): QueueHealthMetric[] { + const topSignals = Array.isArray(params.topSignals) ? params.topSignals : []; + const stateTraits = Array.isArray(params.stateTraits) ? params.stateTraits : []; + const candidates: QueueHealthMetric[] = []; + const pushCandidate = ( + metric: QueueHealthMetricKind, + value: number | null, + sourceType: QueueHealthMetricSource, + sourceId: string, + evidence: string, + ): void => { + if (value === null || !Number.isFinite(value)) return; + candidates.push({ + metric, + value, + sourceType, + sourceId, + evidence, + }); + }; + + for (const signal of topSignals) { + if (asString(signal.type).toLowerCase() !== "queue_health") continue; + const evidence = asString(signal.evidence); + if (!evidence) continue; + const sourceId = asString(signal.signal_id) || "queue_signal"; + pushCandidate("queue_latency_ms", parseLatencyEvidenceInMs(evidence), "signal", sourceId, evidence); + pushCandidate( + "job_failure_rate", + parseJobFailureRateFromEvidence(evidence), + "signal", + sourceId, + evidence, + ); + } + + for (const trait of stateTraits) { + if (!isQueueRelevantTrait(trait)) continue; + const evidence = asString(trait.evidence); + if (!evidence) continue; + const sourceId = asString(trait.trait_id) || "queue_trait"; + pushCandidate("queue_latency_ms", parseLatencyEvidenceInMs(evidence), "trait", sourceId, evidence); + pushCandidate( + "job_failure_rate", + parseJobFailureRateFromEvidence(evidence), + "trait", + sourceId, + evidence, + ); + } + + const bestByMetric = new Map(); + for (const candidate of candidates) { + const current = bestByMetric.get(candidate.metric); + if (!current) { + bestByMetric.set(candidate.metric, candidate); + continue; + } + const currentRank = SOURCE_RANK[current.sourceType]; + const candidateRank = SOURCE_RANK[candidate.sourceType]; + if (candidateRank > currentRank) { + bestByMetric.set(candidate.metric, candidate); + continue; + } + if (candidateRank < currentRank) continue; + if (candidate.value > current.value) { + bestByMetric.set(candidate.metric, candidate); + continue; + } + if (candidate.value === current.value && candidate.sourceId.localeCompare(current.sourceId) < 0) { + bestByMetric.set(candidate.metric, candidate); + } + } + + return [...bestByMetric.values()].sort((a, b) => { + if (a.metric === b.metric) { + if (a.sourceType === b.sourceType) { + if (a.value === b.value) { + return a.sourceId.localeCompare(b.sourceId); + } + return b.value - a.value; + } + return SOURCE_RANK[b.sourceType] - SOURCE_RANK[a.sourceType]; + } + return a.metric.localeCompare(b.metric); + }); +} + +function includesKeyword(text: string, keywords: string[]): boolean { + if (!text) return false; + const normalized = text.toLowerCase(); + return keywords.some((keyword) => normalized.includes(keyword)); +} + +function isQueueRelevantTrait(trait: Snapshot["state_traits"][number]): boolean { + const traitId = asString(trait?.trait_id); + const focus = asString(trait?.focus); + if (includesKeyword(traitId, QUEUE_TRAIT_KEYWORDS) || includesKeyword(focus, QUEUE_TRAIT_KEYWORDS)) { + return true; + } + const evidence = asString(trait?.evidence); + if (!evidence) return false; + return includesKeyword(evidence, QUEUE_EVIDENCE_KEYWORDS); +} + +export function parseJobFailureRateFromEvidence(evidence: string | null | undefined): number | null { + const text = asString(evidence); + if (!text) return null; + const match = text.match( + /job[_ ]?failure[_ ]?rate\s*[:=]?\s*([+-]?\d+(?:[.,]\d+)?)(?:\s*(%|percent|pct))?/i, + ); + if (!match) return null; + const numericValue = Number(match[1].replace(/,/g, "")); + if (!Number.isFinite(numericValue)) return null; + const unit = match[2]?.toLowerCase() ?? ""; + const isPercentUnit = unit.includes("%") || unit.includes("percent") || unit.includes("pct"); + if (isPercentUnit || Math.abs(numericValue) > 1) { + return numericValue / 100; + } + return numericValue; +} + function asObject(value: unknown): Record { if (!value || typeof value !== "object" || Array.isArray(value)) return {}; return value as Record; @@ -1786,6 +2012,387 @@ function buildCommitHistoryBlocks(params: { .sort((a, b) => b.score - a.score); } +function cloneCandidateShape(shape: EngineCandidateShape): EngineCandidateShape { + return { + ...shape, + target_paths: [...shape.target_paths], + write_globs: [...shape.write_globs], + expected_validation: [...shape.expected_validation], + }; +} + +function isCandidateShapeComplete(shape: EngineCandidateShape): boolean { + return ( + Array.isArray(shape.target_paths) && + shape.target_paths.length > 0 && + Array.isArray(shape.write_globs) && + shape.write_globs.length > 0 && + Array.isArray(shape.expected_validation) && + shape.expected_validation.length > 0 + ); +} + +export function adjacent_possible(params: { + hints?: EngineCommitHistoryHint[]; + gaps?: EngineOpportunityGap[]; + maxIdeas?: number; + minMotifSignal?: number; + minGapScore?: number; +}): { ideas: AdjacentPossibleIdea[]; telemetry: AdjacentPossibleTelemetryEvent[] } { + const hints = Array.isArray(params.hints) ? params.hints : []; + const gaps = Array.isArray(params.gaps) ? params.gaps : []; + const configuredMax = Number.isFinite(params.maxIdeas) + ? Math.max( + 0, + Math.min(ADJACENT_POSSIBLE_MAX_IDEAS, Math.floor(Number(params.maxIdeas))), + ) + : ADJACENT_POSSIBLE_DEFAULT_MAX_IDEAS; + const maxIdeas = configuredMax; + const minSignal = clamp01( + Number.isFinite(params.minMotifSignal) + ? Number(params.minMotifSignal) + : ADJACENT_POSSIBLE_MIN_SIGNAL, + ); + const minGapScore = clamp01( + Number.isFinite(params.minGapScore) + ? Number(params.minGapScore) + : ADJACENT_POSSIBLE_MIN_GAP_SCORE, + ); + const telemetry: AdjacentPossibleTelemetryEvent[] = []; + const recordTelemetry = (event: AdjacentPossibleTelemetryEvent): void => { + telemetry.push(event); + }; + const formatEvidenceValue = (value: number): string => { + if (!Number.isFinite(value)) return "0"; + const normalized = value.toFixed(4); + return normalized.replace(/\.?0+$/, "") || "0"; + }; + const sanitizeIdList = (value: unknown): string[] => [...new Set(asStringArray(value))]; + const mergeUniqueStrings = (existing: string[], additions: string[]): string[] => { + if (additions.length === 0) return existing; + const seen = new Set(existing); + for (const value of additions) { + if (!seen.has(value)) seen.add(value); + } + return [...seen]; + }; + const buildAttemptId = (pair: { motifId: string; gapId: string }): string => + `${pair.motifId}::${pair.gapId}`; + const recordPairOutcome = ( + pair: { motifId: string; gapId: string }, + accepted: boolean, + details?: { reason?: string; metrics?: Record }, + ): string => { + const attemptId = buildAttemptId(pair); + recordTelemetry({ + step: "pair_attempt", + motif_id: pair.motifId, + gap_id: pair.gapId, + attempt_id: attemptId, + accepted, + ...(details?.reason ? { reason: details.reason } : {}), + ...(details?.metrics ? { metrics: details.metrics } : {}), + }); + return attemptId; + }; + const rejectPair = ( + pair: { motifId: string; gapId: string }, + reason: string, + metrics?: Record, + ): void => { + const attemptId = recordPairOutcome(pair, false, { reason, ...(metrics ? { metrics } : {}) }); + recordTelemetry({ + step: "guardrail_drop", + motif_id: pair.motifId, + gap_id: pair.gapId, + attempt_id: attemptId, + accepted: false, + reason, + }); + }; + + const ruleByMotifId = new Map(COMMIT_MOTIF_RULES.map((rule) => [rule.motifId, rule])); + const ruleByMotifLabel = new Map( + COMMIT_MOTIF_RULES.map((rule) => [rule.label.toLowerCase(), rule]), + ); + type AggregatedMotifInput = { + motifId: string; + motifLabel: string; + signal: number; + count: number; + observedGapIds: string[]; + sourceIndex: number; + }; + const aggregatedMotifMap = new Map(); + hints.forEach((hint, index) => { + const motifId = asString(hint.motif_id); + if (!motifId) { + recordTelemetry({ + step: "motif_screen", + accepted: false, + reason: "invalid_motif_id", + }); + return; + } + const motifLabel = asString(hint.label); + if (!motifLabel) { + recordTelemetry({ + step: "motif_screen", + motif_id: motifId, + accepted: false, + reason: "invalid_motif_label", + }); + return; + } + const signal = clamp01(asNumber(hint.signal, 0)); + const count = Math.max(0, Math.floor(asNumber(hint.count, 0))); + const observedGapIds = sanitizeIdList(hint.gap_ids); + const aggregated = aggregatedMotifMap.get(motifId); + if (!aggregated) { + aggregatedMotifMap.set(motifId, { + motifId, + motifLabel, + signal, + count, + observedGapIds, + sourceIndex: index, + }); + return; + } + const mergedGapIds = mergeUniqueStrings(aggregated.observedGapIds, observedGapIds); + const shouldReplace = + signal > aggregated.signal || + (signal === aggregated.signal && count > aggregated.count) || + (signal === aggregated.signal && count === aggregated.count && index > aggregated.sourceIndex); + if (shouldReplace) { + aggregatedMotifMap.set(motifId, { + motifId, + motifLabel, + signal, + count, + observedGapIds: mergedGapIds, + sourceIndex: index, + }); + return; + } + aggregatedMotifMap.set(motifId, { + ...aggregated, + observedGapIds: mergedGapIds, + }); + }); + const aggregatedMotifs = [...aggregatedMotifMap.values()].sort((a, b) => + a.motifId.localeCompare(b.motifId), + ); + + type AggregatedGapInput = { gapId: string; gapLabel: string; score: number; sourceIndex: number }; + const aggregatedGapMap = new Map(); + gaps.forEach((gap, index) => { + const gapId = asString(gap.id); + if (!gapId) { + recordTelemetry({ + step: "gap_screen", + accepted: false, + reason: "invalid_gap_id", + }); + return; + } + const gapLabel = asString(gap.label); + if (!gapLabel) { + recordTelemetry({ + step: "gap_screen", + gap_id: gapId, + accepted: false, + reason: "invalid_gap_label", + }); + return; + } + const score = clamp01(asNumber(gap.score, 0)); + const existing = aggregatedGapMap.get(gapId); + if ( + !existing || + score > existing.score || + (score === existing.score && index > existing.sourceIndex) + ) { + aggregatedGapMap.set(gapId, { gapId, gapLabel, score, sourceIndex: index }); + } + }); + const aggregatedGaps = [...aggregatedGapMap.values()].sort((a, b) => + a.gapId.localeCompare(b.gapId), + ); + + type EligibleMotif = { + rule: CommitMotifRule; + motifId: string; + motifLabel: string; + observedGapIds: string[]; + signal: number; + novelty: number; + candidateShape: EngineCandidateShape; + }; + const eligibleMotifs: EligibleMotif[] = []; + for (const hint of aggregatedMotifs) { + const motifId = hint.motifId; + const motifLabel = hint.motifLabel; + const normalizedLabel = motifLabel.toLowerCase(); + const rule = + ruleByMotifId.get(motifId) || + (normalizedLabel ? ruleByMotifLabel.get(normalizedLabel) : undefined); + if (!rule) { + recordTelemetry({ + step: "motif_screen", + motif_id: motifId, + accepted: false, + reason: "unknown_motif", + }); + continue; + } + const signal = clamp01(hint.signal); + if (signal < minSignal) { + recordTelemetry({ + step: "motif_screen", + motif_id: motifId, + accepted: false, + reason: "motif_signal_below_threshold", + metrics: { signal }, + }); + continue; + } + const candidateShape = cloneCandidateShape(rule.shape); + if (!isCandidateShapeComplete(candidateShape)) { + recordTelemetry({ + step: "motif_screen", + motif_id: motifId, + accepted: false, + reason: "candidate_shape_incomplete", + }); + continue; + } + const novelty = clamp01( + 1 - clamp01(hint.count / ADJACENT_POSSIBLE_NOVELTY_DIVISOR), + ); + eligibleMotifs.push({ + rule, + motifId, + motifLabel, + observedGapIds: [...hint.observedGapIds], + signal, + novelty, + candidateShape, + }); + recordTelemetry({ + step: "motif_screen", + motif_id: motifId, + accepted: true, + metrics: { signal, novelty }, + }); + } + + type EligibleGap = { gapId: string; gapLabel: string; score: number }; + const eligibleGaps: EligibleGap[] = []; + for (const gap of aggregatedGaps) { + const gapId = gap.gapId; + const gapLabel = gap.gapLabel; + const score = clamp01(gap.score); + const accepted = score >= minGapScore; + recordTelemetry({ + step: "gap_screen", + gap_id: gapId, + accepted, + metrics: { score }, + ...(accepted ? {} : { reason: "gap_score_below_threshold" }), + }); + if (!accepted) continue; + eligibleGaps.push({ gapId, gapLabel, score }); + } + + const seenPairs = new Set(); + const generatedIdeas: AdjacentPossibleIdea[] = []; + for (const motif of eligibleMotifs) { + for (const gap of eligibleGaps) { + const pairKey = `${motif.motifId}:${gap.gapId}`; + const pairContext = { motifId: motif.motifId, gapId: gap.gapId }; + if (seenPairs.has(pairKey)) { + rejectPair(pairContext, "duplicate_pair"); + continue; + } + seenPairs.add(pairKey); + const supportsGap = + motif.rule.gapIds.includes(gap.gapId) || motif.observedGapIds.includes(gap.gapId); + if (!supportsGap) { + rejectPair(pairContext, "gap_not_supported"); + continue; + } + if (!isCandidateShapeComplete(motif.candidateShape)) { + rejectPair(pairContext, "candidate_shape_incomplete"); + continue; + } + const coverageBoost = motif.observedGapIds.includes(gap.gapId) + ? ADJACENT_POSSIBLE_COVERAGE_BOOST + : 0; + const score = clamp01( + ADJACENT_POSSIBLE_GAP_WEIGHT * gap.score + + ADJACENT_POSSIBLE_SIGNAL_WEIGHT * motif.signal + + ADJACENT_POSSIBLE_NOVELTY_WEIGHT * motif.novelty + + coverageBoost, + ); + const idea: AdjacentPossibleIdea = { + id: `adjacent_possible_${motif.motifId}_${gap.gapId}`, + motif_id: motif.motifId, + gap_id: gap.gapId, + motif_label: motif.motifLabel, + gap_label: gap.gapLabel, + score, + summary: `Adjacent possible: ${motif.motifLabel} targeting ${gap.gapLabel}`, + hypothesis: + `Blend the ${motif.motifLabel.toLowerCase()} motif with ${gap.gapLabel.toLowerCase()} telemetry ` + + "to relieve the active bottleneck without widening scope.", + evidence: [ + `motif_signal=${formatEvidenceValue(motif.signal)}`, + `motif_novelty=${formatEvidenceValue(motif.novelty)}`, + `gap_score=${formatEvidenceValue(gap.score)}`, + `coverage_boost=${formatEvidenceValue(coverageBoost)}`, + ], + candidate_shape: cloneCandidateShape(motif.candidateShape), + }; + recordPairOutcome(pairContext, true, { metrics: { score } }); + generatedIdeas.push(idea); + } + } + + generatedIdeas.sort((a, b) => { + if (b.score !== a.score) return b.score - a.score; + return a.id.localeCompare(b.id); + }); + const allowedIdeaCount = maxIdeas > 0 ? Math.min(maxIdeas, generatedIdeas.length) : 0; + const selectedIdeas = generatedIdeas.slice(0, allowedIdeaCount); + const truncationReason = maxIdeas <= 0 ? "max_ideas_disabled" : "max_ideas_limit"; + generatedIdeas.forEach((idea, index) => { + const attemptId = buildAttemptId({ motifId: idea.motif_id, gapId: idea.gap_id }); + if (index < allowedIdeaCount) { + recordTelemetry({ + step: "idea_emitted", + motif_id: idea.motif_id, + gap_id: idea.gap_id, + attempt_id: attemptId, + accepted: true, + metrics: { score: idea.score, rank: index + 1 }, + }); + } else { + recordTelemetry({ + step: "idea_truncated", + motif_id: idea.motif_id, + gap_id: idea.gap_id, + attempt_id: attemptId, + accepted: false, + reason: truncationReason, + metrics: { score: idea.score, rank: index + 1 }, + }); + } + }); + + return { ideas: selectedIdeas, telemetry }; +} + export function buildEngineInspirationContext(params: { vision: Pick; snapshot: EngineIdeaInputSnapshot; @@ -2104,6 +2711,7 @@ function inferEngineTrialFromCandidate( export function buildEngineFallbackCandidates(params: { engineInspiration: EngineInspirationContext; snapshotTopSignals: EngineIdeaInputSnapshot["top_signals"]; + snapshotStateTraits?: EngineIdeaInputSnapshot["state_traits"]; visionSectionRefs: string[]; maxCandidates?: number; }): Array> { @@ -2114,12 +2722,50 @@ export function buildEngineFallbackCandidates(params: { params.engineInspiration.compiled_objectives.map((objective) => [objective.id, objective.title]), ); const sectionRefs = selectVisionSectionRefs(params.visionSectionRefs); + const topSignals = Array.isArray(params.snapshotTopSignals) ? params.snapshotTopSignals : []; + const stateTraits = Array.isArray(params.snapshotStateTraits) ? params.snapshotStateTraits : []; + const queueMetricByKind = new Map( + extractQueueHealthMetrics({ + topSignals, + stateTraits, + }).map((metric) => [metric.metric, metric]), + ); + const describeQueueMetricSource = (metric: QueueHealthMetric): string => { + const suffix = metric.sourceId ? ` ${metric.sourceId}` : ""; + return `${metric.sourceType}${suffix}`.trim(); + }; + const formatLatencyValue = (value: number): string => { + if (!Number.isFinite(value)) return "unknown"; + if (value >= 1) return `${Math.round(value)}ms`; + return `${Math.round(value * 1000)}us`; + }; + const queueMetricContext = + queueMetricByKind.size > 0 + ? (() => { + const lines: string[] = []; + const latencyMetric = queueMetricByKind.get("queue_latency_ms"); + if (latencyMetric) { + lines.push( + `Queue latency p95 ≈ ${formatLatencyValue(latencyMetric.value)} (${describeQueueMetricSource(latencyMetric)}).`, + ); + } + const failureMetric = queueMetricByKind.get("job_failure_rate"); + if (failureMetric) { + const pct = clamp01(failureMetric.value) * 100; + const decimals = pct >= 10 ? 0 : 1; + lines.push( + `Worker job failure rate ≈ ${pct.toFixed(decimals)}% (${describeQueueMetricSource(failureMetric)}).`, + ); + } + return lines; + })() + : []; return params.engineInspiration.building_blocks .filter((block) => block.score >= 0.42) .slice(0, maxCandidates) .map((block, idx) => { - const signalIds = pickSignalIdsForTrigger(params.snapshotTopSignals, block.candidate_shape.trigger_type); + const signalIds = pickSignalIdsForTrigger(topSignals, block.candidate_shape.trigger_type); const objectiveTitles = block.objective_ids .map((id) => objectiveTitleById.get(id)) .filter((value): value is string => typeof value === "string" && value.length > 0) @@ -2139,6 +2785,8 @@ export function buildEngineFallbackCandidates(params: { sourceLabel: block.source_label, sourceUrl: block.source_url, }); + const queueHypotheses = + block.candidate_shape.trigger_type === "queue_health" ? queueMetricContext : []; return { id: `cand_engine_${block.id}_${randomUUID().slice(0, 8)}`, title: `Engine building block: ${block.algorithm}`, @@ -2162,6 +2810,7 @@ export function buildEngineFallbackCandidates(params: { `Prioritize ${primaryObjectiveTitle} using ${block.algorithm}; score=${block.score.toFixed(2)}.`, vision_section_refs: sectionRefs, feature_hypotheses: [ + ...queueHypotheses, block.summary, block.hypothesis, ...(sourceAttribution ? [sourceAttribution] : []), @@ -2678,13 +3327,28 @@ export class RemoteBuddyAutonomousEngine { ); return; } - const data = (await res.json().catch(() => ({}))) as { + let data: { ok?: boolean; inserted?: unknown; updated?: unknown; skipped?: unknown; - }; - if (data.ok === false) { + } | null = null; + try { + data = (await res.json()) as { + ok?: boolean; + inserted?: unknown; + updated?: unknown; + skipped?: unknown; + }; + } catch (error) { + console.warn( + `[RemoteBuddyAutonomousEngine] tick ${runId}: automatic inspiration ingest response JSON parse failed: ${String( + error, + )}`, + ); + return; + } + if (!data || data.ok === false) { console.warn( `[RemoteBuddyAutonomousEngine] tick ${runId}: automatic inspiration ingest returned ok=false.`, ); @@ -2750,14 +3414,25 @@ export class RemoteBuddyAutonomousEngine { } private async releaseDispatchLock(runId: string): Promise { - await fetch(`${this.server}/autonomy/lock/release`, { - method: "POST", - headers: this.headers(), - body: JSON.stringify({ - sessionId: this.sessionId, - runId, - }), - }).catch(() => {}); + try { + const res = await fetch(`${this.server}/autonomy/lock/release`, { + method: "POST", + headers: this.headers(), + body: JSON.stringify({ + sessionId: this.sessionId, + runId, + }), + }); + if (!res.ok) { + console.warn( + `[RemoteBuddyAutonomousEngine] releaseDispatchLock(${runId}) failed with HTTP ${res.status}`, + ); + } + } catch (error) { + console.warn( + `[RemoteBuddyAutonomousEngine] releaseDispatchLock(${runId}) threw: ${String(error)}`, + ); + } } private async llmPhase( @@ -3275,6 +3950,7 @@ export class RemoteBuddyAutonomousEngine { const synthesized = buildEngineFallbackCandidates({ engineInspiration, snapshotTopSignals: snapshot.top_signals, + snapshotStateTraits: snapshot.state_traits, visionSectionRefs: visionContext.section_numbers, maxCandidates: Math.max(1, Math.min(3, this.cfg.topK)), }); @@ -3398,6 +4074,7 @@ export class RemoteBuddyAutonomousEngine { const synthesizedFallback = buildEngineFallbackCandidates({ engineInspiration, snapshotTopSignals: snapshot.top_signals, + snapshotStateTraits: snapshot.state_traits, visionSectionRefs: visionContext.section_numbers, maxCandidates: Math.max(1, Math.min(3, this.cfg.topK)), }); diff --git a/apps/remotebuddy/src/llm.ts b/apps/remotebuddy/src/llm.ts index 248ed29..e4d5c58 100644 --- a/apps/remotebuddy/src/llm.ts +++ b/apps/remotebuddy/src/llm.ts @@ -43,6 +43,21 @@ export interface LLMClient { type LlmBackend = "lmstudio" | "ollama" | "openai" | "openai_codex"; type LlmService = "localbuddy" | "remotebuddy" | "workerpals"; +export interface LLMUsageEvent { + service: LlmService; + sessionId?: string; + backend: LlmBackend; + modelId?: string | null; + promptTokens: number; + completionTokens: number; + totalTokens: number; + estimated?: boolean; +} + +export interface LLMUsageReporter { + reportUsage(event: LLMUsageEvent): Promise; +} + export interface LLMClientOptions { service?: LlmService; sessionId?: string; @@ -50,6 +65,9 @@ export interface LLMClientOptions { apiKey?: string; model?: string; backend?: string; + serverUrl?: string; + authToken?: string | null; + usageReporter?: LLMUsageReporter; } interface ResolvedServiceLlmConfig { @@ -521,6 +539,66 @@ function sumEstimatedTokens(messages: Array<{ role: string; content: string }>): return messages.reduce((acc, msg) => acc + estimateTokensFromText(msg.content), 0); } +function tokenUsageFromEstimate( + messages: Array<{ role: string; content: string }>, + responseText: string, +): { promptTokens: number; completionTokens: number } { + return { + promptTokens: Math.max(0, sumEstimatedTokens(messages)), + completionTokens: Math.max(0, estimateTokensFromText(responseText)), + }; +} + +function normalizeTokenUsage( + usage: { promptTokens: number; completionTokens: number } | undefined, + fallback: { promptTokens: number; completionTokens: number }, +): { promptTokens: number; completionTokens: number; estimated: boolean } { + if ( + usage && + Number.isFinite(usage.promptTokens) && + usage.promptTokens >= 0 && + Number.isFinite(usage.completionTokens) && + usage.completionTokens >= 0 + ) { + return { + promptTokens: Math.round(usage.promptTokens), + completionTokens: Math.round(usage.completionTokens), + estimated: false, + }; + } + return { + promptTokens: Math.round(fallback.promptTokens), + completionTokens: Math.round(fallback.completionTokens), + estimated: true, + }; +} + +function createHttpUsageReporter(opts: { + serverUrl?: string; + authToken?: string | null; +}): LLMUsageReporter | null { + const serverUrl = (opts.serverUrl ?? "").trim().replace(/\/+$/, ""); + if (!serverUrl) return null; + return { + async reportUsage(event: LLMUsageEvent): Promise { + const headers: Record = { "Content-Type": "application/json" }; + const authToken = (opts.authToken ?? "").trim(); + if (authToken) headers.Authorization = `Bearer ${authToken}`; + const response = await fetch(`${serverUrl}/telemetry/llm-usage`, { + method: "POST", + headers, + body: JSON.stringify(event), + }); + if (!response.ok) { + const detail = await response.text().catch(() => ""); + throw new Error( + `usage telemetry rejected (${response.status})${detail ? `: ${detail.trim()}` : ""}`, + ); + } + }, + }; +} + function providerlessModelName(raw: string): string { const normalized = raw.trim(); if (!normalized.includes("/")) return normalized; @@ -700,9 +778,11 @@ export class LmStudioClient implements LLMClient { private endpoint: string; private apiKey: string; private model: string; + private service: LlmService; private sessionTag: string; private providerKind: "lmstudio" | "openai"; private providerLabel: string; + private usageReporter: LLMUsageReporter | null; private contextWindow: number; private minOutputTokens: number; private tokenSafetyMargin: number; @@ -722,6 +802,7 @@ export class LmStudioClient implements LLMClient { service?: LlmService; sessionId?: string; lmStudio?: PushPalsLmStudioConfig; + usageReporter?: LLMUsageReporter | null; }) { this.providerKind = opts?.backend ?? "lmstudio"; this.providerLabel = this.providerKind === "openai" ? "OpenAI" : "LM Studio"; @@ -731,7 +812,9 @@ export class LmStudioClient implements LLMClient { this.endpoint = normalizeLmStudioEndpoint(rawEndpoint); this.apiKey = opts?.apiKey ?? (this.providerKind === "lmstudio" ? "lmstudio" : ""); this.model = opts?.model ?? DEFAULT_MODEL; - this.sessionTag = stableConversationTag(opts?.service ?? "remotebuddy", opts?.sessionId); + this.service = opts?.service ?? "remotebuddy"; + this.sessionTag = stableConversationTag(this.service, opts?.sessionId); + this.usageReporter = opts?.usageReporter ?? null; const lmStudio = opts?.lmStudio; this.contextWindow = Math.max(512, lmStudio?.contextWindow ?? DEFAULT_LMSTUDIO_CONTEXT_WINDOW); this.minOutputTokens = Math.max( @@ -750,6 +833,27 @@ export class LmStudioClient implements LLMClient { this.batchMemoryChars = Math.max(0, lmStudio?.batchMemoryChars ?? 0); } + private async maybeReportUsage( + modelId: string, + usage: { promptTokens: number; completionTokens: number; estimated: boolean }, + ): Promise { + if (!this.usageReporter) return; + try { + await this.usageReporter.reportUsage({ + service: this.service, + sessionId: this.sessionTag || undefined, + backend: this.providerKind, + modelId, + promptTokens: usage.promptTokens, + completionTokens: usage.completionTokens, + totalTokens: usage.promptTokens + usage.completionTokens, + estimated: usage.estimated, + }); + } catch (err) { + console.warn(`[LLM] Usage telemetry failed (${this.service}): ${String(err)}`); + } + } + private modelProbeUrls(): string[] { const trimmed = this.endpoint.replace(/\/+$/, ""); if (this.providerKind === "openai") { @@ -986,21 +1090,30 @@ export class LmStudioClient implements LLMClient { const data = (await res.json()) as any; const choice = data.choices?.[0]; + const text = choice?.message?.content ?? ""; if ("session_id" in body || "conversation_id" in body) { this.lmStudioSupportsExtendedSessionFields = true; } if ("response_format" in body) { this.lmStudioSupportsResponseFormat = true; } - - return { - text: choice?.message?.content ?? "", - usage: data.usage + const usage = normalizeTokenUsage( + data.usage ? { - promptTokens: data.usage.prompt_tokens, - completionTokens: data.usage.completion_tokens, + promptTokens: Number(data.usage.prompt_tokens ?? 0), + completionTokens: Number(data.usage.completion_tokens ?? 0), } : undefined, + tokenUsageFromEstimate(messages, text), + ); + await this.maybeReportUsage(model, usage); + + return { + text, + usage: { + promptTokens: usage.promptTokens, + completionTokens: usage.completionTokens, + }, }; } @@ -1193,6 +1306,7 @@ export class OpenAiCodexCliClient implements LLMClient { private readonly service: LlmService; private readonly sessionTag: string; private readonly reasoningEffort: string; + private readonly usageReporter: LLMUsageReporter | null; constructor(opts?: { model?: string; @@ -1204,6 +1318,7 @@ export class OpenAiCodexCliClient implements LLMClient { reasoningEffort?: string; service?: LlmService; sessionId?: string; + usageReporter?: LLMUsageReporter | null; }) { this.model = normalizeCodexModel(opts?.model ?? DEFAULT_CODEX_MODEL); this.apiKey = (opts?.apiKey ?? "").trim(); @@ -1214,6 +1329,27 @@ export class OpenAiCodexCliClient implements LLMClient { this.service = opts?.service ?? "remotebuddy"; this.sessionTag = stableConversationTag(this.service, opts?.sessionId); this.reasoningEffort = (opts?.reasoningEffort ?? "").trim(); + this.usageReporter = opts?.usageReporter ?? null; + } + + private async maybeReportUsage( + usage: { promptTokens: number; completionTokens: number; estimated: boolean }, + ): Promise { + if (!this.usageReporter) return; + try { + await this.usageReporter.reportUsage({ + service: this.service, + sessionId: this.sessionTag || undefined, + backend: "openai_codex", + modelId: this.model, + promptTokens: usage.promptTokens, + completionTokens: usage.completionTokens, + totalTokens: usage.promptTokens + usage.completionTokens, + estimated: usage.estimated, + }); + } catch (err) { + console.warn(`[LLM] Usage telemetry failed (${this.service}): ${String(err)}`); + } } private effectiveAuthMode(): CodexAuthMode { @@ -1332,18 +1468,61 @@ export class OpenAiCodexCliClient implements LLMClient { console.warn(`[LLM] Codex CLI stderr (${this.service}): ${firstLine.trim()}`); } } - return { text: result.text }; + const usage = normalizeTokenUsage(undefined, { + promptTokens: estimateTokensFromText(prompt), + completionTokens: estimateTokensFromText(result.text), + }); + await this.maybeReportUsage(usage); + return { + text: result.text, + usage: { + promptTokens: usage.promptTokens, + completionTokens: usage.completionTokens, + }, + }; } } export class OllamaClient implements LLMClient { private endpoint: string; private model: string; + private service: LlmService; + private sessionTag: string; + private usageReporter: LLMUsageReporter | null; - constructor(opts?: { endpoint?: string; model?: string }) { + constructor(opts?: { + endpoint?: string; + model?: string; + service?: LlmService; + sessionId?: string; + usageReporter?: LLMUsageReporter | null; + }) { const rawEndpoint = opts?.endpoint ?? DEFAULT_OLLAMA_ENDPOINT; this.endpoint = normalizeOllamaEndpoint(rawEndpoint); this.model = opts?.model ?? DEFAULT_MODEL; + this.service = opts?.service ?? "remotebuddy"; + this.sessionTag = stableConversationTag(this.service, opts?.sessionId); + this.usageReporter = opts?.usageReporter ?? null; + } + + private async maybeReportUsage( + usage: { promptTokens: number; completionTokens: number; estimated: boolean }, + ): Promise { + if (!this.usageReporter) return; + try { + await this.usageReporter.reportUsage({ + service: this.service, + sessionId: this.sessionTag || undefined, + backend: "ollama", + modelId: this.model, + promptTokens: usage.promptTokens, + completionTokens: usage.completionTokens, + totalTokens: usage.promptTokens + usage.completionTokens, + estimated: usage.estimated, + }); + } catch (err) { + console.warn(`[LLM] Usage telemetry failed (${this.service}): ${String(err)}`); + } } async generate(input: LLMGenerateInput): Promise { @@ -1379,13 +1558,23 @@ export class OllamaClient implements LLMClient { } const data = (await res.json()) as any; - return { text: data.message?.content ?? "" }; + const text = data.message?.content ?? ""; + const usage = normalizeTokenUsage(undefined, tokenUsageFromEstimate(body.messages as Array<{ role: string; content: string }>, text)); + await this.maybeReportUsage(usage); + return { + text, + usage: { + promptTokens: usage.promptTokens, + completionTokens: usage.completionTokens, + }, + }; } } export function createLLMClient(opts: LLMClientOptions = {}): LLMClient { const resolved = resolveServiceLlmConfig(opts); const service = opts.service ?? "remotebuddy"; + const usageReporter = opts.usageReporter ?? createHttpUsageReporter(opts); if (resolved.backend === "openai_codex") { console.log( @@ -1401,6 +1590,7 @@ export function createLLMClient(opts: LLMClientOptions = {}): LLMClient { reasoningEffort: resolved.reasoningEffort, service, sessionId: resolved.sessionId, + usageReporter, }); } @@ -1411,6 +1601,9 @@ export function createLLMClient(opts: LLMClientOptions = {}): LLMClient { return new OllamaClient({ endpoint: resolved.endpoint, model: resolved.model, + service, + sessionId: resolved.sessionId, + usageReporter, }); } @@ -1426,6 +1619,7 @@ export function createLLMClient(opts: LLMClientOptions = {}): LLMClient { service, sessionId: resolved.sessionId, lmStudio: resolved.lmStudio, + usageReporter, }); } @@ -1440,5 +1634,6 @@ export function createLLMClient(opts: LLMClientOptions = {}): LLMClient { service, sessionId: resolved.sessionId, lmStudio: resolved.lmStudio, + usageReporter, }); } diff --git a/apps/remotebuddy/src/remotebuddy_main.ts b/apps/remotebuddy/src/remotebuddy_main.ts index 0c5f75b..5639bfc 100644 --- a/apps/remotebuddy/src/remotebuddy_main.ts +++ b/apps/remotebuddy/src/remotebuddy_main.ts @@ -2363,6 +2363,8 @@ async function main() { endpoint: llmCfg.endpoint, model: llmCfg.model, apiKey: llmCfg.apiKey, + serverUrl: opts.server, + authToken: opts.authToken, }); brain = new AgentBrain(llm); diff --git a/apps/remotebuddy/src/startup/checklist.test.ts b/apps/remotebuddy/src/startup/checklist.test.ts index 9332751..8e7b24c 100644 --- a/apps/remotebuddy/src/startup/checklist.test.ts +++ b/apps/remotebuddy/src/startup/checklist.test.ts @@ -43,6 +43,21 @@ const actionFor = (code: StartupFailureCode): string => { return entry.action; }; +describe("STARTUP_FAILURE_CODES", () => { + test("exports stable literal mappings", () => { + expect(STARTUP_FAILURE_CODES).toEqual({ + BUN_VERSION_UNSUPPORTED: "startup.bun_version_unsupported", + DOCKER_VERSION_UNSUPPORTED: "startup.docker_version_unsupported", + MERGE_IN_PROGRESS: "startup.merge_in_progress", + REPO_DIRTY: "startup.repo_dirty", + ALERTS_ACTIVE: "startup.alerts_active", + SYNTHETIC_FAILED: "startup.synthetic_failed", + DISPATCH_FAILED: "startup.dispatch_failed", + }); + expect(Object.isFrozen(STARTUP_FAILURE_CODES)).toBe(true); + }); +}); + describe("StartupChecklist", () => { test( "surfaces actionable failure codes for merge or dirty states", diff --git a/apps/remotebuddy/src/startup/checklist.ts b/apps/remotebuddy/src/startup/checklist.ts index 4af7805..c4fc291 100644 --- a/apps/remotebuddy/src/startup/checklist.ts +++ b/apps/remotebuddy/src/startup/checklist.ts @@ -3,20 +3,28 @@ * The helper executes each check sequentially, surfaces actionable failure codes, * and optionally blocks job dispatch until a synthetic probe completes. */ -export const STARTUP_FAILURE_CODES = { +export const STARTUP_FAILURE_CODES = Object.freeze({ + BUN_VERSION_UNSUPPORTED: "startup.bun_version_unsupported", + DOCKER_VERSION_UNSUPPORTED: "startup.docker_version_unsupported", MERGE_IN_PROGRESS: "startup.merge_in_progress", REPO_DIRTY: "startup.repo_dirty", ALERTS_ACTIVE: "startup.alerts_active", SYNTHETIC_FAILED: "startup.synthetic_failed", DISPATCH_FAILED: "startup.dispatch_failed", -} as const; +} as const); export type StartupFailureCode = (typeof STARTUP_FAILURE_CODES)[keyof typeof STARTUP_FAILURE_CODES]; type StartupCheckStatus = "pass" | "fail"; -export type StartupCheckCategory = "repo" | "alerts" | "synthetic" | "dispatch"; +export type StartupCheckCategory = + | "runtime" + | "infrastructure" + | "repo" + | "alerts" + | "synthetic" + | "dispatch"; export interface StartupChecklistOptions { syntheticMaxLatencyMs?: number; @@ -62,6 +70,8 @@ export interface StartupCheckRecord { endedAtMs: number; error?: { message: string; + raw?: unknown; + stack?: string; }; } @@ -79,10 +89,47 @@ export interface StartupChecklistResult { history: StartupCheckRecord[]; } +export type StartupTelemetryPhaseEvent = { + type: "startup_phase"; + code: StartupFailureCode; + category: StartupCheckCategory; + step: number; + status: StartupCheckStatus; + detail: string; + startedAtMs: number; + endedAtMs: number; + durationMs: number; + error?: { + message: string; + raw?: unknown; + stack?: string; + }; +}; + +export type StartupTelemetryUnknownFailureEvent = { + type: "startup_unknown_failure"; + code: StartupFailureCode; + phase: string; + step: number; + whenMs: number; + error: { + message: string; + raw?: unknown; + stack?: string; + }; +}; + +export type StartupTelemetryEvent = + | StartupTelemetryPhaseEvent + | StartupTelemetryUnknownFailureEvent; + export interface StartupChecklistContext { describeRepo(): Promise; listFiringAlerts(): Promise; syntheticTester: SyntheticStartupTester; + readBunVersion?: () => Promise; + readDockerVersion?: () => Promise; + telemetry?: (event: StartupTelemetryEvent) => void; now?: () => number; log?: (entry: StartupCheckRecord) => void; } @@ -111,9 +158,85 @@ const DEFAULT_SYNTHETIC_PROBE = "probe.remote_startup"; const DISPATCH_CHECK_LABEL = "Job dispatch must succeed."; const DISPATCH_CHECK_ACTION = "Inspect RemoteBuddy + WorkerPals logs, repair dependencies, then rerun dispatch."; +const MIN_BUN_VERSION = "1.1.0"; +const MIN_DOCKER_VERSION = "24.0.0"; +const asVersionString = (value: unknown): string => String(value ?? "").trim(); + +class StartupCheckError extends Error { + constructor(message: string, public readonly raw?: unknown) { + super(message); + this.name = "StartupCheckError"; + } +} const defaultChecks = Object.freeze( [ + { + code: STARTUP_FAILURE_CODES.BUN_VERSION_UNSUPPORTED, + label: "Bun runtime must meet the supported version.", + action: "Install Bun 1.1.x or newer before starting RemoteBuddy dispatch.", + category: "runtime", + run: async (ctx) => { + const reader = + ctx.readBunVersion ?? + (async () => { + const detected = typeof process !== "undefined" ? process.versions?.bun : undefined; + return asVersionString(detected); + }); + let version: string; + try { + version = asVersionString(await reader()); + } catch (error) { + throw new StartupCheckError("Bun version probe failed", error); + } + if (!version) { + throw new StartupCheckError("Bun version probe failed: version not detected."); + } + if (compareVersions(version, MIN_BUN_VERSION) < 0) { + return { + ok: false, + detail: `Bun ${version} is below the required ${MIN_BUN_VERSION}.`, + }; + } + return { + ok: true, + detail: `Bun runtime OK (${version}).`, + }; + }, + }, + { + code: STARTUP_FAILURE_CODES.DOCKER_VERSION_UNSUPPORTED, + label: "Docker runtime must meet the supported version.", + action: "Upgrade Docker to 24.x or newer before enabling RemoteBuddy dispatch.", + category: "infrastructure", + run: async (ctx) => { + const reader = + ctx.readDockerVersion ?? + (async () => { + const detected = typeof process !== "undefined" ? process.env?.DOCKER_VERSION : undefined; + return asVersionString(detected); + }); + let version: string; + try { + version = asVersionString(await reader()); + } catch (error) { + throw new StartupCheckError("Docker version probe failed", error); + } + if (!version) { + throw new StartupCheckError("Docker version probe failed: version not detected."); + } + if (compareVersions(version, MIN_DOCKER_VERSION) < 0) { + return { + ok: false, + detail: `Docker ${version} is below the required ${MIN_DOCKER_VERSION}.`, + }; + } + return { + ok: true, + detail: `Docker runtime OK (${version}).`, + }; + }, + }, { code: STARTUP_FAILURE_CODES.MERGE_IN_PROGRESS, label: "Git merge or rebase must be resolved.", @@ -257,30 +380,41 @@ export class StartupChecklist { options: StartupChecklistOptions = {}, ): Promise { const history: StartupCheckRecord[] = []; + const emitTelemetry = (event: StartupTelemetryEvent): void => { + ctx.telemetry?.(event); + }; for (const [index, check] of this.checks.entries()) { const step = index + 1; const startedAtMs = nowMs(ctx); let status: StartupCheckStatus = "pass"; let detail = check.label; let failureErrorMessage: string | undefined; + let failureRawError: unknown; + let failureErrorStack: string | undefined; + let threwError = false; try { const outcome = await check.run(ctx, options); status = outcome.ok ? "pass" : "fail"; detail = outcome.detail ?? check.label; } catch (error) { status = "fail"; - detail = - error instanceof Error - ? error.message - : "Unknown error running startup check."; - if (error instanceof Error) { - failureErrorMessage = error.message; - } else if (typeof error === "string" && error.trim()) { - failureErrorMessage = error.trim(); - } + threwError = true; + const normalized = normalizeStartupError(error); + detail = normalized.message; + failureErrorMessage = normalized.message; + failureRawError = normalized.raw; + failureErrorStack = normalized.stack; } const endedAtMs = nowMs(ctx); const durationMs = Math.max(0, endedAtMs - startedAtMs); + const recordError = + status === "fail" && (failureErrorMessage || failureRawError) + ? { + message: failureErrorMessage ?? detail, + raw: failureRawError, + stack: failureErrorStack, + } + : undefined; const record: StartupCheckRecord = { code: check.code, label: check.label, @@ -293,10 +427,36 @@ export class StartupChecklist { durationMs, startedAtMs, endedAtMs, - error: status === "fail" && failureErrorMessage ? { message: failureErrorMessage } : undefined, + error: recordError, }; history.push(record); ctx.log?.(record); + emitTelemetry({ + type: "startup_phase", + code: check.code, + category: check.category, + step, + status, + detail, + startedAtMs, + endedAtMs, + durationMs, + error: recordError, + }); + if (status === "fail" && threwError) { + emitTelemetry({ + type: "startup_unknown_failure", + code: check.code, + phase: check.label, + step, + whenMs: endedAtMs, + error: { + message: detail, + raw: failureRawError ?? failureErrorMessage ?? detail, + stack: failureErrorStack, + }, + }); + } if (status === "fail") { return { ok: false, @@ -334,6 +494,9 @@ export const gateDispatchWithStartupPreflight = async ( if (!result.ok) { return result; } + const emitTelemetry = (event: StartupTelemetryEvent): void => { + ctx.telemetry?.(event); + }; const dispatchStep = result.history.length + 1; const dispatchLabel = DISPATCH_CHECK_LABEL; const dispatchAction = DISPATCH_CHECK_ACTION; @@ -356,14 +519,21 @@ export const gateDispatchWithStartupPreflight = async ( }; result.history.push(successRecord); ctx.log?.(successRecord); + emitTelemetry({ + type: "startup_phase", + code: STARTUP_FAILURE_CODES.DISPATCH_FAILED, + category: "dispatch", + step: dispatchStep, + status: "pass", + detail: successRecord.detail, + startedAtMs, + endedAtMs, + durationMs, + }); return result; } catch (error) { - const errorMessage = - error instanceof Error - ? error.message - : typeof error === "string" - ? error - : "Unknown dispatch failure."; + const normalized = normalizeStartupError(error); + const errorMessage = normalized.message; const detail = `Dispatch job failed: ${errorMessage}`; const endedAtMs = nowMs(ctx); const durationMs = Math.max(0, endedAtMs - startedAtMs); @@ -379,9 +549,41 @@ export const gateDispatchWithStartupPreflight = async ( durationMs, startedAtMs, endedAtMs, - error: { message: errorMessage }, + error: { + message: errorMessage, + raw: normalized.raw, + stack: normalized.stack, + }, }; ctx.log?.(failureRecord); + emitTelemetry({ + type: "startup_phase", + code: STARTUP_FAILURE_CODES.DISPATCH_FAILED, + category: "dispatch", + step: dispatchStep, + status: "fail", + detail, + startedAtMs, + endedAtMs, + durationMs, + error: { + message: errorMessage, + raw: normalized.raw, + stack: normalized.stack, + }, + }); + emitTelemetry({ + type: "startup_unknown_failure", + code: STARTUP_FAILURE_CODES.DISPATCH_FAILED, + phase: dispatchLabel, + step: dispatchStep, + whenMs: endedAtMs, + error: { + message: detail, + raw: normalized.raw ?? errorMessage, + stack: normalized.stack, + }, + }); const history = [...result.history, failureRecord]; return { ok: false, @@ -396,3 +598,61 @@ export const gateDispatchWithStartupPreflight = async ( }; } }; + +type NormalizedStartupError = { + message: string; + raw?: unknown; + stack?: string; +}; + +function parseVersionSegments(version: string): number[] { + return version + .split(/[^0-9]+/) + .filter(Boolean) + .map((segment) => Number.parseInt(segment, 10) || 0); +} + +function compareVersions(a: string, b: string): number { + const aSegments = parseVersionSegments(a); + const bSegments = parseVersionSegments(b); + const maxLen = Math.max(aSegments.length, bSegments.length, 3); + for (let i = 0; i < maxLen; i += 1) { + const diff = (aSegments[i] ?? 0) - (bSegments[i] ?? 0); + if (diff !== 0) return diff; + } + return 0; +} + +function normalizeStartupError(error: unknown): NormalizedStartupError { + if (error instanceof StartupCheckError) { + const raw = error.raw ?? error; + const stack = + error.raw instanceof Error + ? error.raw.stack ?? error.stack + : error instanceof Error + ? error.stack + : undefined; + return { + message: error.message || "Unknown error running startup check.", + raw, + stack, + }; + } + if (error instanceof Error) { + return { + message: error.message || "Unknown error running startup check.", + raw: error, + stack: error.stack, + }; + } + if (typeof error === "string" && error.trim()) { + return { + message: error.trim(), + raw: error.trim(), + }; + } + return { + message: "Unknown error running startup check.", + raw: error, + }; +} diff --git a/apps/server/src/autonomy.ts b/apps/server/src/autonomy.ts index 8d0ddc2..00be18a 100644 --- a/apps/server/src/autonomy.ts +++ b/apps/server/src/autonomy.ts @@ -381,6 +381,28 @@ export interface AutonomyOpsSummary { lastStaleSweepAt: string | null; } +export interface LlmUsageServiceSummary { + service: string; + promptTokens: number; + completionTokens: number; + totalTokens: number; + callCount: number; + avgTokensPerCall: number | null; + estimatedCallCount: number; + lastCallAt: string | null; +} + +export interface LlmUsageSummary { + windowHours: number; + promptTokens: number; + completionTokens: number; + totalTokens: number; + callCount: number; + avgTokensPerCall: number | null; + estimatedCallCount: number; + services: LlmUsageServiceSummary[]; +} + export interface AutonomyInsights { patternStats: AutonomyPatternStatsInsight[]; recentPrFeedback: AutonomyPrFeedbackInsight[]; @@ -441,6 +463,10 @@ function asString(value: unknown): string { return String(value ?? "").trim(); } +function normalizeServiceName(value: unknown): string { + return asString(value).toLowerCase(); +} + function truncateText(value: string, maxChars: number): string { if (maxChars <= 0) return ""; if (value.length <= maxChars) return value; @@ -1348,6 +1374,22 @@ export class AutonomyStore { token_usage_json TEXT, created_at TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS llm_usage_events ( + id TEXT PRIMARY KEY, + service TEXT NOT NULL, + session_id TEXT, + backend TEXT, + model_id TEXT, + prompt_tokens INTEGER NOT NULL DEFAULT 0, + completion_tokens INTEGER NOT NULL DEFAULT 0, + total_tokens INTEGER NOT NULL DEFAULT 0, + estimated INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_llm_usage_events_created + ON llm_usage_events(created_at DESC); + CREATE INDEX IF NOT EXISTS idx_llm_usage_events_service_created + ON llm_usage_events(service, created_at DESC); CREATE TABLE IF NOT EXISTS autonomy_dispatch_lock ( lock_id TEXT PRIMARY KEY, owner_session_id TEXT NOT NULL, @@ -2061,6 +2103,137 @@ export class AutonomyStore { }; } + recordLlmUsage(body: Record): { ok: boolean; reason?: string } { + const service = normalizeServiceName(body.service); + if (!service) return { ok: false, reason: "service is required" }; + + const promptTokens = asNonNegativeInt(body.promptTokens ?? body.prompt_tokens); + const completionTokens = asNonNegativeInt(body.completionTokens ?? body.completion_tokens); + const explicitTotal = asNonNegativeInt(body.totalTokens ?? body.total_tokens); + const totalTokens = explicitTotal > 0 ? explicitTotal : promptTokens + completionTokens; + if (totalTokens <= 0) return { ok: false, reason: "token usage is required" }; + + this.db + .prepare( + `INSERT OR REPLACE INTO llm_usage_events ( + id, service, session_id, backend, model_id, prompt_tokens, completion_tokens, + total_tokens, estimated, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ) + .run( + asString(body.id) || randomUUID(), + service, + asString(body.sessionId ?? body.session_id) || null, + asString(body.backend) || null, + asString(body.modelId ?? body.model_id) || null, + promptTokens, + completionTokens, + totalTokens, + asBoolean(body.estimated, false) ? 1 : 0, + asIsoNow(), + ); + + return { ok: true }; + } + + getLlmUsageSummary(params?: { windowHours?: number }): LlmUsageSummary { + const rawWindowHours = asNonNegativeInt(params?.windowHours ?? 24); + const windowHours = Math.max(1, rawWindowHours || 24); + const nowIso = asIsoNow(); + const modifier = `-${windowHours} hours`; + const overall = this.db + .prepare( + `SELECT + SUM(prompt_tokens) AS prompt_tokens, + SUM(completion_tokens) AS completion_tokens, + SUM(total_tokens) AS total_tokens, + COUNT(*) AS call_count, + SUM(CASE WHEN estimated = 1 THEN 1 ELSE 0 END) AS estimated_call_count + FROM llm_usage_events + WHERE datetime(created_at) >= datetime(?, ?)`, + ) + .get(nowIso, modifier) as + | { + prompt_tokens: number | null; + completion_tokens: number | null; + total_tokens: number | null; + call_count: number | null; + estimated_call_count: number | null; + } + | undefined; + const serviceRows = this.db + .prepare( + `SELECT + service, + SUM(prompt_tokens) AS prompt_tokens, + SUM(completion_tokens) AS completion_tokens, + SUM(total_tokens) AS total_tokens, + COUNT(*) AS call_count, + SUM(CASE WHEN estimated = 1 THEN 1 ELSE 0 END) AS estimated_call_count, + MAX(created_at) AS last_call_at + FROM llm_usage_events + WHERE datetime(created_at) >= datetime(?, ?) + GROUP BY service + ORDER BY total_tokens DESC, service ASC`, + ) + .all(nowIso, modifier) as Array<{ + service: string; + prompt_tokens: number | null; + completion_tokens: number | null; + total_tokens: number | null; + call_count: number | null; + estimated_call_count: number | null; + last_call_at: string | null; + }>; + + const toServiceSummary = (row: { + service: string; + prompt_tokens: number | null; + completion_tokens: number | null; + total_tokens: number | null; + call_count: number | null; + estimated_call_count: number | null; + last_call_at: string | null; + }): LlmUsageServiceSummary => { + const promptTokens = Math.max(0, Math.floor(asNumber(row.prompt_tokens, 0))); + const completionTokens = Math.max(0, Math.floor(asNumber(row.completion_tokens, 0))); + const totalTokens = Math.max( + 0, + Math.floor(asNumber(row.total_tokens, promptTokens + completionTokens)), + ); + const callCount = Math.max(0, Math.floor(asNumber(row.call_count, 0))); + return { + service: row.service, + promptTokens, + completionTokens, + totalTokens, + callCount, + avgTokensPerCall: callCount > 0 ? totalTokens / callCount : null, + estimatedCallCount: Math.max(0, Math.floor(asNumber(row.estimated_call_count, 0))), + lastCallAt: row.last_call_at || null, + }; + }; + + const promptTokens = Math.max(0, Math.floor(asNumber(overall?.prompt_tokens, 0))); + const completionTokens = Math.max(0, Math.floor(asNumber(overall?.completion_tokens, 0))); + const totalTokens = Math.max( + 0, + Math.floor(asNumber(overall?.total_tokens, promptTokens + completionTokens)), + ); + const callCount = Math.max(0, Math.floor(asNumber(overall?.call_count, 0))); + + return { + windowHours, + promptTokens, + completionTokens, + totalTokens, + callCount, + avgTokensPerCall: callCount > 0 ? totalTokens / callCount : null, + estimatedCallCount: Math.max(0, Math.floor(asNumber(overall?.estimated_call_count, 0))), + services: serviceRows.map(toServiceSummary), + }; + } + private buildTopSignals(requestSlo?: QueueSloSummary, jobSlo?: QueueSloSummary): SignalValue[] { const topSignals: SignalValue[] = []; let failedRows: Array<{ kind: string | null; error: string | null; count: number }> = []; diff --git a/apps/server/src/server_main.ts b/apps/server/src/server_main.ts index e606f75..d1c549e 100644 --- a/apps/server/src/server_main.ts +++ b/apps/server/src/server_main.ts @@ -335,11 +335,11 @@ export function createRequestHandler() { // Noisy poll endpoints: only log these at debug level. const isNoisyPoll = (method === "POST" && - /^\/+((jobs|requests|completions)\/claim|workers\/heartbeat|sessions\/[^/]+\/command|jobs\/[^/]+\/log)\/?$/.test( + /^\/+((jobs|requests|completions)\/claim|workers\/heartbeat|sessions\/[^/]+\/command|jobs\/[^/]+\/log|telemetry\/llm-usage)\/?$/.test( pathname, )) || (method === "GET" && - /^\/+(workers|system\/status|requests|jobs|completions|requests\/[^/]+|jobs\/[^/]+|completions\/[^/]+|jobs\/[^/]+\/logs)(\/)?$/.test( + /^\/+(workers|system\/status|requests|jobs|completions|questions|autonomy\/insights|requests\/[^/]+|jobs\/[^/]+|completions\/[^/]+|jobs\/[^/]+\/logs)(\/)?$/.test( pathname, )); if (isNoisyPoll) { @@ -727,6 +727,7 @@ export function createRequestHandler() { requestPending: Math.max(0, Number(requestCounts.pending ?? 0)), jobFailureRate, }); + const llmUsage = autonomyStore.getLlmUsageSummary({ windowHours: 24 }); return makeJson({ ok: true, @@ -763,11 +764,21 @@ export function createRequestHandler() { requests: requestSlo, jobs: jobSlo, }, + llmUsage, autonomy: autonomyOps, repo, }); } + // POST /telemetry/llm-usage + if (pathname === "/telemetry/llm-usage" && method === "POST") { + const denied = requireAuth(); + if (denied) return denied; + const body = (await req.json().catch(() => ({}))) as Record; + const result = autonomyStore.recordLlmUsage(body); + return makeJson(result, result.ok ? 200 : 400); + } + // GET /requests if (pathname === "/requests" && method === "GET") { const denied = requireAuth(); @@ -1260,13 +1271,16 @@ export function createRequestHandler() { id: randomUUID(), ts: new Date().toISOString(), sessionId, - type: "question_answered", + type: "log", from: "server:autonomy", payload: { - questionId, - objectiveId: result.objectiveId || "unknown", - status: result.action || "closed", - ...(body.note ? { answerSummary: compactText(body.note, 240) } : {}), + level: "info", + message: compactText( + `question ${questionId} action=${result.action || "closed"} objective=${result.objectiveId || "unknown"}${ + body.note ? ` note=${String(body.note)}` : "" + }`, + 240, + ), }, }); } diff --git a/scripts/start.ts b/scripts/start.ts index 34e9373..cdf6195 100644 --- a/scripts/start.ts +++ b/scripts/start.ts @@ -664,6 +664,23 @@ type LocalConfigPaths = { localExampleTomlRel: string; }; +function quoteForPowerShell(value: string): string { + return `'${String(value ?? "").replace(/'/g, "''")}'`; +} + +function quoteForBash(value: string): string { + return `'${String(value ?? "").replace(/'/g, "'\\''")}'`; +} + +function printTemplateCopyCommands(templateRel: string, targetRel: string): void { + console.error( + `[start] Windows (PowerShell): Copy-Item -Force ${quoteForPowerShell(templateRel)} ${quoteForPowerShell(targetRel)}`, + ); + console.error( + `[start] Linux/macOS (bash): cp -f ${quoteForBash(templateRel)} ${quoteForBash(targetRel)}`, + ); +} + function resolveLocalConfigPaths(): { localTomlPath: string; localTomlRel: string; @@ -802,6 +819,7 @@ function ensureLocalConfigTemplateKeyParity(localConfigPaths: LocalConfigPaths): console.error("[start] Template key-parity preflight failed:"); for (const problem of problems) { + let shouldPrintCopyCommands = false; if (problem.missingKeys.length > 0) { console.error( `[start] - ${problem.label} is missing ${problem.missingKeys.length} key(s) from ${problem.templateLabel}:`, @@ -809,6 +827,7 @@ function ensureLocalConfigTemplateKeyParity(localConfigPaths: LocalConfigPaths): for (const key of problem.missingKeys) { console.error(`[start] ${key}`); } + shouldPrintCopyCommands = true; } if (problem.extraKeys.length > 0) { console.error( @@ -817,6 +836,13 @@ function ensureLocalConfigTemplateKeyParity(localConfigPaths: LocalConfigPaths): for (const key of problem.extraKeys) { console.error(`[start] ${key}`); } + shouldPrintCopyCommands = true; + } + if (shouldPrintCopyCommands) { + console.error( + `[start] Quick template reset (overwrites ${problem.label}):`, + ); + printTemplateCopyCommands(problem.templateLabel, problem.label); } } console.error( diff --git a/tests/remotebuddy.llm-telemetry.test.ts b/tests/remotebuddy.llm-telemetry.test.ts new file mode 100644 index 0000000..86fc5d3 --- /dev/null +++ b/tests/remotebuddy.llm-telemetry.test.ts @@ -0,0 +1,69 @@ +import { afterEach, describe, expect, test } from "bun:test"; +import { createLLMClient } from "../apps/remotebuddy/src/llm"; + +const originalFetch = globalThis.fetch; + +afterEach(() => { + globalThis.fetch = originalFetch; +}); + +describe("remotebuddy llm telemetry", () => { + test("reports estimated ollama usage to the server telemetry endpoint", async () => { + const calls: Array<{ url: string; init?: RequestInit }> = []; + globalThis.fetch = (async (input: RequestInfo | URL, init?: RequestInit) => { + const url = + typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; + calls.push({ url, init }); + + if (url === "http://ollama.test/api/chat") { + return new Response(JSON.stringify({ message: { content: "Queue depth is stable." } }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + if (url === "http://server.test/telemetry/llm-usage") { + return new Response(JSON.stringify({ ok: true }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + return new Response("not found", { status: 404 }); + }) as typeof fetch; + + const client = createLLMClient({ + service: "localbuddy", + sessionId: "session-1", + backend: "ollama", + endpoint: "http://ollama.test/api/chat", + model: "tiny-model", + serverUrl: "http://server.test", + authToken: "secret-token", + }); + + const output = await client.generate({ + system: "Answer in one sentence.", + messages: [{ role: "user", content: "How is the queue doing?" }], + temperature: 0.2, + }); + + expect(output.text).toContain("stable"); + expect(output.usage?.promptTokens).toBeGreaterThan(0); + expect(output.usage?.completionTokens).toBeGreaterThan(0); + + const telemetryCall = calls.find((entry) => entry.url === "http://server.test/telemetry/llm-usage"); + expect(telemetryCall).toBeDefined(); + const payload = JSON.parse(String(telemetryCall?.init?.body ?? "{}")) as Record; + expect(payload.service).toBe("localbuddy"); + expect(payload.backend).toBe("ollama"); + expect(payload.modelId).toBe("tiny-model"); + expect(payload.estimated).toBe(true); + expect(payload.totalTokens).toBe( + Number(payload.promptTokens ?? 0) + Number(payload.completionTokens ?? 0), + ); + expect((telemetryCall?.init?.headers as Record).Authorization).toBe( + "Bearer secret-token", + ); + }); +}); diff --git a/tests/server.autonomy-store.test.ts b/tests/server.autonomy-store.test.ts index 93ca367..a7169f6 100644 --- a/tests/server.autonomy-store.test.ts +++ b/tests/server.autonomy-store.test.ts @@ -648,6 +648,56 @@ describe("server AutonomyStore policy gates", () => { } }); + test("aggregates llm usage by service with average tokens per call", () => { + const store = makeStore(); + + expect( + store.recordLlmUsage({ + id: "usage_local_1", + service: "localbuddy", + promptTokens: 120, + completionTokens: 30, + }).ok, + ).toBe(true); + expect( + store.recordLlmUsage({ + id: "usage_local_2", + service: "localbuddy", + promptTokens: 80, + completionTokens: 20, + estimated: true, + }).ok, + ).toBe(true); + expect( + store.recordLlmUsage({ + id: "usage_remote_1", + service: "remotebuddy", + promptTokens: 200, + completionTokens: 50, + }).ok, + ).toBe(true); + + const summary = store.getLlmUsageSummary({ windowHours: 24 }); + + expect(summary.callCount).toBe(3); + expect(summary.totalTokens).toBe(500); + expect(summary.avgTokensPerCall).toBeCloseTo(500 / 3, 5); + expect(summary.estimatedCallCount).toBe(1); + + const localbuddy = summary.services.find((row) => row.service === "localbuddy"); + expect(localbuddy).toBeDefined(); + expect(localbuddy?.callCount).toBe(2); + expect(localbuddy?.totalTokens).toBe(250); + expect(localbuddy?.avgTokensPerCall).toBe(125); + expect(localbuddy?.estimatedCallCount).toBe(1); + + const remotebuddy = summary.services.find((row) => row.service === "remotebuddy"); + expect(remotebuddy).toBeDefined(); + expect(remotebuddy?.callCount).toBe(1); + expect(remotebuddy?.totalTokens).toBe(250); + expect(remotebuddy?.avgTokensPerCall).toBe(250); + }); + test("persists candidates with run-scoped ids to prevent cross-run overwrites", () => { const store = makeStore(); const snapshotA = store.createSnapshot({ sessionId: "s1", runId: "run_a" }).snapshot_id;