Skip to content

Commit 16ea277

Browse files
authored
fix(731): StreamableHTTPClientTransport Fails to Reconnect on Non-Resumable Streams (#732)
1 parent af61a08 commit 16ea277

File tree

2 files changed

+127
-16
lines changed

2 files changed

+127
-16
lines changed

src/client/streamableHttp.test.ts

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { StreamableHTTPClientTransport, StreamableHTTPReconnectionOptions, StartSSEOptions } from "./streamableHttp.js";
22
import { OAuthClientProvider, UnauthorizedError } from "./auth.js";
3-
import { JSONRPCMessage } from "../types.js";
3+
import { JSONRPCMessage, JSONRPCRequest } from "../types.js";
44
import { InvalidClientError, InvalidGrantError, UnauthorizedClientError } from "../server/auth/errors.js";
55

66

@@ -594,6 +594,111 @@ describe("StreamableHTTPClientTransport", () => {
594594
await expect(transport.send(message)).rejects.toThrow(UnauthorizedError);
595595
expect(mockAuthProvider.redirectToAuthorization.mock.calls).toHaveLength(1);
596596
});
597+
598+
describe('Reconnection Logic', () => {
599+
let transport: StreamableHTTPClientTransport;
600+
601+
// Use fake timers to control setTimeout and make the test instant.
602+
beforeEach(() => jest.useFakeTimers());
603+
afterEach(() => jest.useRealTimers());
604+
605+
it('should reconnect a GET-initiated notification stream that fails', async () => {
606+
// ARRANGE
607+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
608+
reconnectionOptions: {
609+
initialReconnectionDelay: 10,
610+
maxRetries: 1,
611+
maxReconnectionDelay: 1000, // Ensure it doesn't retry indefinitely
612+
reconnectionDelayGrowFactor: 1 // No exponential backoff for simplicity
613+
}
614+
});
615+
616+
const errorSpy = jest.fn();
617+
transport.onerror = errorSpy;
618+
619+
const failingStream = new ReadableStream({
620+
start(controller) { controller.error(new Error("Network failure")); }
621+
});
622+
623+
const fetchMock = global.fetch as jest.Mock;
624+
// Mock the initial GET request, which will fail.
625+
fetchMock.mockResolvedValueOnce({
626+
ok: true, status: 200,
627+
headers: new Headers({ "content-type": "text/event-stream" }),
628+
body: failingStream,
629+
});
630+
// Mock the reconnection GET request, which will succeed.
631+
fetchMock.mockResolvedValueOnce({
632+
ok: true, status: 200,
633+
headers: new Headers({ "content-type": "text/event-stream" }),
634+
body: new ReadableStream(),
635+
});
636+
637+
// ACT
638+
await transport.start();
639+
// Trigger the GET stream directly using the internal method for a clean test.
640+
await transport["_startOrAuthSse"]({});
641+
await jest.advanceTimersByTimeAsync(20); // Trigger reconnection timeout
642+
643+
// ASSERT
644+
expect(errorSpy).toHaveBeenCalledWith(expect.objectContaining({
645+
message: expect.stringContaining('SSE stream disconnected: Error: Network failure'),
646+
}));
647+
// THE KEY ASSERTION: A second fetch call proves reconnection was attempted.
648+
expect(fetchMock).toHaveBeenCalledTimes(2);
649+
expect(fetchMock.mock.calls[0][1]?.method).toBe('GET');
650+
expect(fetchMock.mock.calls[1][1]?.method).toBe('GET');
651+
});
652+
653+
it('should NOT reconnect a POST-initiated stream that fails', async () => {
654+
// ARRANGE
655+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
656+
reconnectionOptions: {
657+
initialReconnectionDelay: 10,
658+
maxRetries: 1,
659+
maxReconnectionDelay: 1000, // Ensure it doesn't retry indefinitely
660+
reconnectionDelayGrowFactor: 1 // No exponential backoff for simplicity
661+
}
662+
});
663+
664+
const errorSpy = jest.fn();
665+
transport.onerror = errorSpy;
666+
667+
const failingStream = new ReadableStream({
668+
start(controller) { controller.error(new Error("Network failure")); }
669+
});
670+
671+
const fetchMock = global.fetch as jest.Mock;
672+
// Mock the POST request. It returns a streaming content-type but a failing body.
673+
fetchMock.mockResolvedValueOnce({
674+
ok: true, status: 200,
675+
headers: new Headers({ "content-type": "text/event-stream" }),
676+
body: failingStream,
677+
});
678+
679+
// A dummy request message to trigger the `send` logic.
680+
const requestMessage: JSONRPCRequest = {
681+
jsonrpc: '2.0',
682+
method: 'long_running_tool',
683+
id: 'request-1',
684+
params: {},
685+
};
686+
687+
// ACT
688+
await transport.start();
689+
// Use the public `send` method to initiate a POST that gets a stream response.
690+
await transport.send(requestMessage);
691+
await jest.advanceTimersByTimeAsync(20); // Advance time to check for reconnections
692+
693+
// ASSERT
694+
expect(errorSpy).toHaveBeenCalledWith(expect.objectContaining({
695+
message: expect.stringContaining('SSE stream disconnected: Error: Network failure'),
696+
}));
697+
// THE KEY ASSERTION: Fetch was only called ONCE. No reconnection was attempted.
698+
expect(fetchMock).toHaveBeenCalledTimes(1);
699+
expect(fetchMock.mock.calls[0][1]?.method).toBe('POST');
700+
});
701+
});
597702

