diff --git a/.travis.yml b/.travis.yml index 9f1051b2..e9e039d1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,5 @@ language: node_js node_js: -- "0.8" -- "0.10" - "0.12" - "iojs-v1" - "iojs-v2" diff --git a/channel_api.js b/channel_api.js index 6c8c4dd7..7f3fe8e4 100644 --- a/channel_api.js +++ b/channel_api.js @@ -1,15 +1,10 @@ -var raw_connect = require('./lib/connect').connect; +const { promisify } = require('util'); +const raw_connect = promisify(require('./lib/connect').connect); var ChannelModel = require('./lib/channel_model').ChannelModel; -var Promise = require('bluebird'); function connect(url, connOptions) { - return Promise.fromCallback(function(cb) { - return raw_connect(url, connOptions, cb); - }) - .then(function(conn) { - return new ChannelModel(conn); - }); -}; + return raw_connect(url, connOptions).then(conn => new ChannelModel(conn)); +} module.exports.connect = connect; module.exports.credentials = require('./lib/credentials'); diff --git a/examples/tutorials/receive_logs_direct.js b/examples/tutorials/receive_logs_direct.js index 17fc22de..f1e77f95 100755 --- a/examples/tutorials/receive_logs_direct.js +++ b/examples/tutorials/receive_logs_direct.js @@ -1,7 +1,6 @@ #!/usr/bin/env node var amqp = require('amqplib'); -var all = require('bluebird').all; var basename = require('path').basename; var severities = process.argv.slice(2); @@ -24,7 +23,7 @@ amqp.connect('amqp://localhost').then(function(conn) { ok = ok.then(function(qok) { var queue = qok.queue; - return all(severities.map(function(sev) { + return Promise.all(severities.map(function(sev) { ch.bindQueue(queue, ex, sev); })).then(function() { return queue; }); }); diff --git a/examples/tutorials/receive_logs_topic.js b/examples/tutorials/receive_logs_topic.js index 3e8eb6f0..cc8e47f1 100755 --- a/examples/tutorials/receive_logs_topic.js +++ b/examples/tutorials/receive_logs_topic.js @@ -2,7 +2,6 @@ var amqp = require('amqplib'); var basename = require('path').basename; -var all = require('bluebird').all; var keys = process.argv.slice(2); if (keys.length < 1) { @@ -23,7 +22,7 @@ amqp.connect('amqp://localhost').then(function(conn) { ok = ok.then(function(qok) { var queue = qok.queue; - return all(keys.map(function(rk) { + return Promise.all(keys.map(function(rk) { ch.bindQueue(queue, ex, rk); })).then(function() { return queue; }); }); diff --git a/examples/tutorials/rpc_client.js b/examples/tutorials/rpc_client.js index 47566b34..88cc72a4 100755 --- a/examples/tutorials/rpc_client.js +++ b/examples/tutorials/rpc_client.js @@ -2,7 +2,6 @@ var amqp = require('amqplib'); var basename = require('path').basename; -var Promise = require('bluebird'); var uuid = require('node-uuid'); // I've departed from the form of the original RPC tutorial, which diff --git a/lib/callback_model.js b/lib/callback_model.js index dca383f8..3a0b3b3e 100644 --- a/lib/callback_model.js +++ b/lib/callback_model.js @@ -5,7 +5,6 @@ 'use strict'; var defs = require('./defs'); -var Promise = require('bluebird'); var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var BaseChannel = require('./channel').BaseChannel; diff --git a/lib/channel_model.js b/lib/channel_model.js index 2097fd14..d7020262 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -5,7 +5,8 @@ 'use strict'; var defs = require('./defs'); -var Promise = require('bluebird'); +var Bluebird = require('bluebird'); +const { promisify } = require('util'); var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var BaseChannel = require('./channel').BaseChannel; @@ -29,7 +30,7 @@ module.exports.ChannelModel = ChannelModel; var CM = ChannelModel.prototype; CM.close = function() { - return Promise.fromCallback(this.connection.close.bind(this.connection)); + return promisify(this.connection.close.bind(this.connection)); }; // Channels @@ -54,30 +55,28 @@ var C = Channel.prototype; // response's fields; this is intended to be suitable for implementing // API procedures. C.rpc = function(method, fields, expect) { - var self = this; - return Promise.fromCallback(function(cb) { - return self._rpc(method, fields, expect, cb); - }) - .then(function(f) { - return f.fields; - }); + const rpc = promisify(this._rpc.bind(this)); + + return rpc(method, fields, expect).then(f => f.fields); }; // Do the remarkably simple channel open handshake -C.open = function() { - return Promise.try(this.allocate.bind(this)).then( - function(ch) { - return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, - defs.ChannelOpenOk); - }); +C.open = async function () { + const ch = this.allocate.call(this); + + return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, defs.ChannelOpenOk); }; C.close = function() { var self = this; - return Promise.fromCallback(function(cb) { + + return Bluebird.fromCallback(function(cb) { return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, cb); }); + // const closeBecause = promisify(this.closeBecause.bind(this)); + // + // return closeBecause("Goodbye", defs.constants.REPLY_SUCCESS); }; // === Public API, declaring queues and stuff === @@ -167,9 +166,18 @@ C.consume = function(queue, callback, options) { // NB we want the callback to be run synchronously, so that we've // registered the consumerTag before any messages can arrive. var fields = Args.consume(queue, options); - return Promise.fromCallback(function(cb) { + return Bluebird.fromCallback(function(cb) { self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); }) + // return new Promise(function(resolve, reject) { + // self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, function (err, result) { + // if (err) { + // reject(err); + // } else { + // resolve(result); + // } + // }); + // }) .then(function(ok) { self.registerConsumer(ok.fields.consumerTag, callback); return ok.fields; @@ -177,13 +185,9 @@ C.consume = function(queue, callback, options) { }; C.cancel = function(consumerTag) { - var self = this; - return Promise.fromCallback(function(cb) { - self._rpc(defs.BasicCancel, Args.cancel(consumerTag), - defs.BasicCancelOk, - cb); - }) - .then(function(ok) { + const rpc = promisify(this._rpc.bind(this)); + + return rpc(defs.BasicCancel, Args.cancel(consumerTag), defs.BasicCancelOk).then(ok => { self.unregisterConsumer(consumerTag); return ok.fields; }); @@ -192,9 +196,18 @@ C.cancel = function(consumerTag) { C.get = function(queue, options) { var self = this; var fields = Args.get(queue, options); - return Promise.fromCallback(function(cb) { + return Bluebird.fromCallback(function(cb) { return self.sendOrEnqueue(defs.BasicGet, fields, cb); }) + // return new Promise(function(resolve, reject) { + // return self.sendOrEnqueue(defs.BasicGet, fields, function (err, result) { + // if (err) { + // reject(err); + // } else { + // resolve(result); + // } + // }); + // }) .then(function(f) { if (f.id === defs.BasicGetEmpty) { return false; diff --git a/test/channel.js b/test/channel.js index 072c3354..c1cf2b30 100644 --- a/test/channel.js +++ b/test/channel.js @@ -3,7 +3,6 @@ 'use strict'; var assert = require('assert'); -var Promise = require('bluebird'); var Channel = require('../lib/channel').Channel; var Connection = require('../lib/connection').Connection; var util = require('./util'); @@ -77,11 +76,19 @@ var DELIVER_FIELDS = { }; function open(ch) { - return Promise.try(function() { - ch.allocate(); - return Promise.fromCallback(function(cb) { - ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk, cb); - }); + return new Promise(function (resolve, reject) { + try { + ch.allocate(); + ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk, function (err, done) { + if (err) { + reject(err); + } else { + resolve(done); + } + }); + } catch (e) { + reject(e); + } }); } @@ -286,7 +293,7 @@ test("RPC on closed channel", channelTest( failureCb(resolve, reject)); }); - Promise.join(close, fail1, fail2) + Promise.all([close, fail1, fail2]) .then(succeed(done)) .catch(fail(done)); }, diff --git a/test/channel_api.js b/test/channel_api.js index bd5473f0..75b42734 100644 --- a/test/channel_api.js +++ b/test/channel_api.js @@ -6,7 +6,6 @@ var util = require('./util'); var succeed = util.succeed, fail = util.fail; var schedule = util.schedule; var randomString = util.randomString; -var Promise = require('bluebird'); var Buffer = require('safe-buffer').Buffer; var URL = process.env.URL || 'amqp://localhost'; @@ -41,7 +40,7 @@ function channel_test(chmethod, name, chfun) { c[chmethod]().then(ignoreErrors).then(chfun) .then(succeed(done), fail(done)) // close the connection regardless of what happens with the test - .finally(function() {c.close();}); + .finally(() => c.close()); }); }); } @@ -119,9 +118,9 @@ chtest("channel break on publishing to non-exchange", function(ch) { chtest("delete queue", function(ch) { var q = 'test.delete-queue'; - return Promise.join( + return Promise.all([ ch.assertQueue(q, QUEUE_OPTS), - ch.checkQueue(q)) + ch.checkQueue(q)]) .then(function() { return ch.deleteQueue(q);}) .then(function() { @@ -130,9 +129,9 @@ chtest("delete queue", function(ch) { chtest("delete exchange", function(ch) { var ex = 'test.delete-exchange'; - return Promise.join( + return Promise.all([ ch.assertExchange(ex, 'fanout', EX_OPTS), - ch.checkExchange(ex)) + ch.checkExchange(ex)]) .then(function() { return ch.deleteExchange(ex);}) .then(function() { @@ -178,7 +177,7 @@ suite("sendMessage", function() { chtest("send to queue and get from queue", function(ch) { var q = 'test.send-to-q'; var msg = randomString(); - return Promise.join(ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) + return Promise.all([ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) .then(function() { ch.sendToQueue(q, Buffer.from(msg)); return waitForMessages(q); @@ -195,9 +194,9 @@ chtest("send to queue and get from queue", function(ch) { chtest("send (and get) zero content to queue", function(ch) { var q = 'test.send-to-q'; var msg = Buffer.alloc(0); - return Promise.join( + return Promise.all([ ch.assertQueue(q, QUEUE_OPTS), - ch.purgeQueue(q)) + ch.purgeQueue(q)]) .then(function() { ch.sendToQueue(q, msg); return waitForMessages(q);}) @@ -219,11 +218,11 @@ chtest("route message", function(ch) { var q = 'test.route-message-q'; var msg = randomString(); - return Promise.join( + return Promise.all([ ch.assertExchange(ex, 'fanout', EX_OPTS), ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), - ch.bindQueue(q, ex, '', {})) + ch.bindQueue(q, ex, '', {})]) .then(function() { ch.publish(ex, '', Buffer.from(msg)); return waitForMessages(q);}) @@ -257,11 +256,11 @@ chtest("unbind queue", function(ch) { var viabinding = randomString(); var direct = randomString(); - return Promise.join( + return Promise.all([ ch.assertExchange(ex, 'fanout', EX_OPTS), ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), - ch.bindQueue(q, ex, '', {})) + ch.bindQueue(q, ex, '', {})]) .then(function() { ch.publish(ex, '', Buffer.from('foobar')); return waitForMessages(q);}) @@ -290,14 +289,14 @@ chtest("consume via exchange-exchange binding", function(ch) { var ex1 = 'test.ex-ex-binding1', ex2 = 'test.ex-ex-binding2'; var q = 'test.ex-ex-binding-q'; var rk = 'test.routing.key', msg = randomString(); - return Promise.join( + return Promise.all([ ch.assertExchange(ex1, 'direct', EX_OPTS), ch.assertExchange(ex2, 'fanout', {durable: false, internal: true}), ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), ch.bindExchange(ex2, ex1, rk, {}), - ch.bindQueue(q, ex2, '', {})) + ch.bindQueue(q, ex2, '', {})]) .then(function() { return new Promise(function(resolve, reject) { function delivery(m) { @@ -320,13 +319,13 @@ chtest("unbind exchange", function(ch) { var viabinding = randomString(); var direct = randomString(); - return Promise.join( + return Promise.all([ ch.assertExchange(source, 'fanout', EX_OPTS), ch.assertExchange(dest, 'fanout', EX_OPTS), ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), ch.bindExchange(dest, source, '', {}), - ch.bindQueue(q, dest, '', {})) + ch.bindQueue(q, dest, '', {})]) .then(function() { ch.publish(source, '', Buffer.from('foobar')); return waitForMessages(q);}) @@ -354,7 +353,7 @@ chtest("cancel consumer", function(ch) { var q = 'test.consumer-cancel'; var ctag; var recv1 = new Promise(function (resolve, reject) { - Promise.join( + Promise.all([ ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), // My callback is 'resolve the promise in `arrived`' @@ -362,17 +361,17 @@ chtest("cancel consumer", function(ch) { .then(function(ok) { ctag = ok.consumerTag; ch.sendToQueue(q, Buffer.from('foo')); - })); + })]); }); // A message should arrive because of the consume return recv1.then(function() { - var recv2 = Promise.join( + var recv2 = Promise.all([ ch.cancel(ctag).then(function() { return ch.sendToQueue(q, Buffer.from('bar')); }), // but check a message did arrive in the queue - waitForMessages(q)) + waitForMessages(q)]) .then(function() { return ch.get(q, {noAck:true}); }) @@ -391,13 +390,15 @@ chtest("cancel consumer", function(ch) { chtest("cancelled consumer", function(ch) { var q = 'test.cancelled-consumer'; return new Promise(function(resolve, reject) { - return Promise.join( - ch.assertQueue(q), - ch.purgeQueue(q), - ch.consume(q, function(msg) { - if (msg === null) resolve(); - else reject(new Error('Message not expected')); - })) + return Promise.all([ + ch.assertQueue(q), + ch.purgeQueue(q)]) + .then(function () { + ch.consume(q, function(msg) { + if (msg === null) resolve(); + else reject(new Error('Message not expected')); + }) + }) .then(function() { return ch.deleteQueue(q); }); @@ -409,9 +410,9 @@ chtest("ack", function(ch) { var q = 'test.ack'; var msg1 = randomString(), msg2 = randomString(); - return Promise.join( + return Promise.all([ ch.assertQueue(q, QUEUE_OPTS), - ch.purgeQueue(q)) + ch.purgeQueue(q)]) .then(function() { ch.sendToQueue(q, Buffer.from(msg1)); ch.sendToQueue(q, Buffer.from(msg2)); @@ -439,8 +440,8 @@ chtest("nack", function(ch) { var q = 'test.nack'; var msg1 = randomString(); - return Promise.join( - ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) .then(function() { ch.sendToQueue(q, Buffer.from(msg1)); return waitForMessages(q);}) @@ -464,8 +465,8 @@ chtest("reject", function(ch) { var q = 'test.reject'; var msg1 = randomString(); - return Promise.join( - ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) .then(function() { ch.sendToQueue(q, Buffer.from(msg1)); return waitForMessages(q);}) @@ -485,9 +486,9 @@ chtest("reject", function(ch) { chtest("prefetch", function(ch) { var q = 'test.prefetch'; - return Promise.join( + return Promise.all([ ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), - ch.prefetch(1)) + ch.prefetch(1)]) .then(function() { ch.sendToQueue(q, Buffer.from('foobar')); ch.sendToQueue(q, Buffer.from('foobar')); @@ -524,8 +525,8 @@ suite("confirms", function() { confirmtest('message is confirmed', function(ch) { var q = 'test.confirm-message'; - return Promise.join( - ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) .then(function() { return ch.sendToQueue(q, Buffer.from('bleep')); }); @@ -538,8 +539,8 @@ confirmtest('message is confirmed', function(ch) { // multi-ack. confirmtest('multiple confirms', function(ch) { var q = 'test.multiple-confirms'; - return Promise.join( - ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)) + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) .then(function() { var multipleRainbows = false; ch.on('ack', function(a) { @@ -550,8 +551,14 @@ confirmtest('multiple confirms', function(ch) { var cs = []; function sendAndPushPromise() { - var conf = Promise.fromCallback(function(cb) { - return ch.sendToQueue(q, Buffer.from('bleep'), {}, cb); + var conf = new Promise(function(resolve, reject) { + ch.sendToQueue(q, Buffer.from('bleep'), {}, function (err, result) { + if (err) { + reject(err); + } else { + resolve(result); + } + }); }); cs.push(conf); } diff --git a/test/util.js b/test/util.js index a41a8eb3..0cc8122d 100644 --- a/test/util.js +++ b/test/util.js @@ -1,6 +1,5 @@ 'use strict'; -var Promise = require('bluebird'); var crypto = require('crypto'); var Connection = require('../lib/connection').Connection; var PassThrough =