diff --git a/Sources/MCP/Base/Transports/HTTPClientTransport.swift b/Sources/MCP/Base/Transports/HTTPClientTransport.swift index 9cf93a0..adf3c93 100644 --- a/Sources/MCP/Base/Transports/HTTPClientTransport.swift +++ b/Sources/MCP/Base/Transports/HTTPClientTransport.swift @@ -2,22 +2,22 @@ import Foundation import Logging #if canImport(FoundationNetworking) - import FoundationNetworking +import FoundationNetworking #endif public actor HTTPClientTransport: Actor, Transport { - public let endpoint: URL + private(set) public var endpoint: URL private let session: URLSession public private(set) var sessionID: String? private let streaming: Bool private var streamingTask: Task? private var lastEventID: String? public nonisolated let logger: Logger - + private var isConnected = false private let messageStream: AsyncThrowingStream private let messageContinuation: AsyncThrowingStream.Continuation - + public init( endpoint: URL, configuration: URLSessionConfiguration = .default, @@ -31,7 +31,7 @@ public actor HTTPClientTransport: Actor, Transport { logger: logger ) } - + internal init( endpoint: URL, session: URLSession, @@ -41,83 +41,98 @@ public actor HTTPClientTransport: Actor, Transport { self.endpoint = endpoint self.session = session self.streaming = streaming - + // Create message stream var continuation: AsyncThrowingStream.Continuation! self.messageStream = AsyncThrowingStream { continuation = $0 } self.messageContinuation = continuation - + self.logger = - logger - ?? Logger( - label: "mcp.transport.http.client", - factory: { _ in SwiftLogNoOpLogHandler() } - ) + logger + ?? Logger( + label: "mcp.transport.http.client", + factory: { _ in SwiftLogNoOpLogHandler() } + ) } - + /// Establishes connection with the transport public func connect() async throws { - guard !isConnected else { return } - isConnected = true - - if streaming { - // Start listening to server events - streamingTask = Task { await startListeningForServerEvents() } + try await withUnsafeThrowingContinuation { (cont: UnsafeContinuation) in + guard !isConnected else { + cont.resume(throwing: MCPError.internalError("Transport already connected")) + return + } + + isConnected = true + + addListener("endpoint") { endpointPath in + guard let resolvedURL = URL(string: endpointPath, relativeTo: self.endpoint)?.absoluteURL else { + cont.resume(throwing: MCPError.internalError("Invalid endpoint URL")) + return + } + self.endpoint = resolvedURL + cont.resume() + } + + if streaming { + // Start listening to server events + streamingTask = Task { await startListeningForServerEvents() } + } + + logger.info("HTTP transport connected") } - - logger.info("HTTP transport connected") } - + /// Disconnects from the transport public func disconnect() async { guard isConnected else { return } isConnected = false - + // Cancel streaming task if active streamingTask?.cancel() streamingTask = nil - + // Cancel any in-progress requests session.invalidateAndCancel() - + // Clean up message stream messageContinuation.finish() - + logger.info("HTTP clienttransport disconnected") } - + /// Sends data through an HTTP POST request public func send(_ data: Data) async throws { guard isConnected else { throw MCPError.internalError("Transport not connected") } - + var request = URLRequest(url: endpoint) request.httpMethod = "POST" request.addValue("application/json, text/event-stream", forHTTPHeaderField: "Accept") request.addValue("application/json", forHTTPHeaderField: "Content-Type") request.httpBody = data - + // Add session ID if available if let sessionID = sessionID { request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - + let (responseData, response) = try await session.data(for: request) - + guard let httpResponse = response as? HTTPURLResponse else { throw MCPError.internalError("Invalid HTTP response") } - + // Process the response based on content type and status code let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? "" - + // Extract session ID if present if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { self.sessionID = newSessionID logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) } - + // Handle different response types switch httpResponse.statusCode { case 200, 201, 202: @@ -127,7 +142,7 @@ public actor HTTPClientTransport: Actor, Transport { // The streaming is handled by the SSE task if active return } - + // For JSON responses, deliver the data directly if contentType.contains("application/json") && !responseData.isEmpty { logger.debug("Received JSON response", metadata: ["size": "\(responseData.count)"]) @@ -145,18 +160,18 @@ public actor HTTPClientTransport: Actor, Transport { throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)") } } - + /// Receives data in an async sequence public func receive() -> AsyncThrowingStream { return messageStream } - + // MARK: - SSE - + /// Starts listening for server events using SSE private func startListeningForServerEvents() async { guard isConnected else { return } - + // Retry loop for connection drops while isConnected && !Task.isCancelled { do { @@ -170,143 +185,157 @@ public actor HTTPClientTransport: Actor, Transport { } } } - - #if canImport(FoundationNetworking) - private func connectToEventStream() async throws { - logger.warning("SSE is not supported on this platform") + +#if canImport(FoundationNetworking) + private func connectToEventStream() async throws { + logger.warning("SSE is not supported on this platform") + } +#else + /// Establishes an SSE connection to the server + private func connectToEventStream() async throws { + guard isConnected else { return } + + var request = URLRequest(url: endpoint) + request.httpMethod = "GET" + request.addValue("text/event-stream", forHTTPHeaderField: "Accept") + + // Add session ID if available + if let sessionID = sessionID { + request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - #else - /// Establishes an SSE connection to the server - private func connectToEventStream() async throws { - guard isConnected else { return } - - var request = URLRequest(url: endpoint) - request.httpMethod = "GET" - request.addValue("text/event-stream", forHTTPHeaderField: "Accept") - - // Add session ID if available - if let sessionID = sessionID { - request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") - } - - // Add Last-Event-ID header for resumability if available - if let lastEventID = lastEventID { - request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID") - } - - logger.debug("Starting SSE connection") - - // Create URLSession task for SSE - let (stream, response) = try await session.bytes(for: request) - - guard let httpResponse = response as? HTTPURLResponse else { - throw MCPError.internalError("Invalid HTTP response") + + // Add Last-Event-ID header for resumability if available + if let lastEventID = lastEventID { + request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID") + } + + logger.debug("Starting SSE connection") + + // Create URLSession task for SSE + let (stream, response) = try await session.bytes(for: request) + + guard let httpResponse = response as? HTTPURLResponse else { + throw MCPError.internalError("Invalid HTTP response") + } + + // Check response status + guard httpResponse.statusCode == 200 else { + // If the server returns 405 Method Not Allowed, + // it indicates that the server doesn't support SSE streaming. + // We should cancel the task instead of retrying the connection. + if httpResponse.statusCode == 405 { + self.streamingTask?.cancel() } - - // Check response status - guard httpResponse.statusCode == 200 else { - // If the server returns 405 Method Not Allowed, - // it indicates that the server doesn't support SSE streaming. - // We should cancel the task instead of retrying the connection. - if httpResponse.statusCode == 405 { - self.streamingTask?.cancel() + throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)") + } + + // Extract session ID if present + if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { + self.sessionID = newSessionID + } + + // Process the SSE stream + var buffer = "" + var eventType = "" + var eventID: String? + var eventData = "" + + for try await byte in stream { + if Task.isCancelled { break } + + guard let char = String(bytes: [byte], encoding: .utf8) else { continue } + buffer.append(char) + + // Process complete lines + while let newlineIndex = buffer.utf8.firstIndex(where: { $0 == 10 }) { + var line = buffer[.. Void]() + + private func addListener(_ named: EventName, handler: @escaping @isolated(any) (EventData) -> Void) { + listeners[named] = handler + } + }