From 510bbe0d40453056301b2c4247a4fc6efc83ee5f Mon Sep 17 00:00:00 2001 From: Abhash Kumar Singh Date: Mon, 9 Jun 2025 13:19:05 -0700 Subject: [PATCH 1/7] fix: serialize web socket client APIs --- .../Websocket/AppSyncWebSocketClient.swift | 48 +++++++++++-------- .../AppSyncWebSocketClientTests.swift | 25 ++++++++++ .../IntegrationTestAppTests/APIKeyTests.swift | 17 +++++-- .../AuthTokenTests.swift | 4 +- 4 files changed, 69 insertions(+), 25 deletions(-) diff --git a/Sources/AWSAppSyncApolloExtensions/Websocket/AppSyncWebSocketClient.swift b/Sources/AWSAppSyncApolloExtensions/Websocket/AppSyncWebSocketClient.swift index f727159..fc6aebb 100644 --- a/Sources/AWSAppSyncApolloExtensions/Websocket/AppSyncWebSocketClient.swift +++ b/Sources/AWSAppSyncApolloExtensions/Websocket/AppSyncWebSocketClient.swift @@ -79,36 +79,46 @@ public class AppSyncWebSocketClient: NSObject, ApolloWebSocket.WebSocketClient, } public func connect() { - AppSyncApolloLogger.debug("Calling Connect") - guard connection?.state != .running else { - AppSyncApolloLogger.debug("[AppSyncWebSocketClient] WebSocket is already in connecting state") - return - } - - subscribeToAppSyncResponse() - - Task { + taskQueue.async { [weak self] in + guard let self else { return } + AppSyncApolloLogger.debug("Calling Connect") + guard connection?.state != .running else { + AppSyncApolloLogger.debug("[AppSyncWebSocketClient] WebSocket is already in connecting state") + return + } + + subscribeToAppSyncResponse() + AppSyncApolloLogger.debug("[AppSyncWebSocketClient] Creating new connection and starting read") - self.connection = try await createWebSocketConnection() + self.connection = try await self.createWebSocketConnection() + // Perform reading from a WebSocket in a separate task recursively to avoid blocking the execution. - Task { await self.startReadMessage() } + Task { + await self.startReadMessage() + } + self.connection?.resume() } } public func disconnect(forceTimeout: TimeInterval?) { - AppSyncApolloLogger.debug("Calling Disconnect") - heartBeatMonitorCancellable?.cancel() - guard connection?.state == .running else { - AppSyncApolloLogger.debug("[AppSyncWebSocketClient] client should be in connected state to trigger disconnect") - return + taskQueue.async { [weak self] in + guard let self else { return } + AppSyncApolloLogger.debug("Calling Disconnect") + heartBeatMonitorCancellable?.cancel() + guard connection?.state == .running else { + AppSyncApolloLogger.debug("[AppSyncWebSocketClient] client should be in connected state to trigger disconnect") + return + } + + connection?.cancel(with: .goingAway, reason: nil) } - - connection?.cancel(with: .goingAway, reason: nil) } public func write(ping: Data, completion: (() -> Void)?) { - AppSyncApolloLogger.debug("Not called, not implemented.") + taskQueue.async { + AppSyncApolloLogger.debug("Not called, not implemented.") + } } public func write(string: String) { diff --git a/Tests/AWSAppSyncApolloExtensionsTests/Websocket/AppSyncWebSocketClientTests.swift b/Tests/AWSAppSyncApolloExtensionsTests/Websocket/AppSyncWebSocketClientTests.swift index 57d7790..75cc9a5 100644 --- a/Tests/AWSAppSyncApolloExtensionsTests/Websocket/AppSyncWebSocketClientTests.swift +++ b/Tests/AWSAppSyncApolloExtensionsTests/Websocket/AppSyncWebSocketClientTests.swift @@ -39,6 +39,31 @@ final class AppSyncWebSocketClientTests: XCTestCase { let webSocketClient = AppSyncWebSocketClient(endpointURL: endpoint, authorizer: MockAppSyncAuthorizer()) await verifyConnected(webSocketClient) } + + func testConnect_ConcurrentInvoke() async throws { + guard let endpoint = try localWebSocketServer?.start() else { + XCTFail("Local WebSocket server failed to start") + return + } + let webSocketClient = AppSyncWebSocketClient(endpointURL: endpoint, authorizer: MockAppSyncAuthorizer()) + let connectedExpectation = expectation(description: "WebSocket should received connected event only once") + connectedExpectation.expectedFulfillmentCount = 1 + let sink = webSocketClient.publisher.sink { event in + switch event { + case .connected: + connectedExpectation.fulfill() + default: + XCTFail("No other type of event should be received") + } + } + + for _ in 1...100 { + let task = Task { + webSocketClient.connect() + } + } + await fulfillment(of: [connectedExpectation], timeout: 5) + } func testDisconnect_didDisconnectFromRemote() async throws { var cancellables = Set() diff --git a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift index 5449809..f49fdaa 100644 --- a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift +++ b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift @@ -148,9 +148,13 @@ final class APIKeyTests: IntegrationTestBase { let websocket = AppSyncWebSocketClient(endpointURL: configuration.endpoint, authorizer: authorizer) let receivedConnection = expectation(description: "received connection") + receivedConnection.expectedFulfillmentCount = 200 + let receivedMaxSubscriptionsReachedError = expectation(description: "received MaxSubscriptionsReachedError") - receivedConnection.expectedFulfillmentCount = 100 + receivedMaxSubscriptionsReachedError.expectedFulfillmentCount = 5 + let sink = websocket.publisher.sink { event in + print("Received event: \(event)") if case .string(let message) = event { if message.contains("start_ack") { receivedConnection.fulfill() @@ -168,12 +172,15 @@ final class APIKeyTests: IntegrationTestBase { ) let client = ApolloClient(networkTransport: splitTransport, store: store) - for _ in 1...101 { - _ = client.subscribe(subscription: OnCreateSubscription()) { _ in - } + var cancellables = [Cancellable]() + for _ in 1...205 { + cancellables.append(client.subscribe(subscription: OnCreateSubscription()) { _ in }) } - await fulfillment(of: [receivedConnection, receivedMaxSubscriptionsReachedError], timeout: 10) + await fulfillment(of: [receivedConnection, receivedMaxSubscriptionsReachedError], timeout: 15) + for cancellable in cancellables { + cancellable.cancel() + } } } diff --git a/Tests/IntegrationTestApp/IntegrationTestAppTests/AuthTokenTests.swift b/Tests/IntegrationTestApp/IntegrationTestAppTests/AuthTokenTests.swift index 986b278..f65d78d 100644 --- a/Tests/IntegrationTestApp/IntegrationTestAppTests/AuthTokenTests.swift +++ b/Tests/IntegrationTestApp/IntegrationTestAppTests/AuthTokenTests.swift @@ -90,7 +90,7 @@ final class AuthTokenTests: IntegrationTestBase { let receivedDisconnectError = expectation(description: "received disconnect") receivedDisconnectError.assertForOverFulfill = false let sink = websocket.publisher.sink { event in - if case .error(let error) = event, error.localizedDescription.contains("Socket is not connected") { + if case .disconnected(_, _) = event { receivedDisconnectError.fulfill() } } @@ -99,8 +99,10 @@ final class AuthTokenTests: IntegrationTestBase { uploadingNetworkTransport: transport, webSocketNetworkTransport: webSocketTransport ) + let apolloCUPInvalidToken = ApolloClient(networkTransport: splitTransport, store: store) + try await Task.sleep(nanoseconds: 5 * 1_000_000_000) // 5 seconds await fulfillment(of: [receivedDisconnectError], timeout: 10) } From 07cef6235b7708686e57dcf00e5345d53be16a0b Mon Sep 17 00:00:00 2001 From: Abhash Kumar Singh Date: Mon, 9 Jun 2025 13:38:16 -0700 Subject: [PATCH 2/7] Update tests --- .../IntegrationTestAppTests/APIKeyTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift index f49fdaa..45849ad 100644 --- a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift +++ b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift @@ -148,7 +148,7 @@ final class APIKeyTests: IntegrationTestBase { let websocket = AppSyncWebSocketClient(endpointURL: configuration.endpoint, authorizer: authorizer) let receivedConnection = expectation(description: "received connection") - receivedConnection.expectedFulfillmentCount = 200 + receivedConnection.expectedFulfillmentCount = 100 let receivedMaxSubscriptionsReachedError = expectation(description: "received MaxSubscriptionsReachedError") receivedMaxSubscriptionsReachedError.expectedFulfillmentCount = 5 @@ -173,7 +173,7 @@ final class APIKeyTests: IntegrationTestBase { let client = ApolloClient(networkTransport: splitTransport, store: store) var cancellables = [Cancellable]() - for _ in 1...205 { + for _ in 1...105 { cancellables.append(client.subscribe(subscription: OnCreateSubscription()) { _ in }) } From cbcd386b5aa8751f125def07b5c902e18f733f1d Mon Sep 17 00:00:00 2001 From: Abhash Kumar Singh Date: Mon, 9 Jun 2025 13:39:04 -0700 Subject: [PATCH 3/7] Remove debug statements --- .../IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift index 45849ad..63b78c4 100644 --- a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift +++ b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift @@ -154,7 +154,6 @@ final class APIKeyTests: IntegrationTestBase { receivedMaxSubscriptionsReachedError.expectedFulfillmentCount = 5 let sink = websocket.publisher.sink { event in - print("Received event: \(event)") if case .string(let message) = event { if message.contains("start_ack") { receivedConnection.fulfill() From 6ffa8e6b71fa266259200fc6b117b8c69600a6e2 Mon Sep 17 00:00:00 2001 From: Abhash Kumar Singh Date: Mon, 9 Jun 2025 14:16:55 -0700 Subject: [PATCH 4/7] Update appsync subscription limit --- .../IntegrationTestAppTests/APIKeyTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift index 63b78c4..99603cf 100644 --- a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift +++ b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift @@ -148,7 +148,7 @@ final class APIKeyTests: IntegrationTestBase { let websocket = AppSyncWebSocketClient(endpointURL: configuration.endpoint, authorizer: authorizer) let receivedConnection = expectation(description: "received connection") - receivedConnection.expectedFulfillmentCount = 100 + receivedConnection.expectedFulfillmentCount = 200 let receivedMaxSubscriptionsReachedError = expectation(description: "received MaxSubscriptionsReachedError") receivedMaxSubscriptionsReachedError.expectedFulfillmentCount = 5 @@ -172,7 +172,7 @@ final class APIKeyTests: IntegrationTestBase { let client = ApolloClient(networkTransport: splitTransport, store: store) var cancellables = [Cancellable]() - for _ in 1...105 { + for _ in 1...205 { cancellables.append(client.subscribe(subscription: OnCreateSubscription()) { _ in }) } From d25cb13103d8c24b6d0984de3d8553a68053073f Mon Sep 17 00:00:00 2001 From: Abhash Kumar Singh Date: Mon, 9 Jun 2025 14:41:29 -0700 Subject: [PATCH 5/7] Increase timeout limit --- .../IntegrationTestAppTests/APIKeyTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift index 99603cf..249ca1f 100644 --- a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift +++ b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift @@ -176,7 +176,7 @@ final class APIKeyTests: IntegrationTestBase { cancellables.append(client.subscribe(subscription: OnCreateSubscription()) { _ in }) } - await fulfillment(of: [receivedConnection, receivedMaxSubscriptionsReachedError], timeout: 15) + await fulfillment(of: [receivedConnection, receivedMaxSubscriptionsReachedError], timeout: 30) for cancellable in cancellables { cancellable.cancel() } From 60de40d539b32f8409a7536d665eef893383abfd Mon Sep 17 00:00:00 2001 From: Abhash Kumar Singh Date: Mon, 9 Jun 2025 15:03:29 -0700 Subject: [PATCH 6/7] add debug statements --- .../IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift index 249ca1f..3069346 100644 --- a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift +++ b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift @@ -154,6 +154,7 @@ final class APIKeyTests: IntegrationTestBase { receivedMaxSubscriptionsReachedError.expectedFulfillmentCount = 5 let sink = websocket.publisher.sink { event in + print("Received event: \(event)") if case .string(let message) = event { if message.contains("start_ack") { receivedConnection.fulfill() From 2a94a41b9330b5e1f6aa63a3221bb628ef370a71 Mon Sep 17 00:00:00 2001 From: Abhash Kumar Singh Date: Mon, 9 Jun 2025 16:49:58 -0700 Subject: [PATCH 7/7] Fix failing integration test --- .../IntegrationTestAppTests/APIKeyTests.swift | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift index 3069346..61756fb 100644 --- a/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift +++ b/Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift @@ -138,6 +138,9 @@ final class APIKeyTests: IntegrationTestBase { } func testMaxSubscriptionReached() async throws { + let subscriptionLimit = 200 + let failedSubscriptionCount = 5 + let configuration = try AWSAppSyncConfiguration(with: .amplifyOutputs) let store = ApolloStore(cache: InMemoryNormalizedCache()) let authorizer = APIKeyAuthorizer(apiKey: configuration.apiKey ?? "") @@ -148,13 +151,12 @@ final class APIKeyTests: IntegrationTestBase { let websocket = AppSyncWebSocketClient(endpointURL: configuration.endpoint, authorizer: authorizer) let receivedConnection = expectation(description: "received connection") - receivedConnection.expectedFulfillmentCount = 200 + receivedConnection.expectedFulfillmentCount = subscriptionLimit let receivedMaxSubscriptionsReachedError = expectation(description: "received MaxSubscriptionsReachedError") - receivedMaxSubscriptionsReachedError.expectedFulfillmentCount = 5 + receivedMaxSubscriptionsReachedError.expectedFulfillmentCount = failedSubscriptionCount let sink = websocket.publisher.sink { event in - print("Received event: \(event)") if case .string(let message) = event { if message.contains("start_ack") { receivedConnection.fulfill() @@ -171,16 +173,18 @@ final class APIKeyTests: IntegrationTestBase { webSocketNetworkTransport: webSocketTransport ) let client = ApolloClient(networkTransport: splitTransport, store: store) - + + try await Task.sleep(nanoseconds: 5 * 1_000_000_000) // 5 seconds + var cancellables = [Cancellable]() - for _ in 1...205 { + for _ in 1...subscriptionLimit + failedSubscriptionCount { cancellables.append(client.subscribe(subscription: OnCreateSubscription()) { _ in }) } - - await fulfillment(of: [receivedConnection, receivedMaxSubscriptionsReachedError], timeout: 30) + + await fulfillment(of: [receivedConnection, receivedMaxSubscriptionsReachedError], timeout: 10) + for cancellable in cancellables { cancellable.cancel() } } - }