diff --git a/celery.js b/celery.js index fe215c9..95fd635 100644 --- a/celery.js +++ b/celery.js @@ -10,10 +10,23 @@ var createMessage = require('./protocol').createMessage; var debug = process.env.NODE_CELERY_DEBUG === '1' ? console.info : function() {}; var supportedProtocols = ['amqp', 'amqps', 'redis']; -function checkProtocol(kind, protocol) { +function getProtocol(kind, options) { + const protocol = url.parse(options.url).protocol.slice(0, -1); + if (protocol === 'amqps') { + protocol = 'amqp'; + } if (supportedProtocols.indexOf(protocol) === -1) { throw new Error(util.format('Unsupported %s type: %s', kind, protocol)); } + debug(kind + ' type: ' + protocol); + + return protocol; +} + +function addProtocolDefaults(protocol, options) { + if (protocol === 'amqp') { + options.heartbeat = options.heartbeat || 580; + } } function Configuration(options) { @@ -29,8 +42,20 @@ function Configuration(options) { self.TASK_RESULT_EXPIRES = self.TASK_RESULT_EXPIRES * 1000 || 86400000; // Default 1 day // broker - self.BROKER_URL = self.BROKER_URL || 'amqp://'; - self.BROKER_OPTIONS = self.BROKER_OPTIONS || { url: self.BROKER_URL, heartbeat: 580 }; + self.BROKER_OPTIONS = self.BROKER_OPTIONS || {}; + self.BROKER_OPTIONS.url = self.BROKER_URL || 'amqp://'; + self.broker_type = getProtocol('broker', self.BROKER_OPTIONS); + addProtocolDefaults(self.broker_type, self.BROKER_OPTIONS); + + // backend + self.RESULT_BACKEND_OPTIONS = self.RESULT_BACKEND_OPTIONS || {}; + if (self.RESULT_BACKEND === self.broker_type) { + self.RESULT_BACKEND = self.BROKER_URL; + } + self.RESULT_BACKEND_OPTIONS.url = self.RESULT_BACKEND || self.BROKER_URL; + self.backend_type = getProtocol('backend', self.RESULT_BACKEND_OPTIONS); + addProtocolDefaults(self.backend_type, self.RESULT_BACKEND_OPTIONS); + self.DEFAULT_QUEUE = self.DEFAULT_QUEUE || 'celery'; self.DEFAULT_EXCHANGE = self.DEFAULT_EXCHANGE || ''; self.DEFAULT_EXCHANGE_TYPE = self.DEFAULT_EXCHANGE_TYPE || 'direct'; @@ -39,46 +64,11 @@ function Configuration(options) { self.IGNORE_RESULT = self.IGNORE_RESULT || false; self.TASK_RESULT_DURABLE = undefined !== self.TASK_RESULT_DURABLE ? self.TASK_RESULT_DURABLE : true; // Set Durable true by default (Celery 3.1.7) self.ROUTES = self.ROUTES || {}; - - self.broker_type = url.parse(self.BROKER_URL).protocol.slice(0, -1); - if (self.broker_type === 'amqps') - self.broker_type = 'amqp'; - debug('Broker type: ' + self.broker_type); - checkProtocol('broker', self.broker_type); - - // backend - if (!self.RESULT_BACKEND || (self.RESULT_BACKEND === self.broker_type)) { - self.RESULT_BACKEND = self.BROKER_URL; - } - - self.backend_type = url.parse(self.RESULT_BACKEND).protocol.slice(0, -1); - if (self.backend_type === 'amqps') - self.backend_type = 'amqp'; - debug('Backend type: ' + self.backend_type); - checkProtocol('backend', self.backend_type); } -function RedisBroker(broker_url) { +function RedisBroker(conf) { var self = this; - var purl = url.parse(broker_url); - var database; - - if (purl.pathname) { - database = purl.pathname.slice(1); - } - - self.redis = redis.createClient(purl.port || 6379, - purl.hostname || 'localhost'); - - if (purl.auth) { - debug('Authenticating broker...'); - self.redis.auth(purl.auth.split(':')[1]); - debug('Broker authenticated...'); - } - - if (database) { - self.redis.select(database); - } + self.redis = redis.createClient(conf.BROKER_OPTIONS); self.end = function() { self.redis.end(true); @@ -128,26 +118,10 @@ util.inherits(RedisBroker, events.EventEmitter); function RedisBackend(conf) { var self = this; - var purl = url.parse(conf.RESULT_BACKEND); - var database; + self.redis = redis.createClient(conf.RESULT_BACKEND_OPTIONS); - if (purl.pathname) { - database = purl.pathname.slice(1); - } - - debug('Connecting to backend...'); - if (purl.auth) { - self.redis = redis.createClient(purl.port, purl.hostname, {'auth_pass': purl.auth.split(':')[1]}); - } else { - self.redis = redis.createClient(purl.port, purl.hostname); - } - // needed because we'll use `psubscribe` var backend_ex = self.redis.duplicate(); - if (database) { - self.redis.select(database); - } - self.redis.on('error', function(err) { self.emit('error', err); }); @@ -156,7 +130,7 @@ function RedisBackend(conf) { self.emit('end'); }); - self.quit = function() { + self.disconnect = function() { backend_ex.quit(); self.redis.quit(); }; @@ -211,24 +185,21 @@ function Client(conf) { self.emit('message', msg); }); } else if (self.conf.backend_type === 'amqp') { - self.backend = amqp.createConnection({ - url: self.conf.BROKER_URL, - heartbeat: 580 - }, { + self.backend = amqp.createConnection(self.conf.RESULT_BACKEND_OPTIONS, { defaultExchangeName: self.conf.DEFAULT_EXCHANGE }); - } else if (self.conf.backend_type === self.conf.broker_type) { - if (self.conf.backend_type === 'amqp') { - self.backend = self.broker; - } } + self.backend.on('error', function(err) { + self.emit('error', err); + }); + // backend ready... self.backend.on('ready', function() { debug('Connecting to broker...'); if (self.conf.broker_type === 'redis') { - self.broker = new RedisBroker(self.conf.BROKER_URL); + self.broker = new RedisBroker(self.conf); } else if (self.conf.broker_type === 'amqp') { self.broker = amqp.createConnection(self.conf.BROKER_OPTIONS, { defaultExchangeName: self.conf.DEFAULT_EXCHANGE @@ -260,11 +231,7 @@ Client.prototype.createTask = function(name, options, exchange) { Client.prototype.end = function() { this.broker.disconnect(); - if (this.conf.backend_type === 'redis') { - this.backend.quit(); - } else if (this.conf.broker_type !== this.conf.backend_type) { - this.backend.quit(); - } + this.backend.disconnect(); }; Client.prototype.call = function(name /*[args], [kwargs], [options], [callback]*/ ) { diff --git a/tests/test_celery.js b/tests/test_celery.js index 78b6a63..9ab908e 100644 --- a/tests/test_celery.js +++ b/tests/test_celery.js @@ -20,34 +20,63 @@ var conf_redis = { describe('celery functional tests', function() { describe('initialization', function() { - it('should create a client without error', function(done) { - var client1 = celery.createClient(conf_amqp), - client2 = celery.createClient(conf_invalid); + it('should create a valid amqp client without error', function(done) { + var client = celery.createClient(conf_amqp); - client1.on('connect', function() { - client1.end(); + assert.equal(client.conf.BROKER_OPTIONS.url, 'amqp://'); + assert.equal(client.conf.BROKER_OPTIONS.heartbeat, 580); + assert.equal(client.conf.broker_type, 'amqp'); + + assert.equal(client.conf.RESULT_BACKEND_OPTIONS.url, 'amqp://'); + assert.equal(client.conf.RESULT_BACKEND_OPTIONS.heartbeat, 580); + assert.equal(client.conf.backend_type, 'amqp'); + + client.on('connect', function() { + client.end(); }); - client1.on('error', function(exception) { - console.log(exception); + client.on('error', function(exception) { assert.ok(false); }); - client1.once('end', function() { + client.once('end', function() { done(); }); + }); + + it('should create a valid redis client without error', function(done) { + var client = celery.createClient(conf_redis); + + assert.equal(client.conf.BROKER_OPTIONS.url, 'redis://'); + assert.equal(client.conf.broker_type, 'redis'); + + assert.equal(client.conf.RESULT_BACKEND_OPTIONS.url, 'redis://'); + assert.equal(client.conf.backend_type, 'redis'); + + client.on('connect', function() { + client.end(); + }); - client2.on('ready', function() { + client.on('error', function(exception) { assert.ok(false); }); - client2.on('error', function(exception) { - assert.ok(exception); + client.once('end', function() { + done(); }); + }); - client2.once('end', function() { + it('should throw error on invalid amqp client', function(done) { + var client = celery.createClient(conf_invalid); + + client.on('ready', function() { assert.ok(false); }); + + client.on('error', function(exception) { + assert.ok(exception); + done(); + }); }); });