Skip to content
20 changes: 11 additions & 9 deletions internal/runtime/executor/claude_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,12 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
}
helps.AppendAPIResponseChunk(ctx, e.cfg, data)
if stream {
var acc helps.ClaudeStreamUsageAccumulator
lines := bytes.Split(data, []byte("\n"))
for _, line := range lines {
if detail, ok := helps.ParseClaudeStreamUsage(line); ok {
reporter.Publish(ctx, detail)
}
acc.ProcessLine(line)
}
acc.Publish(ctx, reporter)
} else {
reporter.Publish(ctx, helps.ParseClaudeUsage(data))
}
Expand Down Expand Up @@ -465,14 +465,13 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A

// If from == to (Claude → Claude), directly forward the SSE stream without translation
if from == to {
var acc helps.ClaudeStreamUsageAccumulator
scanner := bufio.NewScanner(decodedBody)
scanner.Buffer(nil, 52_428_800) // 50MB
for scanner.Scan() {
line := scanner.Bytes()
helps.AppendAPIResponseChunk(ctx, e.cfg, line)
if detail, ok := helps.ParseClaudeStreamUsage(line); ok {
reporter.Publish(ctx, detail)
}
acc.ProcessLine(line)
if isClaudeOAuthToken(apiKey) && !auth.ToolPrefixDisabled() {
line = stripClaudeToolPrefixFromStreamLine(line, claudeToolPrefix)
}
Expand All @@ -489,20 +488,21 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} else {
acc.Publish(ctx, reporter)
}
return
}

// For other formats, use translation
var acc helps.ClaudeStreamUsageAccumulator
scanner := bufio.NewScanner(decodedBody)
scanner.Buffer(nil, 52_428_800) // 50MB
var param any
for scanner.Scan() {
line := scanner.Bytes()
helps.AppendAPIResponseChunk(ctx, e.cfg, line)
if detail, ok := helps.ParseClaudeStreamUsage(line); ok {
reporter.Publish(ctx, detail)
}
acc.ProcessLine(line)
if isClaudeOAuthToken(apiKey) && !auth.ToolPrefixDisabled() {
line = stripClaudeToolPrefixFromStreamLine(line, claudeToolPrefix)
}
Expand All @@ -527,6 +527,8 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} else {
acc.Publish(ctx, reporter)
}
}()
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
Expand Down
99 changes: 88 additions & 11 deletions internal/runtime/executor/helps/usage_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,29 +272,106 @@ func ParseClaudeUsage(data []byte) usage.Detail {
// fall back to creation tokens when read tokens are absent
detail.CachedTokens = usageNode.Get("cache_creation_input_tokens").Int()
}
// Estimate reasoning tokens from thinking content blocks.
// Claude API does not expose thinking tokens in the usage object,
// so we approximate by summing thinking block text lengths / 4.
detail.ReasoningTokens = estimateClaudeThinkingTokens(data)
detail.TotalTokens = detail.InputTokens + detail.OutputTokens
return detail
}

