Skip to content

Commit dab86c6

Browse files
committed
udpate message buffer and ut
1 parent 8616c7a commit dab86c6

9 files changed

+178
-65
lines changed

Sources/SignalRClient/ConnectionProtocol.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ enum ConnectionFeature: String, CaseIterable {
55
case Reconnect = "reconnect"
66
case Resend = "resend"
77
case Disconnected = "disconnected"
8-
// Add more feature keys as needed
98
}
109

1110
protocol ConnectionProtocol: AnyObject, Sendable {

Sources/SignalRClient/HttpConnection.swift

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -385,13 +385,11 @@ actor HttpConnection: ConnectionProtocol {
385385
}
386386
}
387387
}
388-
389-
do {
390-
try await transport!.connect(url: url, transferFormat: transferFormat)
391-
} catch {
392-
await transport!.onReceive(nil)
393-
await transport!.onClose(nil)
394-
throw error
388+
else {
389+
await transport!.onClose { [weak self] error in
390+
guard let self = self else { return }
391+
await self.handleConnectionClose(error: error)
392+
}
395393
}
396394
}
397395

Sources/SignalRClient/HubConnection.swift

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public actor HubConnection {
1616
private let logger: Logger
1717
private let hubProtocol: HubProtocol
1818
private let connection: ConnectionProtocol
19-
private let reconnectPolicy: RetryPolicy
19+
private let retryPolicy: RetryPolicy
2020
private let keepAliveScheduler: TimeScheduler
2121
private let serverTimeoutScheduler: TimeScheduler
2222
private let statefulReconnectBufferSize: Int
@@ -40,17 +40,16 @@ public actor HubConnection {
4040
internal init(connection: ConnectionProtocol,
4141
logger: Logger,
4242
hubProtocol: HubProtocol,
43-
reconnectPolicy: RetryPolicy,
43+
retryPolicy: RetryPolicy,
4444
serverTimeout: TimeInterval?,
4545
keepAliveInterval: TimeInterval?,
4646
statefulReconnectBufferSize: Int?) {
4747
self.serverTimeout = serverTimeout ?? HubConnection.defaultTimeout
4848
self.keepAliveInterval = keepAliveInterval ?? HubConnection.defaultPingInterval
49-
self.statefulReconnectBufferSize =
50-
statefulReconnectBufferSize ?? HubConnection.defaultStatefulReconnectBufferSize
49+
self.statefulReconnectBufferSize = statefulReconnectBufferSize ?? HubConnection.defaultStatefulReconnectBufferSize
5150

5251
self.logger = logger
53-
self.reconnectPolicy = reconnectPolicy
52+
self.retryPolicy = retryPolicy
5453

5554
self.connection = connection
5655
self.hubProtocol = hubProtocol
@@ -64,7 +63,7 @@ public actor HubConnection {
6463
}
6564

6665
public func start() async throws {
67-
if connectionStatus != .Stopped {
66+
if (connectionStatus != .Stopped) {
6867
throw SignalRError.invalidOperation("Start client while not in a stopped state.")
6968
}
7069

@@ -345,7 +344,7 @@ public actor HubConnection {
345344
// 2. Connected: In this case, we should reconnect
346345
// 3. Reconnecting: In this case, we're in the control of previous reconnect(), let that function handle the reconnection
347346

348-
if connectionStatus == .Connected {
347+
if (connectionStatus == .Connected) {
349348
do {
350349
try await reconnect(error: error)
351350
} catch {
@@ -361,13 +360,11 @@ public actor HubConnection {
361360
var lastError: Error? = error
362361

363362
// reconnect
364-
while let interval = reconnectPolicy.nextRetryInterval(
365-
retryContext: RetryContext(
366-
retryCount: retryCount,
367-
elapsed: elapsed,
368-
retryReason: lastError
369-
))
370-
{
363+
while let interval = retryPolicy.nextRetryInterval(retryContext: RetryContext(
364+
retryCount: retryCount,
365+
elapsed: elapsed,
366+
retryReason: lastError
367+
)) {
371368
try Task.checkCancellation()
372369
if connectionStatus == .Stopping {
373370
break
@@ -436,16 +433,11 @@ public actor HubConnection {
436433
do {
437434
let hubMessage = try hubProtocol.parseMessages(input: data!, binder: invocationBinder)
438435
for message in hubMessage {
439-
do {
440-
if let messageBuffer = self.messageBuffer {
441-
let shouldProcess = try await messageBuffer.shouldProcessMessage(message)
442-
if !shouldProcess {
443-
// Don't process the message, we are either waiting for a SequenceMessage or received a duplicate message
444-
continue
445-
}
436+
if let messageBuffer = self.messageBuffer {
437+
if !(try await messageBuffer.shouldProcessMessage(message)) {
438+
// Don't process the message, we are either waiting for a SequenceMessage or received a duplicate message
439+
continue
446440
}
447-
} catch {
448-
logger.log(level: .error, message: "Error parsing messages: \(error)")
449441
}
450442
await dispatchMessage(message)
451443
}
@@ -482,11 +474,18 @@ public actor HubConnection {
482474
case _ as CloseMessage:
483475
// Close
484476
break
485-
case _ as AckMessage:
486-
// TODO: In stateful reconnect
477+
case let message as AckMessage:
478+
let result = await self.messageBuffer?.ack(sequenceId: message.sequenceId);
479+
if (result == false) {
480+
logger.log(level: .warning, message: "Ack message received for sequenceId: \(message.sequenceId), but failed.")
481+
}
487482
break
488-
case _ as SequenceMessage:
489-
// TODO: In stateful reconnect
483+
case let message as SequenceMessage:
484+
if let messageBuffer = self.messageBuffer {
485+
await messageBuffer.resetSequenceMessage(message: message)
486+
} else {
487+
logger.log(level: .warning, message: "Sequence message received but no message buffer is available.")
488+
}
490489
break
491490
default:
492491
logger.log(level: .warning, message: "Unknown message type: \(message)")
@@ -554,7 +553,7 @@ public actor HubConnection {
554553
logger.log(level: .error, message: "Unsupported handshake version: \(version)")
555554
throw SignalRError.unsupportedHandshakeVersion
556555
}
557-
// TODO: enable version 2 when stateful reconnect is done
556+
// TODO: enable version 2 when stateful reconnect is ready
558557

559558
receivedHandshakeResponse = false
560559
let handshakeRequest = HandshakeRequestMessage(protocol: hubProtocol.name, version: version)
@@ -593,17 +592,19 @@ public actor HubConnection {
593592
let useStatefulReconnect = await (self.connection.features[ConnectionFeature.Reconnect] as? Bool) == true
594593
if useStatefulReconnect {
595594
self.messageBuffer = MessageBuffer(
596-
hubProtocol: self.hubProtocol, connection: self.connection,
597-
bufferSize: self.statefulReconnectBufferSize)
595+
bufferSize: self.statefulReconnectBufferSize,
596+
hubProtocol: self.hubProtocol,
597+
connection: self.connection
598+
)
598599
await self.connection.setFeature(
599600
feature: ConnectionFeature.Disconnected,
600601
value: { [weak self] () async -> Void in
601-
_ = try? await self?.messageBuffer?.disconnected()
602+
_ = await self?.messageBuffer?.disconnected()
602603
})
603604
await self.connection.setFeature(
604605
feature: ConnectionFeature.Resend,
605606
value: { [weak self] () async -> Any? in
606-
return try? await self?.messageBuffer?.resend()
607+
return try? await self?.messageBuffer?.resend()
607608
})
608609
}
609610

Sources/SignalRClient/HubConnectionBuilder.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public class HubConnectionBuilder {
103103
return HubConnection(connection: connection,
104104
logger: logger,
105105
hubProtocol: hubProtocol,
106-
reconnectPolicy: retryPolicy,
106+
retryPolicy: retryPolicy,
107107
serverTimeout: serverTimeout,
108108
keepAliveInterval: keepAliveInterval,
109109
statefulReconnectBufferSize: statefulReconnectBufferSize)

Sources/SignalRClient/MessageBuffer.swift

Lines changed: 115 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,38 +4,131 @@
44
import Foundation
55

66
actor MessageBuffer {
7-
private var hubProtocol: HubProtocol;
8-
private var connection: ConnectionProtocol;
9-
107
private var maxBufferSize: Int
118
private var messages: [BufferedItem] = []
129
private var bufferedByteCount: Int = 0
1310
private var totalMessageCount: Int = 0
1411
private var lastSendSequenceId: Int = 0
1512
private var nextSendIdx = 0
13+
private var nextReceivingIdx: Int64 = 1
14+
private var lastReceivedSequenceId: Int64 = 0
1615
private var dequeueContinuations: [CheckedContinuation<Bool, Never>] = []
1716
private var closed: Bool = false
17+
private var reconnectInprogress: Bool = false;
18+
private var waitForSequenceMessage: Bool = false;
19+
20+
private var ackTimerHandle: DispatchWorkItem?
21+
22+
private var hubProtocol: HubProtocol;
23+
private var connection: ConnectionProtocol;
1824

19-
init(hubProtocol: HubProtocol, connection: ConnectionProtocol, bufferSize: Int) {
25+
init(bufferSize: Int, hubProtocol: HubProtocol, connection: ConnectionProtocol) {
26+
self.maxBufferSize = bufferSize
2027
self.hubProtocol = hubProtocol
2128
self.connection = connection
22-
self.maxBufferSize = bufferSize
2329
}
2430

2531
public func send(message: HubMessage) async throws -> Void {
26-
throw SignalRError.invalidOperation("Send is not implemented")
32+
let serializedMessage = try self.hubProtocol.writeMessage(message: message);
33+
34+
try await self.enqueue(content: serializedMessage);
35+
36+
if (!self.reconnectInprogress) {
37+
do {
38+
try await self.connection.send(serializedMessage);
39+
}
40+
catch {
41+
self.disconnected();
42+
}
43+
}
2744
}
2845

2946
public func resend() async throws -> Void {
30-
throw SignalRError.invalidOperation("Resend is not implemented")
47+
let sequenceId = Int64(self.messages.count > 0 ? self.messages[0].id : self.totalMessageCount + 1);
48+
let serializedMessage = try self.hubProtocol.writeMessage(message: SequenceMessage(sequenceId: sequenceId));
49+
try await self.connection.send(serializedMessage);
50+
51+
let messages = self.messages;
52+
for element in messages {
53+
try await self.connection.send(element.content);
54+
}
55+
56+
self.reconnectInprogress = false;
57+
}
58+
59+
public func disconnected() -> Void {
60+
self.reconnectInprogress = true;
61+
self.waitForSequenceMessage = true;
62+
}
63+
64+
private func ackTimer() {
65+
guard ackTimerHandle == nil else {
66+
return
67+
}
68+
69+
let workItem = DispatchWorkItem { [weak self] in
70+
guard let self = self else { return }
71+
72+
Task {
73+
await self.performScheduledAck()
74+
}
75+
}
76+
77+
ackTimerHandle = workItem
78+
DispatchQueue.main.asyncAfter(deadline: .now() + 1.0, execute: workItem)
3179
}
3280

33-
public func disconnected() async throws -> Void {
34-
throw SignalRError.invalidOperation("Disconnected is not implemented")
81+
private func performScheduledAck() async {
82+
defer {
83+
// 在方法结束时清理定时器
84+
ackTimerHandle = nil
85+
}
86+
87+
do {
88+
if !reconnectInprogress {
89+
let ackMessage = AckMessage(
90+
sequenceId: lastReceivedSequenceId
91+
)
92+
93+
let serializedMessage = try hubProtocol.writeMessage(message: ackMessage)
94+
try await connection.send(serializedMessage)
95+
}
96+
} catch {
97+
// 忽略错误,连接关闭时不需要发送ACK
98+
}
3599
}
36100

37101
public func shouldProcessMessage(_ message: HubMessage) throws -> Bool {
38-
throw SignalRError.invalidOperation("ShouldProcessMessage is not implemented")
102+
if (self.waitForSequenceMessage) {
103+
if (message.type != .sequence) {
104+
return false;
105+
} else {
106+
self.waitForSequenceMessage = false;
107+
return true;
108+
}
109+
}
110+
111+
if !self.isInvocationMessage(message: message) {
112+
return true
113+
}
114+
115+
let currentId = self.nextReceivingIdx;
116+
self.nextReceivingIdx += 1;
117+
if currentId <= self.lastReceivedSequenceId{
118+
if currentId == self.lastReceivedSequenceId{
119+
// Should only hit this if we just reconnected and the server is sending
120+
// Messages it has buffered, which would mean it hasn't seen an Ack for these messages
121+
self.ackTimer();
122+
}
123+
// Ignore, this is a duplicate message
124+
return false;
125+
}
126+
self.lastReceivedSequenceId = currentId;
127+
128+
// Only start the timer for sending an Ack message when we have a message to ack. This also conveniently solves
129+
// timer throttling by not having a recursive timer, and by starting the timer via a network call (recv)
130+
self.ackTimer();
131+
return true;
39132
}
40133

41134
public func enqueue(content: StringOrData) async throws -> Void {
@@ -71,7 +164,7 @@ actor MessageBuffer {
71164
}
72165
}
73166

74-
public func ack(sequenceId: Int) throws -> Bool {
167+
public func ack(sequenceId: Int64) -> Bool {
75168
// It might be wrong ack or the ack of previous connection
76169
if (sequenceId <= 0 || sequenceId > lastSendSequenceId) {
77170
return false
@@ -137,6 +230,17 @@ actor MessageBuffer {
137230
}
138231
}
139232

233+
public func resetSequenceMessage(message: SequenceMessage) async {
234+
if message.sequenceId > self.nextReceivingIdx {
235+
// do not await stop
236+
Task {
237+
await self.connection.stop(error: SignalRError.invalidOperation("Received sequence message with sequenceId \(message.sequenceId) greater than nextReceivingIdx \(self.nextReceivingIdx)"))
238+
}
239+
return
240+
}
241+
self.nextReceivingIdx = message.sequenceId;
242+
}
243+
140244
private func isInvocationMessage(message: HubMessage) -> Bool {
141245
switch (message.type) {
142246
case .invocation, .streamItem, .completion, .streamInvocation, .cancelInvocation:

Tests/SignalRClientTests/HubConnection+OnResultTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ final class HubConnectionOnResultTests: XCTestCase {
2929
connection: mockConnection,
3030
logger: Logger(logLevel: .debug, logHandler: logHandler),
3131
hubProtocol: hubProtocol,
32-
reconnectPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
32+
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
3333
serverTimeout: nil,
3434
keepAliveInterval: nil,
3535
statefulReconnectBufferSize: nil

Tests/SignalRClientTests/HubConnection+OnTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ final class HubConnectionOnTests: XCTestCase {
2525
connection: mockConnection,
2626
logger: Logger(logLevel: .debug, logHandler: logHandler),
2727
hubProtocol: hubProtocol,
28-
reconnectPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
28+
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
2929
serverTimeout: nil,
3030
keepAliveInterval: nil,
3131
statefulReconnectBufferSize: nil

Tests/SignalRClientTests/HubConnectionTests.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class MockConnection: ConnectionProtocol, @unchecked Sendable {
1313
var onSend: ((StringOrData) -> Void)?
1414
var onStart: (() -> Void)?
1515
var onStop: ((Error?) -> Void)?
16+
var features: [ConnectionFeature : Any] = [:]
1617

1718
private(set) var startCalled = false
1819
private(set) var sendCalled = false
@@ -42,6 +43,10 @@ class MockConnection: ConnectionProtocol, @unchecked Sendable {
4243
func onClose(_ handler: @escaping @Sendable ((any Error)?) async -> Void) async {
4344
onClose = handler
4445
}
46+
47+
func setFeature(feature: SignalRClient.ConnectionFeature, value: Any) async {
48+
features[feature] = value
49+
}
4550
}
4651

4752
final class HubConnectionTests: XCTestCase {

0 commit comments

Comments
 (0)