Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions packages/cli/src/format-translation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,66 @@ describe("Anthropic SSE Passthrough (createAnthropicPassthroughStream)", () => {
expect(tokenInput).toBe(50);
expect(tokenOutput).toBe(5);
});

// never-hang: Z.AI and similar providers close the TCP socket mid-stream
// under load. Before the fix, the reader.read() rejection escaped to a catch
// that did a bare controller.close() with NO terminal message_stop, so the
// client saw "socket connection was closed unexpectedly" and froze the turn.
test("never-hang: upstream socket close mid-stream still emits terminal message_stop", async () => {
const createAnthropicPassthroughStream = await getParser();
const encoder = new TextEncoder();
// Fixture: a few valid events, then the upstream body ERRORS (socket close).
const upstream = new ReadableStream({
start(controller) {
controller.enqueue(
encoder.encode(
"event: message_start\n" +
`data: {"type":"message_start","message":{"id":"msg_test","type":"message","role":"assistant","model":"glm-5.1","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":10,"output_tokens":0}}}\n\n`
)
);
controller.enqueue(
encoder.encode(
"event: content_block_start\n" +
`data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}\n\n`
)
);
controller.enqueue(
encoder.encode(
"event: content_block_delta\n" +
`data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Partial answer before cutoff"}}\n\n`
)
);
// Simulate the upstream TCP socket dying mid-stream — deferred so the
// enqueued chunks are read first (a real close arrives after a network
// round-trip, not synchronously in the same tick as the writes).
setTimeout(
() => controller.error(new Error("The socket connection was closed unexpectedly.")),
5
);
},
});
const fixture = new Response(upstream, {
status: 200,
headers: { "Content-Type": "text/event-stream" },
});
const ctx = createMockContext();

const response = createAnthropicPassthroughStream(ctx, fixture, {
modelName: "glm-5.1",
});

const events = await parseClaudeSseStream(response);

// Partial content must survive (not lost to the cutoff).
expect(extractText(events)).toContain("Partial answer before cutoff");

// CRITICAL: a terminal message_stop MUST be present so the client ends the
// turn cleanly instead of freezing. This assertion fails before the fix.
expect(events.some((e) => e.data?.type === "message_stop")).toBe(true);

// Synthesized stop_reason so the client treats it as a completed turn.
expect(extractStopReason(events)).toBe("end_turn");
});
});

// ─── Adapter Message Conversion Tests ───────────────────────────────────────
Expand Down
88 changes: 88 additions & 0 deletions packages/cli/src/handlers/shared/stream-parsers/anthropic-sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,79 @@ export function createAnthropicPassthroughStream(
let insideThinkingBlock = false;
/** How many thinking blocks have been suppressed so far. */
let thinkingBlocksSuppressed = 0;
/** Whether a message_start has been emitted downstream yet. */
let sawMessageStart = false;

// ── Graceful finalization on upstream failure ────────────────────
// Z.AI and other Anthropic-compatible providers sometimes close the
// TCP socket mid-stream (under load / burst limits). When that happens
// reader.read() rejects, and without this net the rejection escaped to
// the outer catch — which did a bare controller.close() with NO
// terminal message_stop event. Claude Code then reported "API Error:
// The socket connection was closed unexpectedly" and froze the turn.
// This emits a valid terminal Claude message so the client ends the
// turn cleanly (preserving whatever streamed before the cutoff).
const finalizeWithError = (errMsg: string, path: string) => {
if (isClosed) return;
if (!sawMessageStart) {
// No content reached the client yet — emit a minimal synthetic
// message so message_stop below is well-formed.
const synthId = `msg_${Date.now()}`;
controller.enqueue(
encoder.encode(
"event: message_start\n" +
`data: {"type":"message_start","message":{"id":"${synthId}","type":"message","role":"assistant","model":"${opts.modelName}","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":${inputTokens},"output_tokens":${outputTokens}}}}\n\n`
)
);
controller.enqueue(
encoder.encode(
"event: content_block_start\n" +
`data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}\n\n`
)
);
controller.enqueue(
encoder.encode(
"event: content_block_delta\n" +
`data: ${JSON.stringify({ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: `[Upstream provider error: ${errMsg}]` } })}\n\n`
)
);
controller.enqueue(
encoder.encode(
"event: content_block_stop\n" +
`data: {"type":"content_block_stop","index":0}\n\n`
)
);
}
// If sawMessageStart, whatever blocks the provider opened may be
// left unterminated — the client tolerates a missing content_block_stop
// on early termination better than a missing message_stop, which is
// what hangs the turn.
controller.enqueue(
encoder.encode(
"event: message_delta\n" +
`data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":${outputTokens}}}\n\n`
)
);
controller.enqueue(
encoder.encode("event: message_stop\n" + `data: {"type":"message_stop"}\n\n`)
);
isClosed = true;
if (pingInterval) {
clearInterval(pingInterval);
pingInterval = null;
}
log(`[AnthropicSSE] Finalized stream gracefully (${path}): ${errMsg.slice(0, 120)}`);
try {
controller.close();
} catch {
// already closed
}
};

// Wrap the read loop so a mid-stream reader rejection (socket close)
// is caught here — where finalizeWithError is in scope — instead of
// escaping to the outer catch which can only bare-close.
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
Expand Down Expand Up @@ -220,6 +292,9 @@ export function createAnthropicPassthroughStream(
if (data.type === "message_delta" && data.delta?.stop_reason) {
stopReason = data.delta.stop_reason;
}
if (data.type === "message_start") {
sawMessageStart = true;
}
} catch {
// Unparseable data line — pass through
if (!isClosed) {
Expand Down Expand Up @@ -261,10 +336,23 @@ export function createAnthropicPassthroughStream(
if (data.type === "message_delta" && data.delta?.stop_reason) {
stopReason = data.delta.stop_reason;
}
if (data.type === "message_start") {
sawMessageStart = true;
}
} catch {}
}
}
}
} catch (readErr) {
// Upstream socket closed mid-stream (Z.AI / similar under load).
// finalizeWithError() emits the terminal message_stop so the client
// ends the turn cleanly instead of freezing.
log(
`[AnthropicSSE] Upstream read error for ${opts.modelName}: ${String(readErr).slice(0, 200)} — finalizing gracefully`
);
finalizeWithError(`upstream read error: ${String(readErr)}`, "reader-exception");
return; // skip normal finalization — already terminated
}

log(
`[AnthropicSSE] Stream complete for ${opts.modelName}: ${totalLines} lines, ${textChunks} text chunks, ${toolUseBlocks} tool_use blocks, stop_reason=${stopReason}` +
Expand Down