diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 0da3293504..432bd81495 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -285,12 +285,12 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r } helps.AppendAPIResponseChunk(ctx, e.cfg, data) if stream { + var acc helps.ClaudeStreamUsageAccumulator lines := bytes.Split(data, []byte("\n")) for _, line := range lines { - if detail, ok := helps.ParseClaudeStreamUsage(line); ok { - reporter.Publish(ctx, detail) - } + acc.ProcessLine(line) } + acc.Publish(ctx, reporter) } else { reporter.Publish(ctx, helps.ParseClaudeUsage(data)) } @@ -465,14 +465,13 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A // If from == to (Claude → Claude), directly forward the SSE stream without translation if from == to { + var acc helps.ClaudeStreamUsageAccumulator scanner := bufio.NewScanner(decodedBody) scanner.Buffer(nil, 52_428_800) // 50MB for scanner.Scan() { line := scanner.Bytes() helps.AppendAPIResponseChunk(ctx, e.cfg, line) - if detail, ok := helps.ParseClaudeStreamUsage(line); ok { - reporter.Publish(ctx, detail) - } + acc.ProcessLine(line) if isClaudeOAuthToken(apiKey) && !auth.ToolPrefixDisabled() { line = stripClaudeToolPrefixFromStreamLine(line, claudeToolPrefix) } @@ -489,20 +488,21 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} + } else { + acc.Publish(ctx, reporter) } return } // For other formats, use translation + var acc helps.ClaudeStreamUsageAccumulator scanner := bufio.NewScanner(decodedBody) scanner.Buffer(nil, 52_428_800) // 50MB var param any for scanner.Scan() { line := scanner.Bytes() helps.AppendAPIResponseChunk(ctx, e.cfg, line) - if detail, ok := helps.ParseClaudeStreamUsage(line); ok { - reporter.Publish(ctx, detail) - } + acc.ProcessLine(line) if isClaudeOAuthToken(apiKey) && !auth.ToolPrefixDisabled() { line = stripClaudeToolPrefixFromStreamLine(line, claudeToolPrefix) } @@ -527,6 +527,8 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} + } else { + acc.Publish(ctx, reporter) } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/helps/usage_helpers.go b/internal/runtime/executor/helps/usage_helpers.go index 8da8fd1e7a..949186afc4 100644 --- a/internal/runtime/executor/helps/usage_helpers.go +++ b/internal/runtime/executor/helps/usage_helpers.go @@ -272,29 +272,106 @@ func ParseClaudeUsage(data []byte) usage.Detail { // fall back to creation tokens when read tokens are absent detail.CachedTokens = usageNode.Get("cache_creation_input_tokens").Int() } + // Estimate reasoning tokens from thinking content blocks. + // Claude API does not expose thinking tokens in the usage object, + // so we approximate by summing thinking block text lengths / 4. + detail.ReasoningTokens = estimateClaudeThinkingTokens(data) detail.TotalTokens = detail.InputTokens + detail.OutputTokens return detail } -func ParseClaudeStreamUsage(line []byte) (usage.Detail, bool) { +// ParseClaudeStreamUsage is intentionally removed. +// Use ClaudeStreamUsageAccumulator.ProcessLine instead, which merges +// usage fields across all SSE events and publishes once at stream end. + +// claudeThinkingTokenFactor is the approximate characters-per-token ratio +// used to estimate thinking token counts from content block text length. +const claudeThinkingTokenFactor = 4 + +// estimateClaudeThinkingTokens scans Claude response content blocks for +// type="thinking" entries and estimates token count from text length. +func estimateClaudeThinkingTokens(data []byte) int64 { + var total int64 + gjson.ParseBytes(data).Get("content").ForEach(func(_, block gjson.Result) bool { + if block.Get("type").String() == "thinking" { + total += int64(len(block.Get("thinking").String())) + } + return true + }) + return total / claudeThinkingTokenFactor +} + +// claudeStreamThinkingLen returns the byte length of thinking text in a +// streaming content_block_delta line. Returns 0 for non-thinking lines. +func claudeStreamThinkingLen(line []byte) int { + payload := jsonPayload(line) + if len(payload) == 0 { + return 0 + } + p := gjson.ParseBytes(payload) + if p.Get("type").String() != "content_block_delta" { + return 0 + } + delta := p.Get("delta") + if delta.Get("type").String() != "thinking_delta" { + return 0 + } + return len(delta.Get("thinking").String()) +} + +// ClaudeStreamUsageAccumulator merges usage fields across Claude SSE events +// and accumulates thinking content block lengths for reasoning token estimation. +// message_start carries input_tokens + cache_read_input_tokens (under message.usage), +// message_delta carries output_tokens (under usage), and content_block_delta carries +// thinking text. Publishing happens once at the end of the stream. +type ClaudeStreamUsageAccumulator struct { + detail usage.Detail + thinkingLen int64 + sawUsage bool +} + +// ProcessLine extracts usage and thinking data from a single SSE line. +func (a *ClaudeStreamUsageAccumulator) ProcessLine(line []byte) { + a.thinkingLen += int64(claudeStreamThinkingLen(line)) + payload := jsonPayload(line) if len(payload) == 0 || !gjson.ValidBytes(payload) { - return usage.Detail{}, false + return } + // message_start nests usage under "message.usage"; message_delta uses top-level "usage". usageNode := gjson.GetBytes(payload, "usage") if !usageNode.Exists() { - return usage.Detail{}, false + usageNode = gjson.GetBytes(payload, "message.usage") } - detail := usage.Detail{ - InputTokens: usageNode.Get("input_tokens").Int(), - OutputTokens: usageNode.Get("output_tokens").Int(), - CachedTokens: usageNode.Get("cache_read_input_tokens").Int(), + if !usageNode.Exists() { + return } - if detail.CachedTokens == 0 { - detail.CachedTokens = usageNode.Get("cache_creation_input_tokens").Int() + a.sawUsage = true + // Merge non-zero fields across events. + if v := usageNode.Get("input_tokens").Int(); v > 0 { + a.detail.InputTokens = v } - detail.TotalTokens = detail.InputTokens + detail.OutputTokens - return detail, true + if v := usageNode.Get("output_tokens").Int(); v > 0 { + a.detail.OutputTokens = v + } + if v := usageNode.Get("cache_read_input_tokens").Int(); v > 0 { + a.detail.CachedTokens = v + } else if a.detail.CachedTokens == 0 { + if v := usageNode.Get("cache_creation_input_tokens").Int(); v > 0 { + a.detail.CachedTokens = v + } + } +} + +// publish emits the accumulated usage with estimated reasoning tokens. +// Skips publishing if no usage event was observed during the stream. +func (a *ClaudeStreamUsageAccumulator) Publish(ctx context.Context, reporter *UsageReporter) { + if !a.sawUsage { + return + } + a.detail.ReasoningTokens = a.thinkingLen / claudeThinkingTokenFactor + a.detail.TotalTokens = a.detail.InputTokens + a.detail.OutputTokens + reporter.Publish(ctx, a.detail) } func parseGeminiFamilyUsageDetail(node gjson.Result) usage.Detail { diff --git a/internal/runtime/executor/helps/usage_helpers_test.go b/internal/runtime/executor/helps/usage_helpers_test.go index 1a5648e89b..2e32f6977d 100644 --- a/internal/runtime/executor/helps/usage_helpers_test.go +++ b/internal/runtime/executor/helps/usage_helpers_test.go @@ -62,3 +62,91 @@ func TestUsageReporterBuildRecordIncludesLatency(t *testing.T) { t.Fatalf("latency = %v, want <= 3s", record.Latency) } } + + func TestClaudeStreamAccumulator_FullSequence(t *testing.T) { + // Simulate: message_start (input+cache) → thinking_delta → message_delta (output) + var acc ClaudeStreamUsageAccumulator + acc.ProcessLine([]byte(`data: {"type":"message_start","message":{"usage":{"input_tokens":10,"cache_read_input_tokens":50000}}}`)) + acc.ProcessLine([]byte(`data: {"type":"content_block_delta","delta":{"type":"thinking_delta","thinking":"Let me think about this carefully step by step"}}`)) + acc.ProcessLine([]byte(`data: {"type":"message_delta","usage":{"output_tokens":200}}`)) + + if !acc.sawUsage { + t.Fatal("sawUsage should be true after processing usage events") + } + if acc.detail.InputTokens != 10 { + t.Fatalf("input tokens = %d, want 10", acc.detail.InputTokens) + } + if acc.detail.OutputTokens != 200 { + t.Fatalf("output tokens = %d, want 200", acc.detail.OutputTokens) + } + if acc.detail.CachedTokens != 50000 { + t.Fatalf("cached tokens = %d, want 50000", acc.detail.CachedTokens) + } + expectedThinking := int64(len("Let me think about this carefully step by step")) / claudeThinkingTokenFactor + if acc.thinkingLen/claudeThinkingTokenFactor != expectedThinking { + t.Fatalf("thinking tokens = %d, want %d", acc.thinkingLen/claudeThinkingTokenFactor, expectedThinking) + } +} + + func TestClaudeStreamAccumulator_MessageStartOnly(t *testing.T) { + var acc ClaudeStreamUsageAccumulator + acc.ProcessLine([]byte(`data: {"type":"message_start","message":{"usage":{"input_tokens":5,"cache_read_input_tokens":100000}}}`)) + + if !acc.sawUsage { + t.Fatal("sawUsage should be true after message_start with usage") + } + if acc.detail.InputTokens != 5 { + t.Fatalf("input tokens = %d, want 5", acc.detail.InputTokens) + } + if acc.detail.OutputTokens != 0 { + t.Fatalf("output tokens = %d, want 0", acc.detail.OutputTokens) + } + if acc.detail.CachedTokens != 100000 { + t.Fatalf("cached tokens = %d, want 100000", acc.detail.CachedTokens) + } +} + + func TestClaudeStreamAccumulator_MessageDeltaOnly(t *testing.T) { + var acc ClaudeStreamUsageAccumulator + acc.ProcessLine([]byte(`data: {"type":"message_delta","usage":{"output_tokens":150}}`)) + + if !acc.sawUsage { + t.Fatal("sawUsage should be true after message_delta with usage") + } + if acc.detail.InputTokens != 0 { + t.Fatalf("input tokens = %d, want 0", acc.detail.InputTokens) + } + if acc.detail.OutputTokens != 150 { + t.Fatalf("output tokens = %d, want 150", acc.detail.OutputTokens) + } +} + + func TestClaudeStreamAccumulator_ThinkingDeltaOnly_NoUsage(t *testing.T) { + // Stream with only thinking deltas but no usage event — should not publish + var acc ClaudeStreamUsageAccumulator + acc.ProcessLine([]byte(`data: {"type":"content_block_delta","delta":{"type":"thinking_delta","thinking":"some thinking text"}}`)) + + if acc.sawUsage { + t.Fatal("sawUsage should be false when no usage event was received") + } + if acc.thinkingLen == 0 { + t.Fatal("thinkingLen should be non-zero after processing thinking_delta") + } +} + + func TestClaudeStreamAccumulator_NoEvents(t *testing.T) { + // Empty/interrupted stream — should not publish + var acc ClaudeStreamUsageAccumulator + if acc.sawUsage { + t.Fatal("sawUsage should be false for empty accumulator") + } +} + + func TestClaudeStreamAccumulator_CacheCreationFallback(t *testing.T) { + var acc ClaudeStreamUsageAccumulator + acc.ProcessLine([]byte(`data: {"type":"message_start","message":{"usage":{"input_tokens":3,"cache_creation_input_tokens":80000}}}`)) + + if acc.detail.CachedTokens != 80000 { + t.Fatalf("cached tokens = %d, want 80000 (from cache_creation fallback)", acc.detail.CachedTokens) + } +} diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 25cc7221a9..37711a92df 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -293,7 +293,17 @@ func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID strin } if changed { updateAggregatedAvailability(auth, now) - if !hasModelError(auth, now) { + // Only clear auth-level status when the auth is otherwise healthy. + // hasModelError only inspects per-model state, so without these + // extra guards a disabled auth or an auth with a non-model + // auth-level failure (e.g. OAuth-level error set by + // applyAuthFailureState) would be silently flipped back to + // StatusActive, hiding the real problem from management output. + if !hasModelError(auth, now) && + !auth.Disabled && + auth.Status != StatusDisabled && + auth.Status != StatusError && + auth.LastError == nil { auth.LastError = nil auth.StatusMessage = "" auth.Status = StatusActive