Skip to content

Commit e584cfe

Browse files
authored
fix(sse-client): Skip SSE in StreamableHttpClientTransport when data is empty (#433)
Refactor StreamableHttpClientTransport to skip SSE when the data field is empty. ## Motivation and Context TypeScript SSE Server started returning Server-Sent Events with only id and empty data field (Heartbeat/Checkpoint), followed by an event with data, e.g.: ``` id: d85c6bb0-0fa2-4828-81ad-3e951131aea5_1764226144689_7qb3iojc data: event: message id: d85c6bb0-0fa2-4828-81ad-3e951131aea5_1764226144690_sodh20lo data: {"result":{"protocolVersion":"2025-06-18","capabilities":{"logging":{},"tools":{"listChanged":true},"prompts":{"listChanged":true},"completions":{},"resources":{"listChanged":true}},"serverInfo":{"name":"simple-streamable-http-server","version":"1.0.0"}},"jsonrpc":"2.0","id":"7ce065b0678f49e5b04ce5a0fcc7d518"} ``` Such empty events might be used by the server as **Heartbeat** or **Checkpoint** events. The client should retain the updated Last-Event-ID and disregard the payload. It is essential to handle such responses to ensure compatibility with updated TypeScript SDK. ## How Has This Been Tested? - Added integration test - Regression tests are green now ## Breaking Changes No ## Types of changes <!-- What types of changes does your code introduce? Put an `x` in all the boxes that apply: --> - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update ## Checklist <!-- Go over all the following points, and put an `x` in all the boxes that apply. --> - [x] I have read the [MCP Documentation](https://modelcontextprotocol.io) - [x] My code follows the repository's style guidelines - [x] New and existing tests pass locally - [ ] I have added appropriate error handling - [ ] I have added or updated documentation as needed ## Additional context <!-- Add any other context, implementation notes, or design decisions -->
1 parent fbbb430 commit e584cfe

File tree

4 files changed

+71
-245
lines changed

4 files changed

+71
-245
lines changed

kotlin-sdk-client/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport.kt

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ import kotlin.concurrent.atomics.AtomicBoolean
4242
import kotlin.concurrent.atomics.ExperimentalAtomicApi
4343
import kotlin.time.Duration
4444

45-
private val logger = KotlinLogging.logger {}
46-
4745
private const val MCP_SESSION_ID_HEADER = "mcp-session-id"
4846
private const val MCP_PROTOCOL_VERSION_HEADER = "mcp-protocol-version"
4947
private const val MCP_RESUMPTION_TOKEN_HEADER = "Last-Event-ID"
@@ -67,6 +65,10 @@ public class StreamableHttpClientTransport(
6765
private val requestBuilder: HttpRequestBuilder.() -> Unit = {},
6866
) : AbstractTransport() {
6967

68+
private companion object {
69+
private val logger = KotlinLogging.logger {}
70+
}
71+
7072
public var sessionId: String? = null
7173
private set
7274
public var protocolVersion: String? = null
@@ -316,11 +318,14 @@ public class StreamableHttpClientTransport(
316318
var id: String? = null
317319
var eventName: String? = null
318320

319-
suspend fun dispatch(data: String) {
321+
suspend fun dispatch(id: String?, eventName: String?, data: String) {
320322
id?.let {
321323
lastEventId = it
322324
onResumptionToken?.invoke(it)
323325
}
326+
if (data.isBlank()) {
327+
return
328+
}
324329
if (eventName == null || eventName == "message") {
325330
runCatching { McpJson.decodeFromString<JSONRPCMessage>(data) }
326331
.onSuccess { msg ->
@@ -335,16 +340,16 @@ public class StreamableHttpClientTransport(
335340
throw it
336341
}
337342
}
338-
// reset
339-
id = null
340-
eventName = null
341-
sb.clear()
342343
}
343344

344345
while (!channel.isClosedForRead) {
345346
val line = channel.readUTF8Line() ?: break
346347
if (line.isEmpty()) {
347-
dispatch(sb.toString())
348+
dispatch(id = id, eventName = eventName, data = sb.toString())
349+
// reset
350+
id = null
351+
eventName = null
352+
sb.clear()
348353
continue
349354
}
350355
when {

kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/AbstractStreamableHttpClientTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.junit.jupiter.api.TestInstance
1212
internal abstract class AbstractStreamableHttpClientTest {
1313

1414
// start mokksy on random port
15-
protected val mockMcp: OldSchemaMockMcp = OldSchemaMockMcp(verbose = true)
15+
protected val mockMcp: MockMcp = MockMcp(verbose = true)
1616

1717
@AfterEach
1818
fun afterEach() {

kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/OldSchemaMockMcp.kt

Lines changed: 0 additions & 236 deletions
This file was deleted.

kotlin-sdk-client/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTest.kt

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import io.modelcontextprotocol.kotlin.sdk.types.Implementation
1111
import io.modelcontextprotocol.kotlin.sdk.types.Tool
1212
import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema
1313
import kotlinx.coroutines.delay
14+
import kotlinx.coroutines.flow.emptyFlow
1415
import kotlinx.coroutines.flow.flow
16+
import kotlinx.coroutines.flow.flowOf
1517
import kotlinx.coroutines.runBlocking
1618
import kotlinx.serialization.json.buildJsonObject
1719
import kotlinx.serialization.json.put
@@ -28,6 +30,61 @@ import kotlin.uuid.Uuid
2830
@Suppress("LongMethod")
2931
internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() {
3032

33+
@Test
34+
fun `Should skip empty SSE`(): Unit = runBlocking {
35+
val client = Client(
36+
clientInfo = Implementation(
37+
name = "client1",
38+
version = "1.0.0",
39+
),
40+
options = ClientOptions(
41+
capabilities = ClientCapabilities(),
42+
),
43+
)
44+
val sessionId = Uuid.random().toString()
45+
46+
mockMcp.onJSONRPCRequest(
47+
httpMethod = HttpMethod.Post,
48+
jsonRpcMethod = "initialize",
49+
).respondsWithStream {
50+
headers += MCP_SESSION_ID_HEADER to sessionId
51+
flow = flowOf(
52+
"id: ${Uuid.random()}\n",
53+
"data:\n", // empty data
54+
"\n",
55+
"id: ${Uuid.random()}\n",
56+
"data: \t \n", // tabs and spaces
57+
"\n",
58+
"id: ${Uuid.random()}\n",
59+
"event: message\n",
60+
// multiline data
61+
"data: {\n",
62+
"data: \"result\":{\n" +
63+
"data: \"protocolVersion\":\"2025-06-18\",\n" +
64+
"data: \"capabilities\":{},\n" +
65+
"data: \"serverInfo\":{\"name\":\"simple-streamable-http-server\",\"version\":\"1.0.0\"}\n" +
66+
"data: },\n" +
67+
"data: \"jsonrpc\":\"2.0\",\n" +
68+
"data: \"id\":\"7ce065b0678f49e5b04ce5a0fcc7d518\"\n" +
69+
"data: }\n",
70+
"\n",
71+
)
72+
}
73+
74+
mockMcp.handleJSONRPCRequest(
75+
jsonRpcMethod = "notifications/initialized",
76+
expectedSessionId = sessionId,
77+
sessionId = sessionId,
78+
statusCode = HttpStatusCode.Accepted,
79+
)
80+
81+
mockMcp.handleSubscribeWithGet(sessionId) {
82+
emptyFlow()
83+
}
84+
85+
connect(client)
86+
}
87+
3188
@Test
3289
fun `test streamableHttpClient`() = runBlocking {
3390
val client = Client(

0 commit comments

Comments
 (0)