From 57c8f37dc24cd9abab259c577912c736465502b2 Mon Sep 17 00:00:00 2001 From: "d.poellath" Date: Thu, 5 Feb 2015 15:08:40 +0100 Subject: [PATCH 1/8] + add BaseBackend for save new backends (make sure all functions have been overwritten), with MakeEmitter (no need on Backend impl.) ! renamed Backend -> MemoryBackend (no impact, only internal refactoring) ! refactored (save) MemoryBackend method calling --- example.js | 2 +- lib/base-backend.js | 327 ++++++++++++++++++++++++++++++ lib/index.js | 10 +- lib/workflow-in-memory-backend.js | 325 ++++++++++++++--------------- 4 files changed, 481 insertions(+), 183 deletions(-) create mode 100644 lib/base-backend.js diff --git a/example.js b/example.js index d3e05c5..185d062 100644 --- a/example.js +++ b/example.js @@ -8,7 +8,7 @@ var util = require('util'); var wf = require('./lib/index'); // With modules, it would be require('workflow'); var Factory = wf.Factory; -var Backend = wf.Backend; +var Backend = wf.MemoryBackend; var backend, factory; diff --git a/lib/base-backend.js b/lib/base-backend.js new file mode 100644 index 0000000..2be62de --- /dev/null +++ b/lib/base-backend.js @@ -0,0 +1,327 @@ +var e = require('./errors'); +var util = require("util"); +var makeEmitter = require("./make-emitter"); + +var baseBackend = { + init: function (callback) { + throw new e.BackendInternal('Backend.init() not implemented yet'); + }, + // usage: - (test only) + quit: function (callback) { + throw new e.BackendInternal('Backend.quit() not implemented yet'); + }, + // usage: Factory + // workflow - Workflow object + // meta - Any additional information to pass to the backend which is not + // workflow properties + // callback - f(err, workflow) + createWorkflow: function (workflow, meta, callback) { + throw new e.BackendInternal('Backend.createWorkflow() not implemented yet'); + }, + // usage: API & Factory + // uuid - Workflow.uuid + // meta - Any additional information to pass to the backend which is not + // workflow properties + // callback - f(err, workflow) + getWorkflow: function (uuid, meta, callback) { + throw new e.BackendInternal('Backend.getWorkflow() not implemented yet'); + }, + // usage: API + // workflow - the workflow object + // meta - Any additional information to pass to the backend which is not + // workflow properties + // callback - f(err, boolean) + deleteWorkflow: function (workflow, meta, callback) { + throw new e.BackendInternal('Backend.deleteWorkflow() not implemented yet'); + }, + // usage: API + // workflow - update workflow object. + // meta - Any additional information to pass to the backend which is not + // workflow properties + // callback - f(err, workflow) + updateWorkflow: function (workflow, meta, callback) { + throw new e.BackendInternal('Backend.updateWorkflow() not implemented yet'); + }, + + // usage: Factory + // job - Job object + // meta - Any additional information to pass to the backend which is not + // job properties + // callback - f(err, job) + createJob: function (job, meta, callback) { + throw new e.BackendInternal('Backend.createJob() not implemented yet'); + }, + + // usage: Runner + // uuid - Job.uuid + // meta - Any additional information to pass to the backend which is not + // job properties + // callback - f(err, job) + getJob: function (uuid, meta, callback) { + throw new e.BackendInternal('Backend.getJob() not implemented yet'); + }, + + // usage: Internal & Test + // Get a single job property + // uuid - Job uuid. + // prop - (String) property name + // cb - callback f(err, value) + getJobProperty: function (uuid, prop, cb) { + throw new e.BackendInternal('Backend.getJobProperty() not implemented yet'); + }, + + // usage: Factory + // job - the job object + // callback - f(err) called with error in case there is a duplicated + // job with the same target and same params + validateJobTarget: function (job, callback) { + throw new e.BackendInternal('Backend.validateJobTarget() not implemented yet'); + }, + + // usage: - (test-only) + // Get the next queued job. + // index - Integer, optional. When given, it'll get the job at index + // position (when not given, it'll return the job at position + // zero). + // callback - f(err, job) + nextJob: function (index, callback) { + throw new e.BackendInternal('Backend.nextJob() not implemented yet'); + }, + + // usage: Runner + // Lock a job, mark it as running by the given runner, update job + // status. + // uuid - the job uuid (String) + // runner_id - the runner identifier (String) + // callback - f(err, job) callback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + runJob: function (uuid, runner_id, callback) { + throw new e.BackendInternal('Backend.runJob() not implemented yet'); + }, + + // usage: Runner + // Unlock the job, mark it as finished, update the status, add the + // results for every job's task. + // job - the job object. It'll be saved to the backend with the provided + // properties. + // callback - f(err, job) callback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + finishJob: function (job, callback) { + throw new e.BackendInternal('Backend.finishJob() not implemented yet'); + }, + + // usage: API + // Update the job while it is running with information regarding + // progress + // job - the job object. It'll be saved to the backend with the + // provided properties. + // meta - Any additional information to pass to the backend which is + // not job properties + // callback - f(err, job) callback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + updateJob: function (job, meta, callback) { + throw new e.BackendInternal('Backend.updateJob() not implemented yet'); + }, + + // usage: Runner + // Unlock the job, mark it as canceled, and remove the runner_id + // uuid - string, the job uuid. + // cb - f(err, job) callback will be called with error if something + // fails, otherwise it'll return the updated job using getJob. + cancelJob: function (uuid, cb) { + throw new e.BackendInternal('Backend.cancelJob() not implemented yet'); + }, + + // usage: API & Runner + // Update only the given Job property. Intendeed to prevent conflicts + // with two sources updating the same job at the same time, but + // different properties: + // - uuid - the job's uuid + // - prop - the name of the property to update + // - val - value to assign to such property + // - meta - Any additional information to pass to the backend which is + // not job properties + // - callback - f(err) called with error if something fails, otherwise + // with null. + updateJobProperty: function (uuid, prop, val, meta, callback) { + throw new e.BackendInternal('Backend.updateJobProperty() not implemented yet'); + }, + + // usage: Runner + // Queue a job which has been running; i.e, due to whatever the reason, + // re-queue the job. It'll unlock the job, update the status, add the + // results for every finished task so far ... + // job - the job Object. It'll be saved to the backend with the provided + // properties to ensure job status persistence. + // callback - f(err, job) callback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + queueJob: function (job, callback) { + throw new e.BackendInternal('Backend.queueJob() not implemented yet'); + }, + + // usage: Runner + // Pause a job which has been running; i.e, tell the job to wait for + // something external to happen. It'll unlock the job, update the + // status, add the results for every finished task so far ... + // job - the job Object. It'll be saved to the backend with the provided + // properties to ensure job status persistence. + // callback - f(err, job) callback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + pauseJob: function (job, callback) { + throw new e.BackendInternal('Backend.pauseJob() not implemented yet'); + }, + + // usage: - (test only) + // Tell a waiting job that whatever it has been waiting for has + // happened and it can run again. + // job - the job Object. It'll be saved to the backend with the provided + // properties to ensure job status persistence. + // callback - f(err, job) callback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + resumeJob: function (job, callback) { + throw new e.BackendInternal('Backend.resumeJob() not implemented yet'); + }, + + // usage: Runner + // Get the given number of queued jobs uuids. + // - start - Integer - Position of the first job to retrieve + // - stop - Integer - Position of the last job to retrieve, _included_ + // - callback - f(err, jobs) + nextJobs: function (start, stop, callback) { + throw new e.BackendInternal('Backend.nextJobs() not implemented yet'); + }, + + // usage: Runner + // Register a runner on the backend and report it's active: + // - runner_id - String, unique identifier for runner. + // - active_at - ISO String timestamp. Optional. If none is given, + // current time + // - callback - f(err) + registerRunner: function (runner_id, active_at, callback) { + throw new e.BackendInternal('Backend.registerRunner() not implemented yet'); + }, + + // usage: Runner + // Report a runner remains active: + // - runner_id - String, unique identifier for runner. Required. + // - active_at - ISO String timestamp. Optional. If none is given, + // current time + // - callback - f(err) + runnerActive: function (runner_id, active_at, callback) { + throw new e.BackendInternal('Backend.runnerActive() not implemented yet'); + }, + + // usage: - (test only) + // Get the given runner id details + // - runner_id - String, unique identifier for runner. Required. + // - callback - f(err, runner) + getRunner: function (runner_id, callback) { + throw new e.BackendInternal('Backend.getRunner() not implemented yet'); + }, + + // usage: API & Runner + // Get all the registered runners: + // - callback - f(err, runners) + getRunners: function (callback) { + throw new e.BackendInternal('Backend.getRunners() not implemented yet'); + }, + + // usage: - (test only) + // Set a runner as idle: + // - runner_id - String, unique identifier for runner + // - callback - f(err) + idleRunner: function (runner_id, callback) { + throw new e.BackendInternal('Backend.idleRunner() not implemented yet'); + }, + + // usage: Runner + // Check if the given runner is idle + // - runner_id - String, unique identifier for runner + // - callback - f(boolean) + isRunnerIdle: function (runner_id, callback) { + throw new e.BackendInternal('Backend.isRunnerIdle() not implemented yet'); + }, + + // usage: - (test only) + // Remove idleness of the given runner + // - runner_id - String, unique identifier for runner + // - callback - f(err) + wakeUpRunner: function (runner_id, callback) { + throw new e.BackendInternal('Backend.wakeUpRunner() not implemented yet'); + }, + + // usage: Runner + // Get all jobs associated with the given runner_id + // - runner_id - String, unique identifier for runner + // - callback - f(err, jobs). `jobs` is an array of job's UUIDs. + // Note `jobs` will be an array, even when empty. + getRunnerJobs: function (runner_id, callback) { + throw new e.BackendInternal('Backend.getRunnerJobs() not implemented yet'); + }, + + // usage: API + // Get all the workflows: + // - params - JSON Object (Optional). Can include the value of the + // workflow's "name", and any other key/value pair to search for + // into workflow's definition. + // - callback - f(err, workflows) + getWorkflows: function (params, callback) { + throw new e.BackendInternal('Backend.getWorkflows() not implemented yet'); + }, + + // usage: API + // Get all the jobs: + // - params - JSON Object. Can include the value of the job's + // "execution" status, and any other key/value pair to search for + // into job'sparams. + // - execution - String, the execution status for the jobs to return. + // Return all jobs if no execution status is given. + // - callback - f(err, jobs) + getJobs: function (params, callback) { + throw new e.BackendInternal('Backend.getJobs() not implemented yet'); + }, + + // usage: API + countJobs: function (callback) { + throw new e.BackendInternal('Backend.countJobs() not implemented yet'); + }, + + // usage: API & Runner + // Add progress information to an existing job: + // - uuid - String, the Job's UUID. + // - info - Object, {'key' => 'Value'} + // - meta - Any additional information to pass to the backend which is + // not job info + // - callback - f(err) + addInfo: function (uuid, info, meta, callback) { + throw new e.BackendInternal('Backend.addInfo() not implemented yet'); + }, + + // usage: API + // Get progress information from an existing job: + // - uuid - String, the Job's UUID. + // - callback - f(err, info) + getInfo: function (uuid, meta, callback) { + throw new e.BackendInternal('Backend.getInfo() not implemented yet'); + } +}; + +makeEmitter(baseBackend); + +module.exports = function (pBackend) { + var newBackend = {}; + var a; + for (a in baseBackend) { + newBackend[a] = baseBackend[a]; + } + for (a in pBackend) { + newBackend[a] = pBackend[a]; + } + return newBackend; +}; \ No newline at end of file diff --git a/lib/index.js b/lib/index.js index fe365c4..6b9715a 100644 --- a/lib/index.js +++ b/lib/index.js @@ -31,15 +31,17 @@ module.exports = { return WorkflowFactory(backend); }, - Backend: function (config) { - var Backend = require('./workflow-in-memory-backend'); - return Backend(config); + MemoryBackend: function (config) { + var MemoryBackend = require('./workflow-in-memory-backend'); + return MemoryBackend(config); + }, + BaseBackend: function () { + return require('./base-backend'); }, API: function (config) { if (typeof (config) !== 'object') { throw new Error('config must be an object'); } - var API = require('./api'); return API(config); }, diff --git a/lib/workflow-in-memory-backend.js b/lib/workflow-in-memory-backend.js index 9419769..cf60b04 100644 --- a/lib/workflow-in-memory-backend.js +++ b/lib/workflow-in-memory-backend.js @@ -1,15 +1,15 @@ // Copyright 2012 Pedro P. Candel . All rights reserved. -var util = require('util'); -var makeEmitter = require('./make-emitter'); -var Logger = require('bunyan'); -var e = require('./errors'); -var clone = require('clone'); -var sprintf = util.format; +var util = require('util') + , Logger = require('bunyan') + , e = require('./errors') + , clone = require('clone') + , sprintf = util.format + , baseBackend = require('./base-backend'); // Returns true when "obj" (Object) has all the properties "kv" (Object) has, // and with exactly the same values, otherwise, false -function hasPropsAndVals(obj, kv) { +function _hasPropsAndVals(obj, kv) { if (typeof (obj) !== 'object' || typeof (kv) !== 'object') { return (false); } @@ -23,7 +23,7 @@ function hasPropsAndVals(obj, kv) { })); } -var Backend = module.exports = function (config) { +function MemoryBackend(config) { var log; @@ -33,17 +33,14 @@ var Backend = module.exports = function (config) { if (!config.logger) { config.logger = {}; } - config.logger.name = 'wf-in-memory-backend'; config.logger.serializers = { err: Logger.stdSerializers.err }; - - config.logger.streams = config.logger.streams || [ { + config.logger.streams = config.logger.streams || [{ level: 'info', stream: process.stdout }]; - log = new Logger(config.logger); } @@ -75,44 +72,10 @@ var Backend = module.exports = function (config) { return wf_job_targets; } - function getJob(uuid, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (jobs[uuid]) { - return callback(null, clone(jobs[uuid])); - } else { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', uuid))); - } - } - - // Register a runner on the backend and report it's active: - // - runner_id - String, unique identifier for runner. - // - active_at - ISO String timestamp. Optional. If none is given, - // current time - // - callback - f(err) - function registerRunner(runner_id, active_at, callback) { - if (typeof (active_at) === 'function') { - callback = active_at; - active_at = new Date(); - } - if (typeof (active_at) === 'string') { - active_at = new Date(active_at); - } - runners[runner_id] = { - runner_id: runner_id, - active_at: active_at, - idle: false - }; - return callback(null); - } - - var backend = { + var backend = require('./base-backend'); + backend = { log: log, - init: function init(callback) { + init: function (callback) { workflows = {}; jobs = {}; runners = {}; @@ -121,99 +84,98 @@ var Backend = module.exports = function (config) { return callback(); }, - quit: function quit(callback) { + // Never get called, except in test + quit: function (callback) { return callback(); }, + + // usage: Factory // workflow - Workflow object // meta - Any additional information to pass to the backend which is not // workflow properties // callback - f(err, workflow) - createWorkflow: function createWorkflow(workflow, meta, callback) { + createWorkflow: function (workflow, meta, callback) { if (typeof (meta) === 'function') { callback = meta; meta = {}; } if (_wfNames().indexOf(workflow.name) !== -1) { return callback(new e.BackendInvalidArgumentError( - 'Workflow.name must be unique. A workflow with name "' + - workflow.name + '" already exists')); + 'Workflow.name must be unique. A workflow with name "' + + workflow.name + '" already exists')); } else { workflows[workflow.uuid] = clone(workflow); return callback(null, workflow); } }, + // usage: API & Factory // uuid - Workflow.uuid // meta - Any additional information to pass to the backend which is not // workflow properties // callback - f(err, workflow) - getWorkflow: function getWorkflow(uuid, meta, callback) { + getWorkflow: function (uuid, meta, callback) { if (typeof (meta) === 'function') { callback = meta; meta = {}; } - if (workflows[uuid]) { return callback(null, clone(workflows[uuid])); } else { return callback(new e.BackendResourceNotFoundError(sprintf( - 'Workflow with uuid \'%s\' does not exist', uuid))); + 'Workflow with uuid \'%s\' does not exist', uuid))); } }, + // usage: API // workflow - the workflow object // meta - Any additional information to pass to the backend which is not // workflow properties // callback - f(err, boolean) - deleteWorkflow: function deleteWorkflow(workflow, meta, callback) { + deleteWorkflow: function (workflow, meta, callback) { if (typeof (meta) === 'function') { callback = meta; meta = {}; } - - if (workflows[workflow.uuid]) { - return callback(null, (delete workflows[workflow.uuid])); - } else { - return callback(null, false); - } + return callback(null, workflows[workflow.uuid] ? (delete workflows[workflow.uuid]) : false); }, + // usage: API // workflow - update workflow object. // meta - Any additional information to pass to the backend which is not // workflow properties // callback - f(err, workflow) - updateWorkflow: function updateWorkflow(workflow, meta, callback) { + updateWorkflow: function (workflow, meta, callback) { if (typeof (meta) === 'function') { callback = meta; meta = {}; } - if (workflows[workflow.uuid]) { if (_wfNames().indexOf(workflow.name) !== -1 && workflows[workflow.uuid].name !== workflow.name) { return callback(new e.BackendInvalidArgumentError( - 'Workflow.name must be unique. A workflow with name "' + - workflow.name + '" already exists')); + 'Workflow.name must be unique. A workflow with name "' + + workflow.name + '" already exists')); } else { workflows[workflow.uuid] = clone(workflow); return callback(null, workflow); } } else { return callback(new e.BackendResourceNotFoundError( - 'Workflow does not exist. Cannot Update.')); + 'Workflow does not exist. Cannot Update.')); } }, + // usage: Factory // job - Job object // meta - Any additional information to pass to the backend which is not // job properties // callback - f(err, job) - createJob: function createJob(job, meta, callback) { + createJob: function (job, meta, callback) { if (typeof (meta) === 'function') { callback = meta; meta = {}; } - job.created_at = job.created_at || new Date().toISOString(); jobs[job.uuid] = clone(job); queued_jobs.push(job.uuid); @@ -223,12 +185,25 @@ var Backend = module.exports = function (config) { return callback(null, job); }, + // usage: Runner // uuid - Job.uuid // meta - Any additional information to pass to the backend which is not // job properties // callback - f(err, job) - getJob: getJob, + getJob: function (uuid, meta, callback) { + if (typeof (meta) === 'function') { + callback = meta; + meta = {}; + } + if (jobs[uuid]) { + return callback(null, clone(jobs[uuid])); + } else { + return callback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', uuid))); + } + }, + // usage: Internal & Test // Get a single job property // uuid - Job uuid. // prop - (String) property name @@ -238,78 +213,67 @@ var Backend = module.exports = function (config) { return cb(null, jobs[uuid][prop]); } else { return cb(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', uuid))); + 'Job with uuid \'%s\' does not exist', uuid))); } }, + // usage: Factory // job - the job object // callback - f(err) called with error in case there is a duplicated // job with the same target and same params - validateJobTarget: function validateJobTarget(job, callback) { + validateJobTarget: function (job, callback) { // If no target is given, we don't care: if (!job.target) { return callback(null); } - var locked = _lockedTargets().some(function (r) { var re = new RegExp(r); return (re.test(job.target)); }); - if (locked) { return callback(new e.BackendInvalidArgumentError( 'Job target is currently locked by another job')); } - if (_jobTargets().indexOf(job.target) === -1) { return callback(null); } - var filtered = Object.keys(jobs).filter(function (uuid) { return ( - uuid !== job.uuid && - jobs[uuid].target === job.target && - Object.keys(job.params).every(function (p) { + uuid !== job.uuid && + jobs[uuid].target === job.target && + Object.keys(job.params).every(function (p) { return (jobs[uuid].params[p] && - jobs[uuid].params[p] === job.params[p]); + jobs[uuid].params[p] === job.params[p]); }) && - (jobs[uuid].execution === 'queued' || - jobs[uuid].execution === 'running')); + (jobs[uuid].execution === 'queued' || + jobs[uuid].execution === 'running')); }); - - if (filtered.length !== 0) { - return callback(new e.BackendInvalidArgumentError( - 'Another job with the same target' + - ' and params is already queued')); - } else { - return callback(null); - } + return callback(filtered.length !== 0 ? new e.BackendInvalidArgumentError('Another job with the same target and params is already queued') : null); }, + // usage: - (test-only) // Get the next queued job. // index - Integer, optional. When given, it'll get the job at index // position (when not given, it'll return the job at position // zero). // callback - f(err, job) - nextJob: function nextJob(index, callback) { + nextJob: function (index, callback) { if (typeof (index) === 'function') { callback = index; index = 0; } - if (queued_jobs.length === 0) { return callback(null, null); } - var slice = queued_jobs.slice(index, index + 1); - if (slice.length === 0) { return callback(null, null); } else { - return getJob(slice[0], callback); + return this.getJob(slice[0], callback); } }, + // usage: Runner // Lock a job, mark it as running by the given runner, update job // status. // uuid - the job uuid (String) @@ -317,11 +281,10 @@ var Backend = module.exports = function (config) { // callback - f(err, job) callback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - runJob: function runJob(uuid, runner_id, callback) { + runJob: function (uuid, runner_id, callback) { var idx = queued_jobs.indexOf(uuid); if (idx === -1) { - return callback(new e.BackendPreconditionFailedError( - 'Only queued jobs can be run')); + return callback(new e.BackendPreconditionFailedError('Only queued jobs can be run')); } else { queued_jobs.splice(idx, 1); jobs[uuid].runner_id = runner_id; @@ -330,6 +293,7 @@ var Backend = module.exports = function (config) { } }, + // usage: Runner // Unlock the job, mark it as finished, update the status, add the // results for every job's task. // job - the job object. It'll be saved to the backend with the provided @@ -337,14 +301,14 @@ var Backend = module.exports = function (config) { // callback - f(err, job) callback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - finishJob: function finishJob(job, callback) { + finishJob: function (job, callback) { if (!jobs[job.uuid]) { return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); + 'Job with uuid \'%s\' does not exist', job.uuid))); } else if (jobs[job.uuid].execution !== 'running' && jobs[job.uuid].execution !== 'canceled') { return callback(new e.BackendPreconditionFailedError( - 'Only running jobs can be finished')); + 'Only running jobs can be finished')); } else { if (job.execution === 'running') { job.execution = 'succeeded'; @@ -362,6 +326,7 @@ var Backend = module.exports = function (config) { } }, + // usage: API // Update the job while it is running with information regarding // progress // job - the job object. It'll be saved to the backend with the @@ -371,31 +336,29 @@ var Backend = module.exports = function (config) { // callback - f(err, job) callback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - updateJob: function updateJob(job, meta, callback) { + updateJob: function (job, meta, callback) { if (typeof (meta) === 'function') { callback = meta; meta = {}; } - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); + return callback(new e.BackendResourceNotFoundError(sprintf('Job with uuid \'%s\' does not exist', job.uuid))); } else { jobs[job.uuid] = clone(job); return callback(null, job); } }, + // usage: Runner // Unlock the job, mark it as canceled, and remove the runner_id // uuid - string, the job uuid. // cb - f(err, job) callback will be called with error if something // fails, otherwise it'll return the updated job using getJob. - cancelJob: function cancelJob(uuid, cb) { + cancelJob: function (uuid, cb) { if (typeof (uuid) === 'undefined') { return cb(new e.BackendInternalError( - 'cancelJob uuid(String) required')); + 'cancelJob uuid(String) required')); } - jobs[uuid].execution = 'canceled'; delete jobs[uuid].runner_id; if (typeof (jobs[uuid].locks) !== 'undefined') { @@ -404,6 +367,7 @@ var Backend = module.exports = function (config) { return cb(null, jobs[uuid]); }, + // usage: API & Runner // Update only the given Job property. Intendeed to prevent conflicts // with two sources updating the same job at the same time, but // different properties: @@ -414,28 +378,21 @@ var Backend = module.exports = function (config) { // not job properties // - callback - f(err) called with error if something fails, otherwise // with null. - updateJobProperty: function updateJobProperty( - uuid, - prop, - val, - meta, - callback) - { - + updateJobProperty: function (uuid, prop, val, meta, callback) { if (typeof (meta) === 'function') { callback = meta; meta = {}; } - if (!jobs[uuid]) { return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', uuid))); + 'Job with uuid \'%s\' does not exist', uuid))); } else { jobs[uuid][prop] = val; return callback(null); } }, + // usage: Runner // Queue a job which has been running; i.e, due to whatever the reason, // re-queue the job. It'll unlock the job, update the status, add the // results for every finished task so far ... @@ -444,13 +401,13 @@ var Backend = module.exports = function (config) { // callback - f(err, job) callback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - queueJob: function queueJob(job, callback) { + queueJob: function (job, callback) { if (!jobs[job.uuid]) { return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); + 'Job with uuid \'%s\' does not exist', job.uuid))); } else if (jobs[job.uuid].execution !== 'running') { return callback(new e.BackendPreconditionFailedError( - 'Only running jobs can be queued again')); + 'Only running jobs can be queued again')); } else { job.runner_id = null; job.execution = 'queued'; @@ -460,6 +417,7 @@ var Backend = module.exports = function (config) { } }, + // usage: Runner // Pause a job which has been running; i.e, tell the job to wait for // something external to happen. It'll unlock the job, update the // status, add the results for every finished task so far ... @@ -468,13 +426,13 @@ var Backend = module.exports = function (config) { // callback - f(err, job) callback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - pauseJob: function pauseJob(job, callback) { + pauseJob: function (job, callback) { if (!jobs[job.uuid]) { return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); + 'Job with uuid \'%s\' does not exist', job.uuid))); } else if (jobs[job.uuid].execution !== 'running') { return callback(new e.BackendPreconditionFailedError( - 'Only running jobs can be paused')); + 'Only running jobs can be paused')); } else { job.runner_id = null; job.execution = 'waiting'; @@ -484,6 +442,7 @@ var Backend = module.exports = function (config) { } }, + // usage: - (test only) // Tell a waiting job that whatever it has been waiting for has // happened and it can run again. // job - the job Object. It'll be saved to the backend with the provided @@ -491,13 +450,13 @@ var Backend = module.exports = function (config) { // callback - f(err, job) callback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - resumeJob: function resumeJob(job, callback) { + resumeJob: function (job, callback) { if (!jobs[job.uuid]) { return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); + 'Job with uuid \'%s\' does not exist', job.uuid))); } else if (jobs[job.uuid].execution !== 'waiting') { return callback(new e.BackendPreconditionFailedError( - 'Only waiting jobs can be resumed')); + 'Only waiting jobs can be resumed')); } else { job.runner_id = null; job.execution = 'queued'; @@ -510,17 +469,16 @@ var Backend = module.exports = function (config) { } }, + // usage: Runner // Get the given number of queued jobs uuids. // - start - Integer - Position of the first job to retrieve // - stop - Integer - Position of the last job to retrieve, _included_ // - callback - f(err, jobs) - nextJobs: function nextJobs(start, stop, callback) { + nextJobs: function (start, stop, callback) { if (queued_jobs.length === 0) { return callback(null, null); } - var slice = queued_jobs.slice(start, stop + 1); - if (slice.length === 0) { return callback(null, null); } else { @@ -528,102 +486,116 @@ var Backend = module.exports = function (config) { } }, + // usage: Runner // Register a runner on the backend and report it's active: // - runner_id - String, unique identifier for runner. // - active_at - ISO String timestamp. Optional. If none is given, // current time // - callback - f(err) - registerRunner: registerRunner, + registerRunner: function (runner_id, active_at, callback) { + if (typeof (active_at) === 'function') { + callback = active_at; + active_at = new Date(); + } else if (typeof (active_at) === 'string') { + active_at = new Date(active_at); + } + runners[runner_id] = { + runner_id: runner_id, + active_at: active_at, + idle: false + }; + return callback(null); + }, + // usage: Runner // Report a runner remains active: // - runner_id - String, unique identifier for runner. Required. // - active_at - ISO String timestamp. Optional. If none is given, // current time // - callback - f(err) - runnerActive: function runnerActive(runner_id, active_at, callback) { - return registerRunner(runner_id, active_at, callback); + runnerActive: function (runner_id, active_at, callback) { + return this.registerRunner(runner_id, active_at, callback); }, + // usage: - (test only) // Get the given runner id details // - runner_id - String, unique identifier for runner. Required. // - callback - f(err, runner) - getRunner: function getRunner(runner_id, callback) { + getRunner: function (runner_id, callback) { if (!runners[runner_id]) { return callback(new e.BackendResourceNotFoundError(sprintf( - 'Runner with uuid \'%s\' does not exist', runner_id))); + 'Runner with uuid \'%s\' does not exist', runner_id))); } else { return callback(null, runners[runner_id].active_at); } }, + // usage: API & Runner // Get all the registered runners: // - callback - f(err, runners) - getRunners: function getRunners(callback) { + getRunners: function (callback) { var theRunners = {}; - Object.keys(runners).forEach(function (uuid) { theRunners[uuid] = runners[uuid].active_at; }); - return callback(null, theRunners); }, + // usage: - (test only) // Set a runner as idle: // - runner_id - String, unique identifier for runner // - callback - f(err) - idleRunner: function idleRunner(runner_id, callback) { + idleRunner: function (runner_id, callback) { if (!runners[runner_id]) { return callback(new e.BackendResourceNotFoundError(sprintf( - 'Runner with uuid \'%s\' does not exist', runner_id))); + 'Runner with uuid \'%s\' does not exist', runner_id))); } else { runners[runner_id].idle = true; return callback(null); } }, + // usage: Runner // Check if the given runner is idle // - runner_id - String, unique identifier for runner // - callback - f(boolean) - isRunnerIdle: function isRunnerIdle(runner_id, callback) { - if (!runners[runner_id] || (runners[runner_id].idle === true)) { - return callback(true); - } else { - return callback(false); - } + isRunnerIdle: function (runner_id, callback) { + return callback((!runners[runner_id] || (runners[runner_id].idle === true))); }, + // usage: - (test only) // Remove idleness of the given runner // - runner_id - String, unique identifier for runner // - callback - f(err) - wakeUpRunner: function wakeUpRunner(runner_id, callback) { + wakeUpRunner: function (runner_id, callback) { if (!runners[runner_id]) { return callback(new e.BackendResourceNotFoundError(sprintf( - 'Runner with uuid \'%s\' does not exist', runner_id))); + 'Runner with uuid \'%s\' does not exist', runner_id))); } else { runners[runner_id].idle = false; return callback(null); } }, + // usage: Runner // Get all jobs associated with the given runner_id // - runner_id - String, unique identifier for runner // - callback - f(err, jobs). `jobs` is an array of job's UUIDs. // Note `jobs` will be an array, even when empty. - getRunnerJobs: function getRunnerJobs(runner_id, callback) { + getRunnerJobs: function (runner_id, callback) { var wf_runner_jobs = Object.keys(jobs).filter(function (uuid) { - return jobs[uuid].runner_id === runner_id; - }); - + return jobs[uuid].runner_id === runner_id; + }); return callback(null, wf_runner_jobs); }, + // usage: API // Get all the workflows: // - params - JSON Object (Optional). Can include the value of the // workflow's "name", and any other key/value pair to search for // into workflow's definition. // - callback - f(err, workflows) - getWorkflows: function getWorkflows(params, callback) { - + getWorkflows: function (params, callback) { if (typeof (params) === 'function') { callback = params; params = {}; @@ -635,13 +607,14 @@ var Backend = module.exports = function (config) { }); rWorkflows.forEach(function (wf) { - if (hasPropsAndVals(wf, params)) { + if (_hasPropsAndVals(wf, params)) { wfs.push(wf); } }); return callback(null, wfs); }, + // usage: API // Get all the jobs: // - params - JSON Object. Can include the value of the job's // "execution" status, and any other key/value pair to search for @@ -649,7 +622,7 @@ var Backend = module.exports = function (config) { // - execution - String, the execution status for the jobs to return. // Return all jobs if no execution status is given. // - callback - f(err, jobs) - getJobs: function getJobs(params, callback) { + getJobs: function (params, callback) { var executions = [ 'queued', 'failed', @@ -664,7 +637,6 @@ var Backend = module.exports = function (config) { var limit; var rJobs = []; var theJobs = []; - if (typeof (params) === 'object') { execution = params.execution; delete params.execution; @@ -673,20 +645,16 @@ var Backend = module.exports = function (config) { limit = params.limit; delete params.limit; } - if (typeof (params) === 'function') { callback = params; params = {}; } - - if ((typeof (execution) !== 'undefined') && - (executions.indexOf(execution) === -1)) { + (executions.indexOf(execution) === -1)) { return callback(new e.BackendInvalidArgumentError( - 'excution is required and must be one of "' + - executions.join('", "') + '"')); + 'excution is required and must be one of "' + + executions.join('", "') + '"')); } - if (typeof (execution) !== 'undefined') { rJobs = Object.keys(jobs).filter(function (uuid) { return (jobs[uuid].execution === execution); @@ -698,23 +666,22 @@ var Backend = module.exports = function (config) { return clone(jobs[uuid]); }); } - rJobs.forEach(function (job) { - if (hasPropsAndVals(job.params, params)) { + if (_hasPropsAndVals(job.params, params)) { theJobs.push(job); } }); - if (typeof (offset) !== 'undefined' && - typeof (limit) !== 'undefined') { + typeof (limit) !== 'undefined') { return callback(null, theJobs.slice(offset, limit)); } else { return callback(null, theJobs); } }, - - countJobs: function countJobs(callback) { + // usage: API + // TODO: missing specs + countJobs: function (callback) { var executions = [ 'queued', 'failed', @@ -781,13 +748,14 @@ var Backend = module.exports = function (config) { }, + // usage: API & Runner // Add progress information to an existing job: // - uuid - String, the Job's UUID. // - info - Object, {'key' => 'Value'} // - meta - Any additional information to pass to the backend which is // not job info // - callback - f(err) - addInfo: function addInfo(uuid, info, meta, callback) { + addInfo: function (uuid, info, meta, callback) { if (typeof (meta) === 'function') { callback = meta; @@ -796,7 +764,7 @@ var Backend = module.exports = function (config) { if (!jobs[uuid]) { return callback(new e.BackendResourceNotFoundError( - 'Job does not exist. Cannot Update.')); + 'Job does not exist. Cannot Update.')); } else { if (!util.isArray(jobs[uuid].info)) { jobs[uuid].info = []; @@ -806,10 +774,11 @@ var Backend = module.exports = function (config) { } }, + // usage: API // Get progress information from an existing job: // - uuid - String, the Job's UUID. // - callback - f(err, info) - getInfo: function getInfo(uuid, meta, callback) { + getInfo: function (uuid, meta, callback) { if (typeof (meta) === 'function') { callback = meta; meta = {}; @@ -817,7 +786,7 @@ var Backend = module.exports = function (config) { if (!jobs[uuid]) { return callback(new e.BackendResourceNotFoundError( - 'Job does not exist. Cannot get info.')); + 'Job does not exist. Cannot get info.')); } else { if (!util.isArray(jobs[uuid].info)) { jobs[uuid].info = []; @@ -827,7 +796,7 @@ var Backend = module.exports = function (config) { } }; + return baseBackend(backend); +} - makeEmitter(backend); - return backend; -}; +module.exports = MemoryBackend; \ No newline at end of file From a6323e15cbfea0726e2c39e51f9fefbda33ed23b Mon Sep 17 00:00:00 2001 From: "d.poellath" Date: Thu, 5 Feb 2015 15:20:12 +0100 Subject: [PATCH 2/8] ! mark optional parameters --- lib/base-backend.js | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/lib/base-backend.js b/lib/base-backend.js index 2be62de..12720f8 100644 --- a/lib/base-backend.js +++ b/lib/base-backend.js @@ -12,7 +12,7 @@ var baseBackend = { }, // usage: Factory // workflow - Workflow object - // meta - Any additional information to pass to the backend which is not + // meta (optional) - Any additional information to pass to the backend which is not // workflow properties // callback - f(err, workflow) createWorkflow: function (workflow, meta, callback) { @@ -20,7 +20,7 @@ var baseBackend = { }, // usage: API & Factory // uuid - Workflow.uuid - // meta - Any additional information to pass to the backend which is not + // meta (optional) - Any additional information to pass to the backend which is not // workflow properties // callback - f(err, workflow) getWorkflow: function (uuid, meta, callback) { @@ -28,7 +28,7 @@ var baseBackend = { }, // usage: API // workflow - the workflow object - // meta - Any additional information to pass to the backend which is not + // meta (optional) - Any additional information to pass to the backend which is not // workflow properties // callback - f(err, boolean) deleteWorkflow: function (workflow, meta, callback) { @@ -36,7 +36,7 @@ var baseBackend = { }, // usage: API // workflow - update workflow object. - // meta - Any additional information to pass to the backend which is not + // meta (optional) - Any additional information to pass to the backend which is not // workflow properties // callback - f(err, workflow) updateWorkflow: function (workflow, meta, callback) { @@ -45,7 +45,7 @@ var baseBackend = { // usage: Factory // job - Job object - // meta - Any additional information to pass to the backend which is not + // meta (optional) - Any additional information to pass to the backend which is not // job properties // callback - f(err, job) createJob: function (job, meta, callback) { @@ -54,7 +54,7 @@ var baseBackend = { // usage: Runner // uuid - Job.uuid - // meta - Any additional information to pass to the backend which is not + // meta (optional) - Any additional information to pass to the backend which is not // job properties // callback - f(err, job) getJob: function (uuid, meta, callback) { @@ -80,7 +80,7 @@ var baseBackend = { // usage: - (test-only) // Get the next queued job. - // index - Integer, optional. When given, it'll get the job at index + // index (optional) - Integer, optional. When given, it'll get the job at index // position (when not given, it'll return the job at position // zero). // callback - f(err, job) @@ -117,7 +117,7 @@ var baseBackend = { // progress // job - the job object. It'll be saved to the backend with the // provided properties. - // meta - Any additional information to pass to the backend which is + // meta (optional) - Any additional information to pass to the backend which is // not job properties // callback - f(err, job) callback will be called with error if // something fails, otherwise it'll return the updated job @@ -142,7 +142,7 @@ var baseBackend = { // - uuid - the job's uuid // - prop - the name of the property to update // - val - value to assign to such property - // - meta - Any additional information to pass to the backend which is + // - meta (optional) - Any additional information to pass to the backend which is // not job properties // - callback - f(err) called with error if something fails, otherwise // with null. @@ -200,7 +200,7 @@ var baseBackend = { // usage: Runner // Register a runner on the backend and report it's active: // - runner_id - String, unique identifier for runner. - // - active_at - ISO String timestamp. Optional. If none is given, + // - active_at (optional) - ISO String timestamp. Optional. If none is given, // current time // - callback - f(err) registerRunner: function (runner_id, active_at, callback) { @@ -210,7 +210,7 @@ var baseBackend = { // usage: Runner // Report a runner remains active: // - runner_id - String, unique identifier for runner. Required. - // - active_at - ISO String timestamp. Optional. If none is given, + // - active_at (optional) - ISO String timestamp. Optional. If none is given, // current time // - callback - f(err) runnerActive: function (runner_id, active_at, callback) { @@ -267,7 +267,7 @@ var baseBackend = { // usage: API // Get all the workflows: - // - params - JSON Object (Optional). Can include the value of the + // - params (optional) - JSON Object (Optional). Can include the value of the // workflow's "name", and any other key/value pair to search for // into workflow's definition. // - callback - f(err, workflows) @@ -277,17 +277,19 @@ var baseBackend = { // usage: API // Get all the jobs: - // - params - JSON Object. Can include the value of the job's + // - params (optional) - JSON Object. Can include the value of the job's // "execution" status, and any other key/value pair to search for // into job'sparams. // - execution - String, the execution status for the jobs to return. // Return all jobs if no execution status is given. - // - callback - f(err, jobs) + // - callback - f(err, jobs, count) getJobs: function (params, callback) { throw new e.BackendInternal('Backend.getJobs() not implemented yet'); }, // usage: API + // Get count of jobs: + // - callback - f(err, stats) countJobs: function (callback) { throw new e.BackendInternal('Backend.countJobs() not implemented yet'); }, @@ -296,7 +298,7 @@ var baseBackend = { // Add progress information to an existing job: // - uuid - String, the Job's UUID. // - info - Object, {'key' => 'Value'} - // - meta - Any additional information to pass to the backend which is + // - meta (optional) - Any additional information to pass to the backend which is // not job info // - callback - f(err) addInfo: function (uuid, info, meta, callback) { @@ -306,6 +308,7 @@ var baseBackend = { // usage: API // Get progress information from an existing job: // - uuid - String, the Job's UUID. + // - meta (optional) // - callback - f(err, info) getInfo: function (uuid, meta, callback) { throw new e.BackendInternal('Backend.getInfo() not implemented yet'); From c7b95fb859ead9a8e1cce8a649c2d50392af0871 Mon Sep 17 00:00:00 2001 From: "d.poellath" Date: Wed, 11 Mar 2015 15:26:53 +0100 Subject: [PATCH 3/8] ! preparing base-backend to split logic & persistence --- lib/base-backend.js | 650 ++++++++++++++++++++---- lib/workflow-in-memory-backend.js | 816 +++--------------------------- package.json | 87 ++-- 3 files changed, 652 insertions(+), 901 deletions(-) diff --git a/lib/base-backend.js b/lib/base-backend.js index 12720f8..80415c1 100644 --- a/lib/base-backend.js +++ b/lib/base-backend.js @@ -1,81 +1,228 @@ var e = require('./errors'); var util = require("util"); -var makeEmitter = require("./make-emitter"); +var makeEmitter = require("./make-emitter"), + clone = require('clone'), + _ = require('lodash'), + sprintf = util.format; + + +/* + Using Filter with Query Language (ORM) + http://sailsjs.org/#!/documentation/concepts/ORM/Querylanguage.html + */ var baseBackend = { - init: function (callback) { - throw new e.BackendInternal('Backend.init() not implemented yet'); + TYPES: { + WORKFLOW: 'workflow', + JOB: 'job', + RUNNER: 'runner' + }, + + EXECUTION: { + RUNNING: 'running', + QUEUED: 'queued', + CANCELED: 'canceled', + SUCCEEDED: 'succeeded', + WAITING: 'waiting', + FAILED: 'failed', + RETRIED: 'retried' + }, + + init: function (pCallback) { + if (pCallback) + return pCallback(); }, // usage: - (test only) - quit: function (callback) { - throw new e.BackendInternal('Backend.quit() not implemented yet'); + quit: function (pCallback) { + if (pCallback) + return pCallback(); + }, + // usage: internal + // should save obj to persistence + // pType - type, TYPES + // pObj - Object + // pCallback - f(err, obj) + save: function (pType, pObj, pCallback) { + throw new e.BackendInternal('Backend.save() not implemented yet'); + }, + // usage: internal + // should find object from persistence + // pType - type, TYPES + // pFilterObj - Filter for search, e.g. { 'where': { 'attr': 'value' }} + // pCallback - f(err, objs), objs is an array even if empty + find: function (pType, pFilterObj, pCallback) { + throw new e.BackendInternal('Backend.find() not implemented yet'); + }, + // usage: internal + // should remove object from persistence + // pType - type, TYPES + // pObj - Object + // pCallback - f(err, boolean) + remove: function (pType, pObj, pCallback) { + throw new e.BackendInternal('Backend.remove() not implemented yet'); }, // usage: Factory // workflow - Workflow object // meta (optional) - Any additional information to pass to the backend which is not // workflow properties - // callback - f(err, workflow) - createWorkflow: function (workflow, meta, callback) { - throw new e.BackendInternal('Backend.createWorkflow() not implemented yet'); + // pCallback - f(err, workflow) + createWorkflow: function (pWorkflow, pMeta, pCallback) { + if (typeof (pMeta) === 'function') { + pCallback = pMeta; + pMeta = {}; + } + var _self = this; + this.find(this.TYPES.WORKFLOW, {'name': pWorkflow.name}, function (pError, pWorkflows) { + if (pWorkflows.length > 0) + return pCallback(new e.BackendInvalidArgumentError( + 'Workflow.name must be unique. A workflow with name "' + + pWorkflow.name + '" already exists')); + _self.save(_self.TYPES.WORKFLOW, clone(pWorkflow), function (pError) { + return pCallback(pError, pWorkflow); + }); + }); }, // usage: API & Factory // uuid - Workflow.uuid // meta (optional) - Any additional information to pass to the backend which is not // workflow properties - // callback - f(err, workflow) - getWorkflow: function (uuid, meta, callback) { - throw new e.BackendInternal('Backend.getWorkflow() not implemented yet'); + // pCallback - f(err, workflow) + getWorkflow: function (uuid, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + this.find(this.TYPES.WORKFLOW, {'uuid': uuid}, function (pError, pWorkflows) { + if (pWorkflows.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Workflow with uuid \'%s\' does not exist', uuid))); + return pCallback(null, clone(pWorkflows[0])); + }); }, // usage: API // workflow - the workflow object // meta (optional) - Any additional information to pass to the backend which is not // workflow properties - // callback - f(err, boolean) - deleteWorkflow: function (workflow, meta, callback) { - throw new e.BackendInternal('Backend.deleteWorkflow() not implemented yet'); + // pCallback - f(err, boolean) + deleteWorkflow: function (workflow, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + this.remove(this.TYPES.WORKFLOW, workflow, pCallback); }, // usage: API // workflow - update workflow object. // meta (optional) - Any additional information to pass to the backend which is not // workflow properties - // callback - f(err, workflow) - updateWorkflow: function (workflow, meta, callback) { - throw new e.BackendInternal('Backend.updateWorkflow() not implemented yet'); + // pCallback - f(err, workflow) + updateWorkflow: function (workflow, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + var _self = this; + this.getWorkflow(workflow.uuid, meta, function (pError, pWorkflow) { + if (!pWorkflow) + return pCallback(new e.BackendResourceNotFoundError( + 'Workflow does not exist. Cannot Update.')); + _self.find(_self.TYPES.WORKFLOW, {'name': pWorkflow.name}, function (pError, pWorkflows) { + if (pWorkflows.length > 0) + for (var i = 0; i < pWorkflows.length; i++) { + if (pWorkflows[i].uuid !== workflow.uuid) + return pCallback(new e.BackendInvalidArgumentError( + 'Workflow.name must be unique. A workflow with name "' + + workflow.name + '" already exists')); + } + _self.save(_self.TYPES.WORKFLOW, clone(workflow), function (pError) { + return pCallback(pError, workflow); + }); + }); + }); }, // usage: Factory // job - Job object // meta (optional) - Any additional information to pass to the backend which is not // job properties - // callback - f(err, job) - createJob: function (job, meta, callback) { - throw new e.BackendInternal('Backend.createJob() not implemented yet'); + // pCallback - f(err, job) + createJob: function (job, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + job.created_at = job.created_at || new Date().toISOString(); + this.save(this.TYPES.JOB, clone(job), function (pError) { + return pCallback(pError, job); + }); }, // usage: Runner // uuid - Job.uuid // meta (optional) - Any additional information to pass to the backend which is not // job properties - // callback - f(err, job) - getJob: function (uuid, meta, callback) { - throw new e.BackendInternal('Backend.getJob() not implemented yet'); + // pCallback - f(err, job) + getJob: function (uuid, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + console.error('getJob(): uuid=%j', uuid); + this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', uuid))); + return pCallback(pError, clone(pJobs[0])); + }); }, + // DEPRECATED // usage: Internal & Test // Get a single job property // uuid - Job uuid. // prop - (String) property name - // cb - callback f(err, value) - getJobProperty: function (uuid, prop, cb) { - throw new e.BackendInternal('Backend.getJobProperty() not implemented yet'); + // cb - pCallback f(err, value) + getJobProperty: function (uuid, prop, pCallback) { + console.error('getJobProperty(): uuid=%j', uuid); + this.getJob(uuid, function (pError, pJob) { + if (pError) + return pCallback(pError); + if (!pJob) + return cb(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', uuid))); + return pCallback(null, pJob[prop]); + }); }, // usage: Factory // job - the job object - // callback - f(err) called with error in case there is a duplicated + // pCallback - f(err) called with error in case there is a duplicated // job with the same target and same params - validateJobTarget: function (job, callback) { - throw new e.BackendInternal('Backend.validateJobTarget() not implemented yet'); + validateJobTarget: function (pJob, pCallback) { + // If no target is given, we don't care: + if (!pJob.target) { + return pCallback(null); + } + var _self = this; + _self.find(_self.TYPES.JOB, {'locks': pJob.target}, function (pError, pJobs) { + if (pJobs.length > 0) + return pCallback(new e.BackendInvalidArgumentError( + 'Job target is currently locked by another job')); + _self.find(_self.TYPES.JOB, {'target': pJob.target}, function (pError, pJobs) { + if (pJobs.length === 0) // needed? own job will be found + return pCallback(null); + for (var i = 0; i < pJobs.length; i++) { + var job = pJobs[i]; + if (job.uuid !== pJob.uuid && job.target === pJob.target && + Object.keys(pJob.params).every(function (p) { + return (job.params[p] && job.params[p] === pJob.params[p]); + }) && (job.execution === _self.EXECUTION.QUEUED || + job.execution === _self.EXECUTION.RUNNING)) + return pCallback(new e.BackendInvalidArgumentError('Another job (' + job.uuid + ') with the same target and params is already queued')); + } + return pCallback(null); + }); + }); }, // usage: - (test-only) @@ -83,9 +230,16 @@ var baseBackend = { // index (optional) - Integer, optional. When given, it'll get the job at index // position (when not given, it'll return the job at position // zero). - // callback - f(err, job) - nextJob: function (index, callback) { - throw new e.BackendInternal('Backend.nextJob() not implemented yet'); + // pCallback - f(err, job) + nextJob: function (index, pCallback) { + if (typeof (index) === 'function') { + pCallback = index; + index = 0; + } + this.find(this.TYPES.JOB, {'execution': this.EXECUTION.QUEUED}, function (pError, pJobs) { + return pCallback(null, (pJobs.length === 0 || index >= pJobs.length) ? null : + clone(pJobs.slice(index, index + 1)[0])); + }); }, // usage: Runner @@ -93,11 +247,22 @@ var baseBackend = { // status. // uuid - the job uuid (String) // runner_id - the runner identifier (String) - // callback - f(err, job) callback will be called with error if + // pCallback - f(err, job) pCallback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - runJob: function (uuid, runner_id, callback) { - throw new e.BackendInternal('Backend.runJob() not implemented yet'); + runJob: function (uuid, runner_id, pCallback) { + var _self = this; + console.error('runJob(): uuid=%j', uuid); + this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + if (pJobs.length === 0 || pJobs[0].execution != _self.EXECUTION.QUEUED) + return pCallback(new e.BackendPreconditionFailedError('Only queued jobs can be run')); + var job = pJobs[0]; + job.runner_id = runner_id; + job.execution = _self.EXECUTION.RUNNING; + return _self.save(_self.TYPES.JOB, clone(job), function (pError) { + return pCallback(pError, job); + }); + }); }, // usage: Runner @@ -105,11 +270,27 @@ var baseBackend = { // results for every job's task. // job - the job object. It'll be saved to the backend with the provided // properties. - // callback - f(err, job) callback will be called with error if + // pCallback - f(err, job) pCallback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - finishJob: function (job, callback) { - throw new e.BackendInternal('Backend.finishJob() not implemented yet'); + finishJob: function (pJob, pCallback) { + var _self = this; + console.error('finishJob(): uuid=%j', pJob.uuid); + this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', pJob.uuid))); + if (pJobs[0].execution !== _self.EXECUTION.RUNNING && + pJobs[0].execution !== _self.EXECUTION.CANCELED) + return pCallback(new e.BackendPreconditionFailedError( + 'Only running jobs can be finished')); + if (pJob.execution === _self.EXECUTION.RUNNING) + pJob.execution = _self.EXECUTION.SUCCEEDED; + delete pJob.runner_id; + _self.save(_self.TYPES.JOB, clone(pJob), function (pError) { + return pCallback(pError, pJob); + }); + }); }, // usage: API @@ -119,22 +300,50 @@ var baseBackend = { // provided properties. // meta (optional) - Any additional information to pass to the backend which is // not job properties - // callback - f(err, job) callback will be called with error if + // pCallback - f(err, job) pCallback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - updateJob: function (job, meta, callback) { - throw new e.BackendInternal('Backend.updateJob() not implemented yet'); + updateJob: function (job, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + var _self = this; + console.error('updateJob(): uuid=%j', job.uuid); + this.find(this.TYPES.JOB, {'uuid': job.uuid}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf('Job with uuid \'%s\' does not exist', job.uuid))); + _self.save(_self.TYPES.JOB, clone(job), function (pError) { + return pCallback(pError, job); + }); + }); }, // usage: Runner // Unlock the job, mark it as canceled, and remove the runner_id // uuid - string, the job uuid. - // cb - f(err, job) callback will be called with error if something + // cb - f(err, job) pCallback will be called with error if something // fails, otherwise it'll return the updated job using getJob. - cancelJob: function (uuid, cb) { - throw new e.BackendInternal('Backend.cancelJob() not implemented yet'); + cancelJob: function (uuid, pCallback) { + if (typeof (uuid) === 'undefined') { + return pCallback(new e.BackendInternalError( + 'cancelJob uuid(String) required')); + } + var _self = this; + console.error('cancelJob(): uuid=%j', uuid); + this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf('Job with uuid \'%s\' does not exist', job.uuid))); + var job = pJobs[0]; + job.execution = _self.EXECUTION.CANCELED; + delete job.runner_id; + _self.save(_self.TYPES.JOB, clone(job), function (pError) { + return pCallback(pError, job); + }); + }); }, + // DEPRICATED -> use getJob & saveJob // usage: API & Runner // Update only the given Job property. Intendeed to prevent conflicts // with two sources updating the same job at the same time, but @@ -144,10 +353,25 @@ var baseBackend = { // - val - value to assign to such property // - meta (optional) - Any additional information to pass to the backend which is // not job properties - // - callback - f(err) called with error if something fails, otherwise + // - pCallback - f(err) called with error if something fails, otherwise // with null. - updateJobProperty: function (uuid, prop, val, meta, callback) { - throw new e.BackendInternal('Backend.updateJobProperty() not implemented yet'); + updateJobProperty: function (uuid, prop, val, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + var _self = this; + console.error('updateJobProperty(): uuid=%j', uuid); + this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', uuid))); + var job = pJobs[0]; + job[prop] = val; + _self.save(_self.TYPES.JOB, clone(job), function (pError) { + return pCallback(pError); + }); + }); }, // usage: Runner @@ -156,11 +380,26 @@ var baseBackend = { // results for every finished task so far ... // job - the job Object. It'll be saved to the backend with the provided // properties to ensure job status persistence. - // callback - f(err, job) callback will be called with error if + // pCallback - f(err, job) pCallback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - queueJob: function (job, callback) { - throw new e.BackendInternal('Backend.queueJob() not implemented yet'); + queueJob: function (pJob, pCallback) { + var _self = this; + console.error('queueJob(): uuid=%j', pJob.uuid); + this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', pJob.uuid))); + //var job = pJobs[0]; + if (pJob.execution !== _self.EXECUTION.RUNNING) + return pCallback(new e.BackendPreconditionFailedError( + 'Only running jobs can be queued again')); + delete pJob.runner_id; + pJob.execution = _self.EXECUTION.QUEUED; + _self.save(_self.TYPES.JOB, clone(pJob), function (pError) { + return pCallback(pError, pJob); + }); + }); }, // usage: Runner @@ -169,11 +408,26 @@ var baseBackend = { // status, add the results for every finished task so far ... // job - the job Object. It'll be saved to the backend with the provided // properties to ensure job status persistence. - // callback - f(err, job) callback will be called with error if + // pCallback - f(err, job) pCallback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - pauseJob: function (job, callback) { - throw new e.BackendInternal('Backend.pauseJob() not implemented yet'); + pauseJob: function (pJob, pCallback) { + var _self = this; + console.error('pauseJob(): uuid=%j', pJob.uuid); + this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', pJob.uuid))); + //var job = pJobs[0]; + if (pJob.execution !== _self.EXECUTION.RUNNING) + return pCallback(new e.BackendPreconditionFailedError( + 'Only running jobs can be paused')); + delete pJob.runner_id; + pJob.execution = _self.EXECUTION.WAITING; + _self.save(_self.TYPES.JOB, clone(pJob), function (pError) { + return pCallback(pError, pJob); + }); + }); }, // usage: - (test only) @@ -181,20 +435,45 @@ var baseBackend = { // happened and it can run again. // job - the job Object. It'll be saved to the backend with the provided // properties to ensure job status persistence. - // callback - f(err, job) callback will be called with error if + // pCallback - f(err, job) pCallback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - resumeJob: function (job, callback) { - throw new e.BackendInternal('Backend.resumeJob() not implemented yet'); + resumeJob: function (pJob, pCallback) { + var _self = this; + console.error('resumeJob(): uuid=%j', pJob.uuid); + this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', pJob.uuid))); + //var job = pJobs[0]; + if (pJob.execution !== _self.EXECUTION.WAITING) + return pCallback(new e.BackendPreconditionFailedError( + 'Only waiting jobs can be resumed')); + delete pJob.runner_id; + pJob.execution = _self.EXECUTION.QUEUED; + _self.save(_self.TYPES.JOB, clone(pJob), function (pError) { + return pCallback(pError, pJob); + }); + }); }, // usage: Runner // Get the given number of queued jobs uuids. // - start - Integer - Position of the first job to retrieve // - stop - Integer - Position of the last job to retrieve, _included_ - // - callback - f(err, jobs) - nextJobs: function (start, stop, callback) { - throw new e.BackendInternal('Backend.nextJobs() not implemented yet'); + // - pCallback - f(err, jobs). `jobs` is an array of job's UUIDs. + // Note `jobs` will be an array, even when empty. + nextJobs: function (start, stop, pCallback) { + this.find(this.TYPES.JOB, {'execution': this.EXECUTION.QUEUED}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(pError, null); + var slice = pJobs.slice(start, stop + 1); + if (slice.length === 0) { + return pCallback(pError, null); + } else { + return pCallback(pError, _.map(slice, 'uuid')); + } + }); }, // usage: Runner @@ -202,67 +481,117 @@ var baseBackend = { // - runner_id - String, unique identifier for runner. // - active_at (optional) - ISO String timestamp. Optional. If none is given, // current time - // - callback - f(err) - registerRunner: function (runner_id, active_at, callback) { - throw new e.BackendInternal('Backend.registerRunner() not implemented yet'); + // - pCallback - f(err) + registerRunner: function (runner_id, active_at, pCallback) { + if (typeof (active_at) === 'function') { + pCallback = active_at; + active_at = new Date(); + } else if (typeof (active_at) === 'string') { + active_at = new Date(active_at); + } + this.save(this.TYPES.RUNNER, { + uuid: runner_id, + active_at: active_at, + idle: false + }, function (pError, pRunner) { + return pCallback(pError); + }); }, + // DEPRECATED, use registerRunner // usage: Runner // Report a runner remains active: // - runner_id - String, unique identifier for runner. Required. // - active_at (optional) - ISO String timestamp. Optional. If none is given, // current time - // - callback - f(err) - runnerActive: function (runner_id, active_at, callback) { - throw new e.BackendInternal('Backend.runnerActive() not implemented yet'); + // - pCallback - f(err) + runnerActive: function (runner_id, active_at, pCallback) { + return this.registerRunner(runner_id, active_at, pCallback); }, // usage: - (test only) // Get the given runner id details // - runner_id - String, unique identifier for runner. Required. - // - callback - f(err, runner) - getRunner: function (runner_id, callback) { - throw new e.BackendInternal('Backend.getRunner() not implemented yet'); + // - pCallback - f(err, runner) + getRunner: function (runner_id, pCallback) { + this.find(this.TYPES.RUNNER, {'uuid': runner_id}, function (pError, pRunners) { + if (pRunners.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Runner with uuid \'%s\' does not exist', runner_id))); + return pCallback(pError, pRunners[0].active_at); + }); }, // usage: API & Runner // Get all the registered runners: - // - callback - f(err, runners) - getRunners: function (callback) { - throw new e.BackendInternal('Backend.getRunners() not implemented yet'); + // - pCallback - f(err, runners) + getRunners: function (pCallback) { + this.find(this.TYPES.RUNNER, null, function (pError, pRunners) { + var theRunners = {}; + for (var i = 0; i < pRunners.length; i++) { + theRunners[pRunners.uuid] = pRunners.active_at; + } + return pCallback(pError, theRunners); + }); + }, // usage: - (test only) // Set a runner as idle: // - runner_id - String, unique identifier for runner - // - callback - f(err) - idleRunner: function (runner_id, callback) { - throw new e.BackendInternal('Backend.idleRunner() not implemented yet'); + // - pCallback - f(err, runner-active_at) + idleRunner: function (runner_id, pCallback) { + var _self = this; + this.find(this.TYPES.RUNNER, {'uuid': runner_id}, function (pError, pRunners) { + if (pRunners.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Runner with uuid \'%s\' does not exist', runner_id))); + var runner = pRunners[0]; + runner.idle = true; + _self.save(_self.TYPES.RUNNER, runner, function (pError, pRunner) { + return pCallback(pError, pRunner.active_at); + }); + }); }, // usage: Runner // Check if the given runner is idle // - runner_id - String, unique identifier for runner - // - callback - f(boolean) - isRunnerIdle: function (runner_id, callback) { - throw new e.BackendInternal('Backend.isRunnerIdle() not implemented yet'); + // - pCallback - f(boolean) + isRunnerIdle: function (runner_id, pCallback) { + this.find(this.TYPES.RUNNER, {'uuid': runner_id}, function (pError, pRunners) { + return pCallback((pRunners.length === 0 || pRunners[0].idle === true)); + }); }, // usage: - (test only) // Remove idleness of the given runner // - runner_id - String, unique identifier for runner - // - callback - f(err) - wakeUpRunner: function (runner_id, callback) { - throw new e.BackendInternal('Backend.wakeUpRunner() not implemented yet'); + // - pCallback - f(err) + wakeUpRunner: function (runner_id, pCallback) { + var _self = this; + this.find(this.TYPES.RUNNER, {'uuid': runner_id}, function (pError, pRunners) { + if (pRunners.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Runner with uuid \'%s\' does not exist', runner_id))); + var runner = pRunners[0]; + runner.idle = false; + _self.save(_self.TYPES.RUNNER, runner, function (pError, pRunner) { + return pCallback(pError, pRunner.active_at); + }); + }); }, // usage: Runner // Get all jobs associated with the given runner_id // - runner_id - String, unique identifier for runner - // - callback - f(err, jobs). `jobs` is an array of job's UUIDs. + // - pCallback - f(err, jobs). `jobs` is an array of job's UUIDs. // Note `jobs` will be an array, even when empty. - getRunnerJobs: function (runner_id, callback) { - throw new e.BackendInternal('Backend.getRunnerJobs() not implemented yet'); + getRunnerJobs: function (runner_id, pCallback) { + this.find(this.TYPES.JOB, {'runner_id': runner_id}, function (pError, pJobs) { + console.error("getRunnerJobs(): %j", _.map(pJobs, 'uuid')); + return pCallback(pError, _.map(pJobs, 'uuid')); + }); }, // usage: API @@ -270,9 +599,15 @@ var baseBackend = { // - params (optional) - JSON Object (Optional). Can include the value of the // workflow's "name", and any other key/value pair to search for // into workflow's definition. - // - callback - f(err, workflows) - getWorkflows: function (params, callback) { - throw new e.BackendInternal('Backend.getWorkflows() not implemented yet'); + // - pCallback - f(err, workflows) + getWorkflows: function (params, pCallback) { + if (typeof (params) === 'function') { + pCallback = params; + params = {}; + } + this.find(this.TYPES.WORKFLOW, params, function (pError, pWorkflows) { + return pCallback(pError, clone(pWorkflows)); + }); }, // usage: API @@ -280,18 +615,103 @@ var baseBackend = { // - params (optional) - JSON Object. Can include the value of the job's // "execution" status, and any other key/value pair to search for // into job'sparams. - // - execution - String, the execution status for the jobs to return. + // - execution - String, the execution status for the jobs to return. // Return all jobs if no execution status is given. - // - callback - f(err, jobs, count) - getJobs: function (params, callback) { - throw new e.BackendInternal('Backend.getJobs() not implemented yet'); + // - pCallback - f(err, jobs, count) + getJobs: function (params, pCallback) { + if (typeof (params) === 'function') { + pCallback = params; + params = {}; + } + // TODO usage: Waterline Query Language! + // TODO -> ORM should handle offset & limit + var offset; + var limit; + if (typeof (params) === 'object') { + offset = params.offset; + delete params.offset; + limit = params.limit; + delete params.limit; + } + if (typeof (params) === 'function') { + pCallback = params; + params = {}; + } + var _self = this; + var executions = Object.keys(this.EXECUTION).map(function (k) { + return _self.EXECUTION[k] + }); + if ((typeof (params.execution) !== 'undefined') && + (executions.indexOf(params.execution) === -1)) { + return pCallback(new e.BackendInvalidArgumentError( + 'execution is required and must be one of "' + + executions.join('", "') + '"')); + } + console.error("getJobs(): params=%j", params); + this.find(this.TYPES.JOB, params, function (pError, pJobs) { + if (pError) + return pCallback(pError, clone(pJobs)); + if (typeof (offset) !== 'undefined' && + typeof (limit) !== 'undefined') { + return pCallback(pError, clone(pJobs.slice(offset, limit))); + } else { + return pCallback(pError, clone(pJobs)); + } + }); }, // usage: API // Get count of jobs: - // - callback - f(err, stats) - countJobs: function (callback) { - throw new e.BackendInternal('Backend.countJobs() not implemented yet'); + // - pCallback - f(err, stats) + countJobs: function (pCallback) { + var _self = this; + var executions = Object.keys(this.EXECUTION).map(function (k) { + return _self.EXECUTION[k] + }); + var stats = { + all_time: {}, + past_24h: {}, + past_hour: {}, + current: {} + }; + executions.forEach(function (e) { + stats.all_time[e] = 0; + stats.past_24h[e] = 0; + stats.past_hour[e] = 0; + stats.current[e] = 0; + }); + this.find(this.TYPES.JOB, null, function (pError, pJobs) { + var yesterday = (function (d) { + d.setDate(d.getDate() - 1); + return d; + })(new Date()).getTime(); + var _1hr = (function (d) { + d.setHours(d.getHours() - 1); + return d; + })(new Date()).getTime(); + var _2hr = (function (d) { + d.setHours(d.getHours() - 2); + return d; + })(new Date()).getTime(); + pJobs = pJobs.map(function (job) { + return ({ + execution: job.execution, + created_at: new Date(job.created_at).getTime() + }); + }); + pJobs.forEach(function (j) { + if (j.created_at < yesterday) { + stats.all_time[j.execution] += 1; + } else if (j.created_at > yesterday && j.created_at < _2hr) { + stats.past_24h[j.execution] += 1; + } else if (j.created_at > _2hr && j.created_at < _1hr) { + stats.past_hour[j.execution] += 1; + } else { + stats.current[j.execution] += 1; + } + }); + return pCallback(null, stats); + }); }, // usage: API & Runner @@ -300,18 +720,50 @@ var baseBackend = { // - info - Object, {'key' => 'Value'} // - meta (optional) - Any additional information to pass to the backend which is // not job info - // - callback - f(err) - addInfo: function (uuid, info, meta, callback) { - throw new e.BackendInternal('Backend.addInfo() not implemented yet'); + // - pCallback - f(err) + addInfo: function (uuid, info, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + var _self = this; + console.error('addInfo(): uuid=%j', uuid); + this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError( + 'Job does not exist. Cannot Update.')); + var job = pJobs[0]; + if (!util.isArray(job.info)) { + job.info = []; + } + job.info.push(info); + _self.save(_self.TYPES.JOB, job, function (pError, pJob) { + return pCallback(pError, clone(pJob)); + }); + }); }, // usage: API // Get progress information from an existing job: // - uuid - String, the Job's UUID. // - meta (optional) - // - callback - f(err, info) - getInfo: function (uuid, meta, callback) { - throw new e.BackendInternal('Backend.getInfo() not implemented yet'); + // - pCallback - f(err, info) + getInfo: function (uuid, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + console.error('getInfo(): uuid=%j', uuid); + this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError( + 'Job does not exist. Cannot get info.')); + var job = pJobs[0]; + if (!util.isArray(job.info)) { + job.info = []; + } + return pCallback(null, clone(job.info)); + }); } }; diff --git a/lib/workflow-in-memory-backend.js b/lib/workflow-in-memory-backend.js index cf60b04..6c2866f 100644 --- a/lib/workflow-in-memory-backend.js +++ b/lib/workflow-in-memory-backend.js @@ -1,27 +1,11 @@ // Copyright 2012 Pedro P. Candel . All rights reserved. -var util = require('util') - , Logger = require('bunyan') - , e = require('./errors') - , clone = require('clone') - , sprintf = util.format - , baseBackend = require('./base-backend'); - -// Returns true when "obj" (Object) has all the properties "kv" (Object) has, -// and with exactly the same values, otherwise, false -function _hasPropsAndVals(obj, kv) { - if (typeof (obj) !== 'object' || typeof (kv) !== 'object') { - return (false); - } - - if (Object.keys(kv).length === 0) { - return (true); - } - - return (Object.keys(kv).every(function (k) { - return (obj[k] && obj[k] === kv[k]); - })); -} +var util = require('util'), + Logger = require('bunyan'), + e = require('./errors'), + _ = require('lodash'), + clone = require('clone'), + baseBackend = require('./base-backend'); function MemoryBackend(config) { @@ -44,43 +28,13 @@ function MemoryBackend(config) { log = new Logger(config.logger); } - var workflows = null; - var jobs = null; - var runners = null; - var queued_jobs = null; - var waiting_jobs = null; - var locked_targets = {}; - - function _lockedTargets() { - var targets = Object.keys(locked_targets).map(function (t) { - return locked_targets[t]; - }); - return targets; - } - - function _wfNames() { - var wf_names = Object.keys(workflows).map(function (uuid) { - return workflows[uuid].name; - }); - return wf_names; - } - - function _jobTargets() { - var wf_job_targets = Object.keys(jobs).map(function (uuid) { - return jobs[uuid].target; - }); - return wf_job_targets; - } + var _store = null; var backend = require('./base-backend'); backend = { log: log, init: function (callback) { - workflows = {}; - jobs = {}; - runners = {}; - queued_jobs = []; - waiting_jobs = []; + _store = {}; return callback(); }, @@ -89,712 +43,56 @@ function MemoryBackend(config) { return callback(); }, - // usage: Factory - // workflow - Workflow object - // meta - Any additional information to pass to the backend which is not - // workflow properties - // callback - f(err, workflow) - createWorkflow: function (workflow, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - if (_wfNames().indexOf(workflow.name) !== -1) { - return callback(new e.BackendInvalidArgumentError( - 'Workflow.name must be unique. A workflow with name "' + - workflow.name + '" already exists')); - } else { - workflows[workflow.uuid] = clone(workflow); - return callback(null, workflow); - } - }, - - // usage: API & Factory - // uuid - Workflow.uuid - // meta - Any additional information to pass to the backend which is not - // workflow properties - // callback - f(err, workflow) - getWorkflow: function (uuid, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - if (workflows[uuid]) { - return callback(null, clone(workflows[uuid])); - } else { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Workflow with uuid \'%s\' does not exist', uuid))); - } - }, - - // usage: API - // workflow - the workflow object - // meta - Any additional information to pass to the backend which is not - // workflow properties - // callback - f(err, boolean) - deleteWorkflow: function (workflow, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - return callback(null, workflows[workflow.uuid] ? (delete workflows[workflow.uuid]) : false); - }, - - // usage: API - // workflow - update workflow object. - // meta - Any additional information to pass to the backend which is not - // workflow properties - // callback - f(err, workflow) - updateWorkflow: function (workflow, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - if (workflows[workflow.uuid]) { - if (_wfNames().indexOf(workflow.name) !== -1 && - workflows[workflow.uuid].name !== workflow.name) { - return callback(new e.BackendInvalidArgumentError( - 'Workflow.name must be unique. A workflow with name "' + - workflow.name + '" already exists')); - } else { - workflows[workflow.uuid] = clone(workflow); - return callback(null, workflow); - } - } else { - return callback(new e.BackendResourceNotFoundError( - 'Workflow does not exist. Cannot Update.')); - } - }, - - // usage: Factory - // job - Job object - // meta - Any additional information to pass to the backend which is not - // job properties - // callback - f(err, job) - createJob: function (job, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - job.created_at = job.created_at || new Date().toISOString(); - jobs[job.uuid] = clone(job); - queued_jobs.push(job.uuid); - if (typeof (job.locks) !== 'undefined') { - locked_targets[job.uuid] = job.locks; - } - return callback(null, job); - }, - - // usage: Runner - // uuid - Job.uuid - // meta - Any additional information to pass to the backend which is not - // job properties - // callback - f(err, job) - getJob: function (uuid, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - if (jobs[uuid]) { - return callback(null, clone(jobs[uuid])); - } else { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', uuid))); - } - }, - - // usage: Internal & Test - // Get a single job property - // uuid - Job uuid. - // prop - (String) property name - // cb - callback f(err, value) - getJobProperty: function (uuid, prop, cb) { - if (jobs[uuid]) { - return cb(null, jobs[uuid][prop]); - } else { - return cb(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', uuid))); - } - }, - - // usage: Factory - // job - the job object - // callback - f(err) called with error in case there is a duplicated - // job with the same target and same params - validateJobTarget: function (job, callback) { - // If no target is given, we don't care: - if (!job.target) { - return callback(null); - } - var locked = _lockedTargets().some(function (r) { - var re = new RegExp(r); - return (re.test(job.target)); - }); - if (locked) { - return callback(new e.BackendInvalidArgumentError( - 'Job target is currently locked by another job')); - } - if (_jobTargets().indexOf(job.target) === -1) { - return callback(null); - } - var filtered = Object.keys(jobs).filter(function (uuid) { - return ( - uuid !== job.uuid && - jobs[uuid].target === job.target && - Object.keys(job.params).every(function (p) { - return (jobs[uuid].params[p] && - jobs[uuid].params[p] === job.params[p]); - }) && - (jobs[uuid].execution === 'queued' || - jobs[uuid].execution === 'running')); - }); - return callback(filtered.length !== 0 ? new e.BackendInvalidArgumentError('Another job with the same target and params is already queued') : null); - }, - - // usage: - (test-only) - // Get the next queued job. - // index - Integer, optional. When given, it'll get the job at index - // position (when not given, it'll return the job at position - // zero). - // callback - f(err, job) - nextJob: function (index, callback) { - if (typeof (index) === 'function') { - callback = index; - index = 0; - } - if (queued_jobs.length === 0) { - return callback(null, null); - } - var slice = queued_jobs.slice(index, index + 1); - if (slice.length === 0) { - return callback(null, null); - } else { - return this.getJob(slice[0], callback); - } - }, - - // usage: Runner - // Lock a job, mark it as running by the given runner, update job - // status. - // uuid - the job uuid (String) - // runner_id - the runner identifier (String) - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - runJob: function (uuid, runner_id, callback) { - var idx = queued_jobs.indexOf(uuid); - if (idx === -1) { - return callback(new e.BackendPreconditionFailedError('Only queued jobs can be run')); - } else { - queued_jobs.splice(idx, 1); - jobs[uuid].runner_id = runner_id; - jobs[uuid].execution = 'running'; - return callback(null, clone(jobs[uuid])); - } - }, - - // usage: Runner - // Unlock the job, mark it as finished, update the status, add the - // results for every job's task. - // job - the job object. It'll be saved to the backend with the provided - // properties. - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - finishJob: function (job, callback) { - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); - } else if (jobs[job.uuid].execution !== 'running' && - jobs[job.uuid].execution !== 'canceled') { - return callback(new e.BackendPreconditionFailedError( - 'Only running jobs can be finished')); - } else { - if (job.execution === 'running') { - job.execution = 'succeeded'; - } - var info = jobs[job.uuid].info; - job.runner_id = null; - jobs[job.uuid] = clone(job); - if (info) { - jobs[job.uuid].info = info; - } - if (typeof (job.locks) !== 'undefined') { - delete locked_targets[job.uuid]; - } - return callback(null, job); - } - }, - - // usage: API - // Update the job while it is running with information regarding - // progress - // job - the job object. It'll be saved to the backend with the - // provided properties. - // meta - Any additional information to pass to the backend which is - // not job properties - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - updateJob: function (job, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf('Job with uuid \'%s\' does not exist', job.uuid))); - } else { - jobs[job.uuid] = clone(job); - return callback(null, job); - } - }, - - // usage: Runner - // Unlock the job, mark it as canceled, and remove the runner_id - // uuid - string, the job uuid. - // cb - f(err, job) callback will be called with error if something - // fails, otherwise it'll return the updated job using getJob. - cancelJob: function (uuid, cb) { - if (typeof (uuid) === 'undefined') { - return cb(new e.BackendInternalError( - 'cancelJob uuid(String) required')); - } - jobs[uuid].execution = 'canceled'; - delete jobs[uuid].runner_id; - if (typeof (jobs[uuid].locks) !== 'undefined') { - delete locked_targets[uuid]; - } - return cb(null, jobs[uuid]); - }, - - // usage: API & Runner - // Update only the given Job property. Intendeed to prevent conflicts - // with two sources updating the same job at the same time, but - // different properties: - // - uuid - the job's uuid - // - prop - the name of the property to update - // - val - value to assign to such property - // - meta - Any additional information to pass to the backend which is - // not job properties - // - callback - f(err) called with error if something fails, otherwise - // with null. - updateJobProperty: function (uuid, prop, val, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - if (!jobs[uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', uuid))); - } else { - jobs[uuid][prop] = val; - return callback(null); - } - }, - - // usage: Runner - // Queue a job which has been running; i.e, due to whatever the reason, - // re-queue the job. It'll unlock the job, update the status, add the - // results for every finished task so far ... - // job - the job Object. It'll be saved to the backend with the provided - // properties to ensure job status persistence. - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - queueJob: function (job, callback) { - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); - } else if (jobs[job.uuid].execution !== 'running') { - return callback(new e.BackendPreconditionFailedError( - 'Only running jobs can be queued again')); - } else { - job.runner_id = null; - job.execution = 'queued'; - jobs[job.uuid] = clone(job); - queued_jobs.push(job.uuid); - return callback(null, job); - } - }, - - // usage: Runner - // Pause a job which has been running; i.e, tell the job to wait for - // something external to happen. It'll unlock the job, update the - // status, add the results for every finished task so far ... - // job - the job Object. It'll be saved to the backend with the provided - // properties to ensure job status persistence. - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - pauseJob: function (job, callback) { - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); - } else if (jobs[job.uuid].execution !== 'running') { - return callback(new e.BackendPreconditionFailedError( - 'Only running jobs can be paused')); - } else { - job.runner_id = null; - job.execution = 'waiting'; - jobs[job.uuid] = clone(job); - waiting_jobs.push(job.uuid); - return callback(null, job); - } - }, - - // usage: - (test only) - // Tell a waiting job that whatever it has been waiting for has - // happened and it can run again. - // job - the job Object. It'll be saved to the backend with the provided - // properties to ensure job status persistence. - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - resumeJob: function (job, callback) { - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); - } else if (jobs[job.uuid].execution !== 'waiting') { - return callback(new e.BackendPreconditionFailedError( - 'Only waiting jobs can be resumed')); - } else { - job.runner_id = null; - job.execution = 'queued'; - jobs[job.uuid] = clone(job); - waiting_jobs = waiting_jobs.filter(function (j) { - return (j !== job.uuid); - }); - queued_jobs.push(job.uuid); - return callback(null, job); - } - }, - - // usage: Runner - // Get the given number of queued jobs uuids. - // - start - Integer - Position of the first job to retrieve - // - stop - Integer - Position of the last job to retrieve, _included_ - // - callback - f(err, jobs) - nextJobs: function (start, stop, callback) { - if (queued_jobs.length === 0) { - return callback(null, null); - } - var slice = queued_jobs.slice(start, stop + 1); - if (slice.length === 0) { - return callback(null, null); - } else { - return callback(null, slice); - } - }, - - // usage: Runner - // Register a runner on the backend and report it's active: - // - runner_id - String, unique identifier for runner. - // - active_at - ISO String timestamp. Optional. If none is given, - // current time - // - callback - f(err) - registerRunner: function (runner_id, active_at, callback) { - if (typeof (active_at) === 'function') { - callback = active_at; - active_at = new Date(); - } else if (typeof (active_at) === 'string') { - active_at = new Date(active_at); - } - runners[runner_id] = { - runner_id: runner_id, - active_at: active_at, - idle: false - }; - return callback(null); - }, - - // usage: Runner - // Report a runner remains active: - // - runner_id - String, unique identifier for runner. Required. - // - active_at - ISO String timestamp. Optional. If none is given, - // current time - // - callback - f(err) - runnerActive: function (runner_id, active_at, callback) { - return this.registerRunner(runner_id, active_at, callback); - }, - - // usage: - (test only) - // Get the given runner id details - // - runner_id - String, unique identifier for runner. Required. - // - callback - f(err, runner) - getRunner: function (runner_id, callback) { - if (!runners[runner_id]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Runner with uuid \'%s\' does not exist', runner_id))); - } else { - return callback(null, runners[runner_id].active_at); - } - }, - - // usage: API & Runner - // Get all the registered runners: - // - callback - f(err, runners) - getRunners: function (callback) { - var theRunners = {}; - Object.keys(runners).forEach(function (uuid) { - theRunners[uuid] = runners[uuid].active_at; - }); - return callback(null, theRunners); - }, - - // usage: - (test only) - // Set a runner as idle: - // - runner_id - String, unique identifier for runner - // - callback - f(err) - idleRunner: function (runner_id, callback) { - if (!runners[runner_id]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Runner with uuid \'%s\' does not exist', runner_id))); - } else { - runners[runner_id].idle = true; - return callback(null); - } - }, - - // usage: Runner - // Check if the given runner is idle - // - runner_id - String, unique identifier for runner - // - callback - f(boolean) - isRunnerIdle: function (runner_id, callback) { - return callback((!runners[runner_id] || (runners[runner_id].idle === true))); - }, - - // usage: - (test only) - // Remove idleness of the given runner - // - runner_id - String, unique identifier for runner - // - callback - f(err) - wakeUpRunner: function (runner_id, callback) { - if (!runners[runner_id]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Runner with uuid \'%s\' does not exist', runner_id))); - } else { - runners[runner_id].idle = false; - return callback(null); - } - }, - - // usage: Runner - // Get all jobs associated with the given runner_id - // - runner_id - String, unique identifier for runner - // - callback - f(err, jobs). `jobs` is an array of job's UUIDs. - // Note `jobs` will be an array, even when empty. - getRunnerJobs: function (runner_id, callback) { - var wf_runner_jobs = Object.keys(jobs).filter(function (uuid) { - return jobs[uuid].runner_id === runner_id; - }); - return callback(null, wf_runner_jobs); - }, - - // usage: API - // Get all the workflows: - // - params - JSON Object (Optional). Can include the value of the - // workflow's "name", and any other key/value pair to search for - // into workflow's definition. - // - callback - f(err, workflows) - getWorkflows: function (params, callback) { - if (typeof (params) === 'function') { - callback = params; - params = {}; - } - - var wfs = []; - var rWorkflows = Object.keys(workflows).map(function (uuid) { - return clone(workflows[uuid]); - }); - - rWorkflows.forEach(function (wf) { - if (_hasPropsAndVals(wf, params)) { - wfs.push(wf); - } - }); - return callback(null, wfs); - }, - - // usage: API - // Get all the jobs: - // - params - JSON Object. Can include the value of the job's - // "execution" status, and any other key/value pair to search for - // into job'sparams. - // - execution - String, the execution status for the jobs to return. - // Return all jobs if no execution status is given. - // - callback - f(err, jobs) - getJobs: function (params, callback) { - var executions = [ - 'queued', - 'failed', - 'succeeded', - 'canceled', - 'running', - 'retried', - 'waiting' - ]; - var execution; - var offset; - var limit; - var rJobs = []; - var theJobs = []; - if (typeof (params) === 'object') { - execution = params.execution; - delete params.execution; - offset = params.offset; - delete params.offset; - limit = params.limit; - delete params.limit; - } - if (typeof (params) === 'function') { - callback = params; - params = {}; - } - if ((typeof (execution) !== 'undefined') && - (executions.indexOf(execution) === -1)) { - return callback(new e.BackendInvalidArgumentError( - 'excution is required and must be one of "' + - executions.join('", "') + '"')); - } - if (typeof (execution) !== 'undefined') { - rJobs = Object.keys(jobs).filter(function (uuid) { - return (jobs[uuid].execution === execution); - }).map(function (uuid) { - return clone(jobs[uuid]); - }); - } else { - rJobs = Object.keys(jobs).map(function (uuid) { - return clone(jobs[uuid]); - }); - } - rJobs.forEach(function (job) { - if (_hasPropsAndVals(job.params, params)) { - theJobs.push(job); - } - }); - if (typeof (offset) !== 'undefined' && - typeof (limit) !== 'undefined') { - return callback(null, theJobs.slice(offset, limit)); - } else { - return callback(null, theJobs); - } - }, - - // usage: API - // TODO: missing specs - countJobs: function (callback) { - var executions = [ - 'queued', - 'failed', - 'succeeded', - 'canceled', - 'running', - 'retried', - 'waiting' - ]; - - var rJobs = []; - var stats = { - all_time: {}, - past_24h: {}, - past_hour: {}, - current: {} - }; - - executions.forEach(function (e) { - stats.all_time[e] = 0; - stats.past_24h[e] = 0; - stats.past_hour[e] = 0; - stats.current[e] = 0; - }); - - rJobs = Object.keys(jobs).map(function (uuid) { - return clone(jobs[uuid]); - }); - - var yesterday = (function (d) { - d.setDate(d.getDate() - 1); - return d; - })(new Date()).getTime(); - - var _1hr = (function (d) { - d.setHours(d.getHours() - 1); - return d; - })(new Date()).getTime(); - - var _2hr = (function (d) { - d.setHours(d.getHours() - 2); - return d; - })(new Date()).getTime(); - - rJobs = rJobs.map(function (job) { - return ({ - execution: job.execution, - created_at: new Date(job.created_at).getTime() - }); - }); - - rJobs.forEach(function (j) { - if (j.created_at < yesterday) { - stats.all_time[j.execution] += 1; - } else if (j.created_at > yesterday && j.created_at < _2hr) { - stats.past_24h[j.execution] += 1; - } else if (j.created_at > _2hr && j.created_at < _1hr) { - stats.past_hour[j.execution] += 1; - } else { - stats.current[j.execution] += 1; - } - }); - return callback(null, stats); - }, - - - // usage: API & Runner - // Add progress information to an existing job: - // - uuid - String, the Job's UUID. - // - info - Object, {'key' => 'Value'} - // - meta - Any additional information to pass to the backend which is - // not job info - // - callback - f(err) - addInfo: function (uuid, info, meta, callback) { - - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (!jobs[uuid]) { - return callback(new e.BackendResourceNotFoundError( - 'Job does not exist. Cannot Update.')); - } else { - if (!util.isArray(jobs[uuid].info)) { - jobs[uuid].info = []; - } - jobs[uuid].info.push(info); - return callback(null); - } - }, - - // usage: API - // Get progress information from an existing job: - // - uuid - String, the Job's UUID. - // - callback - f(err, info) - getInfo: function (uuid, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (!jobs[uuid]) { - return callback(new e.BackendResourceNotFoundError( - 'Job does not exist. Cannot get info.')); - } else { - if (!util.isArray(jobs[uuid].info)) { - jobs[uuid].info = []; - } - return callback(null, clone(jobs[uuid].info)); - } + // usage: internal + // should save obj to persistence + // pType - type, TYPES + // pObj - Object + // pCallback - f(err, obj) + save: function (pType, pObj, pCallback) { + if (!_.has(_store, pType)) + _store[pType] = []; + // remove old elements + //if (!pObj.uuid) + // pObj.uuid = require('node-uuid')(); + //_store[pType] = _.without(_store[pType], _.where(_store[pType], {'uuid': pObj.uuid})); + var item = _.find(_store[pType], {'uuid': pObj.uuid}); + if (item) + _store[pType] = _.without(_store[pType], item); + // store new element in array + _store[pType].push(pObj); + // call back + console.error("save(): %j %j %j", pType, pObj.uuid, _store[pType].length); + if (pCallback) + pCallback(null, pObj); + }, + // usage: internal + // should find object from persistence + // pType - type, TYPES + // pFilterObj - Filter for search, e.g. { 'where': { 'attr': 'value' }} + // pCallback - f(err, objs), objs is an array even if empty + find: function (pType, pFilterObj, pCallback) { + if (!_.has(_store, pType)) + return pCallback(null, []); + if (!pFilterObj) + return pCallback(null, _store[pType]); + // find & return elements + console.error("find(): %j %j %j", pType, pFilterObj, _store[pType].length); + pCallback(null, _.where(_store[pType], pFilterObj.where || pFilterObj)); + }, + // usage: internal + // should remove object from persistence + // pType - type, TYPES + // pObj - Object + // pCallback - f(err, boolean) + remove: function (pType, pObj, pCallback) { + if (!_.has(_store, pType)) + return pCallback(null, false); + var item = _.find(_store[pType], {'uuid': pObj.uuid}); + if (item) + _store[pType] = _.without(_store[pType], item); + console.error("remove(): %j %j %j", pType, pObj.uuid, _store[pType].length); + return pCallback(null, item !== null); } - }; return baseBackend(backend); } diff --git a/package.json b/package.json index 2a806d9..fe3670c 100644 --- a/package.json +++ b/package.json @@ -1,45 +1,46 @@ { - "name": "wf", - "description": "Tasks Workflows orchestration API and runners", - "version": "0.10.0", - "repository": { - "type": "git", - "url": "git://github.com/kusor/node-workflow.git" - }, - "author": "Pedro Palazón Candel (http://www.joyent.com)", - "contributors": [ - "Mark Cavage", - "Trent Mick", - "Josh Wilsdon", - "Bryan Cantrill", - "Andrés Rodríquez", - "Rob Gulewich", - "Fred Kuo" - ], - "bin": { - "workflow-api": "./bin/workflow-api", - "workflow-runner": "./bin/workflow-runner" - }, - "main": "lib/index.js", - "dependencies": { - "node-uuid": "1.4.0", - "bunyan": "0.23.1", - "vasync": "1.6.1", - "backoff": "1.2.0", - "clone": "0.1.6", - "restify": "2.6.1", - "sigyan": "0.2.0" - }, - "scripts": { - "test": "./node_modules/.bin/tap ./test/*.test.js" - }, - "devDependencies": { - "tap": "~0.3" - }, - "optionalDependencies": { - "dtrace-provider": "0.2.8" - }, - "engines": { - "node": ">=0.8" - } + "name": "wf", + "description": "Tasks Workflows orchestration API and runners", + "version": "0.10.0", + "repository": { + "type": "git", + "url": "git://github.com/kusor/node-workflow.git" + }, + "author": "Pedro Palazón Candel (http://www.joyent.com)", + "contributors": [ + "Mark Cavage", + "Trent Mick", + "Josh Wilsdon", + "Bryan Cantrill", + "Andrés Rodríquez", + "Rob Gulewich", + "Fred Kuo" + ], + "bin": { + "workflow-api": "./bin/workflow-api", + "workflow-runner": "./bin/workflow-runner" + }, + "main": "lib/index.js", + "dependencies": { + "backoff": "1.2.0", + "bunyan": "0.23.1", + "clone": "0.1.6", + "lodash": "^3.5.0", + "node-uuid": "1.4.0", + "restify": "2.6.1", + "sigyan": "0.2.0", + "vasync": "1.6.1" + }, + "scripts": { + "test": "./node_modules/.bin/tap ./test/*.test.js" + }, + "devDependencies": { + "tap": "~0.3" + }, + "optionalDependencies": { + "dtrace-provider": "0.2.8" + }, + "engines": { + "node": ">=0.8" + } } From 878ac47438eb988afd0aed874d74f310073b8dfd Mon Sep 17 00:00:00 2001 From: "d.poellath" Date: Mon, 16 Mar 2015 13:33:39 +0100 Subject: [PATCH 4/8] ! removed getRunners error ! checking for waiting & running jobs was incorrect (should check on db-obj) ! saving assigned/extended objects back to database so no values getting lost --- lib/base-backend.js | 243 +++++++++++++++++------------- lib/workflow-in-memory-backend.js | 29 ++-- 2 files changed, 154 insertions(+), 118 deletions(-) diff --git a/lib/base-backend.js b/lib/base-backend.js index 80415c1..b76356a 100644 --- a/lib/base-backend.js +++ b/lib/base-backend.js @@ -1,11 +1,23 @@ var e = require('./errors'); var util = require("util"); var makeEmitter = require("./make-emitter"), - clone = require('clone'), _ = require('lodash'), sprintf = util.format; +// 11-03-2015 15:31 - 739/775 +// 12-03-2015 14:32 - 689/721 +// 12-03-2015 16:03 - 680/717 - finishJob: delete locks +// 12-03-2015 16:03 - 694/721 - validateJobTarget: remove checks without locks +// 12-03-2015 16:03 - 703/722 - queueJob: not given job have to be checked if running +// 13-03-2015 09:44 - 704/722 - corrected typo of error message +// 13-03-2015 11:39 - 708/722 - fixed getJobs finding (params are in a sub-object) +// 13-03-2015 11:39 - 709/722 - return value of removing now counts items +// 16-03-2015 09:30 - 712/722 - removed getRunners error +// 16-03-2015 09:30 - 771/779 - checking for waiting & running jobs was incorrect (should check on db-obj) +// 16-03-2015 09:30 - 784/791 - saving assigned/extended objects back to database so no values getting lost + + /* Using Filter with Query Language (ORM) http://sailsjs.org/#!/documentation/concepts/ORM/Querylanguage.html @@ -77,9 +89,7 @@ var baseBackend = { return pCallback(new e.BackendInvalidArgumentError( 'Workflow.name must be unique. A workflow with name "' + pWorkflow.name + '" already exists')); - _self.save(_self.TYPES.WORKFLOW, clone(pWorkflow), function (pError) { - return pCallback(pError, pWorkflow); - }); + _self.save(_self.TYPES.WORKFLOW, pWorkflow, pCallback); }); }, // usage: API & Factory @@ -96,7 +106,7 @@ var baseBackend = { if (pWorkflows.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Workflow with uuid \'%s\' does not exist', uuid))); - return pCallback(null, clone(pWorkflows[0])); + return pCallback(null, pWorkflows[0]); }); }, // usage: API @@ -126,17 +136,16 @@ var baseBackend = { if (!pWorkflow) return pCallback(new e.BackendResourceNotFoundError( 'Workflow does not exist. Cannot Update.')); + _.assign(pWorkflow, workflow); _self.find(_self.TYPES.WORKFLOW, {'name': pWorkflow.name}, function (pError, pWorkflows) { if (pWorkflows.length > 0) for (var i = 0; i < pWorkflows.length; i++) { - if (pWorkflows[i].uuid !== workflow.uuid) + if (pWorkflows[i].uuid !== pWorkflow.uuid) return pCallback(new e.BackendInvalidArgumentError( 'Workflow.name must be unique. A workflow with name "' + - workflow.name + '" already exists')); + pWorkflow.name + '" already exists')); } - _self.save(_self.TYPES.WORKFLOW, clone(workflow), function (pError) { - return pCallback(pError, workflow); - }); + _self.save(_self.TYPES.WORKFLOW, pWorkflow, pCallback); }); }); }, @@ -152,9 +161,7 @@ var baseBackend = { meta = {}; } job.created_at = job.created_at || new Date().toISOString(); - this.save(this.TYPES.JOB, clone(job), function (pError) { - return pCallback(pError, job); - }); + this.save(this.TYPES.JOB, job, pCallback); }, // usage: Runner @@ -167,12 +174,12 @@ var baseBackend = { pCallback = meta; meta = {}; } - console.error('getJob(): uuid=%j', uuid); + //console.error('getJob(): uuid=%j', uuid); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', uuid))); - return pCallback(pError, clone(pJobs[0])); + return pCallback(pError, pJobs[0]); }); }, @@ -183,7 +190,7 @@ var baseBackend = { // prop - (String) property name // cb - pCallback f(err, value) getJobProperty: function (uuid, prop, pCallback) { - console.error('getJobProperty(): uuid=%j', uuid); + //console.error('getJobProperty(): uuid=%j', uuid); this.getJob(uuid, function (pError, pJob) { if (pError) return pCallback(pError); @@ -199,29 +206,48 @@ var baseBackend = { // pCallback - f(err) called with error in case there is a duplicated // job with the same target and same params validateJobTarget: function (pJob, pCallback) { + if (typeof (pJob) === 'undefined') + return pCallback(new e.BackendInternalError('WorkflowRedisBackend.validateJobTarget job(Object) required')); // If no target is given, we don't care: - if (!pJob.target) { + if (!pJob.target) return pCallback(null); - } var _self = this; - _self.find(_self.TYPES.JOB, {'locks': pJob.target}, function (pError, pJobs) { - if (pJobs.length > 0) - return pCallback(new e.BackendInvalidArgumentError( - 'Job target is currently locked by another job')); - _self.find(_self.TYPES.JOB, {'target': pJob.target}, function (pError, pJobs) { - if (pJobs.length === 0) // needed? own job will be found - return pCallback(null); - for (var i = 0; i < pJobs.length; i++) { - var job = pJobs[i]; - if (job.uuid !== pJob.uuid && job.target === pJob.target && - Object.keys(pJob.params).every(function (p) { - return (job.params[p] && job.params[p] === pJob.params[p]); - }) && (job.execution === _self.EXECUTION.QUEUED || - job.execution === _self.EXECUTION.RUNNING)) - return pCallback(new e.BackendInvalidArgumentError('Another job (' + job.uuid + ') with the same target and params is already queued')); - } + + _self.find(_self.TYPES.JOB, null, function (pError, pJobs) { + if (pError || !pJobs) return pCallback(null); - }); + //console.error("validateJobTarget() found:%j", pJobs.length); + if (_.some(pJobs, function (job) { + //console.error("validateJobTarget() check:%j;%j==%j", job, job.locks, pJob.target); + if ([_self.EXECUTION.FAILED, _self.EXECUTION.SUCCEEDED].indexOf(job.execution) !== -1 || !job.locks) + return false; + var re = new RegExp(job.locks); + return (re.test(pJob.target)); + })) { + //console.error("validateJobTarget() locked:%j", pJob); + return pCallback(new e.BackendInvalidArgumentError('Job target is currently locked by another job')); + } + + var sameTargets = _.where(pJobs, {'target': pJob.target}); + //console.error("validateJobTarget() sameTargets:%j", sameTargets); + + if (sameTargets.length === 0) + return pCallback(null); + for (var i = 0; i < sameTargets.length; i++) { + var job = sameTargets[i]; + if (job.uuid !== pJob.uuid && + job.workflow_uuid === pJob.workflow_uuid && + JSON.stringify(job.params) === JSON.stringify(pJob.params) && + [_self.EXECUTION.QUEUED, _self.EXECUTION.RUNNING, _self.EXECUTION.WAITING].indexOf(job.execution) !== -1) + //if (job.uuid !== pJob.uuid && job.target === pJob.target && + // Object.keys(pJob.params).every(function (p) { + // return (job.params[p] && job.params[p] === pJob.params[p]); + // }) && (job.execution === _self.EXECUTION.QUEUED || + // job.execution === _self.EXECUTION.RUNNING)) + return pCallback(new e.BackendInvalidArgumentError('Another job with the same target and params is already queued')); + } + return pCallback(null); + }); }, @@ -236,9 +262,12 @@ var baseBackend = { pCallback = index; index = 0; } - this.find(this.TYPES.JOB, {'execution': this.EXECUTION.QUEUED}, function (pError, pJobs) { + this.find(this.TYPES.JOB, { + 'where': {'execution': this.EXECUTION.QUEUED}, + 'sort': 'created_at' + }, function (pError, pJobs) { return pCallback(null, (pJobs.length === 0 || index >= pJobs.length) ? null : - clone(pJobs.slice(index, index + 1)[0])); + pJobs.slice(index, index + 1)[0]); }); }, @@ -252,16 +281,14 @@ var baseBackend = { // using getJob. runJob: function (uuid, runner_id, pCallback) { var _self = this; - console.error('runJob(): uuid=%j', uuid); + //console.error('runJob(): uuid=%j', uuid); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0 || pJobs[0].execution != _self.EXECUTION.QUEUED) return pCallback(new e.BackendPreconditionFailedError('Only queued jobs can be run')); var job = pJobs[0]; job.runner_id = runner_id; job.execution = _self.EXECUTION.RUNNING; - return _self.save(_self.TYPES.JOB, clone(job), function (pError) { - return pCallback(pError, job); - }); + return _self.save(_self.TYPES.JOB, job, pCallback); }); }, @@ -275,21 +302,22 @@ var baseBackend = { // using getJob. finishJob: function (pJob, pCallback) { var _self = this; - console.error('finishJob(): uuid=%j', pJob.uuid); + //console.error('finishJob(): uuid=%j', pJob.uuid); this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', pJob.uuid))); - if (pJobs[0].execution !== _self.EXECUTION.RUNNING && - pJobs[0].execution !== _self.EXECUTION.CANCELED) + var job = pJobs[0]; + if (job.execution !== _self.EXECUTION.RUNNING && + job.execution !== _self.EXECUTION.CANCELED) return pCallback(new e.BackendPreconditionFailedError( 'Only running jobs can be finished')); - if (pJob.execution === _self.EXECUTION.RUNNING) - pJob.execution = _self.EXECUTION.SUCCEEDED; - delete pJob.runner_id; - _self.save(_self.TYPES.JOB, clone(pJob), function (pError) { - return pCallback(pError, pJob); - }); + _.assign(job, pJob); + if (job.execution === _self.EXECUTION.RUNNING) + job.execution = _self.EXECUTION.SUCCEEDED; + delete job.runner_id; + //delete job.locks; + _self.save(_self.TYPES.JOB, job, pCallback); }); }, @@ -303,19 +331,19 @@ var baseBackend = { // pCallback - f(err, job) pCallback will be called with error if // something fails, otherwise it'll return the updated job // using getJob. - updateJob: function (job, meta, pCallback) { + updateJob: function (pJob, meta, pCallback) { if (typeof (meta) === 'function') { pCallback = meta; meta = {}; } var _self = this; - console.error('updateJob(): uuid=%j', job.uuid); - this.find(this.TYPES.JOB, {'uuid': job.uuid}, function (pError, pJobs) { + //console.error('updateJob(): uuid=%j', pJob.uuid); + this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { if (pJobs.length === 0) - return pCallback(new e.BackendResourceNotFoundError(sprintf('Job with uuid \'%s\' does not exist', job.uuid))); - _self.save(_self.TYPES.JOB, clone(job), function (pError) { - return pCallback(pError, job); - }); + return pCallback(new e.BackendResourceNotFoundError(sprintf('Job with uuid \'%s\' does not exist', pJob.uuid))); + var job = pJobs[0]; + _.assign(job, pJob); + _self.save(_self.TYPES.JOB, job, pCallback); }); }, @@ -330,16 +358,14 @@ var baseBackend = { 'cancelJob uuid(String) required')); } var _self = this; - console.error('cancelJob(): uuid=%j', uuid); + //console.error('cancelJob(): uuid=%j', uuid); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf('Job with uuid \'%s\' does not exist', job.uuid))); var job = pJobs[0]; job.execution = _self.EXECUTION.CANCELED; delete job.runner_id; - _self.save(_self.TYPES.JOB, clone(job), function (pError) { - return pCallback(pError, job); - }); + _self.save(_self.TYPES.JOB, job, pCallback); }); }, @@ -361,16 +387,14 @@ var baseBackend = { meta = {}; } var _self = this; - console.error('updateJobProperty(): uuid=%j', uuid); + //console.error('updateJobProperty(): uuid=%j', uuid); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', uuid))); var job = pJobs[0]; job[prop] = val; - _self.save(_self.TYPES.JOB, clone(job), function (pError) { - return pCallback(pError); - }); + _self.save(_self.TYPES.JOB, job, pCallback); }); }, @@ -385,20 +409,19 @@ var baseBackend = { // using getJob. queueJob: function (pJob, pCallback) { var _self = this; - console.error('queueJob(): uuid=%j', pJob.uuid); + //console.error('queueJob(): uuid=%j', pJob.uuid); this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', pJob.uuid))); - //var job = pJobs[0]; - if (pJob.execution !== _self.EXECUTION.RUNNING) + var job = pJobs[0]; + if (job.execution !== _self.EXECUTION.RUNNING) return pCallback(new e.BackendPreconditionFailedError( 'Only running jobs can be queued again')); - delete pJob.runner_id; - pJob.execution = _self.EXECUTION.QUEUED; - _self.save(_self.TYPES.JOB, clone(pJob), function (pError) { - return pCallback(pError, pJob); - }); + _.assign(job, pJob); + delete job.runner_id; + job.execution = _self.EXECUTION.QUEUED; + _self.save(_self.TYPES.JOB, job, pCallback); }); }, @@ -413,20 +436,20 @@ var baseBackend = { // using getJob. pauseJob: function (pJob, pCallback) { var _self = this; - console.error('pauseJob(): uuid=%j', pJob.uuid); + //console.error('pauseJob(): uuid=%j', pJob.uuid); this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', pJob.uuid))); - //var job = pJobs[0]; - if (pJob.execution !== _self.EXECUTION.RUNNING) + var job = pJobs[0]; + //console.error('pauseJob(): execution=%j', pJob.execution); + if (job.execution !== _self.EXECUTION.RUNNING) return pCallback(new e.BackendPreconditionFailedError( 'Only running jobs can be paused')); - delete pJob.runner_id; - pJob.execution = _self.EXECUTION.WAITING; - _self.save(_self.TYPES.JOB, clone(pJob), function (pError) { - return pCallback(pError, pJob); - }); + _.assign(job, pJob); + delete job.runner_id; + job.execution = _self.EXECUTION.WAITING; + _self.save(_self.TYPES.JOB, job, pCallback); }); }, @@ -440,20 +463,19 @@ var baseBackend = { // using getJob. resumeJob: function (pJob, pCallback) { var _self = this; - console.error('resumeJob(): uuid=%j', pJob.uuid); + //console.error('resumeJob(): uuid=%j', pJob.uuid); this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', pJob.uuid))); - //var job = pJobs[0]; - if (pJob.execution !== _self.EXECUTION.WAITING) + var job = pJobs[0]; + if (job.execution !== _self.EXECUTION.WAITING) return pCallback(new e.BackendPreconditionFailedError( 'Only waiting jobs can be resumed')); - delete pJob.runner_id; - pJob.execution = _self.EXECUTION.QUEUED; - _self.save(_self.TYPES.JOB, clone(pJob), function (pError) { - return pCallback(pError, pJob); - }); + _.assign(job, pJob); + delete job.runner_id; + job.execution = _self.EXECUTION.QUEUED; + _self.save(_self.TYPES.JOB, job, pCallback); }); }, @@ -464,7 +486,10 @@ var baseBackend = { // - pCallback - f(err, jobs). `jobs` is an array of job's UUIDs. // Note `jobs` will be an array, even when empty. nextJobs: function (start, stop, pCallback) { - this.find(this.TYPES.JOB, {'execution': this.EXECUTION.QUEUED}, function (pError, pJobs) { + this.find(this.TYPES.JOB, { + 'where': {'execution': this.EXECUTION.QUEUED}, + 'sort': 'created_at' + }, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(pError, null); var slice = pJobs.slice(start, stop + 1); @@ -529,11 +554,11 @@ var baseBackend = { this.find(this.TYPES.RUNNER, null, function (pError, pRunners) { var theRunners = {}; for (var i = 0; i < pRunners.length; i++) { - theRunners[pRunners.uuid] = pRunners.active_at; + var runner = pRunners[i]; + theRunners[runner.uuid] = runner.active_at; } return pCallback(pError, theRunners); }); - }, // usage: - (test only) @@ -589,7 +614,7 @@ var baseBackend = { // Note `jobs` will be an array, even when empty. getRunnerJobs: function (runner_id, pCallback) { this.find(this.TYPES.JOB, {'runner_id': runner_id}, function (pError, pJobs) { - console.error("getRunnerJobs(): %j", _.map(pJobs, 'uuid')); + //console.error("getRunnerJobs(): %j", _.map(pJobs, 'uuid')); return pCallback(pError, _.map(pJobs, 'uuid')); }); }, @@ -605,9 +630,7 @@ var baseBackend = { pCallback = params; params = {}; } - this.find(this.TYPES.WORKFLOW, params, function (pError, pWorkflows) { - return pCallback(pError, clone(pWorkflows)); - }); + this.find(this.TYPES.WORKFLOW, params, pCallback); }, // usage: API @@ -647,15 +670,22 @@ var baseBackend = { 'execution is required and must be one of "' + executions.join('", "') + '"')); } - console.error("getJobs(): params=%j", params); - this.find(this.TYPES.JOB, params, function (pError, pJobs) { + //console.error("getJobs(): params=%j", params); + var whereObj = {}; + if (params.execution) { + whereObj.execution = params.execution; + delete params.execution; + } + this.find(this.TYPES.JOB, whereObj, function (pError, pJobs) { if (pError) - return pCallback(pError, clone(pJobs)); + return pCallback(pError, pJobs); + if (_.keys(params).length > 0) + pJobs = _.where(pJobs, {'params': params}); if (typeof (offset) !== 'undefined' && typeof (limit) !== 'undefined') { - return pCallback(pError, clone(pJobs.slice(offset, limit))); + return pCallback(pError, pJobs.slice(offset, limit)); } else { - return pCallback(pError, clone(pJobs)); + return pCallback(pError, pJobs); } }); }, @@ -727,7 +757,7 @@ var baseBackend = { meta = {}; } var _self = this; - console.error('addInfo(): uuid=%j', uuid); + //console.error('addInfo(): uuid=%j info=%j', uuid, info); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError( @@ -737,9 +767,7 @@ var baseBackend = { job.info = []; } job.info.push(info); - _self.save(_self.TYPES.JOB, job, function (pError, pJob) { - return pCallback(pError, clone(pJob)); - }); + _self.save(_self.TYPES.JOB, job, pCallback); }); }, @@ -753,7 +781,7 @@ var baseBackend = { pCallback = meta; meta = {}; } - console.error('getInfo(): uuid=%j', uuid); + //console.error('getInfo(): uuid=%j', uuid); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError( @@ -762,7 +790,8 @@ var baseBackend = { if (!util.isArray(job.info)) { job.info = []; } - return pCallback(null, clone(job.info)); + //console.error('getInfo(): uuid=%j info=%j', uuid, job.info); + return pCallback(null, job.info); }); } }; diff --git a/lib/workflow-in-memory-backend.js b/lib/workflow-in-memory-backend.js index 6c2866f..7965507 100644 --- a/lib/workflow-in-memory-backend.js +++ b/lib/workflow-in-memory-backend.js @@ -52,16 +52,13 @@ function MemoryBackend(config) { if (!_.has(_store, pType)) _store[pType] = []; // remove old elements - //if (!pObj.uuid) - // pObj.uuid = require('node-uuid')(); - //_store[pType] = _.without(_store[pType], _.where(_store[pType], {'uuid': pObj.uuid})); var item = _.find(_store[pType], {'uuid': pObj.uuid}); if (item) _store[pType] = _.without(_store[pType], item); // store new element in array - _store[pType].push(pObj); + _store[pType].push(clone(pObj)); // call back - console.error("save(): %j %j %j", pType, pObj.uuid, _store[pType].length); + //console.error("save(): %j %j %j", pType, pObj.uuid, _store[pType].length); if (pCallback) pCallback(null, pObj); }, @@ -74,10 +71,17 @@ function MemoryBackend(config) { if (!_.has(_store, pType)) return pCallback(null, []); if (!pFilterObj) - return pCallback(null, _store[pType]); + return pCallback(null, clone(_store[pType])); + var objs = clone(_.where(_store[pType], pFilterObj.where || pFilterObj)); + if (pFilterObj.sort) { + var sortByArray = pFilterObj.sort.split(' '); + objs = _.sortBy(objs, sortByArray[0]); + if (sortByArray.length > 1 && sortByArray[1] == 'DESC') + objs.reverse(); + } // find & return elements - console.error("find(): %j %j %j", pType, pFilterObj, _store[pType].length); - pCallback(null, _.where(_store[pType], pFilterObj.where || pFilterObj)); + //console.error("find(): %j %j %j", pType, pFilterObj, _store[pType].length); + pCallback(null, objs); }, // usage: internal // should remove object from persistence @@ -87,11 +91,14 @@ function MemoryBackend(config) { remove: function (pType, pObj, pCallback) { if (!_.has(_store, pType)) return pCallback(null, false); + var before = _store[pType].length; var item = _.find(_store[pType], {'uuid': pObj.uuid}); - if (item) + if (item) { _store[pType] = _.without(_store[pType], item); - console.error("remove(): %j %j %j", pType, pObj.uuid, _store[pType].length); - return pCallback(null, item !== null); + //console.error("remove(): item found & removed: %j", (item !== null)); + } + //console.error("remove(): %j %j %j", pType, pObj.uuid, _store[pType].length); + return pCallback(null, before !== _store[pType].length); } }; return baseBackend(backend); From bc776c10b13d8eb446ba9809f0cbba94da62892b Mon Sep 17 00:00:00 2001 From: "d.poellath" Date: Mon, 16 Mar 2015 13:38:16 +0100 Subject: [PATCH 5/8] ! finished base-backend preparation for waterline query interface --- lib/base-backend.js | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/lib/base-backend.js b/lib/base-backend.js index b76356a..0576bd1 100644 --- a/lib/base-backend.js +++ b/lib/base-backend.js @@ -5,19 +5,6 @@ var makeEmitter = require("./make-emitter"), sprintf = util.format; -// 11-03-2015 15:31 - 739/775 -// 12-03-2015 14:32 - 689/721 -// 12-03-2015 16:03 - 680/717 - finishJob: delete locks -// 12-03-2015 16:03 - 694/721 - validateJobTarget: remove checks without locks -// 12-03-2015 16:03 - 703/722 - queueJob: not given job have to be checked if running -// 13-03-2015 09:44 - 704/722 - corrected typo of error message -// 13-03-2015 11:39 - 708/722 - fixed getJobs finding (params are in a sub-object) -// 13-03-2015 11:39 - 709/722 - return value of removing now counts items -// 16-03-2015 09:30 - 712/722 - removed getRunners error -// 16-03-2015 09:30 - 771/779 - checking for waiting & running jobs was incorrect (should check on db-obj) -// 16-03-2015 09:30 - 784/791 - saving assigned/extended objects back to database so no values getting lost - - /* Using Filter with Query Language (ORM) http://sailsjs.org/#!/documentation/concepts/ORM/Querylanguage.html From 64ad37df1f98f864210b2aafb227636f6e5599a5 Mon Sep 17 00:00:00 2001 From: "d.poellath" Date: Tue, 17 Mar 2015 08:07:11 +0100 Subject: [PATCH 6/8] ! removed debug-comments --- lib/base-backend.js | 42 +++++------------------------------------- 1 file changed, 5 insertions(+), 37 deletions(-) diff --git a/lib/base-backend.js b/lib/base-backend.js index 0576bd1..cbd8f9e 100644 --- a/lib/base-backend.js +++ b/lib/base-backend.js @@ -161,7 +161,6 @@ var baseBackend = { pCallback = meta; meta = {}; } - //console.error('getJob(): uuid=%j', uuid); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( @@ -177,7 +176,6 @@ var baseBackend = { // prop - (String) property name // cb - pCallback f(err, value) getJobProperty: function (uuid, prop, pCallback) { - //console.error('getJobProperty(): uuid=%j', uuid); this.getJob(uuid, function (pError, pJob) { if (pError) return pCallback(pError); @@ -199,25 +197,18 @@ var baseBackend = { if (!pJob.target) return pCallback(null); var _self = this; - _self.find(_self.TYPES.JOB, null, function (pError, pJobs) { if (pError || !pJobs) return pCallback(null); - //console.error("validateJobTarget() found:%j", pJobs.length); if (_.some(pJobs, function (job) { - //console.error("validateJobTarget() check:%j;%j==%j", job, job.locks, pJob.target); if ([_self.EXECUTION.FAILED, _self.EXECUTION.SUCCEEDED].indexOf(job.execution) !== -1 || !job.locks) return false; var re = new RegExp(job.locks); return (re.test(pJob.target)); })) { - //console.error("validateJobTarget() locked:%j", pJob); return pCallback(new e.BackendInvalidArgumentError('Job target is currently locked by another job')); } - var sameTargets = _.where(pJobs, {'target': pJob.target}); - //console.error("validateJobTarget() sameTargets:%j", sameTargets); - if (sameTargets.length === 0) return pCallback(null); for (var i = 0; i < sameTargets.length; i++) { @@ -226,15 +217,9 @@ var baseBackend = { job.workflow_uuid === pJob.workflow_uuid && JSON.stringify(job.params) === JSON.stringify(pJob.params) && [_self.EXECUTION.QUEUED, _self.EXECUTION.RUNNING, _self.EXECUTION.WAITING].indexOf(job.execution) !== -1) - //if (job.uuid !== pJob.uuid && job.target === pJob.target && - // Object.keys(pJob.params).every(function (p) { - // return (job.params[p] && job.params[p] === pJob.params[p]); - // }) && (job.execution === _self.EXECUTION.QUEUED || - // job.execution === _self.EXECUTION.RUNNING)) return pCallback(new e.BackendInvalidArgumentError('Another job with the same target and params is already queued')); } return pCallback(null); - }); }, @@ -268,7 +253,6 @@ var baseBackend = { // using getJob. runJob: function (uuid, runner_id, pCallback) { var _self = this; - //console.error('runJob(): uuid=%j', uuid); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0 || pJobs[0].execution != _self.EXECUTION.QUEUED) return pCallback(new e.BackendPreconditionFailedError('Only queued jobs can be run')); @@ -289,7 +273,6 @@ var baseBackend = { // using getJob. finishJob: function (pJob, pCallback) { var _self = this; - //console.error('finishJob(): uuid=%j', pJob.uuid); this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( @@ -303,7 +286,6 @@ var baseBackend = { if (job.execution === _self.EXECUTION.RUNNING) job.execution = _self.EXECUTION.SUCCEEDED; delete job.runner_id; - //delete job.locks; _self.save(_self.TYPES.JOB, job, pCallback); }); }, @@ -324,10 +306,10 @@ var baseBackend = { meta = {}; } var _self = this; - //console.error('updateJob(): uuid=%j', pJob.uuid); this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { if (pJobs.length === 0) - return pCallback(new e.BackendResourceNotFoundError(sprintf('Job with uuid \'%s\' does not exist', pJob.uuid))); + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', pJob.uuid))); var job = pJobs[0]; _.assign(job, pJob); _self.save(_self.TYPES.JOB, job, pCallback); @@ -345,10 +327,10 @@ var baseBackend = { 'cancelJob uuid(String) required')); } var _self = this; - //console.error('cancelJob(): uuid=%j', uuid); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0) - return pCallback(new e.BackendResourceNotFoundError(sprintf('Job with uuid \'%s\' does not exist', job.uuid))); + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', job.uuid))); var job = pJobs[0]; job.execution = _self.EXECUTION.CANCELED; delete job.runner_id; @@ -374,7 +356,6 @@ var baseBackend = { meta = {}; } var _self = this; - //console.error('updateJobProperty(): uuid=%j', uuid); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( @@ -396,7 +377,6 @@ var baseBackend = { // using getJob. queueJob: function (pJob, pCallback) { var _self = this; - //console.error('queueJob(): uuid=%j', pJob.uuid); this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( @@ -423,13 +403,11 @@ var baseBackend = { // using getJob. pauseJob: function (pJob, pCallback) { var _self = this; - //console.error('pauseJob(): uuid=%j', pJob.uuid); this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', pJob.uuid))); var job = pJobs[0]; - //console.error('pauseJob(): execution=%j', pJob.execution); if (job.execution !== _self.EXECUTION.RUNNING) return pCallback(new e.BackendPreconditionFailedError( 'Only running jobs can be paused')); @@ -450,7 +428,6 @@ var baseBackend = { // using getJob. resumeJob: function (pJob, pCallback) { var _self = this; - //console.error('resumeJob(): uuid=%j', pJob.uuid); this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( @@ -601,7 +578,6 @@ var baseBackend = { // Note `jobs` will be an array, even when empty. getRunnerJobs: function (runner_id, pCallback) { this.find(this.TYPES.JOB, {'runner_id': runner_id}, function (pError, pJobs) { - //console.error("getRunnerJobs(): %j", _.map(pJobs, 'uuid')); return pCallback(pError, _.map(pJobs, 'uuid')); }); }, @@ -657,7 +633,6 @@ var baseBackend = { 'execution is required and must be one of "' + executions.join('", "') + '"')); } - //console.error("getJobs(): params=%j", params); var whereObj = {}; if (params.execution) { whereObj.execution = params.execution; @@ -744,7 +719,6 @@ var baseBackend = { meta = {}; } var _self = this; - //console.error('addInfo(): uuid=%j info=%j', uuid, info); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError( @@ -768,17 +742,11 @@ var baseBackend = { pCallback = meta; meta = {}; } - //console.error('getInfo(): uuid=%j', uuid); this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError( 'Job does not exist. Cannot get info.')); - var job = pJobs[0]; - if (!util.isArray(job.info)) { - job.info = []; - } - //console.error('getInfo(): uuid=%j info=%j', uuid, job.info); - return pCallback(null, job.info); + return pCallback(null, !util.isArray(pJobs[0].info) ? [] : pJobs[0].info); }); } }; From d1cbb232cbe72ad60fc079edb35f116b8cb67c46 Mon Sep 17 00:00:00 2001 From: "d.poellath" Date: Tue, 31 Mar 2015 11:10:35 +0200 Subject: [PATCH 7/8] bugfix --- lib/index.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/index.js b/lib/index.js index 6b9715a..7174866 100644 --- a/lib/index.js +++ b/lib/index.js @@ -35,8 +35,8 @@ module.exports = { var MemoryBackend = require('./workflow-in-memory-backend'); return MemoryBackend(config); }, - BaseBackend: function () { - return require('./base-backend'); + BaseBackend: function (config) { + return require('./base-backend')(config); }, API: function (config) { if (typeof (config) !== 'object') { From 7f63f8c807553271436dff054971e04ccfbd872e Mon Sep 17 00:00:00 2001 From: "d.poellath" Date: Tue, 31 Mar 2015 11:02:35 +0200 Subject: [PATCH 8/8] minor typo changes ! correcting typos (using plural) ! use direct uuid/id find without filter-object --- lib/base-backend.js | 96 +++++++++++++++++++++++---------------------- 1 file changed, 49 insertions(+), 47 deletions(-) diff --git a/lib/base-backend.js b/lib/base-backend.js index cbd8f9e..0973997 100644 --- a/lib/base-backend.js +++ b/lib/base-backend.js @@ -12,9 +12,9 @@ var makeEmitter = require("./make-emitter"), var baseBackend = { TYPES: { - WORKFLOW: 'workflow', - JOB: 'job', - RUNNER: 'runner' + WORKFLOWS: 'workflows', + JOBS: 'jobs', + RUNNERS: 'runners' }, EXECUTION: { @@ -71,12 +71,12 @@ var baseBackend = { pMeta = {}; } var _self = this; - this.find(this.TYPES.WORKFLOW, {'name': pWorkflow.name}, function (pError, pWorkflows) { + this.find(this.TYPES.WORKFLOWS, {'name': pWorkflow.name}, function (pError, pWorkflows) { if (pWorkflows.length > 0) return pCallback(new e.BackendInvalidArgumentError( 'Workflow.name must be unique. A workflow with name "' + pWorkflow.name + '" already exists')); - _self.save(_self.TYPES.WORKFLOW, pWorkflow, pCallback); + _self.save(_self.TYPES.WORKFLOWS, pWorkflow, pCallback); }); }, // usage: API & Factory @@ -89,7 +89,7 @@ var baseBackend = { pCallback = meta; meta = {}; } - this.find(this.TYPES.WORKFLOW, {'uuid': uuid}, function (pError, pWorkflows) { + this.find(this.TYPES.WORKFLOWS, uuid, function (pError, pWorkflows) { if (pWorkflows.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Workflow with uuid \'%s\' does not exist', uuid))); @@ -106,7 +106,7 @@ var baseBackend = { pCallback = meta; meta = {}; } - this.remove(this.TYPES.WORKFLOW, workflow, pCallback); + this.remove(this.TYPES.WORKFLOWS, workflow, pCallback); }, // usage: API // workflow - update workflow object. @@ -124,7 +124,7 @@ var baseBackend = { return pCallback(new e.BackendResourceNotFoundError( 'Workflow does not exist. Cannot Update.')); _.assign(pWorkflow, workflow); - _self.find(_self.TYPES.WORKFLOW, {'name': pWorkflow.name}, function (pError, pWorkflows) { + _self.find(_self.TYPES.WORKFLOWS, {'name': pWorkflow.name}, function (pError, pWorkflows) { if (pWorkflows.length > 0) for (var i = 0; i < pWorkflows.length; i++) { if (pWorkflows[i].uuid !== pWorkflow.uuid) @@ -132,7 +132,7 @@ var baseBackend = { 'Workflow.name must be unique. A workflow with name "' + pWorkflow.name + '" already exists')); } - _self.save(_self.TYPES.WORKFLOW, pWorkflow, pCallback); + _self.save(_self.TYPES.WORKFLOWS, pWorkflow, pCallback); }); }); }, @@ -148,7 +148,7 @@ var baseBackend = { meta = {}; } job.created_at = job.created_at || new Date().toISOString(); - this.save(this.TYPES.JOB, job, pCallback); + this.save(this.TYPES.JOBS, job, pCallback); }, // usage: Runner @@ -161,7 +161,7 @@ var baseBackend = { pCallback = meta; meta = {}; } - this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', uuid))); @@ -197,7 +197,7 @@ var baseBackend = { if (!pJob.target) return pCallback(null); var _self = this; - _self.find(_self.TYPES.JOB, null, function (pError, pJobs) { + _self.find(_self.TYPES.JOBS, null, function (pError, pJobs) { if (pError || !pJobs) return pCallback(null); if (_.some(pJobs, function (job) { @@ -211,6 +211,7 @@ var baseBackend = { var sameTargets = _.where(pJobs, {'target': pJob.target}); if (sameTargets.length === 0) return pCallback(null); + // TODO async.each for (var i = 0; i < sameTargets.length; i++) { var job = sameTargets[i]; if (job.uuid !== pJob.uuid && @@ -234,7 +235,7 @@ var baseBackend = { pCallback = index; index = 0; } - this.find(this.TYPES.JOB, { + this.find(this.TYPES.JOBS, { 'where': {'execution': this.EXECUTION.QUEUED}, 'sort': 'created_at' }, function (pError, pJobs) { @@ -253,13 +254,13 @@ var baseBackend = { // using getJob. runJob: function (uuid, runner_id, pCallback) { var _self = this; - this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { if (pJobs.length === 0 || pJobs[0].execution != _self.EXECUTION.QUEUED) return pCallback(new e.BackendPreconditionFailedError('Only queued jobs can be run')); var job = pJobs[0]; job.runner_id = runner_id; job.execution = _self.EXECUTION.RUNNING; - return _self.save(_self.TYPES.JOB, job, pCallback); + return _self.save(_self.TYPES.JOBS, job, pCallback); }); }, @@ -273,7 +274,7 @@ var baseBackend = { // using getJob. finishJob: function (pJob, pCallback) { var _self = this; - this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, pJob.uuid, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', pJob.uuid))); @@ -286,7 +287,7 @@ var baseBackend = { if (job.execution === _self.EXECUTION.RUNNING) job.execution = _self.EXECUTION.SUCCEEDED; delete job.runner_id; - _self.save(_self.TYPES.JOB, job, pCallback); + _self.save(_self.TYPES.JOBS, job, pCallback); }); }, @@ -306,13 +307,13 @@ var baseBackend = { meta = {}; } var _self = this; - this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, pJob.uuid, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', pJob.uuid))); var job = pJobs[0]; _.assign(job, pJob); - _self.save(_self.TYPES.JOB, job, pCallback); + _self.save(_self.TYPES.JOBS, job, pCallback); }); }, @@ -327,14 +328,14 @@ var baseBackend = { 'cancelJob uuid(String) required')); } var _self = this; - this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', job.uuid))); var job = pJobs[0]; job.execution = _self.EXECUTION.CANCELED; delete job.runner_id; - _self.save(_self.TYPES.JOB, job, pCallback); + _self.save(_self.TYPES.JOBS, job, pCallback); }); }, @@ -356,13 +357,13 @@ var baseBackend = { meta = {}; } var _self = this; - this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', uuid))); var job = pJobs[0]; job[prop] = val; - _self.save(_self.TYPES.JOB, job, pCallback); + _self.save(_self.TYPES.JOBS, job, pCallback); }); }, @@ -377,7 +378,7 @@ var baseBackend = { // using getJob. queueJob: function (pJob, pCallback) { var _self = this; - this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, pJob.uuid, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', pJob.uuid))); @@ -388,7 +389,7 @@ var baseBackend = { _.assign(job, pJob); delete job.runner_id; job.execution = _self.EXECUTION.QUEUED; - _self.save(_self.TYPES.JOB, job, pCallback); + _self.save(_self.TYPES.JOBS, job, pCallback); }); }, @@ -403,7 +404,7 @@ var baseBackend = { // using getJob. pauseJob: function (pJob, pCallback) { var _self = this; - this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, pJob.uuid, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', pJob.uuid))); @@ -414,7 +415,7 @@ var baseBackend = { _.assign(job, pJob); delete job.runner_id; job.execution = _self.EXECUTION.WAITING; - _self.save(_self.TYPES.JOB, job, pCallback); + _self.save(_self.TYPES.JOBS, job, pCallback); }); }, @@ -428,7 +429,7 @@ var baseBackend = { // using getJob. resumeJob: function (pJob, pCallback) { var _self = this; - this.find(this.TYPES.JOB, {'uuid': pJob.uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, pJob.uuid, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Job with uuid \'%s\' does not exist', pJob.uuid))); @@ -439,7 +440,7 @@ var baseBackend = { _.assign(job, pJob); delete job.runner_id; job.execution = _self.EXECUTION.QUEUED; - _self.save(_self.TYPES.JOB, job, pCallback); + _self.save(_self.TYPES.JOBS, job, pCallback); }); }, @@ -450,7 +451,7 @@ var baseBackend = { // - pCallback - f(err, jobs). `jobs` is an array of job's UUIDs. // Note `jobs` will be an array, even when empty. nextJobs: function (start, stop, pCallback) { - this.find(this.TYPES.JOB, { + this.find(this.TYPES.JOBS, { 'where': {'execution': this.EXECUTION.QUEUED}, 'sort': 'created_at' }, function (pError, pJobs) { @@ -478,7 +479,7 @@ var baseBackend = { } else if (typeof (active_at) === 'string') { active_at = new Date(active_at); } - this.save(this.TYPES.RUNNER, { + this.save(this.TYPES.RUNNERS, { uuid: runner_id, active_at: active_at, idle: false @@ -503,7 +504,7 @@ var baseBackend = { // - runner_id - String, unique identifier for runner. Required. // - pCallback - f(err, runner) getRunner: function (runner_id, pCallback) { - this.find(this.TYPES.RUNNER, {'uuid': runner_id}, function (pError, pRunners) { + this.find(this.TYPES.RUNNERS, runner_id, function (pError, pRunners) { if (pRunners.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Runner with uuid \'%s\' does not exist', runner_id))); @@ -515,7 +516,7 @@ var baseBackend = { // Get all the registered runners: // - pCallback - f(err, runners) getRunners: function (pCallback) { - this.find(this.TYPES.RUNNER, null, function (pError, pRunners) { + this.find(this.TYPES.RUNNERS, null, function (pError, pRunners) { var theRunners = {}; for (var i = 0; i < pRunners.length; i++) { var runner = pRunners[i]; @@ -531,13 +532,13 @@ var baseBackend = { // - pCallback - f(err, runner-active_at) idleRunner: function (runner_id, pCallback) { var _self = this; - this.find(this.TYPES.RUNNER, {'uuid': runner_id}, function (pError, pRunners) { + this.find(this.TYPES.RUNNERS, runner_id, function (pError, pRunners) { if (pRunners.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Runner with uuid \'%s\' does not exist', runner_id))); var runner = pRunners[0]; runner.idle = true; - _self.save(_self.TYPES.RUNNER, runner, function (pError, pRunner) { + _self.save(_self.TYPES.RUNNERS, runner, function (pError, pRunner) { return pCallback(pError, pRunner.active_at); }); }); @@ -548,8 +549,8 @@ var baseBackend = { // - runner_id - String, unique identifier for runner // - pCallback - f(boolean) isRunnerIdle: function (runner_id, pCallback) { - this.find(this.TYPES.RUNNER, {'uuid': runner_id}, function (pError, pRunners) { - return pCallback((pRunners.length === 0 || pRunners[0].idle === true)); + this.find(this.TYPES.RUNNERS, runner_id, function (pError, pRunners) { + return pCallback((pRunners.length === 1 && pRunners[0].idle === true)); }); }, @@ -559,13 +560,13 @@ var baseBackend = { // - pCallback - f(err) wakeUpRunner: function (runner_id, pCallback) { var _self = this; - this.find(this.TYPES.RUNNER, {'uuid': runner_id}, function (pError, pRunners) { + this.find(this.TYPES.RUNNERS, runner_id, function (pError, pRunners) { if (pRunners.length === 0) return pCallback(new e.BackendResourceNotFoundError(sprintf( 'Runner with uuid \'%s\' does not exist', runner_id))); var runner = pRunners[0]; runner.idle = false; - _self.save(_self.TYPES.RUNNER, runner, function (pError, pRunner) { + _self.save(_self.TYPES.RUNNERS, runner, function (pError, pRunner) { return pCallback(pError, pRunner.active_at); }); }); @@ -577,7 +578,7 @@ var baseBackend = { // - pCallback - f(err, jobs). `jobs` is an array of job's UUIDs. // Note `jobs` will be an array, even when empty. getRunnerJobs: function (runner_id, pCallback) { - this.find(this.TYPES.JOB, {'runner_id': runner_id}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, {'runner_id': runner_id}, function (pError, pJobs) { return pCallback(pError, _.map(pJobs, 'uuid')); }); }, @@ -593,7 +594,7 @@ var baseBackend = { pCallback = params; params = {}; } - this.find(this.TYPES.WORKFLOW, params, pCallback); + this.find(this.TYPES.WORKFLOWS, params, pCallback); }, // usage: API @@ -633,12 +634,13 @@ var baseBackend = { 'execution is required and must be one of "' + executions.join('", "') + '"')); } - var whereObj = {}; + var whereObj = null; if (params.execution) { + whereObj = {}; whereObj.execution = params.execution; delete params.execution; } - this.find(this.TYPES.JOB, whereObj, function (pError, pJobs) { + this.find(this.TYPES.JOBS, whereObj, function (pError, pJobs) { if (pError) return pCallback(pError, pJobs); if (_.keys(params).length > 0) @@ -672,7 +674,7 @@ var baseBackend = { stats.past_hour[e] = 0; stats.current[e] = 0; }); - this.find(this.TYPES.JOB, null, function (pError, pJobs) { + this.find(this.TYPES.JOBS, null, function (pError, pJobs) { var yesterday = (function (d) { d.setDate(d.getDate() - 1); return d; @@ -719,7 +721,7 @@ var baseBackend = { meta = {}; } var _self = this; - this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError( 'Job does not exist. Cannot Update.')); @@ -728,7 +730,7 @@ var baseBackend = { job.info = []; } job.info.push(info); - _self.save(_self.TYPES.JOB, job, pCallback); + _self.save(_self.TYPES.JOBS, job, pCallback); }); }, @@ -742,7 +744,7 @@ var baseBackend = { pCallback = meta; meta = {}; } - this.find(this.TYPES.JOB, {'uuid': uuid}, function (pError, pJobs) { + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { if (pJobs.length === 0) return pCallback(new e.BackendResourceNotFoundError( 'Job does not exist. Cannot get info.'));