From 6dc4a9fdb3bbe20ce2fcbb01c3a16e500f966b27 Mon Sep 17 00:00:00 2001 From: Christian Holm Date: Fri, 3 Feb 2017 19:24:36 +0100 Subject: [PATCH 1/4] Use same BROKER_OPTIONS for backend connection --- celery.js | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/celery.js b/celery.js index fe215c9..2daa6fd 100644 --- a/celery.js +++ b/celery.js @@ -211,10 +211,7 @@ function Client(conf) { self.emit('message', msg); }); } else if (self.conf.backend_type === 'amqp') { - self.backend = amqp.createConnection({ - url: self.conf.BROKER_URL, - heartbeat: 580 - }, { + self.backend = amqp.createConnection(self.conf.BROKER_OPTIONS, { defaultExchangeName: self.conf.DEFAULT_EXCHANGE }); } else if (self.conf.backend_type === self.conf.broker_type) { From b93f0751233a70afe76c409e88aca3ae1b115470 Mon Sep 17 00:00:00 2001 From: Christian Holm Date: Fri, 3 Feb 2017 19:25:00 +0100 Subject: [PATCH 2/4] Remove block that should never happen MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit And wouldn’t work if it did --- celery.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/celery.js b/celery.js index 2daa6fd..2cd8a98 100644 --- a/celery.js +++ b/celery.js @@ -214,10 +214,6 @@ function Client(conf) { self.backend = amqp.createConnection(self.conf.BROKER_OPTIONS, { defaultExchangeName: self.conf.DEFAULT_EXCHANGE }); - } else if (self.conf.backend_type === self.conf.broker_type) { - if (self.conf.backend_type === 'amqp') { - self.backend = self.broker; - } } // backend ready... From 5d1d2e9f5612f8c9c5c9f03172655f3c48f3387f Mon Sep 17 00:00:00 2001 From: Christian Holm Date: Fri, 3 Feb 2017 19:25:08 +0100 Subject: [PATCH 3/4] Add error handling to backend connection --- celery.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/celery.js b/celery.js index 2cd8a98..1632e28 100644 --- a/celery.js +++ b/celery.js @@ -216,6 +216,10 @@ function Client(conf) { }); } + self.backend.on('error', function(err) { + self.emit('error', err); + }); + // backend ready... self.backend.on('ready', function() { debug('Connecting to broker...'); From dc220595daeb2edff789fdc8dbac94b9196da5d7 Mon Sep 17 00:00:00 2001 From: Christian Holm Date: Mon, 13 Mar 2017 17:43:39 +0100 Subject: [PATCH 4/4] Make broker and backend options more flexible --- celery.js | 100 +++++++++++++++---------------------------- tests/test_celery.js | 53 +++++++++++++++++------ 2 files changed, 76 insertions(+), 77 deletions(-) diff --git a/celery.js b/celery.js index 1632e28..95fd635 100644 --- a/celery.js +++ b/celery.js @@ -10,10 +10,23 @@ var createMessage = require('./protocol').createMessage; var debug = process.env.NODE_CELERY_DEBUG === '1' ? console.info : function() {}; var supportedProtocols = ['amqp', 'amqps', 'redis']; -function checkProtocol(kind, protocol) { +function getProtocol(kind, options) { + const protocol = url.parse(options.url).protocol.slice(0, -1); + if (protocol === 'amqps') { + protocol = 'amqp'; + } if (supportedProtocols.indexOf(protocol) === -1) { throw new Error(util.format('Unsupported %s type: %s', kind, protocol)); } + debug(kind + ' type: ' + protocol); + + return protocol; +} + +function addProtocolDefaults(protocol, options) { + if (protocol === 'amqp') { + options.heartbeat = options.heartbeat || 580; + } } function Configuration(options) { @@ -29,8 +42,20 @@ function Configuration(options) { self.TASK_RESULT_EXPIRES = self.TASK_RESULT_EXPIRES * 1000 || 86400000; // Default 1 day // broker - self.BROKER_URL = self.BROKER_URL || 'amqp://'; - self.BROKER_OPTIONS = self.BROKER_OPTIONS || { url: self.BROKER_URL, heartbeat: 580 }; + self.BROKER_OPTIONS = self.BROKER_OPTIONS || {}; + self.BROKER_OPTIONS.url = self.BROKER_URL || 'amqp://'; + self.broker_type = getProtocol('broker', self.BROKER_OPTIONS); + addProtocolDefaults(self.broker_type, self.BROKER_OPTIONS); + + // backend + self.RESULT_BACKEND_OPTIONS = self.RESULT_BACKEND_OPTIONS || {}; + if (self.RESULT_BACKEND === self.broker_type) { + self.RESULT_BACKEND = self.BROKER_URL; + } + self.RESULT_BACKEND_OPTIONS.url = self.RESULT_BACKEND || self.BROKER_URL; + self.backend_type = getProtocol('backend', self.RESULT_BACKEND_OPTIONS); + addProtocolDefaults(self.backend_type, self.RESULT_BACKEND_OPTIONS); + self.DEFAULT_QUEUE = self.DEFAULT_QUEUE || 'celery'; self.DEFAULT_EXCHANGE = self.DEFAULT_EXCHANGE || ''; self.DEFAULT_EXCHANGE_TYPE = self.DEFAULT_EXCHANGE_TYPE || 'direct'; @@ -39,46 +64,11 @@ function Configuration(options) { self.IGNORE_RESULT = self.IGNORE_RESULT || false; self.TASK_RESULT_DURABLE = undefined !== self.TASK_RESULT_DURABLE ? self.TASK_RESULT_DURABLE : true; // Set Durable true by default (Celery 3.1.7) self.ROUTES = self.ROUTES || {}; - - self.broker_type = url.parse(self.BROKER_URL).protocol.slice(0, -1); - if (self.broker_type === 'amqps') - self.broker_type = 'amqp'; - debug('Broker type: ' + self.broker_type); - checkProtocol('broker', self.broker_type); - - // backend - if (!self.RESULT_BACKEND || (self.RESULT_BACKEND === self.broker_type)) { - self.RESULT_BACKEND = self.BROKER_URL; - } - - self.backend_type = url.parse(self.RESULT_BACKEND).protocol.slice(0, -1); - if (self.backend_type === 'amqps') - self.backend_type = 'amqp'; - debug('Backend type: ' + self.backend_type); - checkProtocol('backend', self.backend_type); } -function RedisBroker(broker_url) { +function RedisBroker(conf) { var self = this; - var purl = url.parse(broker_url); - var database; - - if (purl.pathname) { - database = purl.pathname.slice(1); - } - - self.redis = redis.createClient(purl.port || 6379, - purl.hostname || 'localhost'); - - if (purl.auth) { - debug('Authenticating broker...'); - self.redis.auth(purl.auth.split(':')[1]); - debug('Broker authenticated...'); - } - - if (database) { - self.redis.select(database); - } + self.redis = redis.createClient(conf.BROKER_OPTIONS); self.end = function() { self.redis.end(true); @@ -128,26 +118,10 @@ util.inherits(RedisBroker, events.EventEmitter); function RedisBackend(conf) { var self = this; - var purl = url.parse(conf.RESULT_BACKEND); - var database; - - if (purl.pathname) { - database = purl.pathname.slice(1); - } + self.redis = redis.createClient(conf.RESULT_BACKEND_OPTIONS); - debug('Connecting to backend...'); - if (purl.auth) { - self.redis = redis.createClient(purl.port, purl.hostname, {'auth_pass': purl.auth.split(':')[1]}); - } else { - self.redis = redis.createClient(purl.port, purl.hostname); - } - // needed because we'll use `psubscribe` var backend_ex = self.redis.duplicate(); - if (database) { - self.redis.select(database); - } - self.redis.on('error', function(err) { self.emit('error', err); }); @@ -156,7 +130,7 @@ function RedisBackend(conf) { self.emit('end'); }); - self.quit = function() { + self.disconnect = function() { backend_ex.quit(); self.redis.quit(); }; @@ -211,7 +185,7 @@ function Client(conf) { self.emit('message', msg); }); } else if (self.conf.backend_type === 'amqp') { - self.backend = amqp.createConnection(self.conf.BROKER_OPTIONS, { + self.backend = amqp.createConnection(self.conf.RESULT_BACKEND_OPTIONS, { defaultExchangeName: self.conf.DEFAULT_EXCHANGE }); } @@ -225,7 +199,7 @@ function Client(conf) { debug('Connecting to broker...'); if (self.conf.broker_type === 'redis') { - self.broker = new RedisBroker(self.conf.BROKER_URL); + self.broker = new RedisBroker(self.conf); } else if (self.conf.broker_type === 'amqp') { self.broker = amqp.createConnection(self.conf.BROKER_OPTIONS, { defaultExchangeName: self.conf.DEFAULT_EXCHANGE @@ -257,11 +231,7 @@ Client.prototype.createTask = function(name, options, exchange) { Client.prototype.end = function() { this.broker.disconnect(); - if (this.conf.backend_type === 'redis') { - this.backend.quit(); - } else if (this.conf.broker_type !== this.conf.backend_type) { - this.backend.quit(); - } + this.backend.disconnect(); }; Client.prototype.call = function(name /*[args], [kwargs], [options], [callback]*/ ) { diff --git a/tests/test_celery.js b/tests/test_celery.js index 78b6a63..9ab908e 100644 --- a/tests/test_celery.js +++ b/tests/test_celery.js @@ -20,34 +20,63 @@ var conf_redis = { describe('celery functional tests', function() { describe('initialization', function() { - it('should create a client without error', function(done) { - var client1 = celery.createClient(conf_amqp), - client2 = celery.createClient(conf_invalid); + it('should create a valid amqp client without error', function(done) { + var client = celery.createClient(conf_amqp); - client1.on('connect', function() { - client1.end(); + assert.equal(client.conf.BROKER_OPTIONS.url, 'amqp://'); + assert.equal(client.conf.BROKER_OPTIONS.heartbeat, 580); + assert.equal(client.conf.broker_type, 'amqp'); + + assert.equal(client.conf.RESULT_BACKEND_OPTIONS.url, 'amqp://'); + assert.equal(client.conf.RESULT_BACKEND_OPTIONS.heartbeat, 580); + assert.equal(client.conf.backend_type, 'amqp'); + + client.on('connect', function() { + client.end(); }); - client1.on('error', function(exception) { - console.log(exception); + client.on('error', function(exception) { assert.ok(false); }); - client1.once('end', function() { + client.once('end', function() { done(); }); + }); + + it('should create a valid redis client without error', function(done) { + var client = celery.createClient(conf_redis); + + assert.equal(client.conf.BROKER_OPTIONS.url, 'redis://'); + assert.equal(client.conf.broker_type, 'redis'); + + assert.equal(client.conf.RESULT_BACKEND_OPTIONS.url, 'redis://'); + assert.equal(client.conf.backend_type, 'redis'); + + client.on('connect', function() { + client.end(); + }); - client2.on('ready', function() { + client.on('error', function(exception) { assert.ok(false); }); - client2.on('error', function(exception) { - assert.ok(exception); + client.once('end', function() { + done(); }); + }); - client2.once('end', function() { + it('should throw error on invalid amqp client', function(done) { + var client = celery.createClient(conf_invalid); + + client.on('ready', function() { assert.ok(false); }); + + client.on('error', function(exception) { + assert.ok(exception); + done(); + }); }); });