Skip to content

Commit 0a8bb8f

Browse files
committed
update messageBuffer and fix test
1 parent 8616c7a commit 0a8bb8f

File tree

7 files changed

+160
-34
lines changed

7 files changed

+160
-34
lines changed

Sources/SignalRClient/HubConnection.swift

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public actor HubConnection {
1616
private let logger: Logger
1717
private let hubProtocol: HubProtocol
1818
private let connection: ConnectionProtocol
19-
private let reconnectPolicy: RetryPolicy
19+
private let retryPolicy: RetryPolicy
2020
private let keepAliveScheduler: TimeScheduler
2121
private let serverTimeoutScheduler: TimeScheduler
2222
private let statefulReconnectBufferSize: Int
@@ -40,7 +40,7 @@ public actor HubConnection {
4040
internal init(connection: ConnectionProtocol,
4141
logger: Logger,
4242
hubProtocol: HubProtocol,
43-
reconnectPolicy: RetryPolicy,
43+
retryPolicy: RetryPolicy,
4444
serverTimeout: TimeInterval?,
4545
keepAliveInterval: TimeInterval?,
4646
statefulReconnectBufferSize: Int?) {
@@ -50,7 +50,7 @@ public actor HubConnection {
5050
statefulReconnectBufferSize ?? HubConnection.defaultStatefulReconnectBufferSize
5151

5252
self.logger = logger
53-
self.reconnectPolicy = reconnectPolicy
53+
self.retryPolicy = retryPolicy
5454

5555
self.connection = connection
5656
self.hubProtocol = hubProtocol
@@ -361,7 +361,7 @@ public actor HubConnection {
361361
var lastError: Error? = error
362362

363363
// reconnect
364-
while let interval = reconnectPolicy.nextRetryInterval(
364+
while let interval = retryPolicy.nextRetryInterval(
365365
retryContext: RetryContext(
366366
retryCount: retryCount,
367367
elapsed: elapsed,
@@ -438,16 +438,15 @@ public actor HubConnection {
438438
for message in hubMessage {
439439
do {
440440
if let messageBuffer = self.messageBuffer {
441-
let shouldProcess = try await messageBuffer.shouldProcessMessage(message)
442-
if !shouldProcess {
441+
if !(try await messageBuffer.shouldProcessMessage(message)) {
443442
// Don't process the message, we are either waiting for a SequenceMessage or received a duplicate message
444443
continue
445444
}
446445
}
446+
await dispatchMessage(message)
447447
} catch {
448448
logger.log(level: .error, message: "Error parsing messages: \(error)")
449449
}
450-
await dispatchMessage(message)
451450
}
452451
} catch {
453452
logger.log(level: .error, message: "Error parsing messages: \(error)")
@@ -482,11 +481,23 @@ public actor HubConnection {
482481
case _ as CloseMessage:
483482
// Close
484483
break
485-
case _ as AckMessage:
486-
// TODO: In stateful reconnect
484+
case let message as AckMessage:
485+
do {
486+
let result = try await self.messageBuffer?.ack(sequenceId: Int(message.sequenceId));
487+
if (result == false) {
488+
logger.log(level: .warning, message: "Ack message received for sequenceId: \(message.sequenceId), but failed.")
489+
}
490+
}
491+
catch {
492+
logger.log(level: .error, message: "Ack message received for sequenceId: \(message.sequenceId), but failed with error: \(error)")
493+
}
487494
break
488-
case _ as SequenceMessage:
489-
// TODO: In stateful reconnect
495+
case let message as SequenceMessage:
496+
if let messageBuffer = self.messageBuffer {
497+
await messageBuffer.resetSequenceMessage(message: message)
498+
} else {
499+
logger.log(level: .warning, message: "Sequence message received but no message buffer is available.")
500+
}
490501
break
491502
default:
492503
logger.log(level: .warning, message: "Unknown message type: \(message)")
@@ -593,8 +604,7 @@ public actor HubConnection {
593604
let useStatefulReconnect = await (self.connection.features[ConnectionFeature.Reconnect] as? Bool) == true
594605
if useStatefulReconnect {
595606
self.messageBuffer = MessageBuffer(
596-
hubProtocol: self.hubProtocol, connection: self.connection,
597-
bufferSize: self.statefulReconnectBufferSize)
607+
bufferSize: self.statefulReconnectBufferSize, hubProtocol: self.hubProtocol, connection: self.connection)
598608
await self.connection.setFeature(
599609
feature: ConnectionFeature.Disconnected,
600610
value: { [weak self] () async -> Void in

Sources/SignalRClient/HubConnectionBuilder.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public class HubConnectionBuilder {
103103
return HubConnection(connection: connection,
104104
logger: logger,
105105
hubProtocol: hubProtocol,
106-
reconnectPolicy: retryPolicy,
106+
retryPolicy: retryPolicy,
107107
serverTimeout: serverTimeout,
108108
keepAliveInterval: keepAliveInterval,
109109
statefulReconnectBufferSize: statefulReconnectBufferSize)

Sources/SignalRClient/MessageBuffer.swift

Lines changed: 112 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,29 +13,123 @@ actor MessageBuffer {
1313
private var totalMessageCount: Int = 0
1414
private var lastSendSequenceId: Int = 0
1515
private var nextSendIdx = 0
16+
private var nextReceivingIdx: Int64 = 1
17+
private var latestReceivedSequenceId: Int64 = 0
1618
private var dequeueContinuations: [CheckedContinuation<Bool, Never>] = []
1719
private var closed: Bool = false
20+
private var reconnectInprogress: Bool = false;
21+
private var waitForSequenceMessage: Bool = false;
1822

19-
init(hubProtocol: HubProtocol, connection: ConnectionProtocol, bufferSize: Int) {
23+
private var ackTimerHandle: DispatchWorkItem?
24+
25+
26+
init(bufferSize: Int, hubProtocol: HubProtocol, connection: ConnectionProtocol) {
27+
self.maxBufferSize = bufferSize
2028
self.hubProtocol = hubProtocol
2129
self.connection = connection
22-
self.maxBufferSize = bufferSize
2330
}
2431

2532
public func send(message: HubMessage) async throws -> Void {
26-
throw SignalRError.invalidOperation("Send is not implemented")
33+
let serializedMessage = try self.hubProtocol.writeMessage(message: message);
34+
35+
try await self.enqueue(content: serializedMessage);
36+
37+
if (!self.reconnectInprogress) {
38+
do {
39+
try await self.connection.send(serializedMessage);
40+
}
41+
catch {
42+
self.disconnected();
43+
}
44+
}
2745
}
2846

2947
public func resend() async throws -> Void {
30-
throw SignalRError.invalidOperation("Resend is not implemented")
48+
let sequenceId = Int64(self.messages.count > 0 ? self.messages[0].id : self.totalMessageCount + 1);
49+
let serializedMessage = try self.hubProtocol.writeMessage(message: SequenceMessage(sequenceId: sequenceId));
50+
try await self.connection.send(serializedMessage);
51+
52+
let messages = self.messages;
53+
for element in messages {
54+
try await self.connection.send(element.content);
55+
}
56+
57+
self.reconnectInprogress = false;
3158
}
3259

33-
public func disconnected() async throws -> Void {
34-
throw SignalRError.invalidOperation("Disconnected is not implemented")
60+
public func disconnected() -> Void {
61+
self.reconnectInprogress = true;
62+
self.waitForSequenceMessage = true;
63+
}
64+
65+
private func ackTimer() {
66+
guard ackTimerHandle == nil else {
67+
return
68+
}
69+
70+
let workItem = DispatchWorkItem { [weak self] in
71+
guard let self = self else { return }
72+
73+
Task {
74+
await self.performScheduledAck()
75+
}
76+
}
77+
78+
ackTimerHandle = workItem
79+
DispatchQueue.main.asyncAfter(deadline: .now() + 1.0, execute: workItem)
80+
}
81+
82+
private func performScheduledAck() async {
83+
defer {
84+
// 在方法结束时清理定时器
85+
ackTimerHandle = nil
86+
}
87+
88+
do {
89+
if !reconnectInprogress {
90+
let ackMessage = AckMessage(
91+
sequenceId: latestReceivedSequenceId
92+
)
93+
94+
let serializedMessage = try hubProtocol.writeMessage(message: ackMessage)
95+
try await connection.send(serializedMessage)
96+
}
97+
} catch {
98+
// 忽略错误,连接关闭时不需要发送ACK
99+
}
35100
}
36101

37102
public func shouldProcessMessage(_ message: HubMessage) throws -> Bool {
38-
throw SignalRError.invalidOperation("ShouldProcessMessage is not implemented")
103+
if (self.waitForSequenceMessage) {
104+
if (message.type != .sequence) {
105+
return false;
106+
} else {
107+
self.waitForSequenceMessage = false;
108+
return true;
109+
}
110+
}
111+
112+
if !self.isInvocationMessage(message: message) {
113+
return true
114+
}
115+
116+
let currentId = self.nextReceivingIdx;
117+
self.nextReceivingIdx += 1;
118+
if currentId <= self.latestReceivedSequenceId{
119+
if currentId == self.latestReceivedSequenceId{
120+
// Should only hit this if we just reconnected and the server is sending
121+
// Messages it has buffered, which would mean it hasn't seen an Ack for these messages
122+
self.ackTimer();
123+
}
124+
// Ignore, this is a duplicate message
125+
return false;
126+
}
127+
self.latestReceivedSequenceId = currentId;
128+
129+
// Only start the timer for sending an Ack message when we have a message to ack. This also conveniently solves
130+
// timer throttling by not having a recursive timer, and by starting the timer via a network call (recv)
131+
self.ackTimer();
132+
return true;
39133
}
40134

41135
public func enqueue(content: StringOrData) async throws -> Void {
@@ -137,6 +231,17 @@ actor MessageBuffer {
137231
}
138232
}
139233

234+
public func resetSequenceMessage(message: SequenceMessage) async {
235+
if message.sequenceId > self.nextReceivingIdx {
236+
// do not await stop
237+
Task {
238+
await self.connection.stop(error: SignalRError.invalidOperation("Received sequence message with sequenceId \(message.sequenceId) greater than nextReceivingIdx \(self.nextReceivingIdx)"))
239+
}
240+
return
241+
}
242+
self.nextReceivingIdx = message.sequenceId;
243+
}
244+
140245
private func isInvocationMessage(message: HubMessage) -> Bool {
141246
switch (message.type) {
142247
case .invocation, .streamItem, .completion, .streamInvocation, .cancelInvocation:

Tests/SignalRClientTests/HubConnection+OnResultTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ final class HubConnectionOnResultTests: XCTestCase {
2929
connection: mockConnection,
3030
logger: Logger(logLevel: .debug, logHandler: logHandler),
3131
hubProtocol: hubProtocol,
32-
reconnectPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
32+
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
3333
serverTimeout: nil,
3434
keepAliveInterval: nil,
3535
statefulReconnectBufferSize: nil

Tests/SignalRClientTests/HubConnection+OnTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ final class HubConnectionOnTests: XCTestCase {
2525
connection: mockConnection,
2626
logger: Logger(logLevel: .debug, logHandler: logHandler),
2727
hubProtocol: hubProtocol,
28-
reconnectPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
28+
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
2929
serverTimeout: nil,
3030
keepAliveInterval: nil,
3131
statefulReconnectBufferSize: nil

Tests/SignalRClientTests/HubConnectionTests.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class MockConnection: ConnectionProtocol, @unchecked Sendable {
1313
var onSend: ((StringOrData) -> Void)?
1414
var onStart: (() -> Void)?
1515
var onStop: ((Error?) -> Void)?
16+
var features: [ConnectionFeature : Any] = [:]
1617

1718
private(set) var startCalled = false
1819
private(set) var sendCalled = false
@@ -42,6 +43,10 @@ class MockConnection: ConnectionProtocol, @unchecked Sendable {
4243
func onClose(_ handler: @escaping @Sendable ((any Error)?) async -> Void) async {
4344
onClose = handler
4445
}
46+
47+
func setFeature(feature: SignalRClient.ConnectionFeature, value: Any) async {
48+
features[feature] = value
49+
}
4550
}
4651

4752
final class HubConnectionTests: XCTestCase {

Tests/SignalRClientTests/MessageBufferTests.swift

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@ import XCTest
33
@testable import SignalRClient
44

55
class MessageBufferTest: XCTestCase {
6+
func getTestMessageBuffer(bufferSize: Int) -> MessageBuffer {
7+
return MessageBuffer(bufferSize: bufferSize,
8+
hubProtocol: JsonHubProtocol(),
9+
connection: MockConnection())
10+
}
11+
612
func testSendWithinBufferSize() async throws {
7-
let buffer = MessageBuffer(bufferSize: 100)
13+
let buffer = getTestMessageBuffer(bufferSize: 100)
814
let expectation = XCTestExpectation(description: "Should enqueue")
915
Task {
1016
try await buffer.enqueue(content: .string("data"))
@@ -14,7 +20,7 @@ class MessageBufferTest: XCTestCase {
1420
}
1521

1622
func testSendTriggersBackpressure() async throws {
17-
let buffer = MessageBuffer(bufferSize: 5)
23+
let buffer = getTestMessageBuffer(bufferSize: 5)
1824
let expectation1 = XCTestExpectation(description: "Should not enqueue")
1925
expectation1.isInverted = true
2026
let expectation2 = XCTestExpectation(description: "Should enqueue")
@@ -33,7 +39,7 @@ class MessageBufferTest: XCTestCase {
3339
}
3440

3541
func testBackPressureAndRelease() async throws {
36-
let buffer = MessageBuffer(bufferSize: 10)
42+
let buffer = getTestMessageBuffer(bufferSize: 10)
3743
try await buffer.enqueue(content: .string("1234567890"))
3844
async let eq1 = buffer.enqueue(content: .string("1"))
3945
async let eq2 = buffer.enqueue(content: .string("2"))
@@ -51,7 +57,7 @@ class MessageBufferTest: XCTestCase {
5157
}
5258

5359
func testBackPressureAndRelease2() async throws {
54-
let buffer = MessageBuffer(bufferSize: 10)
60+
let buffer = getTestMessageBuffer(bufferSize: 10)
5561
let expect1 = XCTestExpectation(description: "Should not release 1")
5662
expect1.isInverted = true
5763
let expect2 = XCTestExpectation(description: "Should not release 2")
@@ -93,7 +99,7 @@ class MessageBufferTest: XCTestCase {
9399
}
94100

95101
func testAckInvalidSequenceIdIgnored() async throws {
96-
let buffer = MessageBuffer(bufferSize: 100)
102+
let buffer = getTestMessageBuffer(bufferSize: 100)
97103
let rst = try await buffer.ack(sequenceId: 1) // without any send
98104
XCTAssertEqual(false, rst)
99105

@@ -104,7 +110,7 @@ class MessageBufferTest: XCTestCase {
104110
}
105111

106112
func testWaitToDequeueReturnsImmediatelyIfAvailable() async throws {
107-
let buffer = MessageBuffer(bufferSize: 100)
113+
let buffer = getTestMessageBuffer(bufferSize: 100)
108114
_ = try await buffer.enqueue(content: .string("msg"))
109115
let result = try await buffer.WaitToDequeue()
110116
XCTAssertTrue(result)
@@ -113,7 +119,7 @@ class MessageBufferTest: XCTestCase {
113119
}
114120

115121
func testWaitToDequeueFirst() async throws {
116-
let buffer = MessageBuffer(bufferSize: 100)
122+
let buffer = getTestMessageBuffer(bufferSize: 100)
117123
async let dqueue: Bool = try await buffer.WaitToDequeue()
118124
try await Task.sleep(for: .milliseconds(10))
119125

@@ -127,7 +133,7 @@ class MessageBufferTest: XCTestCase {
127133
}
128134

129135
func testMultipleDequeueWait() async throws {
130-
let buffer = MessageBuffer(bufferSize: 100)
136+
let buffer = getTestMessageBuffer(bufferSize: 100)
131137
async let dqueue1: Bool = try await buffer.WaitToDequeue()
132138
async let dqueue2: Bool = try await buffer.WaitToDequeue()
133139
try await Task.sleep(for: .milliseconds(10))
@@ -143,13 +149,13 @@ class MessageBufferTest: XCTestCase {
143149
}
144150

145151
func testTryDequeueReturnsNilIfEmpty() async throws {
146-
let buffer = MessageBuffer(bufferSize: 100)
152+
let buffer = getTestMessageBuffer(bufferSize: 100)
147153
let result = try await buffer.TryDequeue()
148154
XCTAssertNil(result)
149155
}
150156

151157
func testResetDequeueResetsCorrectly() async throws {
152-
let buffer = MessageBuffer(bufferSize: 100)
158+
let buffer = getTestMessageBuffer(bufferSize: 100)
153159
try await buffer.enqueue(content: .string("test1"))
154160
try await buffer.enqueue(content: .string("test2"))
155161
let t1 = try await buffer.TryDequeue()
@@ -172,7 +178,7 @@ class MessageBufferTest: XCTestCase {
172178
}
173179

174180
func testContinuousBackPressure() async throws {
175-
let buffer = MessageBuffer(bufferSize: 5)
181+
let buffer = getTestMessageBuffer(bufferSize: 5)
176182
var tasks: [Task<Void, any Error>] = []
177183
for i in 0..<100 {
178184
let task = Task {

0 commit comments

Comments
 (0)