func ParseClaudeStreamUsage(line []byte) (usage.Detail, bool) {
// ParseClaudeStreamUsage is intentionally removed.
// Use ClaudeStreamUsageAccumulator.ProcessLine instead, which merges
// usage fields across all SSE events and publishes once at stream end.

// claudeThinkingTokenFactor is the approximate characters-per-token ratio
// used to estimate thinking token counts from content block text length.
const claudeThinkingTokenFactor = 4

// estimateClaudeThinkingTokens scans Claude response content blocks for
// type="thinking" entries and estimates token count from text length.
func estimateClaudeThinkingTokens(data []byte) int64 {
var total int64
gjson.ParseBytes(data).Get("content").ForEach(func(_, block gjson.Result) bool {
if block.Get("type").String() == "thinking" {
total += int64(len(block.Get("thinking").String()))
}
return true
})
return total / claudeThinkingTokenFactor
}

// claudeStreamThinkingLen returns the byte length of thinking text in a
// streaming content_block_delta line. Returns 0 for non-thinking lines.
func claudeStreamThinkingLen(line []byte) int {
payload := jsonPayload(line)
if len(payload) == 0 {
return 0
}
p := gjson.ParseBytes(payload)
if p.Get("type").String() != "content_block_delta" {
return 0
}
delta := p.Get("delta")
if delta.Get("type").String() != "thinking_delta" {
return 0
}
return len(delta.Get("thinking").String())
}

// ClaudeStreamUsageAccumulator merges usage fields across Claude SSE events
// and accumulates thinking content block lengths for reasoning token estimation.
// message_start carries input_tokens + cache_read_input_tokens (under message.usage),
// message_delta carries output_tokens (under usage), and content_block_delta carries
// thinking text. Publishing happens once at the end of the stream.
type ClaudeStreamUsageAccumulator struct {
detail usage.Detail
thinkingLen int64
sawUsage bool
}

// ProcessLine extracts usage and thinking data from a single SSE line.
func (a *ClaudeStreamUsageAccumulator) ProcessLine(line []byte) {
a.thinkingLen += int64(claudeStreamThinkingLen(line))

payload := jsonPayload(line)
if len(payload) == 0 || !gjson.ValidBytes(payload) {
return usage.Detail{}, false
return
}
// message_start nests usage under "message.usage"; message_delta uses top-level "usage".
usageNode := gjson.GetBytes(payload, "usage")
if !usageNode.Exists() {
return usage.Detail{}, false
usageNode = gjson.GetBytes(payload, "message.usage")
}
detail := usage.Detail{
InputTokens: usageNode.Get("input_tokens").Int(),
OutputTokens: usageNode.Get("output_tokens").Int(),
CachedTokens: usageNode.Get("cache_read_input_tokens").Int(),
if !usageNode.Exists() {
return
}
if detail.CachedTokens == 0 {
detail.CachedTokens = usageNode.Get("cache_creation_input_tokens").Int()
a.sawUsage = true
// Merge non-zero fields across events.
if v := usageNode.Get("input_tokens").Int(); v > 0 {
a.detail.InputTokens = v
}
detail.TotalTokens = detail.InputTokens + detail.OutputTokens
return detail, true
if v := usageNode.Get("output_tokens").Int(); v > 0 {
a.detail.OutputTokens = v
}
if v := usageNode.Get("cache_read_input_tokens").Int(); v > 0 {
a.detail.CachedTokens = v
} else if a.detail.CachedTokens == 0 {
if v := usageNode.Get("cache_creation_input_tokens").Int(); v > 0 {
a.detail.CachedTokens = v
}
}
}

// publish emits the accumulated usage with estimated reasoning tokens.
// Skips publishing if no usage event was observed during the stream.
func (a *ClaudeStreamUsageAccumulator) Publish(ctx context.Context, reporter *UsageReporter) {
if !a.sawUsage {
return
}
a.detail.ReasoningTokens = a.thinkingLen / claudeThinkingTokenFactor
a.detail.TotalTokens = a.detail.InputTokens + a.detail.OutputTokens
reporter.Publish(ctx, a.detail)
}

func parseGeminiFamilyUsageDetail(node gjson.Result) usage.Detail {
Expand Down
88 changes: 88 additions & 0 deletions internal/runtime/executor/helps/usage_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,91 @@ func TestUsageReporterBuildRecordIncludesLatency(t *testing.T) {
t.Fatalf("latency = %v, want <= 3s", record.Latency)
}
}

func TestClaudeStreamAccumulator_FullSequence(t *testing.T) {
// Simulate: message_start (input+cache) → thinking_delta → message_delta (output)
var acc ClaudeStreamUsageAccumulator
acc.ProcessLine([]byte(`data: {"type":"message_start","message":{"usage":{"input_tokens":10,"cache_read_input_tokens":50000}}}`))
acc.ProcessLine([]byte(`data: {"type":"content_block_delta","delta":{"type":"thinking_delta","thinking":"Let me think about this carefully step by step"}}`))
acc.ProcessLine([]byte(`data: {"type":"message_delta","usage":{"output_tokens":200}}`))

if !acc.sawUsage {
t.Fatal("sawUsage should be true after processing usage events")
}
if acc.detail.InputTokens != 10 {
t.Fatalf("input tokens = %d, want 10", acc.detail.InputTokens)
}
if acc.detail.OutputTokens != 200 {
t.Fatalf("output tokens = %d, want 200", acc.detail.OutputTokens)
}
if acc.detail.CachedTokens != 50000 {
t.Fatalf("cached tokens = %d, want 50000", acc.detail.CachedTokens)
}
expectedThinking := int64(len("Let me think about this carefully step by step")) / claudeThinkingTokenFactor
if acc.thinkingLen/claudeThinkingTokenFactor != expectedThinking {
t.Fatalf("thinking tokens = %d, want %d", acc.thinkingLen/claudeThinkingTokenFactor, expectedThinking)
}
}

func TestClaudeStreamAccumulator_MessageStartOnly(t *testing.T) {
var acc ClaudeStreamUsageAccumulator
acc.ProcessLine([]byte(`data: {"type":"message_start","message":{"usage":{"input_tokens":5,"cache_read_input_tokens":100000}}}`))

if !acc.sawUsage {
t.Fatal("sawUsage should be true after message_start with usage")
}
if acc.detail.InputTokens != 5 {
t.Fatalf("input tokens = %d, want 5", acc.detail.InputTokens)
}
if acc.detail.OutputTokens != 0 {
t.Fatalf("output tokens = %d, want 0", acc.detail.OutputTokens)
}
if acc.detail.CachedTokens != 100000 {
t.Fatalf("cached tokens = %d, want 100000", acc.detail.CachedTokens)
}
}

func TestClaudeStreamAccumulator_MessageDeltaOnly(t *testing.T) {
var acc ClaudeStreamUsageAccumulator
acc.ProcessLine([]byte(`data: {"type":"message_delta","usage":{"output_tokens":150}}`))

if !acc.sawUsage {
t.Fatal("sawUsage should be true after message_delta with usage")
}
if acc.detail.InputTokens != 0 {
t.Fatalf("input tokens = %d, want 0", acc.detail.InputTokens)
}
if acc.detail.OutputTokens != 150 {
t.Fatalf("output tokens = %d, want 150", acc.detail.OutputTokens)
}
}

func TestClaudeStreamAccumulator_ThinkingDeltaOnly_NoUsage(t *testing.T) {
// Stream with only thinking deltas but no usage event — should not publish
var acc ClaudeStreamUsageAccumulator
acc.ProcessLine([]byte(`data: {"type":"content_block_delta","delta":{"type":"thinking_delta","thinking":"some thinking text"}}`))

if acc.sawUsage {
t.Fatal("sawUsage should be false when no usage event was received")
}
if acc.thinkingLen == 0 {
t.Fatal("thinkingLen should be non-zero after processing thinking_delta")
}
}

func TestClaudeStreamAccumulator_NoEvents(t *testing.T) {
// Empty/interrupted stream — should not publish
var acc ClaudeStreamUsageAccumulator
if acc.sawUsage {
t.Fatal("sawUsage should be false for empty accumulator")
}
}

func TestClaudeStreamAccumulator_CacheCreationFallback(t *testing.T) {
var acc ClaudeStreamUsageAccumulator
acc.ProcessLine([]byte(`data: {"type":"message_start","message":{"usage":{"input_tokens":3,"cache_creation_input_tokens":80000}}}`))

if acc.detail.CachedTokens != 80000 {
t.Fatalf("cached tokens = %d, want 80000 (from cache_creation fallback)", acc.detail.CachedTokens)
}
}
12 changes: 11 additions & 1 deletion sdk/cliproxy/auth/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,17 @@ func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID strin
}
if changed {
updateAggregatedAvailability(auth, now)
if !hasModelError(auth, now) {
// Only clear auth-level status when the auth is otherwise healthy.
// hasModelError only inspects per-model state, so without these
// extra guards a disabled auth or an auth with a non-model
// auth-level failure (e.g. OAuth-level error set by
// applyAuthFailureState) would be silently flipped back to
// StatusActive, hiding the real problem from management output.
if !hasModelError(auth, now) &&
!auth.Disabled &&
auth.Status != StatusDisabled &&
auth.Status != StatusError &&
auth.LastError == nil {
auth.LastError = nil
auth.StatusMessage = ""
auth.Status = StatusActive
Expand Down
Loading