Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions internal/executor/backend_opencode.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,12 @@ func (b *OpenCodeBackend) parseSSEStream(reader io.Reader, opts ExecuteOptions,
}
}

// Accumulate token usage
// Accumulate token usage. GH-2428: also accumulate cache tokens so
// the SSE path matches the synchronous parseAssistantResponse path.
result.TokensInput += event.TokensInput
result.TokensOutput += event.TokensOutput
result.CacheCreationInputTokens += event.CacheCreationInputTokens
result.CacheReadInputTokens += event.CacheReadInputTokens
if event.Model != "" {
result.Model = event.Model
}
Expand Down Expand Up @@ -507,6 +510,8 @@ func (b *OpenCodeBackend) parseOpenCodeEvent(data string) BackendEvent {
if ocEvent.Usage != nil {
event.TokensInput = ocEvent.Usage.InputTokens
event.TokensOutput = ocEvent.Usage.OutputTokens
event.CacheCreationInputTokens = ocEvent.Usage.cacheCreate()
event.CacheReadInputTokens = ocEvent.Usage.cacheRead()
}

default:
Expand All @@ -515,10 +520,13 @@ func (b *OpenCodeBackend) parseOpenCodeEvent(data string) BackendEvent {
event.Message = data
}

// Extract usage if present
// Extract usage if present (covers events where the usage block lives at
// the top level rather than under a dedicated "usage" event type).
if ocEvent.Usage != nil {
event.TokensInput = ocEvent.Usage.InputTokens
event.TokensOutput = ocEvent.Usage.OutputTokens
event.CacheCreationInputTokens = ocEvent.Usage.cacheCreate()
event.CacheReadInputTokens = ocEvent.Usage.cacheRead()
}
if ocEvent.Model != "" {
event.Model = ocEvent.Model
Expand All @@ -544,9 +552,32 @@ type openCodeDelta struct {
Text string `json:"text,omitempty"`
}

// openCodeUsage matches the usage shape OpenCode v1.4.x emits in SSE events.
// Both flat (cache_*) and nested (cache.{read,write}) shapes are accepted —
// different OpenCode builds and provider passthroughs use different layouts.
// GH-2428.
type openCodeUsage struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheCreationInputTokens int64 `json:"cache_creation_input_tokens,omitempty"`
CacheReadInputTokens int64 `json:"cache_read_input_tokens,omitempty"`
Cache ocCacheToken `json:"cache,omitempty"`
}

// cacheCreate returns cache creation tokens from either layout.
func (u openCodeUsage) cacheCreate() int64 {
if u.CacheCreationInputTokens > 0 {
return u.CacheCreationInputTokens
}
return u.Cache.Write
}

// cacheRead returns cache read tokens from either layout.
func (u openCodeUsage) cacheRead() int64 {
if u.CacheReadInputTokens > 0 {
return u.CacheReadInputTokens
}
return u.Cache.Read
}

// ocAssistantResponse mirrors the response body of
Expand Down
53 changes: 53 additions & 0 deletions internal/executor/backend_opencode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,3 +582,56 @@ func TestOpenCodeEventStructs(t *testing.T) {
t.Errorf("Usage.InputTokens = %d, want 100", event.Usage.InputTokens)
}
}

// TestOpenCodeBackendParseSSEStreamCacheTokens verifies that the SSE path
// captures cache_creation/cache_read fields from a usage event, matching the
// synchronous parseAssistantResponse path. GH-2428.
func TestOpenCodeBackendParseSSEStreamCacheTokens(t *testing.T) {
backend := NewOpenCodeBackend(nil)

// Two SSE events: a usage event with cache fields, then a result event.
// SSE format: each event ends with a blank line; we also need a trailing
// blank line on the last event to trigger dispatch.
sse := "data: {\"type\":\"usage\",\"usage\":{\"input_tokens\":10,\"output_tokens\":20,\"cache_creation_input_tokens\":3,\"cache_read_input_tokens\":4},\"model\":\"glm-5.1\"}\n\n" +
"data: {\"type\":\"done\",\"output\":\"ok\"}\n\n"

result := &BackendResult{}
opts := ExecuteOptions{}
if err := backend.parseSSEStream(strings.NewReader(sse), opts, result); err != nil {
t.Fatalf("parseSSEStream error = %v", err)
}

if result.TokensInput != 10 || result.TokensOutput != 20 {
t.Errorf("tokens = %d/%d, want 10/20", result.TokensInput, result.TokensOutput)
}
if result.CacheCreationInputTokens != 3 {
t.Errorf("CacheCreationInputTokens = %d, want 3", result.CacheCreationInputTokens)
}
if result.CacheReadInputTokens != 4 {
t.Errorf("CacheReadInputTokens = %d, want 4", result.CacheReadInputTokens)
}
if result.Model != "glm-5.1" {
t.Errorf("Model = %q, want glm-5.1", result.Model)
}
}

