Skip to content

Commit 6a2e634

Browse files
authored
Merge encode, decode and pingreq MQTT channel handlers into one (#91)
1 parent 28dd96c commit 6a2e634

File tree

5 files changed

+176
-143
lines changed

5 files changed

+176
-143
lines changed

Sources/MQTTNIO/ChannelHandlers/MQTTEncoderHandler.swift

Lines changed: 0 additions & 26 deletions
This file was deleted.

Sources/MQTTNIO/ChannelHandlers/MQTTMessageDecoder.swift

Lines changed: 22 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -2,114 +2,48 @@ import Logging
22
import NIO
33

44
/// Decode ByteBuffers into MQTT Messages
5-
struct ByteToMQTTMessageDecoder: ByteToMessageDecoder {
5+
struct ByteToMQTTMessageDecoder: NIOSingleStepByteToMessageDecoder {
6+
mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> MQTTPacket? {
7+
try self.decode(buffer: &buffer)
8+
}
9+
610
typealias InboundOut = MQTTPacket
711

8-
let client: MQTTClient
12+
let version: MQTTClient.Version
913

10-
init(client: MQTTClient) {
11-
self.client = client
14+
init(version: MQTTClient.Version) {
15+
self.version = version
1216
}
1317

14-
mutating func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
15-
var readBuffer = buffer
18+
mutating func decode(buffer: inout ByteBuffer) throws -> MQTTPacket? {
19+
let origBuffer = buffer
1620
do {
17-
let packet = try MQTTIncomingPacket.read(from: &readBuffer)
21+
let packet = try MQTTIncomingPacket.read(from: &buffer)
1822
let message: MQTTPacket
1923
switch packet.type {
2024
case .PUBLISH:
21-
do {
22-
let publishMessage = try MQTTPublishPacket.read(version: self.client.configuration.version, from: packet)
23-
// let publish = try MQTTSerializer.readPublish(from: packet)
24-
// let publishMessage = MQTTPublishPacket(publish: publish.publishInfo, packetId: publish.packetId)
25-
self.client.logger.trace("MQTT In", metadata: [
26-
"mqtt_message": .string("\(publishMessage)"),
27-
"mqtt_packet_id": .string("\(publishMessage.packetId)"),
28-
"mqtt_topicName": .string("\(publishMessage.publish.topicName)"),
29-
])
30-
self.respondToPublish(publishMessage)
31-
} catch InternalError.incompletePacket {
32-
return .needMoreData
33-
} catch {
34-
self.client.publishListeners.notify(.failure(error))
35-
}
36-
buffer = readBuffer
37-
return .continue
25+
message = try MQTTPublishPacket.read(version: self.version, from: packet)
3826
case .CONNACK:
39-
message = try MQTTConnAckPacket.read(version: self.client.configuration.version, from: packet)
27+
message = try MQTTConnAckPacket.read(version: self.version, from: packet)
4028
case .PUBACK, .PUBREC, .PUBREL, .PUBCOMP:
41-
message = try MQTTPubAckPacket.read(version: self.client.configuration.version, from: packet)
42-
if packet.type == .PUBREL {
43-
self.respondToPubrel(message)
44-
}
29+
message = try MQTTPubAckPacket.read(version: self.version, from: packet)
4530
case .SUBACK, .UNSUBACK:
46-
message = try MQTTSubAckPacket.read(version: self.client.configuration.version, from: packet)
31+
message = try MQTTSubAckPacket.read(version: self.version, from: packet)
4732
case .PINGRESP:
48-
message = try MQTTPingrespPacket.read(version: self.client.configuration.version, from: packet)
33+
message = try MQTTPingrespPacket.read(version: self.version, from: packet)
4934
case .DISCONNECT:
50-
let disconnectMessage = try MQTTDisconnectPacket.read(version: self.client.configuration.version, from: packet)
51-
let ack = MQTTAckV5(reason: disconnectMessage.reason, properties: disconnectMessage.properties)
52-
context.fireErrorCaught(MQTTError.serverDisconnection(ack))
53-
context.close(promise: nil)
54-
buffer = readBuffer
55-
return .continue
35+
message = try MQTTDisconnectPacket.read(version: self.version, from: packet)
5636
case .AUTH:
57-
message = try MQTTAuthPacket.read(version: self.client.configuration.version, from: packet)
37+
message = try MQTTAuthPacket.read(version: self.version, from: packet)
5838
default:
5939
throw MQTTError.decodeError
6040
}
61-
self.client.logger.trace("MQTT In", metadata: ["mqtt_message": .string("\(message)"), "mqtt_packet_id": .string("\(message.packetId)")])
62-
context.fireChannelRead(wrapInboundOut(message))
41+
return message
6342
} catch InternalError.incompletePacket {
64-
return .needMoreData
43+
buffer = origBuffer
44+
return nil
6545
} catch {
66-
context.fireErrorCaught(error)
46+
throw error
6747
}
68-
buffer = readBuffer
69-
return .continue
70-
}
71-
72-
/// Respond to PUBLISH message
73-
func respondToPublish(_ message: MQTTPublishPacket) {
74-
guard let connection = client.connection else { return }
75-
switch message.publish.qos {
76-
case .atMostOnce:
77-
self.client.publishListeners.notify(.success(message.publish))
78-
79-
case .atLeastOnce:
80-
connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBACK, packetId: message.packetId))
81-
.map { _ in return message.publish }
82-
.whenComplete { self.client.publishListeners.notify($0) }
83-
84-
case .exactlyOnce:
85-
var publish = message.publish
86-
connection.sendMessage(MQTTPubAckPacket(type: .PUBREC, packetId: message.packetId)) { newMessage in
87-
guard newMessage.packetId == message.packetId else { return false }
88-
// if we receive a publish message while waiting for a PUBREL from broker then replace data to be published and retry PUBREC
89-
if newMessage.type == .PUBLISH, let publishMessage = newMessage as? MQTTPublishPacket {
90-
publish = publishMessage.publish
91-
throw MQTTError.retrySend
92-
}
93-
// if we receive anything but a PUBREL then throw unexpected message
94-
guard newMessage.type == .PUBREL else { throw MQTTError.unexpectedMessage }
95-
// now we have received the PUBREL we can process the published message. PUBCOMP is sent by `respondToPubrel`
96-
return true
97-
}
98-
.map { _ in return publish }
99-
.whenComplete { result in
100-
// do not report retrySend error
101-
if case .failure(let error) = result, case MQTTError.retrySend = error {
102-
return
103-
}
104-
self.client.publishListeners.notify(result)
105-
}
106-
}
107-
}
108-
109-
/// Respond to PUBREL message by sending PUBCOMP. Do this separate from `responeToPublish` as the broker might send
110-
/// multiple PUBREL messages, if the client is slow to respond
111-
func respondToPubrel(_ message: MQTTPacket) {
112-
guard let connection = client.connection else { return }
113-
_ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBCOMP, packetId: message.packetId))
11448
}
11549
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import Logging
2+
import NIO
3+
4+
class MQTTMessageHandler: ChannelDuplexHandler {
5+
typealias InboundIn = ByteBuffer
6+
typealias InboundOut = MQTTPacket
7+
typealias OutboundIn = MQTTPacket
8+
typealias OutboundOut = ByteBuffer
9+
10+
let client: MQTTClient
11+
let pingreqHandler: PingreqHandler?
12+
var decoder: NIOSingleStepByteToMessageProcessor<ByteToMQTTMessageDecoder>
13+
14+
init(_ client: MQTTClient, pingInterval: TimeAmount) {
15+
self.client = client
16+
if client.configuration.disablePing {
17+
self.pingreqHandler = nil
18+
} else {
19+
self.pingreqHandler = .init(client: client, timeout: pingInterval)
20+
}
21+
self.decoder = .init(.init(version: client.configuration.version))
22+
}
23+
24+
func channelActive(context: ChannelHandlerContext) {
25+
self.pingreqHandler?.start(context: context)
26+
context.fireChannelActive()
27+
}
28+
29+
func channelInactive(context: ChannelHandlerContext) {
30+
self.pingreqHandler?.stop()
31+
context.fireChannelInactive()
32+
}
33+
34+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
35+
let message = unwrapOutboundIn(data)
36+
self.client.logger.trace("MQTT Out", metadata: ["mqtt_message": .string("\(message)"), "mqtt_packet_id": .string("\(message.packetId)")])
37+
var bb = context.channel.allocator.buffer(capacity: 0)
38+
do {
39+
try message.write(version: self.client.configuration.version, to: &bb)
40+
context.write(wrapOutboundOut(bb), promise: promise)
41+
} catch {
42+
promise?.fail(error)
43+
}
44+
self.pingreqHandler?.write()
45+
}
46+
47+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
48+
let buffer = self.unwrapInboundIn(data)
49+
50+
do {
51+
try self.decoder.process(buffer: buffer) { message in
52+
switch message.type {
53+
case .PUBLISH:
54+
let publishMessage = message as! MQTTPublishPacket
55+
// publish logging includes topic name
56+
self.client.logger.trace("MQTT In", metadata: [
57+
"mqtt_message": .stringConvertible(publishMessage),
58+
"mqtt_packet_id": .stringConvertible(publishMessage.packetId),
59+
"mqtt_topicName": .string(publishMessage.publish.topicName),
60+
])
61+
self.respondToPublish(publishMessage)
62+
return
63+
64+
case .CONNACK, .PUBACK, .PUBREC, .PUBCOMP, .SUBACK, .UNSUBACK, .PINGRESP, .AUTH:
65+
context.fireChannelRead(wrapInboundOut(message))
66+
67+
case .PUBREL:
68+
self.respondToPubrel(message)
69+
context.fireChannelRead(wrapInboundOut(message))
70+
71+
case .DISCONNECT:
72+
let disconnectMessage = message as! MQTTDisconnectPacket
73+
let ack = MQTTAckV5(reason: disconnectMessage.reason, properties: disconnectMessage.properties)
74+
context.fireErrorCaught(MQTTError.serverDisconnection(ack))
75+
context.close(promise: nil)
76+
77+
case .CONNECT, .SUBSCRIBE, .UNSUBSCRIBE, .PINGREQ:
78+
context.fireErrorCaught(MQTTError.unexpectedMessage)
79+
context.close(promise: nil)
80+
self.client.logger.error("Unexpected MQTT Message", metadata: ["mqtt_message": .string("\(message)")])
81+
return
82+
}
83+
self.client.logger.trace("MQTT In", metadata: ["mqtt_message": .stringConvertible(message), "mqtt_packet_id": .stringConvertible(message.packetId)])
84+
}
85+
} catch {
86+
context.fireErrorCaught(error)
87+
context.close(promise: nil)
88+
self.client.logger.error("Error processing MQTT message", metadata: ["mqtt_error": .string("\(error)")])
89+
}
90+
}
91+
92+
func updatePingreqTimeout(_ timeout: TimeAmount) {
93+
self.pingreqHandler?.updateTimeout(timeout)
94+
}
95+
96+
/// Respond to PUBLISH message
97+
/// If QoS is `.atMostOnce` then no response is required
98+
/// If QoS is `.atLeastOnce` then send PUBACK
99+
/// If QoS is `.exactlyOnce` then send PUBREC, wait for PUBREL and then respond with PUBCOMP (in `respondToPubrel`)
100+
private func respondToPublish(_ message: MQTTPublishPacket) {
101+
guard let connection = client.connection else { return }
102+
switch message.publish.qos {
103+
case .atMostOnce:
104+
self.client.publishListeners.notify(.success(message.publish))
105+
106+
case .atLeastOnce:
107+
connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBACK, packetId: message.packetId))
108+
.map { _ in return message.publish }
109+
.whenComplete { self.client.publishListeners.notify($0) }
110+
111+
case .exactlyOnce:
112+
var publish = message.publish
113+
connection.sendMessage(MQTTPubAckPacket(type: .PUBREC, packetId: message.packetId)) { newMessage in
114+
guard newMessage.packetId == message.packetId else { return false }
115+
// if we receive a publish message while waiting for a PUBREL from broker then replace data to be published and retry PUBREC
116+
if newMessage.type == .PUBLISH, let publishMessage = newMessage as? MQTTPublishPacket {
117+
publish = publishMessage.publish
118+
throw MQTTError.retrySend
119+
}
120+
// if we receive anything but a PUBREL then throw unexpected message
121+
guard newMessage.type == .PUBREL else { throw MQTTError.unexpectedMessage }
122+
// now we have received the PUBREL we can process the published message. PUBCOMP is sent by `respondToPubrel`
123+
return true
124+
}
125+
.map { _ in return publish }
126+
.whenComplete { result in
127+
// do not report retrySend error
128+
if case .failure(let error) = result, case MQTTError.retrySend = error {
129+
return
130+
}
131+
self.client.publishListeners.notify(result)
132+
}
133+
}
134+
}
135+
136+
/// Respond to PUBREL message by sending PUBCOMP. Do this separate from `responeToPublish` as the broker might send
137+
/// multiple PUBREL messages, if the client is slow to respond
138+
private func respondToPubrel(_ message: MQTTPacket) {
139+
guard let connection = client.connection else { return }
140+
_ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBCOMP, packetId: message.packetId))
141+
}
142+
}

