@@ -238,30 +238,50 @@ def _create_session_message( # pragma: no cover
238238 message : JSONRPCMessage ,
239239 request : Request ,
240240 request_id : RequestId ,
241+ protocol_version : str ,
241242 ) -> SessionMessage :
242- """Create a session message with metadata including close_sse_stream callback."""
243+ """Create a session message with metadata including close_sse_stream callback.
243244
244- async def close_stream_callback () -> None :
245- self .close_sse_stream (request_id )
245+ The close_sse_stream callbacks are only provided when the client supports
246+ resumability (protocol version >= 2025-11-25). Old clients can't resume if
247+ the stream is closed early because they didn't receive a priming event.
248+ """
249+ # Only provide close callbacks when client supports resumability
250+ if self ._event_store and protocol_version >= "2025-11-25" :
246251
247- async def close_standalone_stream_callback () -> None :
248- self .close_standalone_sse_stream ()
252+ async def close_stream_callback () -> None :
253+ self .close_sse_stream (request_id )
254+
255+ async def close_standalone_stream_callback () -> None :
256+ self .close_standalone_sse_stream ()
257+
258+ metadata = ServerMessageMetadata (
259+ request_context = request ,
260+ close_sse_stream = close_stream_callback ,
261+ close_standalone_sse_stream = close_standalone_stream_callback ,
262+ )
263+ else :
264+ metadata = ServerMessageMetadata (request_context = request )
249265
250- metadata = ServerMessageMetadata (
251- request_context = request ,
252- close_sse_stream = close_stream_callback ,
253- close_standalone_sse_stream = close_standalone_stream_callback ,
254- )
255266 return SessionMessage (message , metadata = metadata )
256267
257- async def _send_priming_event ( # pragma: no cover
268+ async def _maybe_send_priming_event (
258269 self ,
259270 request_id : RequestId ,
260271 sse_stream_writer : MemoryObjectSendStream [dict [str , Any ]],
272+ protocol_version : str ,
261273 ) -> None :
262- """Send priming event for SSE resumability if event_store is configured."""
274+ """Send priming event for SSE resumability if event_store is configured.
275+
276+ Only sends priming events to clients with protocol version >= 2025-11-25,
277+ which includes the fix for handling empty SSE data. Older clients would
278+ crash trying to parse empty data as JSON.
279+ """
263280 if not self ._event_store :
264281 return
282+ # Priming events have empty data which older clients cannot handle.
283+ if protocol_version < "2025-11-25" :
284+ return
265285 priming_event_id = await self ._event_store .store_event (
266286 str (request_id ), # Convert RequestId to StreamId (str)
267287 None , # Priming event has no payload
@@ -499,6 +519,15 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
499519
500520 return
501521
522+ # Extract protocol version for priming event decision.
523+ # For initialize requests, get from request params.
524+ # For other requests, get from header (already validated).
525+ protocol_version = (
526+ str (message .root .params .get ("protocolVersion" , DEFAULT_NEGOTIATED_VERSION ))
527+ if is_initialization_request and message .root .params
528+ else request .headers .get (MCP_PROTOCOL_VERSION_HEADER , DEFAULT_NEGOTIATED_VERSION )
529+ )
530+
502531 # Extract the request ID outside the try block for proper scope
503532 request_id = str (message .root .id ) # pragma: no cover
504533 # Register this stream for the request ID
@@ -560,7 +589,7 @@ async def sse_writer():
560589 try :
561590 async with sse_stream_writer , request_stream_reader :
562591 # Send priming event for SSE resumability
563- await self ._send_priming_event (request_id , sse_stream_writer )
592+ await self ._maybe_send_priming_event (request_id , sse_stream_writer , protocol_version )
564593
565594 # Process messages from the request-specific stream
566595 async for event_message in request_stream_reader :
@@ -605,7 +634,7 @@ async def sse_writer():
605634 async with anyio .create_task_group () as tg :
606635 tg .start_soon (response , scope , receive , send )
607636 # Then send the message to be processed by the server
608- session_message = self ._create_session_message (message , request , request_id )
637+ session_message = self ._create_session_message (message , request , request_id , protocol_version )
609638 await writer .send (session_message )
610639 except Exception :
611640 logger .exception ("SSE response error" )
@@ -864,6 +893,9 @@ async def _replay_events(self, last_event_id: str, request: Request, send: Send)
864893 if self .mcp_session_id :
865894 headers [MCP_SESSION_ID_HEADER ] = self .mcp_session_id
866895
896+ # Get protocol version from header (already validated in _validate_protocol_version)
897+ replay_protocol_version = request .headers .get (MCP_PROTOCOL_VERSION_HEADER , DEFAULT_NEGOTIATED_VERSION )
898+
867899 # Create SSE stream for replay
868900 sse_stream_writer , sse_stream_reader = anyio .create_memory_object_stream [dict [str , str ]](0 )
869901
@@ -884,7 +916,7 @@ async def send_event(event_message: EventMessage) -> None:
884916 self ._sse_stream_writers [stream_id ] = sse_stream_writer
885917
886918 # Send priming event for this new connection
887- await self ._send_priming_event (stream_id , sse_stream_writer )
919+ await self ._maybe_send_priming_event (stream_id , sse_stream_writer , replay_protocol_version )
888920
889921 # Create new request streams for this connection
890922 self ._request_streams [stream_id ] = anyio .create_memory_object_stream [EventMessage ](0 )
0 commit comments