598703
it("invalidates all credentials on InvalidClientError during auth", async () => {
599704
const message: JSONRPCMessage = {

src/client/streamableHttp.ts

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ const response = await (this._fetch ?? fetch)(this._url, {
231231
);
232232
}
233233

234-
this._handleSseStream(response.body, options);
234+
this._handleSseStream(response.body, options, true);
235235
} catch (error) {
236236
this.onerror?.(error as Error);
237237
throw error;
@@ -300,7 +300,11 @@ const response = await (this._fetch ?? fetch)(this._url, {
300300
}, delay);
301301
}
302302

303-
private _handleSseStream(stream: ReadableStream<Uint8Array> | null, options: StartSSEOptions): void {
303+
private _handleSseStream(
304+
stream: ReadableStream<Uint8Array> | null,
305+
options: StartSSEOptions,
306+
isReconnectable: boolean,
307+
): void {
304308
if (!stream) {
305309
return;
306310
}
@@ -347,20 +351,22 @@ const response = await (this._fetch ?? fetch)(this._url, {
347351
this.onerror?.(new Error(`SSE stream disconnected: ${error}`));
348352

349353
// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
350-
if (this._abortController && !this._abortController.signal.aborted) {
354+
if (
355+
isReconnectable &&
356+
this._abortController &&
357+
!this._abortController.signal.aborted
358+
) {
351359
// Use the exponential backoff reconnection strategy
352-
if (lastEventId !== undefined) {
353-
try {
354-
this._scheduleReconnection({
355-
resumptionToken: lastEventId,
356-
onresumptiontoken,
357-
replayMessageId
358-
}, 0);
359-
}
360-
catch (error) {
361-
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
360+
try {
361+
this._scheduleReconnection({
362+
resumptionToken: lastEventId,
363+
onresumptiontoken,
364+
replayMessageId
365+
}, 0);
366+
}
367+
catch (error) {
368+
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
362369

363-
}
364370
}
365371
}
366372
}
@@ -473,7 +479,7 @@ const response = await (this._fetch ?? fetch)(this._url, init);
473479
// Handle SSE stream responses for requests
474480
// We use the same handler as standalone streams, which now supports
475481
// reconnection with the last event ID
476-
this._handleSseStream(response.body, { onresumptiontoken });
482+
this._handleSseStream(response.body, { onresumptiontoken }, false);
477483
} else if (contentType?.includes("application/json")) {
478484
// For non-streaming servers, we might get direct JSON responses
479485
const data = await response.json();

0 commit comments

Comments
 (0)