From 9e7312d28af7a7029152f6748921428057bc4ea8 Mon Sep 17 00:00:00 2001 From: Aleks Petrov Date: Mon, 27 Apr 2026 13:17:53 +0200 Subject: [PATCH] fix(executor,memory): close token telemetry gap on epic + alt-backend runs (GH-2428) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 65% of executions on Apr 17–27 wrote tokens_total=0 and model_name='' to ~/.pilot/data/pilot.db, including completed runs with real commits. Recorded $104.85 vs. ccusage $574 over the same window — Pilot looked like 18% of spend when actual share is much higher. Cost reporting, model-outcome scoring, and per-task budget enforcement all degraded. Three concrete sources: 1. Epic parent path returned epicResult with no ModelName/TokensTotal (runner.go ~1243). The orchestrator never calls a backend, but the row is indistinguishable from "telemetry missing" downstream. 2. Hardcoded "claude-opus-4-6" fallback in runner.go (lines 1647, 2007) was stale (real CC runs report 4-7) and silently labelled OpenCode/ GLM runs as Claude Opus, biasing model-outcome cost-per-PR. 3. OpenCode SSE path (parseSSEStream) accumulated TokensInput/Output but dropped CacheCreationInputTokens / CacheReadInputTokens — parity gap vs. parseAssistantResponse for v1.4.x usage events. Changes: - Add Runner.fallbackModelName() — config.DefaultModel → OpenCode.Model → backend type. Used in epic result and the two stale hardcoded sites. - Set ModelName on epic orchestrator result so audit queries can split "epic, no backend call" from "telemetry missing". - Accumulate cache tokens in OpenCode SSE; openCodeUsage now accepts both flat (cache_creation_input_tokens) and nested (cache.{read,write}) layouts that different OpenCode builds emit. - Add Store.RecentCompletedTelemetryStats + Dispatcher.checkTelemetryGap: on startup, sample the last 50 completed runs (with commit_sha) and log a warning when ≥50% report tokens_total=0 — early signal that the backend's usage events aren't being parsed. - Tests: fallbackModelName resolution, SSE cache-token capture (flat + nested), telemetry-gap query (commit-only filter excludes epics). Verification: go build ./... + go test ./... clean. --- internal/executor/backend_opencode.go | 39 +++++++++++++-- internal/executor/backend_opencode_test.go | 53 +++++++++++++++++++++ internal/executor/dispatcher.go | 35 ++++++++++++++ internal/executor/runner.go | 45 ++++++++++++++---- internal/executor/runner_test.go | 46 ++++++++++++++++++ internal/memory/metrics.go | 38 +++++++++++++++ internal/memory/store_test.go | 55 ++++++++++++++++++++++ 7 files changed, 299 insertions(+), 12 deletions(-) diff --git a/internal/executor/backend_opencode.go b/internal/executor/backend_opencode.go index 7db8caa5..8077378f 100644 --- a/internal/executor/backend_opencode.go +++ b/internal/executor/backend_opencode.go @@ -440,9 +440,12 @@ func (b *OpenCodeBackend) parseSSEStream(reader io.Reader, opts ExecuteOptions, } } - // Accumulate token usage + // Accumulate token usage. GH-2428: also accumulate cache tokens so + // the SSE path matches the synchronous parseAssistantResponse path. result.TokensInput += event.TokensInput result.TokensOutput += event.TokensOutput + result.CacheCreationInputTokens += event.CacheCreationInputTokens + result.CacheReadInputTokens += event.CacheReadInputTokens if event.Model != "" { result.Model = event.Model } @@ -507,6 +510,8 @@ func (b *OpenCodeBackend) parseOpenCodeEvent(data string) BackendEvent { if ocEvent.Usage != nil { event.TokensInput = ocEvent.Usage.InputTokens event.TokensOutput = ocEvent.Usage.OutputTokens + event.CacheCreationInputTokens = ocEvent.Usage.cacheCreate() + event.CacheReadInputTokens = ocEvent.Usage.cacheRead() } default: @@ -515,10 +520,13 @@ func (b *OpenCodeBackend) parseOpenCodeEvent(data string) BackendEvent { event.Message = data } - // Extract usage if present + // Extract usage if present (covers events where the usage block lives at + // the top level rather than under a dedicated "usage" event type). if ocEvent.Usage != nil { event.TokensInput = ocEvent.Usage.InputTokens event.TokensOutput = ocEvent.Usage.OutputTokens + event.CacheCreationInputTokens = ocEvent.Usage.cacheCreate() + event.CacheReadInputTokens = ocEvent.Usage.cacheRead() } if ocEvent.Model != "" { event.Model = ocEvent.Model @@ -544,9 +552,32 @@ type openCodeDelta struct { Text string `json:"text,omitempty"` } +// openCodeUsage matches the usage shape OpenCode v1.4.x emits in SSE events. +// Both flat (cache_*) and nested (cache.{read,write}) shapes are accepted — +// different OpenCode builds and provider passthroughs use different layouts. +// GH-2428. type openCodeUsage struct { - InputTokens int64 `json:"input_tokens"` - OutputTokens int64 `json:"output_tokens"` + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` + CacheCreationInputTokens int64 `json:"cache_creation_input_tokens,omitempty"` + CacheReadInputTokens int64 `json:"cache_read_input_tokens,omitempty"` + Cache ocCacheToken `json:"cache,omitempty"` +} + +// cacheCreate returns cache creation tokens from either layout. +func (u openCodeUsage) cacheCreate() int64 { + if u.CacheCreationInputTokens > 0 { + return u.CacheCreationInputTokens + } + return u.Cache.Write +} + +// cacheRead returns cache read tokens from either layout. +func (u openCodeUsage) cacheRead() int64 { + if u.CacheReadInputTokens > 0 { + return u.CacheReadInputTokens + } + return u.Cache.Read } // ocAssistantResponse mirrors the response body of diff --git a/internal/executor/backend_opencode_test.go b/internal/executor/backend_opencode_test.go index 224e294c..e01ad703 100644 --- a/internal/executor/backend_opencode_test.go +++ b/internal/executor/backend_opencode_test.go @@ -582,3 +582,56 @@ func TestOpenCodeEventStructs(t *testing.T) { t.Errorf("Usage.InputTokens = %d, want 100", event.Usage.InputTokens) } } + +// TestOpenCodeBackendParseSSEStreamCacheTokens verifies that the SSE path +// captures cache_creation/cache_read fields from a usage event, matching the +// synchronous parseAssistantResponse path. GH-2428. +func TestOpenCodeBackendParseSSEStreamCacheTokens(t *testing.T) { + backend := NewOpenCodeBackend(nil) + + // Two SSE events: a usage event with cache fields, then a result event. + // SSE format: each event ends with a blank line; we also need a trailing + // blank line on the last event to trigger dispatch. + sse := "data: {\"type\":\"usage\",\"usage\":{\"input_tokens\":10,\"output_tokens\":20,\"cache_creation_input_tokens\":3,\"cache_read_input_tokens\":4},\"model\":\"glm-5.1\"}\n\n" + + "data: {\"type\":\"done\",\"output\":\"ok\"}\n\n" + + result := &BackendResult{} + opts := ExecuteOptions{} + if err := backend.parseSSEStream(strings.NewReader(sse), opts, result); err != nil { + t.Fatalf("parseSSEStream error = %v", err) + } + + if result.TokensInput != 10 || result.TokensOutput != 20 { + t.Errorf("tokens = %d/%d, want 10/20", result.TokensInput, result.TokensOutput) + } + if result.CacheCreationInputTokens != 3 { + t.Errorf("CacheCreationInputTokens = %d, want 3", result.CacheCreationInputTokens) + } + if result.CacheReadInputTokens != 4 { + t.Errorf("CacheReadInputTokens = %d, want 4", result.CacheReadInputTokens) + } + if result.Model != "glm-5.1" { + t.Errorf("Model = %q, want glm-5.1", result.Model) + } +} + +// TestOpenCodeBackendParseSSENestedCache verifies that the nested +// {cache:{read,write}} usage layout is also accepted by SSE parsing. GH-2428. +func TestOpenCodeBackendParseSSENestedCache(t *testing.T) { + backend := NewOpenCodeBackend(nil) + + // Trailing empty line is required so bufio.Scanner yields the blank line + // that marks the end of the SSE event. + sse := "data: {\"type\":\"usage\",\"usage\":{\"input_tokens\":1,\"output_tokens\":2,\"cache\":{\"read\":5,\"write\":6}}}\n\n" + + result := &BackendResult{} + if err := backend.parseSSEStream(strings.NewReader(sse), ExecuteOptions{}, result); err != nil { + t.Fatalf("parseSSEStream error = %v", err) + } + if result.CacheCreationInputTokens != 6 { + t.Errorf("CacheCreationInputTokens = %d, want 6 (from cache.write)", result.CacheCreationInputTokens) + } + if result.CacheReadInputTokens != 5 { + t.Errorf("CacheReadInputTokens = %d, want 5 (from cache.read)", result.CacheReadInputTokens) + } +} diff --git a/internal/executor/dispatcher.go b/internal/executor/dispatcher.go index ad16e168..d323c4bc 100644 --- a/internal/executor/dispatcher.go +++ b/internal/executor/dispatcher.go @@ -108,6 +108,11 @@ func (d *Dispatcher) Start(ctx context.Context) error { // Initial recovery pass on startup. d.recoverStaleTasks() + // GH-2428: warn when the last batch of completed runs has no token + // telemetry. A persistent gap means the backend's usage events aren't + // being parsed — cost reporting and per-task budgets silently degrade. + d.checkTelemetryGap() + // Launch periodic recovery loop. d.wg.Add(1) go d.runStaleRecoveryLoop(ctx) @@ -115,6 +120,36 @@ func (d *Dispatcher) Start(ctx context.Context) error { return nil } +// checkTelemetryGap inspects recent completed executions and logs a warning +// when token telemetry is mostly missing. Threshold: ≥50% of the last 50 +// completed runs (with a real commit) reporting tokens_total=0. GH-2428. +func (d *Dispatcher) checkTelemetryGap() { + const sampleSize = 50 + const threshold = 0.5 + stats, err := d.store.RecentCompletedTelemetryStats(sampleSize) + if err != nil { + d.log.Debug("Skipping telemetry gap check", slog.Any("error", err)) + return + } + if stats.CompletedRuns < 10 { + return // Not enough data + } + ratio := float64(stats.ZeroTokenRuns) / float64(stats.CompletedRuns) + if ratio >= threshold { + backend := "claude-code" + if d.runner != nil { + backend = d.runner.backendType() + } + d.log.Warn("Token telemetry gap detected — recent completed runs report 0 tokens", + slog.String("backend", backend), + slog.Int("completed_runs", stats.CompletedRuns), + slog.Int("zero_token_runs", stats.ZeroTokenRuns), + slog.Float64("zero_token_ratio", ratio), + slog.String("hint", "verify backend usage events are being parsed (GH-2428)"), + ) + } +} + // runStaleRecoveryLoop ticks every StaleRecoveryInterval and calls // recoverStaleTasks. It stops when ctx is cancelled or the dispatcher stops. func (d *Dispatcher) runStaleRecoveryLoop(ctx context.Context) { diff --git a/internal/executor/runner.go b/internal/executor/runner.go index 813cc20c..d070519c 100644 --- a/internal/executor/runner.go +++ b/internal/executor/runner.go @@ -562,6 +562,28 @@ func (r *Runner) backendType() string { return "claude-code" } +// fallbackModelName returns the best-known model name for telemetry rows when +// the backend stream did not surface a model field. Used to distinguish +// "telemetry-missing" from "true-zero" runs in execution_metrics. Resolution: +// 1. config.DefaultModel (set when running via OpenCode/GLM/etc.) +// 2. OpenCode config.Model (e.g. "anthropic/claude-sonnet-4-6") +// 3. Backend type prefix (e.g. "claude-code", "opencode") — never empty. +// +// GH-2428: previously runner.go hardcoded "claude-opus-4-6" as the fallback, +// which (a) was stale (real Claude Code runs report 4-7) and (b) silently +// labelled OpenCode/GLM runs as Claude Opus, biasing cost/model metrics. +func (r *Runner) fallbackModelName() string { + if r.config != nil { + if r.config.DefaultModel != "" { + return r.config.DefaultModel + } + if r.config.OpenCode != nil && r.config.Type == BackendTypeOpenCode && r.config.OpenCode.Model != "" { + return r.config.OpenCode.Model + } + } + return r.backendType() +} + // SetBackend changes the execution backend. func (r *Runner) SetBackend(backend Backend) { r.backend = backend @@ -1240,13 +1262,16 @@ func (r *Runner) executeWithOptions(ctx context.Context, task *Task, allowWorktr // GH-539: Epic sub-executions may have created commits on the branch. // Push branch and create PR to propagate deliverables. + // GH-2428: Set ModelName so the saved row distinguishes "epic + // orchestrator (no backend call)" from "telemetry-missing". epicResult := &ExecutionResult{ - TaskID: task.ID, - Success: true, - Output: fmt.Sprintf("Epic completed: %d sub-issues executed", len(issues)), - Duration: time.Since(start), - IsEpic: true, - EpicPlan: plan, + TaskID: task.ID, + Success: true, + Output: fmt.Sprintf("Epic completed: %d sub-issues executed", len(issues)), + Duration: time.Since(start), + IsEpic: true, + EpicPlan: plan, + ModelName: r.fallbackModelName(), } if task.CreatePR && task.Branch != "" { @@ -1644,7 +1669,7 @@ func (r *Runner) executeWithOptions(ctx context.Context, task *Task, allowWorktr result.CacheReadInputTokens = state.cacheReadInputTokens result.ModelName = state.modelName if result.ModelName == "" { - result.ModelName = "claude-opus-4-6" + result.ModelName = r.fallbackModelName() } result.EstimatedCostUSD = estimateCostWithCache(result.TokensInput, result.TokensOutput, result.CacheCreationInputTokens, result.CacheReadInputTokens, result.ModelName) log.Warn("Task cancelled due to per-task budget limit", @@ -2004,7 +2029,11 @@ retrySucceeded: result.ModelName = state.modelName } if result.ModelName == "" { - result.ModelName = "claude-opus-4-6" // Default model + // GH-2428: derive from config (DefaultModel/OpenCode.Model/backend type) + // instead of hardcoding "claude-opus-4-6". The hardcoded value was stale + // (Claude Code reports 4-7) and silently labelled OpenCode/GLM runs as + // Claude Opus, biasing model-outcome metrics. + result.ModelName = r.fallbackModelName() } // Estimate cost based on token usage (including research tokens) with cache-aware pricing (GH-2164) result.EstimatedCostUSD = estimateCostWithCache(result.TokensInput+result.ResearchTokens, result.TokensOutput, result.CacheCreationInputTokens, result.CacheReadInputTokens, result.ModelName) diff --git a/internal/executor/runner_test.go b/internal/executor/runner_test.go index ddd673ec..32b01eeb 100644 --- a/internal/executor/runner_test.go +++ b/internal/executor/runner_test.go @@ -3358,3 +3358,49 @@ func TestIsPermanentFailure(t *testing.T) { }) } } + +// TestRunnerFallbackModelName verifies that the telemetry fallback model name +// reflects the configured backend, not a hardcoded default. GH-2428. +func TestRunnerFallbackModelName(t *testing.T) { + tests := []struct { + name string + cfg *BackendConfig + want string + }{ + { + name: "default model overrides everything", + cfg: &BackendConfig{ + Type: BackendTypeOpenCode, + DefaultModel: "glm-5.1", + OpenCode: &OpenCodeConfig{Model: "anthropic/claude-sonnet-4-6"}, + }, + want: "glm-5.1", + }, + { + name: "opencode falls back to OpenCode.Model", + cfg: &BackendConfig{ + Type: BackendTypeOpenCode, + OpenCode: &OpenCodeConfig{Model: "anthropic/claude-sonnet-4-6"}, + }, + want: "anthropic/claude-sonnet-4-6", + }, + { + name: "claude-code with no DefaultModel falls back to backend type", + cfg: &BackendConfig{Type: BackendTypeClaudeCode}, + want: BackendTypeClaudeCode, + }, + { + name: "nil config returns claude-code default", + cfg: nil, + want: "claude-code", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &Runner{config: tt.cfg} + if got := r.fallbackModelName(); got != tt.want { + t.Errorf("fallbackModelName() = %q, want %q", got, tt.want) + } + }) + } +} diff --git a/internal/memory/metrics.go b/internal/memory/metrics.go index 0c6660a4..37da9414 100644 --- a/internal/memory/metrics.go +++ b/internal/memory/metrics.go @@ -159,6 +159,44 @@ func EstimateCost(inputTokens, outputTokens int64, model string) float64 { return inputCost + outputCost } +// TelemetryGapStats summarises how many recent completed executions reported +// zero token usage, used by the startup health check (GH-2428). A high ratio +// signals the configured backend's usage events aren't being parsed — cost +// reporting and per-task budgets silently misbehave when this happens. +type TelemetryGapStats struct { + CompletedRuns int // Completed runs inspected (with non-empty commit_sha) + ZeroTokenRuns int // Subset where tokens_total = 0 +} + +// RecentCompletedTelemetryStats counts how many of the last `limit` completed +// executions with a real commit reported tokens_total = 0. Excludes epic +// orchestrator rows (no commit_sha) so we measure backend telemetry, not +// the parent-task path that legitimately has no tokens. GH-2428. +func (s *Store) RecentCompletedTelemetryStats(limit int) (*TelemetryGapStats, error) { + if limit <= 0 { + limit = 50 + } + row := s.db.QueryRow(` + SELECT + COUNT(*) as total, + COALESCE(SUM(CASE WHEN tokens_total = 0 THEN 1 ELSE 0 END), 0) as zero_tokens + FROM ( + SELECT tokens_total + FROM executions + WHERE status = 'completed' + AND commit_sha != '' + AND commit_sha IS NOT NULL + ORDER BY created_at DESC + LIMIT ? + ) + `, limit) + stats := &TelemetryGapStats{} + if err := row.Scan(&stats.CompletedRuns, &stats.ZeroTokenRuns); err != nil { + return nil, fmt.Errorf("failed to scan telemetry gap stats: %w", err) + } + return stats, nil +} + // SaveExecutionMetrics saves metrics for an execution func (s *Store) SaveExecutionMetrics(metrics *ExecutionMetrics) error { return s.withRetry("SaveExecutionMetrics", func() error { diff --git a/internal/memory/store_test.go b/internal/memory/store_test.go index b4b2a9aa..1b6b0949 100644 --- a/internal/memory/store_test.go +++ b/internal/memory/store_test.go @@ -1750,3 +1750,58 @@ func TestUpdateExecutionStatusByTaskID_NoMatchingTask(t *testing.T) { t.Fatalf("expected no error for non-existent task, got: %v", err) } } + +// TestRecentCompletedTelemetryStats verifies the zero-token telemetry gap +// query: rows are filtered to completed runs with a real commit, and rows +// without commit_sha (e.g. epic orchestrators) are excluded so they don't +// inflate the gap ratio. GH-2428. +func TestRecentCompletedTelemetryStats(t *testing.T) { + tmpDir := t.TempDir() + store, err := NewStore(tmpDir) + if err != nil { + t.Fatalf("NewStore: %v", err) + } + defer func() { _ = store.Close() }() + + type rec struct { + id string + status string + commit string + tokens int64 + } + rows := []rec{ + {"a", "completed", "deadbeef", 100}, // counts, not zero + {"b", "completed", "cafe1234", 0}, // counts, zero + {"c", "completed", "ba5eba11", 0}, // counts, zero + {"d", "completed", "", 0}, // SKIPPED (no commit — epic orchestrator) + {"e", "failed", "feedface", 0}, // SKIPPED (not completed) + } + for _, r := range rows { + if err := store.SaveExecution(&Execution{ + ID: r.id, + TaskID: "T-" + r.id, + ProjectPath: "/x", + Status: r.status, + CommitSHA: r.commit, + }); err != nil { + t.Fatalf("SaveExecution %s: %v", r.id, err) + } + if err := store.SaveExecutionMetrics(&ExecutionMetrics{ + ExecutionID: r.id, + TokensTotal: r.tokens, + }); err != nil { + t.Fatalf("SaveExecutionMetrics %s: %v", r.id, err) + } + } + + stats, err := store.RecentCompletedTelemetryStats(50) + if err != nil { + t.Fatalf("RecentCompletedTelemetryStats: %v", err) + } + if stats.CompletedRuns != 3 { + t.Errorf("CompletedRuns = %d, want 3 (only completed+commit rows)", stats.CompletedRuns) + } + if stats.ZeroTokenRuns != 2 { + t.Errorf("ZeroTokenRuns = %d, want 2", stats.ZeroTokenRuns) + } +}