Skip to content

Commit 8616c7a

Browse files
committed
init
1 parent 1e002ee commit 8616c7a

File tree

7 files changed

+133
-30
lines changed

7 files changed

+133
-30
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
enum ConnectionFeature: String, CaseIterable {
5+
case Reconnect = "reconnect"
6+
case Resend = "resend"
7+
case Disconnected = "disconnected"
8+
// Add more feature keys as needed
9+
}
10+
411
protocol ConnectionProtocol: AnyObject, Sendable {
512
func onReceive(_ handler: @escaping Transport.OnReceiveHandler) async
613
func onClose(_ handler: @escaping Transport.OnCloseHander) async
714
func start(transferFormat: TransferFormat) async throws
815
func send(_ data: StringOrData) async throws
916
func stop(error: Error?) async
1017
var inherentKeepAlive: Bool { get async }
18+
var features: [ConnectionFeature: Any] { get async }
19+
func setFeature(feature: ConnectionFeature, value: Any) async
1120
}

Sources/SignalRClient/HttpConnection.swift

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ actor HttpConnection: ConnectionProtocol {
8484
private var accessTokenFactory: (@Sendable () async throws -> String?)?
8585
private var inherentKeepAlivePrivate: Bool = false
8686

87-
public var features: [String: Any] = [:]
87+
public var features: [ConnectionFeature: Any] = [:]
8888
public var baseUrl: String
8989
public var connectionId: String?
9090
public var inherentKeepAlive: Bool {
@@ -359,9 +359,31 @@ actor HttpConnection: ConnectionProtocol {
359359

360360
private func startTransport(url: String, transferFormat: TransferFormat) async throws {
361361
await transport!.onReceive(self.onReceive)
362-
await transport!.onClose { [weak self] error in
363-
guard let self = self else { return }
364-
await self.handleConnectionClose(error: error)
362+
if (self.features[ConnectionFeature.Reconnect] as? Bool) == true {
363+
await transport!.onClose { [weak self] error in
364+
var callStop = false;
365+
guard let self = self else { return }
366+
if await (self.features[ConnectionFeature.Reconnect] as? Bool) == true {
367+
do {
368+
if let disconnectedHandler = await self.features[ConnectionFeature.Disconnected] as? (Error?) -> Void {
369+
disconnectedHandler(error);
370+
}
371+
try await self.transport?.connect(url: url, transferFormat: transferFormat);
372+
if let resendHandler = await self.features[ConnectionFeature.Resend] as? () -> Void {
373+
resendHandler();
374+
}
375+
} catch {
376+
callStop = true;
377+
}
378+
}
379+
else {
380+
await self.handleConnectionClose(error: error);
381+
return;
382+
}
383+
if callStop {
384+
await self.handleConnectionClose(error: error);
385+
}
386+
}
365387
}
366388

367389
do {
@@ -435,6 +457,12 @@ actor HttpConnection: ConnectionProtocol {
435457
if let useStatefulReconnect = options.useStatefulReconnect, useStatefulReconnect {
436458
queryItems.append(URLQueryItem(name: "useStatefulReconnect", value: "true"))
437459
}
460+
else {
461+
if (queryItems.first(where: { $0.name == "useStatefulReconnect" })?.value ?? "false") == "true" {
462+
options.useStatefulReconnect = true;
463+
}
464+
}
465+
438466
negotiateUrlComponents.queryItems = queryItems
439467
return negotiateUrlComponents.url!.absoluteString
440468
}
@@ -476,7 +504,7 @@ actor HttpConnection: ConnectionProtocol {
476504
let transferFormats = endpoint.transferFormats.compactMap { TransferFormat($0) }
477505
if transferFormats.contains(requestedTransferFormat) {
478506
do {
479-
features["reconnect"] = (transportType == .webSockets && useStatefulReconnect) ? true : nil
507+
self.features[ConnectionFeature.Reconnect] = (transportType == .webSockets && useStatefulReconnect) ? true : false;
480508
let constructedTransport = try await constructTransport(transport: transportType)
481509
return constructedTransport
482510
} catch {
@@ -511,4 +539,8 @@ actor HttpConnection: ConnectionProtocol {
511539
}
512540
return urlRequest
513541
}
542+
543+
public func setFeature(feature: ConnectionFeature, value: Any) async {
544+
features[feature] = value
545+
}
514546
}

Sources/SignalRClient/HubConnection.swift

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import Foundation
66
public actor HubConnection {
77
private static let defaultTimeout: TimeInterval = 30
88
private static let defaultPingInterval: TimeInterval = 15
9-
private static let defaultStatefulReconnectBufferSize: Int = 100_000_000 // bytes of messages
9+
private static let defaultStatefulReconnectBufferSize: Int = 100_000_000 // bytes of messages
1010

1111
private var invocationBinder: DefaultInvocationBinder
1212
private var invocationHandler: InvocationHandler
@@ -16,16 +16,16 @@ public actor HubConnection {
1616
private let logger: Logger
1717
private let hubProtocol: HubProtocol
1818
private let connection: ConnectionProtocol
19-
private let retryPolicy: RetryPolicy
19+
private let reconnectPolicy: RetryPolicy
2020
private let keepAliveScheduler: TimeScheduler
2121
private let serverTimeoutScheduler: TimeScheduler
2222
private let statefulReconnectBufferSize: Int
2323

2424
private var connectionStarted: Bool = false
2525
private var receivedHandshakeResponse: Bool = false
2626
private var invocationId: Int = 0
27+
private var messageBuffer: MessageBuffer? = nil
2728
private var connectionStatus: HubConnectionState = .Stopped
28-
private var stopping: Bool = false
2929
private var stopDuringStartError: Error?
3030
private nonisolated(unsafe) var handshakeResolver: ((HandshakeResponseMessage) -> Void)?
3131
private nonisolated(unsafe) var handshakeRejector: ((Error) -> Void)?
@@ -40,16 +40,17 @@ public actor HubConnection {
4040
internal init(connection: ConnectionProtocol,
4141
logger: Logger,
4242
hubProtocol: HubProtocol,
43-
retryPolicy: RetryPolicy,
43+
reconnectPolicy: RetryPolicy,
4444
serverTimeout: TimeInterval?,
4545
keepAliveInterval: TimeInterval?,
4646
statefulReconnectBufferSize: Int?) {
4747
self.serverTimeout = serverTimeout ?? HubConnection.defaultTimeout
4848
self.keepAliveInterval = keepAliveInterval ?? HubConnection.defaultPingInterval
49-
self.statefulReconnectBufferSize = statefulReconnectBufferSize ?? HubConnection.defaultStatefulReconnectBufferSize
49+
self.statefulReconnectBufferSize =
50+
statefulReconnectBufferSize ?? HubConnection.defaultStatefulReconnectBufferSize
5051

5152
self.logger = logger
52-
self.retryPolicy = retryPolicy
53+
self.reconnectPolicy = reconnectPolicy
5354

5455
self.connection = connection
5556
self.hubProtocol = hubProtocol
@@ -58,10 +59,12 @@ public actor HubConnection {
5859
self.invocationHandler = InvocationHandler()
5960
self.keepAliveScheduler = TimeScheduler(initialInterval: self.keepAliveInterval)
6061
self.serverTimeoutScheduler = TimeScheduler(initialInterval: self.serverTimeout)
62+
self.reconnectedHandlers = []
63+
self.reconnectingHandlers = []
6164
}
6265

6366
public func start() async throws {
64-
if (connectionStatus != .Stopped) {
67+
if connectionStatus != .Stopped {
6568
throw SignalRError.invalidOperation("Start client while not in a stopped state.")
6669
}
6770

@@ -77,7 +80,6 @@ public actor HubConnection {
7780
startSuccessfully = true
7881
} catch {
7982
connectionStatus = .Stopped
80-
stopping = false
8183
await keepAliveScheduler.stop()
8284
await serverTimeoutScheduler.stop()
8385
logger.log(level: .debug, message: "HubConnection start failed \(error)")
@@ -96,13 +98,14 @@ public actor HubConnection {
9698
}
9799

98100
// 2. Another stop is running, just wait for it
99-
if (stopping) {
101+
if connectionStatus == .Stopping {
100102
logger.log(level: .debug, message: "Connection is already stopping")
101103
await stopTask?.value
102104
return
103105
}
104106

105-
stopping = true
107+
connectionStatus = .Stopping
108+
await self.connection.setFeature(feature: ConnectionFeature.Reconnect, value: false)
106109

107110
// In this step, there's no other start running
108111
stopTask = Task {
@@ -291,6 +294,13 @@ public actor HubConnection {
291294

292295
private func stopInternal() async {
293296
if (connectionStatus == .Stopped) {
297+
logger.log(level: .debug,message:"Call to HubConnection.stop ignored because it is already in the disconnected state.")
298+
return
299+
}
300+
301+
if connectionStatus == .Stopping {
302+
logger.log(level: .debug,message:"Call to HubConnection.stop ignored because it is already in the stopping state.")
303+
await stopTask?.value
294304
return
295305
}
296306

@@ -325,7 +335,7 @@ public actor HubConnection {
325335
handshakeRejector!(SignalRError.connectionAborted)
326336
}
327337

328-
if (stopping) {
338+
if connectionStatus == .Connecting {
329339
await completeClose(error: error)
330340
return
331341
}
@@ -335,7 +345,7 @@ public actor HubConnection {
335345
// 2. Connected: In this case, we should reconnect
336346
// 3. Reconnecting: In this case, we're in the control of previous reconnect(), let that function handle the reconnection
337347

338-
if (connectionStatus == .Connected) {
348+
if connectionStatus == .Connected {
339349
do {
340350
try await reconnect(error: error)
341351
} catch {
@@ -351,13 +361,15 @@ public actor HubConnection {
351361
var lastError: Error? = error
352362

353363
// reconnect
354-
while let interval = retryPolicy.nextRetryInterval(retryContext: RetryContext(
355-
retryCount: retryCount,
356-
elapsed: elapsed,
357-
retryReason: lastError
358-
)) {
364+
while let interval = reconnectPolicy.nextRetryInterval(
365+
retryContext: RetryContext(
366+
retryCount: retryCount,
367+
elapsed: elapsed,
368+
retryReason: lastError
369+
))
370+
{
359371
try Task.checkCancellation()
360-
if (stopping) {
372+
if connectionStatus == .Stopping {
361373
break
362374
}
363375

@@ -380,7 +392,7 @@ public actor HubConnection {
380392
logger.log(level: .warning, message: "Connection reconnect failed: \(error)")
381393
}
382394

383-
if (stopping) {
395+
if connectionStatus == .Stopping {
384396
break
385397
}
386398

@@ -424,6 +436,17 @@ public actor HubConnection {
424436
do {
425437
let hubMessage = try hubProtocol.parseMessages(input: data!, binder: invocationBinder)
426438
for message in hubMessage {
439+
do {
440+
if let messageBuffer = self.messageBuffer {
441+
let shouldProcess = try await messageBuffer.shouldProcessMessage(message)
442+
if !shouldProcess {
443+
// Don't process the message, we are either waiting for a SequenceMessage or received a duplicate message
444+
continue
445+
}
446+
}
447+
} catch {
448+
logger.log(level: .error, message: "Error parsing messages: \(error)")
449+
}
427450
await dispatchMessage(message)
428451
}
429452
} catch {
@@ -499,7 +522,6 @@ public actor HubConnection {
499522

500523
private func completeClose(error: Error?) async {
501524
connectionStatus = .Stopped
502-
stopping = false
503525
await keepAliveScheduler.stop()
504526
await serverTimeoutScheduler.stop()
505527

@@ -513,7 +535,7 @@ public actor HubConnection {
513535
private func startInternal() async throws {
514536
try Task.checkCancellation()
515537

516-
guard stopping == false else {
538+
guard connectionStatus != .Stopping else {
517539
throw SignalRError.invalidOperation("Stopping is called")
518540
}
519541

@@ -532,6 +554,7 @@ public actor HubConnection {
532554
logger.log(level: .error, message: "Unsupported handshake version: \(version)")
533555
throw SignalRError.unsupportedHandshakeVersion
534556
}
557+
// TODO: enable version 2 when stateful reconnect is done
535558

536559
receivedHandshakeResponse = false
537560
let handshakeRequest = HandshakeRequestMessage(protocol: hubProtocol.name, version: version)
@@ -567,6 +590,23 @@ public actor HubConnection {
567590
}
568591
}
569592

593+
let useStatefulReconnect = await (self.connection.features[ConnectionFeature.Reconnect] as? Bool) == true
594+
if useStatefulReconnect {
595+
self.messageBuffer = MessageBuffer(
596+
hubProtocol: self.hubProtocol, connection: self.connection,
597+
bufferSize: self.statefulReconnectBufferSize)
598+
await self.connection.setFeature(
599+
feature: ConnectionFeature.Disconnected,
600+
value: { [weak self] () async -> Void in
601+
_ = try? await self?.messageBuffer?.disconnected()
602+
})
603+
await self.connection.setFeature(
604+
feature: ConnectionFeature.Resend,
605+
value: { [weak self] () async -> Any? in
606+
return try? await self?.messageBuffer?.resend()
607+
})
608+
}
609+
570610
let inherentKeepAlive = await connection.inherentKeepAlive
571611
if (!inherentKeepAlive) {
572612
await keepAliveScheduler.start {
@@ -808,6 +848,7 @@ public actor HubConnection {
808848
public enum HubConnectionState {
809849
// The connection is stopped. Start can only be called if the connection is in this state.
810850
case Stopped
851+
case Stopping
811852
case Connecting
812853
case Connected
813854
case Reconnecting

Sources/SignalRClient/HubConnectionBuilder.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public class HubConnectionBuilder {
103103
return HubConnection(connection: connection,
104104
logger: logger,
105105
hubProtocol: hubProtocol,
106-
retryPolicy: retryPolicy,
106+
reconnectPolicy: retryPolicy,
107107
serverTimeout: serverTimeout,
108108
keepAliveInterval: keepAliveInterval,
109109
statefulReconnectBufferSize: statefulReconnectBufferSize)

Sources/SignalRClient/MessageBuffer.swift

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import Foundation
55

66
actor MessageBuffer {
7+
private var hubProtocol: HubProtocol;
8+
private var connection: ConnectionProtocol;
9+
710
private var maxBufferSize: Int
811
private var messages: [BufferedItem] = []
912
private var bufferedByteCount: Int = 0
@@ -13,10 +16,28 @@ actor MessageBuffer {
1316
private var dequeueContinuations: [CheckedContinuation<Bool, Never>] = []
1417
private var closed: Bool = false
1518

16-
init(bufferSize: Int) {
19+
init(hubProtocol: HubProtocol, connection: ConnectionProtocol, bufferSize: Int) {
20+
self.hubProtocol = hubProtocol
21+
self.connection = connection
1722
self.maxBufferSize = bufferSize
1823
}
1924

25+
public func send(message: HubMessage) async throws -> Void {
26+
throw SignalRError.invalidOperation("Send is not implemented")
27+
}
28+
29+
public func resend() async throws -> Void {
30+
throw SignalRError.invalidOperation("Resend is not implemented")
31+
}
32+
33+
public func disconnected() async throws -> Void {
34+
throw SignalRError.invalidOperation("Disconnected is not implemented")
35+
}
36+
37+
public func shouldProcessMessage(_ message: HubMessage) throws -> Bool {
38+
throw SignalRError.invalidOperation("ShouldProcessMessage is not implemented")
39+
}
40+
2041
public func enqueue(content: StringOrData) async throws -> Void {
2142
if closed {
2243
throw SignalRError.invalidOperation("Message buffer has closed")

Tests/SignalRClientTests/HubConnection+OnResultTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ final class HubConnectionOnResultTests: XCTestCase {
2929
connection: mockConnection,
3030
logger: Logger(logLevel: .debug, logHandler: logHandler),
3131
hubProtocol: hubProtocol,
32-
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
32+
reconnectPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
3333
serverTimeout: nil,
3434
keepAliveInterval: nil,
3535
statefulReconnectBufferSize: nil

Tests/SignalRClientTests/HubConnection+OnTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ final class HubConnectionOnTests: XCTestCase {
2525
connection: mockConnection,
2626
logger: Logger(logLevel: .debug, logHandler: logHandler),
2727
hubProtocol: hubProtocol,
28-
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
28+
reconnectPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
2929
serverTimeout: nil,
3030
keepAliveInterval: nil,
3131
statefulReconnectBufferSize: nil

0 commit comments

Comments
 (0)