diff --git a/packages/cli/src/format-translation.test.ts b/packages/cli/src/format-translation.test.ts index 2fa23dd4..29139047 100644 --- a/packages/cli/src/format-translation.test.ts +++ b/packages/cli/src/format-translation.test.ts @@ -259,6 +259,66 @@ describe("Anthropic SSE Passthrough (createAnthropicPassthroughStream)", () => { expect(tokenInput).toBe(50); expect(tokenOutput).toBe(5); }); + + // never-hang: Z.AI and similar providers close the TCP socket mid-stream + // under load. Before the fix, the reader.read() rejection escaped to a catch + // that did a bare controller.close() with NO terminal message_stop, so the + // client saw "socket connection was closed unexpectedly" and froze the turn. + test("never-hang: upstream socket close mid-stream still emits terminal message_stop", async () => { + const createAnthropicPassthroughStream = await getParser(); + const encoder = new TextEncoder(); + // Fixture: a few valid events, then the upstream body ERRORS (socket close). + const upstream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + "event: message_start\n" + + `data: {"type":"message_start","message":{"id":"msg_test","type":"message","role":"assistant","model":"glm-5.1","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":10,"output_tokens":0}}}\n\n` + ) + ); + controller.enqueue( + encoder.encode( + "event: content_block_start\n" + + `data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}\n\n` + ) + ); + controller.enqueue( + encoder.encode( + "event: content_block_delta\n" + + `data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Partial answer before cutoff"}}\n\n` + ) + ); + // Simulate the upstream TCP socket dying mid-stream — deferred so the + // enqueued chunks are read first (a real close arrives after a network + // round-trip, not synchronously in the same tick as the writes). + setTimeout( + () => controller.error(new Error("The socket connection was closed unexpectedly.")), + 5 + ); + }, + }); + const fixture = new Response(upstream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }); + const ctx = createMockContext(); + + const response = createAnthropicPassthroughStream(ctx, fixture, { + modelName: "glm-5.1", + }); + + const events = await parseClaudeSseStream(response); + + // Partial content must survive (not lost to the cutoff). + expect(extractText(events)).toContain("Partial answer before cutoff"); + + // CRITICAL: a terminal message_stop MUST be present so the client ends the + // turn cleanly instead of freezing. This assertion fails before the fix. + expect(events.some((e) => e.data?.type === "message_stop")).toBe(true); + + // Synthesized stop_reason so the client treats it as a completed turn. + expect(extractStopReason(events)).toBe("end_turn"); + }); }); // ─── Adapter Message Conversion Tests ─────────────────────────────────────── diff --git a/packages/cli/src/handlers/shared/stream-parsers/anthropic-sse.ts b/packages/cli/src/handlers/shared/stream-parsers/anthropic-sse.ts index eb3aa09b..03f49ac8 100644 --- a/packages/cli/src/handlers/shared/stream-parsers/anthropic-sse.ts +++ b/packages/cli/src/handlers/shared/stream-parsers/anthropic-sse.ts @@ -73,7 +73,79 @@ export function createAnthropicPassthroughStream( let insideThinkingBlock = false; /** How many thinking blocks have been suppressed so far. */ let thinkingBlocksSuppressed = 0; + /** Whether a message_start has been emitted downstream yet. */ + let sawMessageStart = false; + // ── Graceful finalization on upstream failure ──────────────────── + // Z.AI and other Anthropic-compatible providers sometimes close the + // TCP socket mid-stream (under load / burst limits). When that happens + // reader.read() rejects, and without this net the rejection escaped to + // the outer catch — which did a bare controller.close() with NO + // terminal message_stop event. Claude Code then reported "API Error: + // The socket connection was closed unexpectedly" and froze the turn. + // This emits a valid terminal Claude message so the client ends the + // turn cleanly (preserving whatever streamed before the cutoff). + const finalizeWithError = (errMsg: string, path: string) => { + if (isClosed) return; + if (!sawMessageStart) { + // No content reached the client yet — emit a minimal synthetic + // message so message_stop below is well-formed. + const synthId = `msg_${Date.now()}`; + controller.enqueue( + encoder.encode( + "event: message_start\n" + + `data: {"type":"message_start","message":{"id":"${synthId}","type":"message","role":"assistant","model":"${opts.modelName}","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":${inputTokens},"output_tokens":${outputTokens}}}}\n\n` + ) + ); + controller.enqueue( + encoder.encode( + "event: content_block_start\n" + + `data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}\n\n` + ) + ); + controller.enqueue( + encoder.encode( + "event: content_block_delta\n" + + `data: ${JSON.stringify({ type: "content_block_delta", index: 0, delta: { type: "text_delta", text: `[Upstream provider error: ${errMsg}]` } })}\n\n` + ) + ); + controller.enqueue( + encoder.encode( + "event: content_block_stop\n" + + `data: {"type":"content_block_stop","index":0}\n\n` + ) + ); + } + // If sawMessageStart, whatever blocks the provider opened may be + // left unterminated — the client tolerates a missing content_block_stop + // on early termination better than a missing message_stop, which is + // what hangs the turn. + controller.enqueue( + encoder.encode( + "event: message_delta\n" + + `data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":${outputTokens}}}\n\n` + ) + ); + controller.enqueue( + encoder.encode("event: message_stop\n" + `data: {"type":"message_stop"}\n\n`) + ); + isClosed = true; + if (pingInterval) { + clearInterval(pingInterval); + pingInterval = null; + } + log(`[AnthropicSSE] Finalized stream gracefully (${path}): ${errMsg.slice(0, 120)}`); + try { + controller.close(); + } catch { + // already closed + } + }; + + // Wrap the read loop so a mid-stream reader rejection (socket close) + // is caught here — where finalizeWithError is in scope — instead of + // escaping to the outer catch which can only bare-close. + try { while (true) { const { done, value } = await reader.read(); if (done) break; @@ -220,6 +292,9 @@ export function createAnthropicPassthroughStream( if (data.type === "message_delta" && data.delta?.stop_reason) { stopReason = data.delta.stop_reason; } + if (data.type === "message_start") { + sawMessageStart = true; + } } catch { // Unparseable data line — pass through if (!isClosed) { @@ -261,10 +336,23 @@ export function createAnthropicPassthroughStream( if (data.type === "message_delta" && data.delta?.stop_reason) { stopReason = data.delta.stop_reason; } + if (data.type === "message_start") { + sawMessageStart = true; + } } catch {} } } } + } catch (readErr) { + // Upstream socket closed mid-stream (Z.AI / similar under load). + // finalizeWithError() emits the terminal message_stop so the client + // ends the turn cleanly instead of freezing. + log( + `[AnthropicSSE] Upstream read error for ${opts.modelName}: ${String(readErr).slice(0, 200)} — finalizing gracefully` + ); + finalizeWithError(`upstream read error: ${String(readErr)}`, "reader-exception"); + return; // skip normal finalization — already terminated + } log( `[AnthropicSSE] Stream complete for ${opts.modelName}: ${totalLines} lines, ${textChunks} text chunks, ${toolUseBlocks} tool_use blocks, stop_reason=${stopReason}` +