Skip to content

Commit 8840d01

Browse files
committed
Remove AsyncHelpers usage
We don't want to use a type-erased sequence wrapper for many reasons. Performance, non-throwing-ness,…
1 parent f7dfa7a commit 8840d01

File tree

5 files changed

+35
-28
lines changed

5 files changed

+35
-28
lines changed

Package.swift

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,12 @@ let package = Package(
3030
.package(url: "https://github.com/apple/swift-log.git", .upToNextMajor(from: "1.5.2")),
3131
// AsyncChannel with backpressure
3232
.package(url: "https://github.com/apple/swift-async-algorithms", branch: "main"),
33-
// Helpers for async/await
34-
.package(url: "https://github.com/stairtree/async-helpers.git", branch: "main"),
3533
],
3634
targets: [
3735
.target(
3836
name: "StructuredWebSocketClient",
3937
dependencies: [
4038
.product(name: "Logging", package: "swift-log"),
41-
.product(name: "AsyncHelpers", package: "async-helpers"),
4239
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
4340
]),
4441
.target(

Sources/StructuredWebSocketClient/MessageTransport.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
import Foundation
1515
import Logging
16-
import AsyncHelpers
1716
import AsyncAlgorithms
1817
#if canImport(FoundationNetworking)
1918
import FoundationNetworking

Sources/StructuredWebSocketClient/URLSession+MessageTransport.swift

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
import Foundation
1515
import Logging
16-
import AsyncHelpers
1716
import AsyncAlgorithms
1817
#if canImport(FoundationNetworking)
1918
import FoundationNetworking
@@ -52,6 +51,12 @@ public final class URLSessionWebSocketTransport: MessageTransport {
5251
}
5352

5453
public func close(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
54+
// If the task is already closed, we need to call onClose, as that is
55+
// the only way the events channel is finished.
56+
guard wsTask.closeCode == .invalid else {
57+
Task { await self.onClose(closeCode: closeCode, reason: reason) }
58+
return
59+
}
5560
wsTask.cancel(with: closeCode, reason: reason)
5661
}
5762

@@ -89,6 +94,11 @@ public final class URLSessionWebSocketTransport: MessageTransport {
8994
WebSocketClient did complete with error (code: \(nsError.code), reason: \(reason))
9095
""")
9196
await self.events.send(.failure(nsError))
97+
// If the task is already closed, we need to call onClose, as that is
98+
// the only way the events channel is finished.
99+
if wsTask.closeCode != .invalid {
100+
await self.onClose(closeCode: .abnormalClosure, reason: Data(reason.utf8))
101+
}
92102
}
93103

94104
private func readNextMessage() {
@@ -110,6 +120,7 @@ public final class URLSessionWebSocketTransport: MessageTransport {
110120
self?.readNextMessage()
111121
} catch {
112122
await self?.events.send(.failure(error))
123+
await self?.onClose(closeCode: .abnormalClosure, reason: Data(error.localizedDescription.utf8))
113124
}
114125
}
115126
}

Sources/StructuredWebSocketClient/WebSocketClient.swift

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import Foundation
1515
import Logging
16-
import AsyncHelpers
16+
import AsyncAlgorithms
1717
#if canImport(FoundationNetworking)
1818
import FoundationNetworking
1919
#endif
@@ -50,28 +50,34 @@ public final class WebSocketClient {
5050
self.logger.trace("♻️ Deinit of WebSocketClient")
5151
}
5252

53-
public func connect() -> AnyAsyncSequence<WebSocketEvent> {
53+
public func connect() -> AsyncCompactMapSequence<AsyncChannel<WebSocketEvent>, WebSocketEvent> {
5454
self.transport.connect().compactMap { e -> WebSocketEvent? in
5555
switch e {
5656
case let .state(s):
5757
return .state(s)
5858
case let .message(m, metadata: meta):
5959
// pipe message through middleware
6060
if let middleware = self.inboundMiddleware {
61-
let handled = try await middleware.handle(m, metadata: meta)
62-
switch handled {
63-
case .handled:
64-
return nil
65-
case let .unhandled(unhandled):
66-
return.message(unhandled, metadata: meta)
61+
do {
62+
let handled = try await middleware.handle(m, metadata: meta)
63+
switch handled {
64+
case .handled:
65+
return nil
66+
case let .unhandled(unhandled):
67+
return.message(unhandled, metadata: meta)
68+
}
69+
} catch {
70+
// In case the middleware throws when handling the message
71+
// We could also just pretend it was unhandled.
72+
return .failure(error)
6773
}
6874
} else {
6975
return .message(m, metadata: meta)
7076
}
7177
case let .failure(error):
7278
return .failure(error)
7379
}
74-
}.eraseToAnyAsyncSequence()
80+
}
7581
}
7682

7783
public func disconnect(reason: String?) async {

Sources/StructuredWebSocketClientTestSupport/TestMessageTransport.swift

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//===----------------------------------------------------------------------===//
1313

1414
import Foundation
15-
import AsyncHelpers
1615
import AsyncAlgorithms
1716
import StructuredWebSocketClient
1817
#if canImport(FoundationNetworking)
@@ -21,8 +20,8 @@ import FoundationNetworking
2120

2221
public final class TestMessageTransport: MessageTransport {
2322
var messageNumber: Int = 0
23+
private var _events: AsyncChannel<WebSocketEvent> = .init()
2424
private var events: AsyncChannel<WebSocketEvent> = .init()
25-
private var awaiter: Awaiter = .init()
2625
private var _initialMessages: [WebSocketEvent]
2726
private var _onMessage: (URLSessionWebSocketTask.Message, TestMessageTransport) async throws -> Void
2827

@@ -31,32 +30,27 @@ public final class TestMessageTransport: MessageTransport {
3130
_onMessage = onMessage
3231
}
3332

34-
// will wait until state is connected
3533
public func push(_ event: WebSocketEvent) async {
36-
await awaiter.awaitUntilTriggered {
37-
await self.events.send(event)
38-
}
34+
await self._events.send(event)
3935
}
4036

4137
public func send(_ message: URLSessionWebSocketTask.Message) async throws {
4238
try await _onMessage(message, self)
43-
// print("Sending: \(String(decoding: try message.data(), as: UTF8.self))")
4439
}
4540

4641
public func close(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
4742
Task {
48-
await events.send(.state(.disconnected(closeCode: closeCode, reason: reason)))
49-
self.events.finish()
43+
await _events.send(.state(.disconnected(closeCode: closeCode, reason: reason)))
44+
self._events.finish()
5045
}
5146
}
5247

5348
public func connect() -> AsyncChannel<WebSocketEvent> {
54-
// if push is called before connect this will only send the messages after the return
55-
Task { await awaiter.trigger() }
56-
Task {
57-
for await event in chain([.state(.connected)].async, _initialMessages.async, events) {
58-
await events.send(event)
49+
Task { [unowned self] in
50+
for await event in chain([.state(.connected)].async, self._initialMessages.async, self._events) {
51+
await self.events.send(event)
5952
}
53+
self.events.finish()
6054
}
6155
return events
6256
}

0 commit comments

Comments
 (0)