// TestOpenCodeBackendParseSSENestedCache verifies that the nested
// {cache:{read,write}} usage layout is also accepted by SSE parsing. GH-2428.
func TestOpenCodeBackendParseSSENestedCache(t *testing.T) {
backend := NewOpenCodeBackend(nil)

// Trailing empty line is required so bufio.Scanner yields the blank line
// that marks the end of the SSE event.
sse := "data: {\"type\":\"usage\",\"usage\":{\"input_tokens\":1,\"output_tokens\":2,\"cache\":{\"read\":5,\"write\":6}}}\n\n"

result := &BackendResult{}
if err := backend.parseSSEStream(strings.NewReader(sse), ExecuteOptions{}, result); err != nil {
t.Fatalf("parseSSEStream error = %v", err)
}
if result.CacheCreationInputTokens != 6 {
t.Errorf("CacheCreationInputTokens = %d, want 6 (from cache.write)", result.CacheCreationInputTokens)
}
if result.CacheReadInputTokens != 5 {
t.Errorf("CacheReadInputTokens = %d, want 5 (from cache.read)", result.CacheReadInputTokens)
}
}
35 changes: 35 additions & 0 deletions internal/executor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,48 @@ func (d *Dispatcher) Start(ctx context.Context) error {
// Initial recovery pass on startup.
d.recoverStaleTasks()

// GH-2428: warn when the last batch of completed runs has no token
// telemetry. A persistent gap means the backend's usage events aren't
// being parsed — cost reporting and per-task budgets silently degrade.
d.checkTelemetryGap()

// Launch periodic recovery loop.
d.wg.Add(1)
go d.runStaleRecoveryLoop(ctx)

return nil
}

// checkTelemetryGap inspects recent completed executions and logs a warning
// when token telemetry is mostly missing. Threshold: ≥50% of the last 50
// completed runs (with a real commit) reporting tokens_total=0. GH-2428.
func (d *Dispatcher) checkTelemetryGap() {
const sampleSize = 50
const threshold = 0.5
stats, err := d.store.RecentCompletedTelemetryStats(sampleSize)
if err != nil {
d.log.Debug("Skipping telemetry gap check", slog.Any("error", err))
return
}
if stats.CompletedRuns < 10 {
return // Not enough data
}
ratio := float64(stats.ZeroTokenRuns) / float64(stats.CompletedRuns)
if ratio >= threshold {
backend := "claude-code"
if d.runner != nil {
backend = d.runner.backendType()
}
d.log.Warn("Token telemetry gap detected — recent completed runs report 0 tokens",
slog.String("backend", backend),
slog.Int("completed_runs", stats.CompletedRuns),
slog.Int("zero_token_runs", stats.ZeroTokenRuns),
slog.Float64("zero_token_ratio", ratio),
slog.String("hint", "verify backend usage events are being parsed (GH-2428)"),
)
}
}

