Skip to content

Commit f227fc2

Browse files
committed
Added publish and close listeners
1 parent ae886f0 commit f227fc2

File tree

4 files changed

+69
-20
lines changed

4 files changed

+69
-20
lines changed

Sources/MQTTNIO/MQTTChannelHandlers.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ struct ByteToMQTTMessageDecoder: ByteToMessageDecoder {
4545
} catch MQTTSerializer.Error.incompletePacket {
4646
return .needMoreData
4747
} catch {
48-
self.client.publishCallback(.failure(error))
48+
self.client.publishListeners.notify(.failure(error))
4949
}
5050
return .continue
5151
case .CONNACK:
@@ -72,11 +72,11 @@ struct ByteToMQTTMessageDecoder: ByteToMessageDecoder {
7272
func publish(_ message: MQTTPublishMessage) {
7373
switch message.publish.qos {
7474
case .atMostOnce:
75-
client.publishCallback(.success(message.publish))
75+
client.publishListeners.notify(.success(message.publish))
7676
case .atLeastOnce:
7777
client.connection!.sendMessageNoWait(MQTTAckMessage(type: .PUBACK, packetId: message.packetId))
7878
.map { _ in return message.publish }
79-
.whenComplete { self.client.publishCallback($0) }
79+
.whenComplete { self.client.publishListeners.notify($0) }
8080
case .exactlyOnce:
8181
client.connection!.sendMessage(MQTTAckMessage(type: .PUBREC, packetId: message.packetId)) { newMessage in
8282
guard newMessage.packetId == message.packetId else { return false }
@@ -87,7 +87,7 @@ struct ByteToMQTTMessageDecoder: ByteToMessageDecoder {
8787
self.client.connection!.sendMessageNoWait(MQTTAckMessage(type: .PUBCOMP, packetId: message.packetId))
8888
}
8989
.map { _ in return message.publish }
90-
.whenComplete { self.client.publishCallback($0) }
90+
.whenComplete { self.client.publishListeners.notify($0) }
9191
}
9292

9393
}

Sources/MQTTNIO/MQTTClient.swift

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ final public class MQTTClient {
3232
var logger: Logger
3333
/// Client configuration
3434
let configuration: Configuration
35-
/// Called whenever a publish event occurs
36-
let publishCallback: (Result<MQTTPublishInfo, Swift.Error>) -> ()
3735

3836
/// Connection client is using
3937
var connection: MQTTConnection?
@@ -90,8 +88,7 @@ final public class MQTTClient {
9088
port: Int? = nil,
9189
eventLoopGroupProvider: NIOEventLoopGroupProvider,
9290
logger: Logger? = nil,
93-
configuration: Configuration = Configuration(),
94-
publishCallback: @escaping (Result<MQTTPublishInfo, Swift.Error>) -> () = { _ in }
91+
configuration: Configuration = Configuration()
9592
) {
9693
self.host = host
9794
if let port = port {
@@ -109,7 +106,6 @@ final public class MQTTClient {
109106
}
110107
}
111108
self.configuration = configuration
112-
self.publishCallback = publishCallback
113109
self.connection = nil
114110
self.logger = logger ?? Self.loggingDisabled
115111
self.eventLoopGroupProvider = eventLoopGroupProvider
@@ -153,7 +149,8 @@ final public class MQTTClient {
153149
return MQTTConnection.create(client: self, pingInterval: pingInterval)
154150
.flatMap { connection -> EventLoopFuture<MQTTInboundMessage> in
155151
self.connection = connection
156-
connection.closeFuture.whenComplete { _ in
152+
connection.closeFuture.whenComplete { result in
153+
self.closeListeners.notify(result)
157154
self.connection = nil
158155
}
159156
// attach client identifier to logger
@@ -259,6 +256,30 @@ final public class MQTTClient {
259256
return future ?? self.eventLoopGroup.next().makeSucceededFuture(())
260257
}
261258
}
259+
260+
/// Add named publish listener. Called whenever a PUBLISH message is received from the server
261+
public func addPublishListener(named name: String, _ listener: @escaping (Result<MQTTPublishInfo, Swift.Error>) -> ()) {
262+
publishListeners.addListener(named: name, listener: listener)
263+
}
264+
265+
/// Remove named publish listener
266+
public func removePublishListener(named name: String) {
267+
publishListeners.removeListener(named: name)
268+
}
269+
270+
/// Add close listener. Called whenever the connection is closed
271+
public func addCloseListener(named name: String, _ listener: @escaping (Result<Void, Swift.Error>) -> ()) {
272+
closeListeners.addListener(named: name, listener: listener)
273+
}
274+
275+
/// Remove named close listener
276+
public func removeCloseListener(named name: String) {
277+
closeListeners.removeListener(named: name)
278+
}
279+
280+
281+
var publishListeners = MQTTListeners<MQTTPublishInfo>()
282+
var closeListeners = MQTTListeners<Void>()
262283
}
263284

264285
extension Logger {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import NIO
2+
import NIOConcurrencyHelpers
3+
4+
struct MQTTListeners<ReturnType> {
5+
typealias Listener = (Result<ReturnType, Error>) -> ()
6+
7+
8+
func notify(_ result: Result<ReturnType, Error>) {
9+
lock.withLock {
10+
listeners.values.forEach { listener in
11+
listener(result)
12+
}
13+
}
14+
}
15+
16+
mutating func addListener(named name: String, listener: @escaping Listener) {
17+
lock.withLock {
18+
listeners[name] = listener
19+
}
20+
}
21+
22+
mutating func removeListener(named name: String) {
23+
lock.withLock {
24+
listeners[name] = nil
25+
}
26+
}
27+
28+
private let lock = Lock()
29+
private var listeners: [String: Listener] = [:]
30+
}

Tests/MQTTNIOTests/MQTTNIOTests.swift

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,26 +41,24 @@ final class MQTTNIOTests: XCTestCase {
4141
-----END CERTIFICATE-----
4242
"""
4343

44-
func createClient(cb: @escaping (Result<MQTTPublishInfo, Swift.Error>) -> () = { _ in }) -> MQTTClient {
44+
func createClient() -> MQTTClient {
4545
MQTTClient(
4646
host: "test.mosquitto.org",
4747
port: 1883,
4848
eventLoopGroupProvider: .createNew,
49-
logger: self.logger,
50-
publishCallback: cb
49+
logger: self.logger
5150
)
5251
}
53-
func createWebSocketClient(cb: @escaping (Result<MQTTPublishInfo, Swift.Error>) -> () = { _ in }) -> MQTTClient {
52+
func createWebSocketClient() -> MQTTClient {
5453
MQTTClient(
5554
host: "test.mosquitto.org",
5655
port: 8080,
5756
eventLoopGroupProvider: .createNew,
5857
logger: self.logger,
59-
configuration: .init(useWebSockets: true, webSocketURLPath: "/mqtt"),
60-
publishCallback: cb
58+
configuration: .init(useWebSockets: true, webSocketURLPath: "/mqtt")
6159
)
6260
}
63-
func createSSLClient(cb: @escaping (Result<MQTTPublishInfo, Swift.Error>) -> () = { _ in }) throws -> MQTTClient {
61+
func createSSLClient() throws -> MQTTClient {
6462
let rootCertificate = try NIOSSLCertificate.fromPEMBytes([UInt8](mosquittoCertificate.utf8))
6563
let tlsConfiguration: TLSConfiguration? = TLSConfiguration.forClient(
6664
trustRoots: .certificates(rootCertificate)
@@ -70,8 +68,7 @@ final class MQTTNIOTests: XCTestCase {
7068
port: 8883,
7169
eventLoopGroupProvider: .createNew,
7270
logger: self.logger,
73-
configuration: .init(useSSL: true, tlsConfiguration: tlsConfiguration),
74-
publishCallback: cb
71+
configuration: .init(useSSL: true, tlsConfiguration: tlsConfiguration)
7572
)
7673
}
7774

@@ -178,7 +175,8 @@ final class MQTTNIOTests: XCTestCase {
178175
)
179176
let client = self.createWebSocketClient()
180177
try connect(to: client, identifier: "soto_publisher")
181-
let client2 = self.createWebSocketClient() { result in
178+
let client2 = self.createWebSocketClient()
179+
client2.addPublishListener(named: "test") { result in
182180
switch result {
183181
case .success(let publish):
184182
var buffer = publish.payload

0 commit comments

Comments
 (0)