Sources/MQTTNIO/ChannelHandlers/PingreqHandler.swift

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import NIO
22

33
/// Channel handler for sending PINGREQ messages to keep connect alive
4-
final class PingreqHandler: ChannelDuplexHandler {
4+
final class PingreqHandler {
55
typealias OutboundIn = MQTTPacket
66
typealias OutboundOut = MQTTPacket
77
typealias InboundIn = MQTTPacket
@@ -23,26 +23,18 @@ final class PingreqHandler: ChannelDuplexHandler {
2323
self.timeout = timeout
2424
}
2525

26-
public func handlerAdded(context: ChannelHandlerContext) {
27-
if context.channel.isActive {
28-
self.scheduleTask(context)
29-
}
30-
}
31-
32-
public func handlerRemoved(context: ChannelHandlerContext) {
33-
self.cancelTask()
26+
func start(context: ChannelHandlerContext) {
27+
guard self.task == nil else { return }
28+
self.scheduleTask(context)
3429
}
3530

36-
public func channelActive(context: ChannelHandlerContext) {
37-
if self.task == nil {
38-
self.scheduleTask(context)
39-
}
40-
context.fireChannelActive()
31+
func stop() {
32+
self.task?.cancel()
33+
self.task = nil
4134
}
4235

43-
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
36+
func write() {
4437
self.lastEventTime = .now()
45-
context.write(data, promise: promise)
4638
}
4739

4840
func scheduleTask(_ context: ChannelHandlerContext) {
@@ -69,9 +61,4 @@ final class PingreqHandler: ChannelDuplexHandler {
6961
}
7062
}
7163
}
72-
73-
func cancelTask() {
74-
self.task?.cancel()
75-
self.task = nil
76-
}
7764
}

Sources/MQTTNIO/MQTTConnection.swift

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,10 @@ final class MQTTConnection {
3939
.connectTimeout(client.configuration.connectTimeout)
4040
.channelInitializer { channel in
4141
// Work out what handlers to add
42-
var handlers: [ChannelHandler] = [
43-
ByteToMessageHandler(ByteToMQTTMessageDecoder(client: client)),
42+
let handlers: [ChannelHandler] = [
43+
MQTTMessageHandler(client, pingInterval: pingInterval),
4444
taskHandler,
45-
MQTTEncodeHandler(client: client),
4645
]
47-
if !client.configuration.disablePing {
48-
handlers = [PingreqHandler(client: client, timeout: pingInterval)] + handlers
49-
}
5046
// are we using websockets
5147
if client.configuration.useWebSockets {
5248
// prepare for websockets and on upgrade add handlers
@@ -191,8 +187,8 @@ final class MQTTConnection {
191187
}
192188

193189
func updatePingreqTimeout(_ timeout: TimeAmount) {
194-
self.channel.pipeline.handler(type: PingreqHandler.self).whenSuccess { pingreq in
195-
pingreq.updateTimeout(timeout)
190+
self.channel.pipeline.handler(type: MQTTMessageHandler.self).whenSuccess { handler in
191+
handler.updatePingreqTimeout(timeout)
196192
}
197193
}
198194

0 commit comments

Comments
 (0)