Skip to content

Commit ab96ebe

Browse files
committed
feat: expose AmqpConnectionManagerClass
The naming is strange because I don't want to introduce any breaking change. Note that jest complains about leaking resources due to: amqp-node/amqplib#584
1 parent 36f234b commit ab96ebe

6 files changed

+77
-15
lines changed

src/AmqpConnectionManager.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ export interface IAmqpConnectionManager {
8282
addListener(event: 'unblocked', listener: () => void): this;
8383
addListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this;
8484

85+
listeners(eventName: string | symbol): any;
86+
8587
on(event: string, listener: (...args: any[]) => void): this;
8688
on(event: 'connect', listener: ConnectListener): this;
8789
on(event: 'blocked', listener: (arg: { reason: string }) => void): this;
@@ -108,6 +110,8 @@ export interface IAmqpConnectionManager {
108110

109111
removeListener(event: string, listener: (...args: any[]) => void): this;
110112

113+
connect(): Promise<void>;
114+
reconnect(): void;
111115
createChannel(options?: CreateChannelOpts): ChannelWrapper;
112116
close(): Promise<void>;
113117
isConnected(): boolean;
@@ -196,8 +200,22 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
196200
this.setMaxListeners(0);
197201

198202
this._findServers = options.findServers || (() => Promise.resolve(urls));
203+
}
199204

205+
connect(): Promise<void> {
200206
this._connect();
207+
return new Promise((resolve, reject) => {
208+
const onConnect = () => {
209+
this.removeListener('disconnect', onDisconnect)
210+
resolve()
211+
}
212+
const onDisconnect = ({ err }: { err: Error }) => {
213+
this.removeListener('connect', onConnect)
214+
reject(err)
215+
}
216+
this.once('connect', onConnect)
217+
this.once('disconnect', onDisconnect)
218+
})
201219
}
202220

203221
// `options` here are any options that can be passed to ChannelWrapper.

src/index.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,15 @@ export function connect(
1515
urls: ConnectionUrl | ConnectionUrl[] | undefined | null,
1616
options?: AmqpConnectionManagerOptions
1717
): IAmqpConnectionManager {
18-
return new AmqpConnectionManager(urls, options);
18+
const conn = new AmqpConnectionManager(urls, options);
19+
conn.connect().catch(() => {
20+
/* noop */
21+
});
22+
return conn;
1923
}
2024

25+
export { AmqpConnectionManager as AmqpConnectionManagerClass };
26+
2127
const amqp = { connect };
2228

2329
export default amqp;

test/AmqpConnectionManagerTest.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ describe('AmqpConnectionManager', function () {
2727

2828
it('should establish a connection to a broker', async () => {
2929
amqp = new AmqpConnectionManager('amqp://localhost');
30+
amqp.connect();
3031
const [{ connection, url }] = await once(amqp, 'connect');
3132
expect(url, 'url').to.equal('amqp://localhost');
3233
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -37,6 +38,7 @@ describe('AmqpConnectionManager', function () {
3738
protocol: 'amqp',
3839
hostname: 'localhost',
3940
});
41+
amqp.connect();
4042
const [{ connection, url }] = await once(amqp, 'connect');
4143
expect(url, 'url').to.eql({
4244
protocol: 'amqp',
@@ -51,14 +53,15 @@ describe('AmqpConnectionManager', function () {
5153

5254
it('should establish a url object based connection to a broker', async () => {
5355
amqp = new AmqpConnectionManager({ url: 'amqp://localhost' });
54-
56+
amqp.connect();
5557
const [{ connection, url }] = await once(amqp, 'connect');
5658
expect(url, 'url').to.equal('amqp://localhost');
5759
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
5860
});
5961

6062
it('should close connection to a broker', async () => {
6163
amqp = new AmqpConnectionManager('amqp://localhost');
64+
amqp.connect();
6265
const [{ connection, url }] = await once(amqp, 'connect');
6366
expect(url, 'url').to.equal('amqp://localhost');
6467
expect((connection as any).url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -77,6 +80,7 @@ describe('AmqpConnectionManager', function () {
7780
let connected = false;
7881

7982
amqp = new AmqpConnectionManager('amqp://localhost');
83+
amqp.connect();
8084
// Connection should not yet be established
8185
expect(amqp.connection, 'current connection').to.equal(undefined);
8286
// Connection should be pending though
@@ -123,6 +127,7 @@ describe('AmqpConnectionManager', function () {
123127
return Promise.resolve('amqp://localhost');
124128
},
125129
});
130+
amqp.connect();
126131
const [{ connection, url }] = await once(amqp, 'connect');
127132
expect(url, 'url').to.equal('amqp://localhost');
128133
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -134,6 +139,7 @@ describe('AmqpConnectionManager', function () {
134139
return Promise.resolve({ url: 'amqp://localhost' });
135140
},
136141
});
142+
amqp.connect();
137143
const [{ connection, url }] = await once(amqp, 'connect');
138144
expect(url, 'url').to.equal('amqp://localhost');
139145
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
@@ -145,13 +151,15 @@ describe('AmqpConnectionManager', function () {
145151
return Promise.resolve(null);
146152
},
147153
});
154+
amqp.connect();
148155
const [{ err }] = await once(amqp, 'disconnect');
149156
expect(err.message).to.contain('No servers found');
150157
return amqp?.close();
151158
});
152159

153160
it('should work with a URL with a query', async () => {
154161
amqp = new AmqpConnectionManager('amqp://localhost?frameMax=0x1000');
162+
amqp.connect();
155163
const [{ connection }] = await once(amqp, 'connect');
156164
expect(connection.url, 'connection.url').to.equal(
157165
'amqp://localhost?frameMax=0x1000&heartbeat=5'
@@ -171,6 +179,7 @@ describe('AmqpConnectionManager', function () {
171179
amqp = new AmqpConnectionManager(['amqp://rabbit1', 'amqp://rabbit2'], {
172180
heartbeatIntervalInSeconds: 0.01,
173181
});
182+
amqp.connect();
174183

175184
let disconnectEventsSeen = 0;
176185
amqp.on('disconnect', function () {
@@ -196,10 +205,10 @@ describe('AmqpConnectionManager', function () {
196205
let disconnectsSeen = 0;
197206
amqp.on('disconnect', () => disconnectsSeen++);
198207

199-
await once(amqp, 'connect');
208+
await amqp.connect();
200209
amqplib.kill();
201210

202-
await once(amqp, 'connect');
211+
await amqp.connect();
203212
expect(disconnectsSeen).to.equal(1);
204213
});
205214

@@ -211,7 +220,7 @@ describe('AmqpConnectionManager', function () {
211220
let disconnectsSeen = 0;
212221
amqp.on('disconnect', () => disconnectsSeen++);
213222

214-
await once(amqp, 'connect');
223+
await amqp.connect();
215224

216225
// Close the connection nicely
217226
amqplib.simulateRemoteClose();
@@ -222,6 +231,7 @@ describe('AmqpConnectionManager', function () {
222231

223232
it('should know if it is connected or not', async () => {
224233
amqp = new AmqpConnectionManager('amqp://localhost');
234+
amqp.connect();
225235

226236
expect(amqp.isConnected()).to.be.false;
227237

@@ -231,7 +241,7 @@ describe('AmqpConnectionManager', function () {
231241

232242
it('should be able to manually reconnect', async () => {
233243
amqp = new AmqpConnectionManager('amqp://localhost');
234-
await once(amqp, 'connect');
244+
await amqp.connect();
235245

236246
amqp.reconnect();
237247
await once(amqp, 'disconnect');
@@ -240,13 +250,14 @@ describe('AmqpConnectionManager', function () {
240250

241251
it('should throw on manual reconnect after close', async () => {
242252
amqp = new AmqpConnectionManager('amqp://localhost');
243-
await once(amqp, 'connect');
244-
await amqp.close()
245-
expect(amqp.reconnect).to.throw()
246-
})
253+
await amqp.connect();
254+
await amqp.close();
255+
expect(amqp.reconnect).to.throw();
256+
});
247257

248258
it('should create and clean up channel wrappers', async function () {
249259
amqp = new AmqpConnectionManager('amqp://localhost');
260+
await amqp.connect();
250261
const channel = amqp.createChannel({ name: 'test-chan' });
251262

252263
// Channel should register with connection manager
@@ -264,6 +275,7 @@ describe('AmqpConnectionManager', function () {
264275

265276
it('should clean up channels on close', async function () {
266277
amqp = new AmqpConnectionManager('amqp://localhost');
278+
await amqp.connect();
267279
amqp.createChannel({ name: 'test-chan' });
268280

269281
// Channel should register with connection manager
@@ -286,7 +298,7 @@ describe('AmqpConnectionManager', function () {
286298

287299
let connectsSeen = 0;
288300
amqp.on('connect', () => connectsSeen++);
289-
await once(amqp, 'connect');
301+
await amqp.connect();
290302

291303
// Close the manager
292304
await amqp?.close();
@@ -308,7 +320,7 @@ describe('AmqpConnectionManager', function () {
308320

309321
amqp.on('unblocked', () => unblockSeen++);
310322

311-
await once(amqp, 'connect');
323+
await amqp.connect();
312324
// Close the connection nicely
313325
amqplib.simulateRemoteBlock();
314326
amqplib.simulateRemoteUnblock();

test/fixtures.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
33

44
import { Connection, Message, Options, Replies } from 'amqplib';
5-
import { EventEmitter } from 'events';
5+
import { EventEmitter, once } from 'events';
66
import { IAmqpConnectionManager } from '../src/AmqpConnectionManager';
77
import ChannelWrapper, { CreateChannelOpts } from '../src/ChannelWrapper';
88

@@ -194,6 +194,15 @@ export class FakeAmqpConnectionManager extends EventEmitter implements IAmqpConn
194194
return 0;
195195
}
196196

197+
async connect(): Promise<void> {
198+
await Promise.all([once(this, 'connect'), this.simulateConnect()]);
199+
}
200+
201+
reconnect(): void {
202+
this.simulateDisconnect();
203+
this.simulateConnect();
204+
}
205+
197206
isConnected() {
198207
return this.connected;
199208
}

test/importTest.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
import { expect } from 'chai';
2-
import amqp from '../src';
2+
import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src';
33

44
describe('import test', function () {
55
it('should let you import as default (#51)', function () {
66
expect(amqp).to.exist;
77
expect(amqp.connect).to.exist;
88
});
9+
10+
it('should let you import class', function () {
11+
new AmqpConnectionManager('url');
12+
});
913
});

test/integrationTest.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import chai from 'chai';
33
import chaiJest from 'chai-jest';
44
import pEvent from 'p-event';
55
import { defer, timeout } from 'promise-tools';
6-
import amqp from '../src';
6+
import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src';
77
import { IAmqpConnectionManager } from '../src/AmqpConnectionManager';
88

99
chai.use(chaiJest);
@@ -69,6 +69,19 @@ describe('Integration tests', () => {
6969
await timeout(pEvent(connection, 'connect'), 3000);
7070
});
7171

72+
// This test causes jest to complain about leaked resources due to the bug described and fixed by:
73+
// https://github.com/squaremo/amqp.node/pull/584
74+
it('should throw on awaited connect with wrong password', async () => {
75+
connection = new AmqpConnectionManager('amqp://guest:wrong@localhost');
76+
let err
77+
try {
78+
await connection.connect();
79+
} catch (error) {
80+
err = error
81+
}
82+
expect(err.message).to.contain('ACCESS-REFUSED')
83+
});
84+
7285
it('send and receive messages', async () => {
7386
const queueName = 'testQueue1';
7487
const content = `hello world - ${Date.now()}`;

0 commit comments

Comments
 (0)