Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: more code cleanup surrounding client.end callbacks #1629

Merged
merged 5 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 30 additions & 38 deletions test/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ describe('MqttClient', function () {
})

describe('message ids', function () {
it('should increment the message id', function () {
it('should increment the message id', function (done) {
client = mqtt.connect(config)
const currentId = client._nextId()

assert.equal(client._nextId(), currentId + 1)
client.end()
client.end((err) => done(err))
})

it('should not throw an error if packet\'s messageId is not found when receiving a pubrel packet', function (done) {
Expand All @@ -83,9 +83,11 @@ describe('MqttClient', function () {

client.on('packetsend', function (packet) {
if (packet.cmd === 'pubcomp') {
client.end()
server2.close()
done()
client.end((err1) => {
server2.close((err2) => {
done(err1 || err2)
})
})
}
})
})
Expand All @@ -108,6 +110,9 @@ describe('MqttClient', function () {

client.on('message', function (t, p, packet) {
if (++count === max) {
// BUGBUG: the client.end callback never gets called here
// client.end((err) => done(err))
Comment on lines +113 to +114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use mqttjs in lot of my projects and this happend to me too, I always create a MqttClient wrapper library that handles all client stuff, this is how I handle the close usually: https://github.com/zwave-js/zwave-js-ui/blob/master/lib/MqttClient.ts#L154. I think this could happen when for some reason underlig socket is already closed/destroyed or when store cannot be closed for some reason

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No clue if this could help: #713

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I merged #713 into a local branch and it didn't resolve this issue. It is an interesting change though.

One of my goals with these test changes is to get the behavior of client.end well tested before we make any changes to it. Right now, it's just too hard to reason about how client.end might behave and I'm afraid to make any changes to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree! Tests are essentials if we want to make improvements to the code

client.end()
done()
}
})
Expand Down Expand Up @@ -143,9 +148,9 @@ describe('MqttClient', function () {
describe('flushing', function () {
it('should attempt to complete pending unsub and send on ping timeout', function (done) {
this.timeout(10000)
const server3 = new MqttServer(function (client) {
client.on('connect', function (packet) {
client.connack({ returnCode: 0 })
const server3 = new MqttServer(function (serverClient) {
serverClient.on('connect', function (packet) {
serverClient.connack({ returnCode: 0 })
})
}).listen(ports.PORTAND72)

Expand All @@ -168,10 +173,10 @@ describe('MqttClient', function () {
unsubscribeCallbackCalled = true
})
setTimeout(() => {
client.end(() => {
client.end((err) => {
assert.strictEqual(pubCallbackCalled && unsubscribeCallbackCalled, true, 'callbacks not invoked')
server3.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does server3.close expects a callback too?

done()
done(err)
})
}, 5000)
})
Expand Down Expand Up @@ -200,7 +205,7 @@ describe('MqttClient', function () {
innerServer.kill('SIGINT') // mocks server shutdown
client.once('close', function () {
assert.exists(client.reconnectTimer)
client.end(true, done)
client.end(true, (err) => done(err))
})
})
})
Expand Down Expand Up @@ -261,7 +266,7 @@ describe('MqttClient', function () {
client.on('reconnect', function () {
reconnects++
if (reconnects >= expectedReconnects) {
client.end(true, done)
client.end(true, (err) => done(err))
}
})
})
Expand Down Expand Up @@ -294,6 +299,7 @@ describe('MqttClient', function () {
client.end(true, (err) => done(err))
} else {
debug('calling client.end()')
// Do not call done. We want to trigger a reconnect here.
client.end(true)
}
}, 2000)
Expand All @@ -313,26 +319,14 @@ describe('MqttClient', function () {
})

const server2 = new MqttServer(function (serverClient) {
serverClient.on('error', function () { })
debug('setting serverClient connect callback')
serverClient.on('connect', function (packet) {
if (packet.clientId === 'invalid') {
debug('connack with returnCode 2')
serverClient.connack({ returnCode: 2 })
} else {
debug('connack with returnCode 0')
serverClient.connack({ returnCode: 0 })
}
})
}).listen(ports.PORTAND46)

server2.on('client', function (serverClient) {
debug('client received on server2.')
debug('subscribing to topic `topic`')
client.subscribe('topic', function () {
debug('once subscribed to topic, end client, destroy serverClient, and close server.')
serverClient.destroy()
server2.close(() => { client.end(true, done) })
server2.close(() => {
client.end(true, (err) => done(err))
})
})

serverClient.on('subscribe', function (packet) {
Expand Down Expand Up @@ -361,7 +355,7 @@ describe('MqttClient', function () {
})
}
})
})
}).listen(ports.PORTAND46)
})

it('should not fill the queue of subscribes if it cannot connect', function (done) {
Expand All @@ -388,9 +382,7 @@ describe('MqttClient', function () {

setTimeout(function () {
assert.equal(client.queue.length, 1)
client.end(true, () => {
done()
})
client.end(true, (err) => done(err))
}, 1000)
})
})
Expand Down Expand Up @@ -424,9 +416,11 @@ describe('MqttClient', function () {

server2.on('client', function (serverClient) {
client.publish('topic', 'data', { qos: 1 }, function () {
serverClient.destroy()
server2.close()
client.end(true, done)
client.end(true, (err1) => {
server2.close((err2) => {
done(err1 || err2)
})
})
})

serverClient.on('publish', function onPublish (packet) {
Expand Down Expand Up @@ -462,7 +456,7 @@ describe('MqttClient', function () {
it('check emit error on checkDisconnection w/o callback', function (done) {
this.timeout(15000)

const server118 = new MqttServer(function (client) {
const server2 = new MqttServer(function (client) {
client.on('connect', function (packet) {
client.connack({
reasonCode: 0
Expand All @@ -486,14 +480,12 @@ describe('MqttClient', function () {
// wait for the client to receive an error...
client.on('error', function (error) {
assert.equal(error.message, 'client disconnecting')
server118.close()
done()
server2.close((err) => done(err))
})
client.on('connect', function () {
client.end(function () {
client._checkDisconnecting()
})
server118.close()
})
})
})
Loading