// runStaleRecoveryLoop ticks every StaleRecoveryInterval and calls
// recoverStaleTasks. It stops when ctx is cancelled or the dispatcher stops.
func (d *Dispatcher) runStaleRecoveryLoop(ctx context.Context) {
Expand Down
45 changes: 37 additions & 8 deletions internal/executor/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,28 @@ func (r *Runner) backendType() string {
return "claude-code"
}

// fallbackModelName returns the best-known model name for telemetry rows when
// the backend stream did not surface a model field. Used to distinguish
// "telemetry-missing" from "true-zero" runs in execution_metrics. Resolution:
// 1. config.DefaultModel (set when running via OpenCode/GLM/etc.)
// 2. OpenCode config.Model (e.g. "anthropic/claude-sonnet-4-6")
// 3. Backend type prefix (e.g. "claude-code", "opencode") — never empty.
//
// GH-2428: previously runner.go hardcoded "claude-opus-4-6" as the fallback,
// which (a) was stale (real Claude Code runs report 4-7) and (b) silently
// labelled OpenCode/GLM runs as Claude Opus, biasing cost/model metrics.
func (r *Runner) fallbackModelName() string {
if r.config != nil {
if r.config.DefaultModel != "" {
return r.config.DefaultModel
}
if r.config.OpenCode != nil && r.config.Type == BackendTypeOpenCode && r.config.OpenCode.Model != "" {
return r.config.OpenCode.Model
}
}
return r.backendType()
}

// SetBackend changes the execution backend.
func (r *Runner) SetBackend(backend Backend) {
r.backend = backend
Expand Down Expand Up @@ -1240,13 +1262,16 @@ func (r *Runner) executeWithOptions(ctx context.Context, task *Task, allowWorktr

// GH-539: Epic sub-executions may have created commits on the branch.
// Push branch and create PR to propagate deliverables.
// GH-2428: Set ModelName so the saved row distinguishes "epic
// orchestrator (no backend call)" from "telemetry-missing".
epicResult := &ExecutionResult{
TaskID: task.ID,
Success: true,
Output: fmt.Sprintf("Epic completed: %d sub-issues executed", len(issues)),
Duration: time.Since(start),
IsEpic: true,
EpicPlan: plan,
TaskID: task.ID,
Success: true,
Output: fmt.Sprintf("Epic completed: %d sub-issues executed", len(issues)),
Duration: time.Since(start),
IsEpic: true,
EpicPlan: plan,
ModelName: r.fallbackModelName(),
}

if task.CreatePR && task.Branch != "" {
Expand Down Expand Up @@ -1644,7 +1669,7 @@ func (r *Runner) executeWithOptions(ctx context.Context, task *Task, allowWorktr
result.CacheReadInputTokens = state.cacheReadInputTokens
result.ModelName = state.modelName
if result.ModelName == "" {
result.ModelName = "claude-opus-4-6"
result.ModelName = r.fallbackModelName()
}
result.EstimatedCostUSD = estimateCostWithCache(result.TokensInput, result.TokensOutput, result.CacheCreationInputTokens, result.CacheReadInputTokens, result.ModelName)
log.Warn("Task cancelled due to per-task budget limit",
Expand Down Expand Up @@ -2004,7 +2029,11 @@ retrySucceeded:
result.ModelName = state.modelName
}
if result.ModelName == "" {
result.ModelName = "claude-opus-4-6" // Default model
// GH-2428: derive from config (DefaultModel/OpenCode.Model/backend type)
// instead of hardcoding "claude-opus-4-6". The hardcoded value was stale
// (Claude Code reports 4-7) and silently labelled OpenCode/GLM runs as
// Claude Opus, biasing model-outcome metrics.
result.ModelName = r.fallbackModelName()
}
// Estimate cost based on token usage (including research tokens) with cache-aware pricing (GH-2164)
result.EstimatedCostUSD = estimateCostWithCache(result.TokensInput+result.ResearchTokens, result.TokensOutput, result.CacheCreationInputTokens, result.CacheReadInputTokens, result.ModelName)
Expand Down
46 changes: 46 additions & 0 deletions internal/executor/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3358,3 +3358,49 @@ func TestIsPermanentFailure(t *testing.T) {
})
}
}

// TestRunnerFallbackModelName verifies that the telemetry fallback model name
// reflects the configured backend, not a hardcoded default. GH-2428.
func TestRunnerFallbackModelName(t *testing.T) {
tests := []struct {
name string
cfg *BackendConfig
want string
}{
{
name: "default model overrides everything",
cfg: &BackendConfig{
Type: BackendTypeOpenCode,
DefaultModel: "glm-5.1",
OpenCode: &OpenCodeConfig{Model: "anthropic/claude-sonnet-4-6"},
},
want: "glm-5.1",
},
{
name: "opencode falls back to OpenCode.Model",
cfg: &BackendConfig{
Type: BackendTypeOpenCode,
OpenCode: &OpenCodeConfig{Model: "anthropic/claude-sonnet-4-6"},
},
want: "anthropic/claude-sonnet-4-6",
},
{
name: "claude-code with no DefaultModel falls back to backend type",
cfg: &BackendConfig{Type: BackendTypeClaudeCode},
want: BackendTypeClaudeCode,
},
{
name: "nil config returns claude-code default",
cfg: nil,
want: "claude-code",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &Runner{config: tt.cfg}
if got := r.fallbackModelName(); got != tt.want {
t.Errorf("fallbackModelName() = %q, want %q", got, tt.want)
}
})
}
}
38 changes: 38 additions & 0 deletions internal/memory/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,44 @@ func EstimateCost(inputTokens, outputTokens int64, model string) float64 {
return inputCost + outputCost
}

// TelemetryGapStats summarises how many recent completed executions reported
// zero token usage, used by the startup health check (GH-2428). A high ratio
// signals the configured backend's usage events aren't being parsed — cost
// reporting and per-task budgets silently misbehave when this happens.
type TelemetryGapStats struct {
CompletedRuns int // Completed runs inspected (with non-empty commit_sha)
ZeroTokenRuns int // Subset where tokens_total = 0
}

// RecentCompletedTelemetryStats counts how many of the last `limit` completed
// executions with a real commit reported tokens_total = 0. Excludes epic
// orchestrator rows (no commit_sha) so we measure backend telemetry, not
// the parent-task path that legitimately has no tokens. GH-2428.
func (s *Store) RecentCompletedTelemetryStats(limit int) (*TelemetryGapStats, error) {
if limit <= 0 {
limit = 50
}
row := s.db.QueryRow(`
SELECT
COUNT(*) as total,
COALESCE(SUM(CASE WHEN tokens_total = 0 THEN 1 ELSE 0 END), 0) as zero_tokens
FROM (
SELECT tokens_total
FROM executions
WHERE status = 'completed'
AND commit_sha != ''
AND commit_sha IS NOT NULL
ORDER BY created_at DESC
LIMIT ?
)
`, limit)
stats := &TelemetryGapStats{}
if err := row.Scan(&stats.CompletedRuns, &stats.ZeroTokenRuns); err != nil {
return nil, fmt.Errorf("failed to scan telemetry gap stats: %w", err)
}
return stats, nil
}

// SaveExecutionMetrics saves metrics for an execution
func (s *Store) SaveExecutionMetrics(metrics *ExecutionMetrics) error {
return s.withRetry("SaveExecutionMetrics", func() error {
Expand Down
Loading
Loading