diff --git a/.eslintrc.json b/.eslintrc.json index 8948a31958e..41c7c2f28f1 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -1,6 +1,7 @@ { "extends": "eslint:recommended", "rules": { +// "quotes": ["warn", "single", { "avoidEscape": true }], "require-atomic-updates": "off", "block-spacing": [ "error", @@ -118,6 +119,240 @@ "unicode-bom": [ "error", "never" + ], + "no-restricted-properties": [ + "warn", + { + "object": "Promise", + "property": "map", + "message": "Bluebird-specific method 'Promise.map' detected. Suggestion: use native arrays with Promise.all or a library like 'p-map'." + }, + { + "object": "Promise", + "property": "reduce", + "message": "Bluebird-specific method 'Promise.reduce' detected. Suggestion: use Array.reduce plus async/await or a concurrency library." + }, + { + "object": "Promise", + "property": "filter", + "message": "Bluebird-specific method 'Promise.filter' detected. Suggestion: use Array.filter plus async/await or a concurrency library." + }, + { + "object": "Promise", + "property": "each", + "message": "Bluebird-specific method 'Promise.each' detected. Suggestion: use a for-loop/forEach with async/await." + }, + { + "object": "Promise", + "property": "props", + "message": "Bluebird-specific method 'Promise.props' detected. Suggestion: use Promise.all with Object.entries or a custom approach." + }, + { + "object": "Promise", + "property": "join", + "message": "Bluebird-specific method 'Promise.join' detected. Suggestion: use Promise.all([...]) and destructuring in .then." + }, + { + "object": "Promise", + "property": "try", + "message": "Bluebird-specific method 'Promise.try' detected. Suggestion: use a try/catch block or an async function." + }, + { + "object": "Promise", + "property": "attempt", + "message": "Bluebird-specific method 'Promise.attempt' detected. Suggestion: same as 'Promise.try'—use try/catch or async." + }, + { + "object": "Promise", + "property": "method", + "message": "Bluebird-specific method 'Promise.method' detected. Suggestion: define an async function or return a native Promise." + }, + { + "object": "Promise", + "property": "promisify", + "message": "Bluebird-specific method 'Promise.promisify' detected. Suggestion: use native 'util.promisify' or wrap in a new Promise." + }, + { + "object": "Promise", + "property": "promisifyAll", + "message": "Bluebird-specific method 'Promise.promisifyAll' detected. Suggestion: consider 'util.promisify' for each function or a similar library." + }, + { + "object": "Promise", + "property": "fromCallback", + "message": "Bluebird-specific method 'Promise.fromCallback' detected. Suggestion: use new Promise(...) or 'util.promisify'." + }, + { + "object": "Promise", + "property": "coroutine", + "message": "Bluebird-specific method 'Promise.coroutine' detected. Suggestion: use native async/await." + }, + { + "object": "Promise", + "property": "spawn", + "message": "Bluebird-specific method 'Promise.spawn' detected. Suggestion: use native async/await." + }, + { + "object": "Promise", + "property": "using", + "message": "Bluebird-specific method 'Promise.using' detected. Suggestion: use try/finally or a resource-management library." + }, + { + "object": "Promise", + "property": "disposer", + "message": "Bluebird-specific method 'Promise.disposer' detected. Suggestion: use try/finally or a resource-management library." + }, + { + "object": "Promise", + "property": "settle", + "message": "Bluebird-specific method 'Promise.settle' detected. Suggestion: use native 'Promise.allSettled'." + }, + + /* ---------- Same methods on the Bluebird object itself ---------- */ + { + "object": "Bluebird", + "property": "map", + "message": "Bluebird-specific method 'Bluebird.map' detected. Suggestion: use array mapping + Promise.all or 'p-map'." + }, + { + "object": "Bluebird", + "property": "reduce", + "message": "Bluebird-specific method 'Bluebird.reduce' detected. Suggestion: use array reduce + async/await or concurrency library." + }, + { + "object": "Bluebird", + "property": "filter", + "message": "Bluebird-specific method 'Bluebird.filter' detected. Suggestion: use array filter + async/await or concurrency library." + }, + { + "object": "Bluebird", + "property": "each", + "message": "Bluebird-specific method 'Bluebird.each' detected. Suggestion: use a for-loop or forEach + async/await." + }, + { + "object": "Bluebird", + "property": "props", + "message": "Bluebird-specific method 'Bluebird.props' detected. Suggestion: use Promise.all with object entries or a custom approach." + }, + { + "object": "Bluebird", + "property": "join", + "message": "Bluebird-specific method 'Bluebird.join' detected. Suggestion: use Promise.all([...]) and destructuring." + }, + { + "object": "Bluebird", + "property": "try", + "message": "Bluebird-specific method 'Bluebird.try' detected. Suggestion: use a try/catch block or async function." + }, + { + "object": "Bluebird", + "property": "attempt", + "message": "Bluebird-specific method 'Bluebird.attempt' detected. Suggestion: use a try/catch block or async function." + }, + { + "object": "Bluebird", + "property": "method", + "message": "Bluebird-specific method 'Bluebird.method' detected. Suggestion: define an async function or return a native Promise." + }, + { + "object": "Bluebird", + "property": "promisify", + "message": "Bluebird-specific method 'Bluebird.promisify' detected. Suggestion: use native 'util.promisify' or wrap in a new Promise." + }, + { + "object": "Bluebird", + "property": "promisifyAll", + "message": "Bluebird-specific method 'Bluebird.promisifyAll' detected. Suggestion: consider 'util.promisify' or a similar library." + }, + { + "object": "Bluebird", + "property": "fromCallback", + "message": "Bluebird-specific method 'Bluebird.fromCallback' detected. Suggestion: use new Promise(...) or 'util.promisify'." + }, + { + "object": "Bluebird", + "property": "coroutine", + "message": "Bluebird-specific method 'Bluebird.coroutine' detected. Suggestion: use native async/await." + }, + { + "object": "Bluebird", + "property": "spawn", + "message": "Bluebird-specific method 'Bluebird.spawn' detected. Suggestion: use native async/await." + }, + { + "object": "Bluebird", + "property": "using", + "message": "Bluebird-specific method 'Bluebird.using' detected. Suggestion: use try/finally or a resource-management library." + }, + { + "object": "Bluebird", + "property": "disposer", + "message": "Bluebird-specific method 'Bluebird.disposer' detected. Suggestion: use try/finally or a resource-management library." + }, + { + "object": "Bluebird", + "property": "settle", + "message": "Bluebird-specific method 'Bluebird.settle' detected. Suggestion: use native 'Promise.allSettled'." + } + ], + "no-restricted-syntax": [ + "warn", + { + "selector": "CallExpression[callee.property.name='tap']", + "message": "Bluebird-specific instance method '.tap()' detected. Suggestion: use '.then(value => { ...; return value; })'." + }, + { + "selector": "CallExpression[callee.property.name='tapCatch']", + "message": "Bluebird-specific instance method '.tapCatch()' detected. Suggestion: use '.catch(error => { ...; throw error; })'." + }, + { + "selector": "CallExpression[callee.property.name='spread']", + "message": "Bluebird-specific instance method '.spread()' detected. Suggestion: use '.then(([a, b]) => ... )' with array destructuring." + }, + { + "selector": "CallExpression[callee.type='MemberExpression'][callee.property.name='bind'][callee.object.name=/^(Promise|Bluebird|BPromise)$/]", + "message": "Bluebird-specific '.bind()' detected on a Bluebird promise. Suggestion: manually bind 'this' or use arrow functions." + }, + { + "selector": "CallExpression[callee.property.name='delay']", + "message": "Bluebird-specific instance method '.delay()' detected. Suggestion: use setTimeout() or a library (e.g., p-delay)." + }, + { + "selector": "CallExpression[callee.property.name='timeout']", + "message": "Bluebird-specific instance method '.timeout()' detected. Suggestion: use p-timeout or similar library." + }, + { + "selector": "CallExpression[callee.property.name='return']", + "message": "Bluebird-specific instance method '.return()' detected. Suggestion: use '.then(() => someValue)' or rewrite chain." + }, + { + "selector": "CallExpression[callee.property.name='throw']", + "message": "Bluebird-specific instance method '.throw()' detected. Suggestion: use '.then(() => { throw error; })'." + }, + { + "selector": "CallExpression[callee.property.name='asCallback']", + "message": "Bluebird-specific instance method '.asCallback()' detected. Suggestion: use 'util.callbackify' or rewrite manually." + }, + { + "selector": "CallExpression[callee.property.name='nodeify']", + "message": "Bluebird-specific instance method '.nodeify()' detected. Suggestion: use 'util.callbackify' or rewrite manually." + }, + { + "selector": "CallExpression[callee.property.name='reflect']", + "message": "Bluebird-specific instance method '.reflect()' detected. Suggestion: use 'Promise.allSettled' or custom handling." + }, + { + "selector": "CallExpression[callee.property.name='caught']", + "message": "Bluebird-specific instance method '.caught()' detected. Suggestion: use '.catch()' with condition or separate logic." + }, + { + "selector": "CallExpression[callee.property.name='catchReturn']", + "message": "Bluebird-specific instance method '.catchReturn()' detected. Suggestion: use '.catch(err => fallbackValue)' or similar." + }, + { + "selector": "CallExpression[callee.property.name='catchThrow']", + "message": "Bluebird-specific instance method '.catchThrow()' detected. Suggestion: use '.catch(err => { throw newError; })'." + } ] }, "overrides": [ @@ -276,6 +511,7 @@ { "files": [ "api/**/*.js", + "jobServer/**/*.js", "frontend/express/*.js", "frontend/express/libs/*.js", "plugins/pluginManager.js", diff --git a/.husky/pre-commit b/.husky/pre-commit new file mode 100644 index 00000000000..4af22ddd479 --- /dev/null +++ b/.husky/pre-commit @@ -0,0 +1 @@ +npx lint-staged --verbose \ No newline at end of file diff --git a/api/api.js b/api/api.js index 667d6870b5c..bb7bb8c4f8f 100644 --- a/api/api.js +++ b/api/api.js @@ -1,14 +1,11 @@ const http = require('http'); -const cluster = require('cluster'); const formidable = require('formidable'); const countlyConfig = require('./config', 'dont-enclose'); const plugins = require('../plugins/pluginManager.js'); -const jobs = require('./parts/jobs'); const log = require('./utils/log.js')('core:api'); const common = require('./utils/common.js'); const {processRequest} = require('./utils/requestProcessor'); const frontendConfig = require('../frontend/express/config.js'); -const {CacheMaster, CacheWorker} = require('./parts/data/cache.js'); const {WriteBatcher, ReadBatcher, InsertBatcher} = require('./parts/data/batcher.js'); const pack = require('../package.json'); const versionInfo = require('../frontend/express/version.info.js'); @@ -19,18 +16,9 @@ var {MongoDbQueryRunner} = require('./utils/mongoDbQueryRunner.js'); var t = ["countly:", "api"]; common.processRequest = processRequest; -if (cluster.isMaster) { - console.log("Starting Countly", "version", versionInfo.version, "package", pack.version); - if (!common.checkDatabaseConfigMatch(countlyConfig.mongodb, frontendConfig.mongodb)) { - log.w('API AND FRONTEND DATABASE CONFIGS ARE DIFFERENT'); - } - t.push("master"); - t.push("node"); - t.push(process.argv[1]); -} -else { - t.push("worker"); - t.push("node"); +console.log("Starting Countly", "version", versionInfo.version, "package", pack.version); +if (!common.checkDatabaseConfigMatch(countlyConfig.mongodb, frontendConfig.mongodb)) { + log.w('API AND FRONTEND DATABASE CONFIGS ARE DIFFERENT'); } // Finaly set the visible title @@ -47,8 +35,6 @@ plugins.connectToAllDatabases().then(function() { common.drillQueryRunner = new MongoDbQueryRunner(common.drillDb); } - let workers = []; - /** * Set Max Sockets */ @@ -127,22 +113,15 @@ plugins.connectToAllDatabases().then(function() { /** * Set Plugins Logs Config */ - plugins.setConfigs('logs', { - debug: (countlyConfig.logging && countlyConfig.logging.debug) ? countlyConfig.logging.debug.join(', ') : '', - info: (countlyConfig.logging && countlyConfig.logging.info) ? countlyConfig.logging.info.join(', ') : '', - warn: (countlyConfig.logging && countlyConfig.logging.warn) ? countlyConfig.logging.warn.join(', ') : '', - error: (countlyConfig.logging && countlyConfig.logging.error) ? countlyConfig.logging.error.join(', ') : '', - default: (countlyConfig.logging && countlyConfig.logging.default) ? countlyConfig.logging.default : 'warn', - }, undefined, () => { - const cfg = plugins.getConfig('logs'), msg = { - cmd: 'log', - config: cfg - }; - if (process.send) { - process.send(msg); + plugins.setConfigs('logs', + { + debug: (countlyConfig.logging && countlyConfig.logging.debug) ? countlyConfig.logging.debug.join(', ') : '', + info: (countlyConfig.logging && countlyConfig.logging.info) ? countlyConfig.logging.info.join(', ') : '', + warn: (countlyConfig.logging && countlyConfig.logging.warn) ? countlyConfig.logging.warn.join(', ') : '', + error: (countlyConfig.logging && countlyConfig.logging.error) ? countlyConfig.logging.error.join(', ') : '', + default: (countlyConfig.logging && countlyConfig.logging.default) ? countlyConfig.logging.default : 'warn', } - require('./utils/log.js').ipcHandler(msg); - }); + ); /** * Initialize Plugins @@ -155,6 +134,19 @@ plugins.connectToAllDatabases().then(function() { */ async function storeBatchedData(code) { try { + + await new Promise((resolve) => { + server.close((err) => { + if (err) { + console.log("Error closing server:", err); + } + else { + console.log("Server closed successfully"); + resolve(); + } + }); + }); + await common.writeBatcher.flushAll(); await common.insertBatcher.flushAll(); console.log("Successfully stored batch state"); @@ -208,243 +200,113 @@ plugins.connectToAllDatabases().then(function() { console.trace(); }); - /** - * Pass To Master - * @param {cluster.Worker} worker - worker thatw as spawned by master - */ - const passToMaster = (worker) => { - worker.on('message', (msg) => { - if (msg.cmd === 'log') { - workers.forEach((w) => { - if (w !== worker) { - w.send({ - cmd: 'log', - config: msg.config - }); - } - }); - require('./utils/log.js').ipcHandler(msg); - } - else if (msg.cmd === "checkPlugins") { - plugins.checkPluginsMaster(); - } - else if (msg.cmd === "startPlugins") { - plugins.startSyncing(); - } - else if (msg.cmd === "endPlugins") { - plugins.stopSyncing(); - } - else if (msg.cmd === "batch_insert") { - const {collection, doc, db} = msg.data; - common.insertBatcher.insert(collection, doc, db); - } - else if (msg.cmd === "batch_write") { - const {collection, id, operation, db} = msg.data; - common.writeBatcher.add(collection, id, operation, db); - } - else if (msg.cmd === "batch_read") { - const {collection, query, projection, multi, msgId} = msg.data; - common.readBatcher.get(collection, query, projection, multi).then((data) => { - worker.send({ cmd: "batch_read", data: {msgId, data} }); - }) - .catch((err) => { - worker.send({ cmd: "batch_read", data: {msgId, err} }); - }); - } - else if (msg.cmd === "batch_invalidate") { - const {collection, query, projection, multi} = msg.data; - common.readBatcher.invalidate(collection, query, projection, multi); - } - else if (msg.cmd === "dispatchMaster" && msg.event) { - plugins.dispatch(msg.event, msg.data); - } - else if (msg.cmd === "dispatch" && msg.event) { - workers.forEach((w) => { - w.send(msg); - }); - } - }); - }; - - if (cluster.isMaster) { - plugins.installMissingPlugins(common.db); - common.runners = require('./parts/jobs/runner'); - common.cache = new CacheMaster(); - common.cache.start().then(() => { - setImmediate(() => { - plugins.dispatch('/cache/init', {}); - }); - }, e => { - console.log(e); - process.exit(1); - }); - - /*const workerCount = (countlyConfig.api.workers) - ? countlyConfig.api.workers - : os.cpus().length;*/ - - const workerCount = 1; - - for (let i = 0; i < workerCount; i++) { - // there's no way to define inspector port of a worker in the code. So if we don't - // pick a unique port for each worker, they conflict with each other. - let nodeOptions = {}; - if (countlyConfig?.symlinked !== true) { // countlyConfig.symlinked is passed when running in a symlinked setup - const inspectorPort = i + 1 + (common?.config?.masterInspectorPort || 9229); - nodeOptions = { NODE_OPTIONS: "--inspect-port=" + inspectorPort }; - } - const worker = cluster.fork(nodeOptions); - workers.push(worker); + var utcMoment = moment.utc(); + var incObj = {}; + incObj.r = 1; + incObj[`d.${utcMoment.format("D")}.${utcMoment.format("H")}.r`] = 1; + common.db.collection("diagnostic").updateOne({"_id": "no-segment_" + utcMoment.format("YYYY:M")}, {"$set": {"m": utcMoment.format("YYYY:M")}, "$inc": incObj}, {"upsert": true}, function(err) { + if (err) { + log.e(err); } + }); - workers.forEach(passToMaster); - cluster.on('exit', (worker) => { - workers = workers.filter((w) => { - return w !== worker; - }); - const newWorker = cluster.fork(); - workers.push(newWorker); - passToMaster(newWorker); - }); + plugins.installMissingPlugins(common.db); + const taskManager = require('./utils/taskmanager.js'); + //since process restarted mark running tasks as errored + taskManager.errorResults({db: common.db}); - plugins.dispatch("/master", {}); - - // Allow configs to load & scanner to find all jobs classes - setTimeout(() => { - jobs.job('api:topEvents').replace().schedule('at 00:01 am ' + 'every 1 day'); - jobs.job('api:ping').replace().schedule('every 1 day'); - jobs.job('api:clear').replace().schedule('every 1 day'); - jobs.job('api:clearTokens').replace().schedule('every 1 day'); - jobs.job('api:clearAutoTasks').replace().schedule('every 1 day'); - jobs.job('api:task').replace().schedule('every 5 minutes'); - jobs.job('api:userMerge').replace().schedule('every 10 minutes'); - jobs.job("api:ttlCleanup").replace().schedule("every 1 minute"); - //jobs.job('api:appExpire').replace().schedule('every 1 day'); - }, 10000); - - //Record as restarted - - var utcMoment = moment.utc(); - - var incObj = {}; - incObj.r = 1; - incObj[`d.${utcMoment.format("D")}.${utcMoment.format("H")}.r`] = 1; - common.db.collection("diagnostic").updateOne({"_id": "no-segment_" + utcMoment.format("YYYY:M")}, {"$set": {"m": utcMoment.format("YYYY:M")}, "$inc": incObj}, {"upsert": true}, function(err) { - if (err) { - log.e(err); - } - }); - } - else { - console.log("Starting worker", process.pid, "parent:", process.ppid); - const taskManager = require('./utils/taskmanager.js'); + plugins.dispatch("/master", {}); // init hook - common.cache = new CacheWorker(); - common.cache.start(); + const server = http.Server((req, res) => { + const params = { + qstring: {}, + res: res, + req: req + }; - //since process restarted mark running tasks as errored - taskManager.errorResults({db: common.db}); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Keep-Alive', 'timeout=5, max=1000'); - process.on('message', common.log.ipcHandler); + if (req.method.toLowerCase() === 'post') { + const formidableOptions = {}; + if (countlyConfig.api.maxUploadFileSize) { + formidableOptions.maxFileSize = countlyConfig.api.maxUploadFileSize; + } - process.on('message', (msg) => { - if (msg.cmd === 'log') { - common.log.ipcHandler(msg); + const form = new formidable.IncomingForm(formidableOptions); + if (/crash_symbols\/(add_symbol|upload_symbol)/.test(req.url)) { + req.body = []; + req.on('data', (data) => { + req.body.push(data); + }); } - else if (msg.cmd === "dispatch" && msg.event) { - plugins.dispatch(msg.event, msg.data || {}); + else { + req.body = ''; + req.on('data', (data) => { + req.body += data; + }); } - }); - - process.on('exit', () => { - console.log('Exiting due to master exited'); - }); - plugins.dispatch("/worker", {common: common}); - - http.Server((req, res) => { - const params = { - qstring: {}, - res: res, - req: req - }; + let multiFormData = false; + // Check if we have 'multipart/form-data' + if (req.headers['content-type']?.startsWith('multipart/form-data')) { + multiFormData = true; + } - if (req.method.toLowerCase() === 'post') { - const formidableOptions = {}; - if (countlyConfig.api.maxUploadFileSize) { - formidableOptions.maxFileSize = countlyConfig.api.maxUploadFileSize; + form.parse(req, (err, fields, files) => { + //handle bakcwards compatability with formiddble v1 + for (let i in files) { + if (files[i].filepath) { + files[i].path = files[i].filepath; + } + if (files[i].mimetype) { + files[i].type = files[i].mimetype; + } + if (files[i].originalFilename) { + files[i].name = files[i].originalFilename; + } } - - const form = new formidable.IncomingForm(formidableOptions); - if (/crash_symbols\/(add_symbol|upload_symbol)/.test(req.url)) { - req.body = []; - req.on('data', (data) => { - req.body.push(data); - }); + params.files = files; + if (multiFormData) { + let formDataUrl = []; + for (const i in fields) { + params.qstring[i] = fields[i]; + formDataUrl.push(`${i}=${fields[i]}`); + } + params.formDataUrl = formDataUrl.join('&'); } else { - req.body = ''; - req.on('data', (data) => { - req.body += data; - }); + for (const i in fields) { + params.qstring[i] = fields[i]; + } } - - let multiFormData = false; - // Check if we have 'multipart/form-data' - if (req.headers['content-type']?.startsWith('multipart/form-data')) { - multiFormData = true; + if (!params.apiPath) { + processRequest(params); } + }); + } + else if (req.method.toLowerCase() === 'options') { + const headers = {}; + headers["Access-Control-Allow-Origin"] = "*"; + headers["Access-Control-Allow-Methods"] = "POST, GET, OPTIONS"; + headers["Access-Control-Allow-Headers"] = "countly-token, Content-Type"; + res.writeHead(200, headers); + res.end(); + } + //attempt process GET request + else if (req.method.toLowerCase() === 'get') { + processRequest(params); + } + else { + common.returnMessage(params, 405, "Method not allowed"); + } + }); + server.listen(common.config.api.port, common.config.api.host || ''); + server.timeout = common.config.api.timeout || 120000; + server.keepAliveTimeout = common.config.api.timeout || 120000; + server.headersTimeout = (common.config.api.timeout || 120000) + 1000; // Slightly higher - form.parse(req, (err, fields, files) => { - //handle bakcwards compatability with formiddble v1 - for (let i in files) { - if (files[i].filepath) { - files[i].path = files[i].filepath; - } - if (files[i].mimetype) { - files[i].type = files[i].mimetype; - } - if (files[i].originalFilename) { - files[i].name = files[i].originalFilename; - } - } - params.files = files; - if (multiFormData) { - let formDataUrl = []; - for (const i in fields) { - params.qstring[i] = fields[i]; - formDataUrl.push(`${i}=${fields[i]}`); - } - params.formDataUrl = formDataUrl.join('&'); - } - else { - for (const i in fields) { - params.qstring[i] = fields[i]; - } - } - if (!params.apiPath) { - processRequest(params); - } - }); - } - else if (req.method.toLowerCase() === 'options') { - const headers = {}; - headers["Access-Control-Allow-Origin"] = "*"; - headers["Access-Control-Allow-Methods"] = "POST, GET, OPTIONS"; - headers["Access-Control-Allow-Headers"] = "countly-token, Content-Type"; - res.writeHead(200, headers); - res.end(); - } - //attempt process GET request - else if (req.method.toLowerCase() === 'get') { - processRequest(params); - } - else { - common.returnMessage(params, 405, "Method not allowed"); - } - }).listen(common.config.api.port, common.config.api.host || '').timeout = common.config.api.timeout || 120000; - plugins.loadConfigs(common.db); - } + plugins.loadConfigs(common.db); }); diff --git a/api/config.sample.js b/api/config.sample.js index de042402ed0..e881e72c607 100644 --- a/api/config.sample.js +++ b/api/config.sample.js @@ -24,6 +24,7 @@ var countlyConfig = { db: "countly", port: 27017, max_pool_size: 500, + replicaName: "rs0", //username: test, //password: test, //mongos: false, diff --git a/api/jobs/appExpire.js b/api/jobs/appExpire.js index 7fb968c19f7..e33b9bae3df 100644 --- a/api/jobs/appExpire.js +++ b/api/jobs/appExpire.js @@ -1,15 +1,34 @@ -'use strict'; - -const job = require('../parts/jobs/job.js'), - async = require('async'), - plugins = require('../../plugins/pluginManager.js'), - log = require('../utils/log.js')('job:appExpire'), - common = require('../utils/common.js'), - crypto = require('crypto'); +const async = require('async'); +const plugins = require('../../plugins/pluginManager.js'); +const log = require('../utils/log.js')('job:appExpire'); +const common = require('../utils/common.js'); +const crypto = require('crypto'); +const {Job} = require("../../jobServer"); /** Class for the user mergind job **/ -class AppExpireJob extends job.Job { +class AppExpireJob extends Job { + + /** + * Determines if the job should be enabled when created + * @public + * @returns {boolean} True if job should be enabled by default, false otherwise + */ + getEnabled() { + return false; + } + + /** + * Get schedule for the job + * @returns {GetScheduleConfig} Schedule configuration object + */ + getSchedule() { + return { + type: 'schedule', + value: '15 4 * * *' // every day at 4:15 AM + }; + } + /** * Run the job * @param {Db} database connection diff --git a/api/jobs/clear.js b/api/jobs/clear.js deleted file mode 100644 index 1150e4c757a..00000000000 --- a/api/jobs/clear.js +++ /dev/null @@ -1,40 +0,0 @@ -'use strict'; - -const job = require('../parts/jobs/job.js'), - log = require('../utils/log.js')('job:clear'); - -/** Class for job of clearing old jobs **/ -class ClearJob extends job.Job { - /** - * Run the job - * @param {Db} db connection - * @param {done} done callback - */ - run(db, done) { - log.d('Clearing jobs ...'); - var query = { - $and: [ - {status: {$in: [job.STATUS.DONE, job.STATUS.CANCELLED]}}, - { - $or: [ - {finished: {$exists: false}}, - {finished: {$lt: Date.now() - 60 * 60 * 24 * 1000}}, - {finished: null} - ] - } - ] - }; - - db.collection('jobs').deleteMany(query, (err, result) => { - if (err) { - log.e('Error while clearing jobs: ', err); - } - else { - log.d('Done clearing old jobs done before %j:', query.$and[1].$or[1].finished.$lt, result.deletedCount); - } - done(err); - }); - } -} - -module.exports = ClearJob; \ No newline at end of file diff --git a/api/jobs/clearAutoTasks.js b/api/jobs/clearAutoTasks.js index f2bf6e929ac..8174626a96b 100644 --- a/api/jobs/clearAutoTasks.js +++ b/api/jobs/clearAutoTasks.js @@ -1,8 +1,10 @@ "use strict"; -const job = require("../parts/jobs/job.js"); +// const job = require("../parts/jobs/job.js"); const log = require('../utils/log.js')('job:clearAutoTasks'); const taskManager = require('../utils/taskmanager'); +const Job = require("../../jobServer/Job"); + /** * clear task record in db with task id * @param {string} taskId - the id of task in db. @@ -20,7 +22,18 @@ const clearTaskRecord = (taskId) => { }; /** Class for job of clearing auto tasks created long time ago **/ -class ClearAutoTasks extends job.Job { +class ClearAutoTasks extends Job { + /** + * Get the schedule configuration for this job + * @returns {GetScheduleConfig} schedule configuration + */ + getSchedule() { + return { + type: "schedule", + value: "0 2 * * *" // Every day at 2:00 AM + }; + } + /** * Run the job * @param {Db} db connection diff --git a/api/jobs/clearTokens.js b/api/jobs/clearTokens.js index 2a3932c2313..9587479acc7 100644 --- a/api/jobs/clearTokens.js +++ b/api/jobs/clearTokens.js @@ -1,10 +1,23 @@ 'use strict'; -const job = require('../parts/jobs/job.js'), - authorize = require('../utils/authorizer.js'); +// const job = require('../parts/jobs/job.js'), +const authorize = require('../utils/authorizer.js'); +const Job = require("../../jobServer/Job"); /** Class for job of clearing tokens **/ -class CleanTokensJob extends job.Job { +class CleanTokensJob extends Job { + + /** + * Get the schedule configuration for this job + * @returns {GetScheduleConfig} schedule configuration + */ + getSchedule() { + return { + type: "schedule", + value: "30 2 * * *" // Every day at 2:30 AM + }; + } + /** * Run the job * @param {Db} db connection diff --git a/api/jobs/ipcTest.js b/api/jobs/ipcTest.js deleted file mode 100644 index e7e63d39bd6..00000000000 --- a/api/jobs/ipcTest.js +++ /dev/null @@ -1,128 +0,0 @@ -'use strict'; - -const job = require('../parts/jobs/job.js'), - res = require('../parts/jobs/resource.js'), - log = require('../utils/log.js')('job:ipcTest'); - -/** Class for testing resource handling for jobs **/ -class TestResource extends res.Resource { - /** - * Open resource - * @returns {Promise} promise - **/ - open() { - this.a = 0; - return new Promise((resolve) => { - log.d('open'); - setTimeout(() => { - this.opened(); - resolve(); - }, 2000); - }); - } - - /** - * Close resource - * @returns {Promise} promise - **/ - close() { - return new Promise((resolve) => { - log.d('close'); - setTimeout(() => { - this.closed(); - resolve(); - }, 2000); - }); - } - - /** - * Check if resource is used - * @returns {Promise} promise - **/ - checkActive() { - return new Promise((resolve) => { - log.d('checkActive'); - setTimeout(() => { - resolve(this.a++ > 0 ? false : true); - }, 2000); - }); - } - -} -/** Class for testing ipc jobs **/ -class Test extends job.IPCJob { - /** - * Create resource - * @returns {Resource} resourse - **/ - createResource() { - return new TestResource(); - } - - /** - * Create between processes - * @returns {Promise} promise - **/ - divide() { - return new Promise((resolve) => { - log.d('dividing ', this._json); - - setTimeout(() => { - resolve([{ - smth: 1, - size: 40 - }], [{ - smth: 1, - size: 50 - }]); - }, 500); - }); - } - - /** - * Run the job - * @param {Db} db connection - * @param {done} done callback - * @param {function} progress to report progress of the job - */ - run(db, done, progress) { - log.d('running ', this._json); - log.d('resource is ', typeof this.resource); - log.d('resource is open? ', this.resource.isOpen); - log.d('resource is active? ', this.resource.isAssigned); - - if (this.done < 10) { - setTimeout(() => { - progress(this._json.size, 10, 'ten'); - }, 1000); - } - - if (this.done < 20) { - setTimeout(() => { - progress(this._json.size, 20, 'twenty'); - // a = b; - }, 5000); - } - - if (this.done < 30) { - setTimeout(() => { - progress(this._json.size, 30, 'thirty'); - }, 6000); - } - - setTimeout(() => { - progress(100, 100, 'sixty'); - done(); - }, 60000); - - setTimeout(() => { - log.d('after done resource is ', typeof this.resource); - done('Big fat error'); - db.collection('jobs').findOne({_id: this._json._id}, (err, obj) => { - log.d('after done job findOne ', err, obj); - }); - }, 120000); - } -} - -module.exports = Test; \ No newline at end of file diff --git a/api/jobs/ping.js b/api/jobs/ping.js index 984e82c566c..f9c81a5780d 100644 --- a/api/jobs/ping.js +++ b/api/jobs/ping.js @@ -1,28 +1,45 @@ 'use strict'; -const job = require('../parts/jobs/job.js'), - log = require('../utils/log.js')('job:ping'), - countlyConfig = require("../../frontend/express/config.js"), - versionInfo = require('../../frontend/express/version.info'), - plugins = require('../../plugins/pluginManager.js'), - request = require('countly-request')(plugins.getConfig("security")); +// const job = require('../parts/jobs/job.js'); +const log = require('../utils/log.js')('job:ping'); +const countlyConfig = require("../../frontend/express/config.js"); +const versionInfo = require('../../frontend/express/version.info'); +const plugins = require('../../plugins/pluginManager.js'); + + +const Job = require("../../jobServer/Job"); /** Class for the job of pinging servers **/ -class PingJob extends job.Job { +class PingJob extends Job { + + /** + * Get the schedule configuration for this job + * @returns {GetScheduleConfig} schedule configuration + */ + getSchedule() { + return { + type: "schedule", + value: "0 1 * * *" // Every day at 1:00 AM + }; + } + /** * Run the ping job * @param {Db} db connection * @param {done} done callback */ run(db, done) { - request({strictSSL: false, uri: (process.env.COUNTLY_CONFIG_PROTOCOL || "http") + "://" + (process.env.COUNTLY_CONFIG_HOSTNAME || "localhost") + (countlyConfig.path || "") + "/configs"}, function() {}); - var countlyConfigOrig = JSON.parse(JSON.stringify(countlyConfig)); - var url = "https://count.ly/configurations/ce/tracking"; - if (versionInfo.type !== "777a2bf527a18e0fffe22fb5b3e322e68d9c07a6") { - url = "https://count.ly/configurations/ee/tracking"; - } + plugins.loadConfigs(db, function() { + const request = require('countly-request')(plugins.getConfig("security")); + request({strictSSL: false, uri: (process.env.COUNTLY_CONFIG_PROTOCOL || "http") + "://" + (process.env.COUNTLY_CONFIG_HOSTNAME || "localhost") + (countlyConfig.path || "") + "/configs"}, function() {}); + var countlyConfigOrig = JSON.parse(JSON.stringify(countlyConfig)); + var url = "https://count.ly/configurations/ce/tracking"; + if (versionInfo.type !== "777a2bf527a18e0fffe22fb5b3e322e68d9c07a6") { + url = "https://count.ly/configurations/ee/tracking"; + } + const offlineMode = plugins.getConfig("api").offline_mode; const { countly_tracking } = plugins.getConfig('frontend'); if (!offlineMode) { @@ -56,12 +73,12 @@ class PingJob extends job.Job { let domain = plugins.getConfig('api').domain; try { - // try to extract hostname from full domain url + // try to extract hostname from full domain url const urlObj = new URL(domain); domain = urlObj.hostname; } catch (_) { - // do nothing, domain from config will be used as is + // do nothing, domain from config will be used as is } request({ diff --git a/api/jobs/task.js b/api/jobs/task.js index 06b6893a7e7..adf4d2cc294 100644 --- a/api/jobs/task.js +++ b/api/jobs/task.js @@ -1,9 +1,10 @@ 'use strict'; -const job = require('../parts/jobs/job.js'), - log = require('../utils/log.js')('api:task'), - asyncjs = require("async"), - plugins = require('../../plugins/pluginManager.js'); +// const job = require('../parts/jobs/job.js'); +const Job = require("../../jobServer/Job"); +const log = require('../utils/log.js')('api:task'); +const asyncjs = require("async"); +const plugins = require('../../plugins/pluginManager.js'); const common = require('../utils/common.js'); const taskmanager = require('../utils/taskmanager.js'); @@ -14,7 +15,19 @@ common.processRequest = processRequest; /** * Task Monitor Job extend from Countly Job */ -class MonitorJob extends job.Job { +class MonitorJob extends Job { + + /** + * Get the schedule configuration for this job + * @returns {GetScheduleConfig} schedule configuration + */ + getSchedule() { + return { + type: "schedule", + value: "*/5 * * * *" // Every 5 minutes + }; + } + /** * Run the job * @param {Db} db connection diff --git a/api/jobs/test.js b/api/jobs/test.js deleted file mode 100644 index b25d3825e99..00000000000 --- a/api/jobs/test.js +++ /dev/null @@ -1,129 +0,0 @@ -'use strict'; - -/* jshint ignore:start */ - -const J = require('../parts/jobs/job.js'), - R = require('../parts/jobs/resource.js'), - RET = require('../parts/jobs/retry.js'); - -/** Class for testing resource handling for jobs **/ -class TestResource extends R.Resource { - /** - * Open resource - * @returns {Promise} promise - **/ - open() { - console.log('resource: opening in %d', process.pid); - this.opened(); - this.openedTime = Date.now(); - return Promise.resolve(); - } - - /** - * Close resource - * @returns {Promise} promise - **/ - close() { - console.log('resource: closing in %d', process.pid); - this.closed(); - return Promise.resolve(); - } - - /** - * Kill resource - * @returns {Promise} promise - **/ - kill() { - console.log('resource: killed in %d', process.pid); - return Promise.resolve(); - } - - /** - * Check if resource is used - * @returns {Promise} promise - **/ - checkActive() { - console.log('resource: checkActive in %d', process.pid); - return Promise.resolve(Date.now() - this.openedTime < 20000); - } - - /** Start using resource **/ - start() { - this.openedTime = Date.now(); - super.start.apply(this, arguments); - } -} -/** Class for testing ipc jobs **/ -class IPCTestJob extends J.IPCJob { - /** - * Prepare the job - * @param {object} manager - resource manager - * @param {Db} db - db connection - */ - async prepare(manager, db) { - console.log('preparing in %d', process.pid); - await new Promise((res, rej) => db.collection('jobs').updateOne({_id: this._id}, {$set: {'data.prepared': 1}}, err => err ? rej(err) : res())); - } - - /** - * Get resource name - * @returns {string} resource name - **/ - resourceName() { - return 'resource:test'; - } - - /** - * Create resource - * @param {string} _id - resource _id - * @param {string} name - resource name - * @param {Db} db - db connection - * @returns {Resource} resource - */ - createResource(_id, name, db) { - return new TestResource(_id, name, db); - } - - /** - * Get retry policy - * @returns {RetryPolicy} retry policy - **/ - retryPolicy() { - return new RET.NoRetryPolicy(); - } - - /** - * Get concurrency - * @returns {number} concurency - **/ - getConcurrency() { - return this.data && this.data.concurrency || 0; - } - - /** - * Run the job - * @param {Db} db connection - */ - async run(db) { - console.log('running in %d', process.pid); - if (!this.resource) { - throw new Error('Resource should exist'); - } - if (!(this.resource instanceof TestResource)) { - throw new Error('Resource should be TestResource'); - } - await new Promise((res, rej) => db.collection('jobs').updateOne({_id: this._id}, {$set: {'data.run': 1}}, err => err ? rej(err) : res())); - - if (this.data && this.data.fail) { - throw new Error(this.data.fail); - } - - if (this.data && this.data.concurrency) { - await new Promise(res => setTimeout(res, 3000)); - } - - console.log('done running in %d', process.pid); - } -} - -module.exports = IPCTestJob; \ No newline at end of file diff --git a/api/jobs/topEvents.js b/api/jobs/topEvents.js index 20d3af998ac..961854d7311 100644 --- a/api/jobs/topEvents.js +++ b/api/jobs/topEvents.js @@ -1,4 +1,5 @@ -const job = require("../parts/jobs/job.js"); +// const job = require("../parts/jobs/job.js"); +const Job = require("../../jobServer/Job"); const crypto = require("crypto"); const Promise = require("bluebird"); const countlyApi = { @@ -14,7 +15,7 @@ const common = require('../utils/common.js'); const log = require('../utils/log.js')('job:topEvents'); /** Class for job of top events widget **/ -class TopEventsJob extends job.Job { +class TopEventsJob extends Job { /** * TopEvents initialize function @@ -231,6 +232,17 @@ class TopEventsJob extends job.Job { done(error); } } + + /** + * Get schedule + * @returns {GetScheduleConfig} schedule + */ + getSchedule() { + return { + type: "schedule", + value: "1 0 * * *" // every day at 00:01 + }; + } } /** diff --git a/api/jobs/ttlCleanup.js b/api/jobs/ttlCleanup.js index 1c168d38b21..5de0df26dbd 100644 --- a/api/jobs/ttlCleanup.js +++ b/api/jobs/ttlCleanup.js @@ -1,12 +1,25 @@ const plugins = require("../../plugins/pluginManager.js"); const common = require('../utils/common'); -const job = require("../parts/jobs/job.js"); +// const job = require("../parts/jobs/job.js"); const log = require("../utils/log.js")("job:ttlCleanup"); +const Job = require("../../jobServer/Job"); /** * Class for job of cleaning expired records inside ttl collections */ -class TTLCleanup extends job.Job { +class TTLCleanup extends Job { + + /** + * Get the schedule configuration for this job + * @returns {GetScheduleConfig} schedule configuration + */ + getSchedule() { + return { + type: "schedule", + value: "* * * * *" // Every minute + }; + } + /** * Run the job */ diff --git a/api/jobs/userMerge.js b/api/jobs/userMerge.js index 973dc4fbdc1..c8c4ffa5363 100644 --- a/api/jobs/userMerge.js +++ b/api/jobs/userMerge.js @@ -1,8 +1,10 @@ -const job = require('../parts/jobs/job.js'), - plugins = require('../../plugins/pluginManager.js'), - log = require('../utils/log.js')('job:userMerge'); -var Promise = require("bluebird"); -var usersApi = require('../parts/mgmt/app_users.js'); + +// const job = require('../parts/jobs/job.js'); +const Job = require("../../jobServer/Job"); +const plugins = require('../../plugins/pluginManager.js'); +const log = require('../utils/log.js')('job:userMerge'); +const Promise = require("bluebird"); +const usersApi = require('../parts/mgmt/app_users.js'); var getMergeDoc = function(data) { @@ -226,7 +228,19 @@ var handleMerges = function(db, callback) { }); }; /** Class for the user mergind job **/ -class UserMergeJob extends job.Job { +class UserMergeJob extends Job { + + /** + * Get the schedule configuration for this job + * @returns {GetScheduleConfig} schedule configuration + */ + getSchedule() { + return { + type: "schedule", + value: "*/5 * * * *" // Every 5 minutes + }; + } + /** * Run the job * @param {Db} db connection diff --git a/api/parts/data/batcher.js b/api/parts/data/batcher.js index 2b9b98aa110..a3bb7529aad 100644 --- a/api/parts/data/batcher.js +++ b/api/parts/data/batcher.js @@ -1,5 +1,4 @@ -const crypto = require('crypto'); -const cluster = require('cluster'); + const plugins = require('../../../plugins/pluginManager.js'); const log = require('../../utils/log.js')("batcher"); const common = require('../../utils/common.js'); @@ -59,7 +58,6 @@ class InsertBatcher { let config = plugins.getConfig("api"); this.period = config.batch_period * 1000; this.process = config.batch_processing; - this.shared = false; } /** @@ -152,26 +150,21 @@ class InsertBatcher { * @param {string} db - name of the database for which to write data */ insert(collection, doc, db = "countly") { - if (!this.shared || cluster.isMaster) { - if (!this.data[db][collection]) { - this.data[db][collection] = []; - } - if (Array.isArray(doc)) { - for (let i = 0; i < doc.length; i++) { - this.data[db][collection].push(doc[i]); - } - batcherStats.insert_queued += doc.length; - } - else { - batcherStats.insert_queued++; - this.data[db][collection].push(doc); - } - if (!this.process) { - this.flush(db, collection); + if (!this.data[db][collection]) { + this.data[db][collection] = []; + } + if (Array.isArray(doc)) { + for (let i = 0; i < doc.length; i++) { + this.data[db][collection].push(doc[i]); } + batcherStats.insert_queued += doc.length; } else { - process.send({ cmd: "batch_insert", data: {collection, doc, db} }); + batcherStats.insert_queued++; + this.data[db][collection].push(doc); + } + if (!this.process) { + this.flush(db, collection); } } } @@ -223,7 +216,6 @@ class WriteBatcher { let config = plugins.getConfig("api"); this.period = config.batch_period * 1000; this.process = config.batch_processing; - this.shared = false; } /** @@ -363,34 +355,29 @@ class WriteBatcher { */ add(collection, id, operation, db = "countly", options) { options = options || {}; - if (!this.shared || cluster.isMaster) { - if (!this.data[db][collection]) { - this.data[db][collection] = {data: {}, "t": options.token, "upsert": options.upsert}; + if (!this.data[db][collection]) { + this.data[db][collection] = {data: {}, "t": options.token, "upsert": options.upsert}; + } + else { + if (options.token) { + this.data[db][collection].t = options.token; } - else { - if (options.token) { - this.data[db][collection].t = options.token; - } - if (typeof options.upsert !== "undefined") { - this.data[db][collection].upsert = options.upsert; - } + if (typeof options.upsert !== "undefined") { + this.data[db][collection].upsert = options.upsert; } + } - if (id) { - if (!this.data[db][collection].data[id]) { - this.data[db][collection].data[id] = {id: id, value: operation}; - batcherStats.update_queued++; - } - else { - this.data[db][collection].data[id].value = common.mergeQuery(this.data[db][collection].data[id].value, operation); - } - if (!this.process) { - this.flush(db, collection); - } + if (id) { + if (!this.data[db][collection].data[id]) { + this.data[db][collection].data[id] = {id: id, value: operation}; + batcherStats.update_queued++; + } + else { + this.data[db][collection].data[id].value = common.mergeQuery(this.data[db][collection].data[id].value, operation); + } + if (!this.process) { + this.flush(db, collection); } - } - else { - process.send({ cmd: "batch_write", data: {collection, id, operation, db} }); } } } @@ -409,25 +396,10 @@ class ReadBatcher { constructor(db) { this.db = db; this.data = {}; - this.promises = {}; plugins.loadConfigs(db, () => { this.loadConfig(); this.schedule(); }); - - if (!cluster.isMaster) { - process.on("message", (msg) => { - if (msg.cmd === "batch_read" && msg.data && msg.data.msgId && this.promises[msg.data.msgId]) { - if (msg.data.data) { - this.promises[msg.data.msgId].resolve(msg.data.data); - } - else { - this.promises[msg.data.msgId].reject(msg.data.err); - } - delete this.promises[msg.data.msgId]; - } - }); - } } /** @@ -438,7 +410,6 @@ class ReadBatcher { this.period = config.batch_read_period * 1000; this.ttl = config.batch_read_ttl * 1000; this.process = config.batch_read_processing; - this.onMaster = false; } /** @@ -541,17 +512,12 @@ class ReadBatcher { * @param {bool} multi - true if multiple documents */ invalidate(collection, query, projection, multi) { - if (!this.onMaster || cluster.isMaster) { - var id = JSON.stringify(query) + "_" + multi; - if (!this.data[collection]) { - this.data[collection] = {}; - } - if (this.data[collection][id] && !this.data[collection][id].promise) { - delete this.data[collection][id]; - } + var id = JSON.stringify(query) + "_" + multi; + if (!this.data[collection]) { + this.data[collection] = {}; } - else { - process.send({ cmd: "batch_invalidate", data: {collection, query, projection, multi} }); + if (this.data[collection][id] && !this.data[collection][id].promise) { + delete this.data[collection][id]; } } @@ -564,65 +530,56 @@ class ReadBatcher { * @returns {Promise} promise */ get(collection, query, projection, multi) { - if (!this.onMaster || cluster.isMaster) { - var id = JSON.stringify(query) + "_" + multi; - if (!this.data[collection]) { - this.data[collection] = {}; - } - var good_projection = true; - var keysSaved = this.keysFromProjectionObject(this.data[collection][id] && this.data[collection][id].projection); - var keysNew = this.keysFromProjectionObject(projection); - - if (this.data[collection][id] && (keysSaved.have_projection || keysNew.have_projection)) { - if (keysSaved.have_projection) { - for (let p = 0; p < keysNew.keys.length; p++) { - if (keysSaved.keys.indexOf(keysNew.keys[p]) === -1) { - good_projection = false; - keysSaved.keys.push(keysNew.keys[p]); - } - } - } - if (!good_projection) { - projection = {}; - for (var p = 0; p < keysSaved.keys.length; p++) { - projection[keysSaved.keys[p]] = 1; + var id = JSON.stringify(query) + "_" + multi; + if (!this.data[collection]) { + this.data[collection] = {}; + } + var good_projection = true; + var keysSaved = this.keysFromProjectionObject(this.data[collection][id] && this.data[collection][id].projection); + var keysNew = this.keysFromProjectionObject(projection); + + if (this.data[collection][id] && (keysSaved.have_projection || keysNew.have_projection)) { + if (keysSaved.have_projection) { + for (let p = 0; p < keysNew.keys.length; p++) { + if (keysSaved.keys.indexOf(keysNew.keys[p]) === -1) { + good_projection = false; + keysSaved.keys.push(keysNew.keys[p]); } } } - - if (!this.process || !good_projection || !this.data[collection][id] || this.data[collection][id].last_updated < Date.now() - this.period) { - if (this.process) { - this.data[collection][id] = { - query: query, - promise: this.getData(collection, id, query, projection, multi), - projection: projection, - last_used: Date.now(), - last_updated: Date.now(), - multi: multi - }; - return this.data[collection][id].promise; - } - else { - return this.getData(collection, id, query, projection, multi); + if (!good_projection) { + projection = {}; + for (var p = 0; p < keysSaved.keys.length; p++) { + projection[keysSaved.keys[p]] = 1; } } - //we already have a read for this - else if (this.data[collection][id] && this.data[collection][id].promise) { + } + + if (!this.process || !good_projection || !this.data[collection][id] || this.data[collection][id].last_updated < Date.now() - this.period) { + if (this.process) { + this.data[collection][id] = { + query: query, + promise: this.getData(collection, id, query, projection, multi), + projection: projection, + last_used: Date.now(), + last_updated: Date.now(), + multi: multi + }; return this.data[collection][id].promise; } else { - this.data[collection][id].last_used = Date.now(); - - return new Promise((resolve) => { - resolve(this.data[collection][id].data); - }); + return this.getData(collection, id, query, projection, multi); } } + //we already have a read for this + else if (this.data[collection][id] && this.data[collection][id].promise) { + return this.data[collection][id].promise; + } else { - return new Promise((resolve, reject) => { - let msgId = getId(); - this.promises[msgId] = {resolve, reject}; - process.send({ cmd: "batch_read", data: {collection, query, projection, multi, msgId} }); + this.data[collection][id].last_used = Date.now(); + + return new Promise((resolve) => { + resolve(this.data[collection][id].data); }); } } @@ -706,12 +663,5 @@ function promiseOrCallback(promise, callback) { return promise; } -/** - * Generate random id - * @returns {string} randomly generated id - */ -function getId() { - return crypto.randomBytes(16).toString("hex"); -} module.exports = {WriteBatcher, ReadBatcher, InsertBatcher}; diff --git a/api/parts/data/cache.js b/api/parts/data/cache.js index a2da942aaf1..cc4263b6e8f 100644 --- a/api/parts/data/cache.js +++ b/api/parts/data/cache.js @@ -1,13 +1,11 @@ 'use strict'; -const log = require('../../utils/log.js')('cache:' + process.pid), - // common = require('../../utils/common.js'), - { CentralWorker, CentralMaster } = require('../jobs/ipc.js'), +const log = require('../../utils/log.js')('cache'), { Jsonable } = require('../../utils/models'), LRU = require('lru-cache'), config = require('../../config.js'); -const CENTRAL = 'cache', OP = {INIT: 'i', PURGE: 'p', READ: 'r', WRITE: 'w', UPDATE: 'u'}; +// const CENTRAL = 'cache', OP = {INIT: 'i', PURGE: 'p', READ: 'r', WRITE: 'w', UPDATE: 'u'}; // new job: {o: 2, k: 'ObjectId', g: 'jobs', d: '{"name": "jobs:clean", "status": 0, ...}'} // job update: {o: 3, k: 'ObjectId', g: 'jobs', d: '{"status": 1}'} // job retreival: {o: 1, k: 'ObjectId', g: 'jobs'} @@ -16,10 +14,12 @@ const CENTRAL = 'cache', OP = {INIT: 'i', PURGE: 'p', READ: 'r', WRITE: 'w', UPD /** * Get value in nested objects - * @param {object} obj - object to checl + * @param {object} obj - object to check * @param {string|array} is - keys for nested value * @param {any} value - if provided acts as setter setting this value in nested object * @return {varies} returns value in provided key in nested object + * @note + * TODO: change to iterative if profiling finds memory issues */ const dot = function(obj, is, value) { if (typeof is === 'string') { @@ -54,7 +54,13 @@ class DataStore { constructor(size, age, dispose, Cls) { this.size = size; this.age = age; - this.lru = new LRU({max: size || 10000, ttl: age || Number.MAX_SAFE_INTEGER, dispose: dispose, noDisposeOnSet: true, updateAgeOnGet: true}); + this.lru = new LRU({ + max: size || 10000, + ttl: age || Number.MAX_SAFE_INTEGER, + dispose: dispose, + noDisposeOnSet: true, + updateAgeOnGet: true + }); if (Cls) { this.Cls = Cls; this.Clas = require('../../../' + Cls[0])[Cls[1]]; @@ -151,415 +157,14 @@ class DataStore { * - notifies master about updates; * - loads data from master when local copy misses a particular key. */ -class CacheWorker { +class Cache { /** * Constructor - * * @param {Number} size max number of cache groups */ constructor(size = 100) { this.data = new DataStore(size); - this.started = false; - - this.ipc = new CentralWorker(CENTRAL, (m, reply) => { - let {o, k, g, d} = m || {}; - - if (!g) { - return; - } - log.d('handling %s: %j', reply ? 'reply' : 'broadcast', m); - - let store = this.data.read(g); - - if (o === OP.INIT) { - this.data.write(g, new DataStore(d.size, d.age, undefined, d.Cls)); - return; - } - else if (!store) { - log.d('Group store is not initialized'); - return; - } - - if (o === OP.PURGE) { - if (k) { - store.write(k, null); - } - else { // purgeAll - store.iterate(id => store.write(id, null)); - } - } - else if (o === OP.READ) { - store.write(k, d); - } - else if (o === OP.WRITE) { - store.write(k, d); - } - else if (o === OP.UPDATE) { - store.update(k, d); - } - else { - throw new Error(`Illegal cache operaion: ${o}, ${k}, ${g}, ${d}`); - } - - // store.iterate((k, v) => { - // log.d('have %s: %j', k, v); - // }); - }); - } - - /** - * Start listening to IPC messages - */ - async start() { - if (this.started === true) { - return; - } - - if (this.started === false) { - log.d('starting worker'); - this.started = new Promise((resolve, reject) => { - let timeout = setTimeout(() => { - reject(new Error('Failed to start CacheWorker on timeout')); - }, 10000); - this.ipc.attach(); - this.ipc.request({o: OP.INIT}).then(ret => { - log.d('got init response: %j', ret); - Object.keys(ret).forEach(g => { - if (!this.data.read(g)) { - this.data.write(g, new DataStore(ret[g].size, ret[g].age, undefined, ret[g].Cls)); - if (ret[g].data) { - log.d('got %d data objects in init response', Object.keys(ret[g].data).length); - for (let k in ret[g].data) { - this.data.read(g).write(k, ret[g].data[k]); - } - } - } - }); - this.started = true; - clearTimeout(timeout); - resolve(); - }); - }); - } - - await this.started; - } - - /** - * Stop worker - */ - async stop() { - this.ipc.detach(); - } - - /** - * Write data to cache: - * - send a write to the master; - * - wait for a response with write status; - * - write the data to local copy and return it in case of success, throw error otherwise. - * - * @param {String} group group key - * @param {String} id data key - * @param {Object} data data to store - * @return {Object} data if succeeded, null otherwise, throws in case of an error - */ - async write(group, id, data) { - await this.start(); - - if (!group || !id || !data || typeof id !== 'string') { - throw new Error('Where are my args?'); - } - else if (!this.data.read(group)) { - throw new Error('No such cache group'); - } - log.d(`writing ${group}:${id}`); - let rsp = await this.ipc.request({o: OP.WRITE, g: group, k: id, d: data instanceof Jsonable ? data.json : data}); - if (rsp) { - this.data.read(group).write(id, rsp); - } - - return this.has(group, id); - } - - /** - * Update data in the cache: - * - send an update to the master; - * - wait for a response with update status; - * - update the data in the local copy and return updated object in case of success, throw error otherwise. - * - * @param {String} group group key - * @param {String} id data key - * @param {Object} update data to store - * @return {Object} data if succeeded, null otherwise, throws in case of an error - */ - async update(group, id, update) { - await this.start(); - - if (!group || !id || !update || typeof id !== 'string') { - throw new Error('Where are my args?!'); - } - else if (!this.data.read(group)) { - throw new Error('No such cache group'); - } - log.d(`updating ${group}:${id}`); - let rsp = await this.ipc.request({o: OP.UPDATE, g: group, k: id, d: update}), - store = this.data.read(group); - if (rsp) { - store.update(id, rsp); - } - else { - store.remove(id); - - } - return this.has(group, id); - } - - /** - * Remove a record from cache. - * - * @param {String} group group key - * @param {String} id data key - * @return {Boolean} true if removed - */ - async remove(group, id) { - await this.start(); - - if (!group || !id || typeof id !== 'string') { - throw new Error('Where are my args?!'); - } - else if (!this.data.read(group)) { - throw new Error('No such cache group'); - } - log.d(`removing ${group}:${id}`); - await this.ipc.request({o: OP.WRITE, g: group, k: id, d: null}); - let store = this.data.read(group); - if (store) { - store.remove(id); - } - return this.has(group, id) === null; - } - - /** - * Remove a record from cache. - * - * @param {String} group group key - * @param {String} id data key - * @return {Boolean} true if removed - */ - async purge(group, id) { - await this.start(); - - if (!group || !id || typeof id !== 'string') { - throw new Error('Where are my args?!'); - } - else if (!this.data.read(group)) { - throw new Error('No such cache group'); - } - log.d(`purging ${group}:${id}`); - await this.ipc.request({o: OP.PURGE, g: group, k: id}); - let store = this.data.read(group); - if (store) { - store.remove(id); - } - return this.has(group, id) === null; - } - - /** - * Remove from cache all records for a given group. - * - * @param {String} group group key - */ - async purgeAll(group) { - await this.start(); - - if (!group) { - throw new Error('Where are my args?!'); - } - else if (!this.data.read(group)) { - throw new Error('No such cache group'); - } - log.d(`purging ${group}`); - await this.ipc.request({o: OP.PURGE, g: group}); - let store = this.data.read(group); - store.iterate(id => store.write(id, null)); - } - - /** - * Read a record from cache: - * - from local copy if exists; - * - send a read request to master otherwise. - * - * @param {String} group group key - * @param {String} id data key - * @return {Object} data if any, null otherwise - */ - async read(group, id) { - await this.start(); - - if (!group || !id) { - throw new Error('Where are my args?!'); - } - let data = this.has(group, id); - if (data) { - return data; - } - else { - let rsp = await this.ipc.request({o: OP.READ, g: group, k: id}); - if (rsp) { - let store = this.data.read(group); - if (!store) { - throw new Error(`No store for a group ${group}?!`); - // store = this.data.write(group, new DataStore(this.size)); - } - store.write(id, rsp); - } - return this.has(group, id); - } - } - - /** - * Check if local copy has data under the key. - * - * @param {String} group group key - * @param {String} id data key - * @return {Object} data if any, null otherwise - */ - has(group, id) { - if (!group) { - throw new Error('Where are my args?!'); - } - let store = this.data.read(group); - if (id) { - return store && store.read(id) || null; - } - else { - return store; - } - } - - /** - * Just a handy method which returns an object with partials with given group. - * - * @param {String} group group name - * @return {Object} object with all the {@code CacheWorker} methods without group - */ - cls(group) { - return { - read: this.read.bind(this, group), - write: this.write.bind(this, group), - update: this.update.bind(this, group), - remove: this.remove.bind(this, group), - purge: this.purge.bind(this, group), - purgeAll: this.purgeAll.bind(this, group), - has: this.has.bind(this, group), - iterate: f => { - let g = this.data.read(group); - if (g) { - g.iterate(f); - } - else { - log.e('no cache group %s to iterate on', group); - } - } - }; - } -} - -/** - * Cache instance for master process: - * - listen for requests from workers; - * - call group operators to read/write/udpate - */ -class CacheMaster { - /** - * Constructor - * - * @param {Number} size max number of cache groups - */ - constructor(size = 100) { - this.data = new DataStore(size, Number.MAX_SAFE_INTEGER); this.operators = {}; - this.initialized = {}; - this.delayed_messages = []; - this.ipc = new CentralMaster(CENTRAL, ({o, g, k, d}, reply, from) => { - log.d('handling %s: %j / %j / %j / %j', reply ? 'reply' : 'broadcast', o, g, k, d); - - if (o === OP.INIT) { - this.initialized[from] = true; - let ret = {}; - this.data.iterate((group, store) => { - let data = {}; - store.iterate((key, obj) => { - data[key] = obj instanceof Jsonable ? obj.json : obj; - }); - ret[group] = {size: store.size, age: store.age, Cls: store.Cls, data}; - }); - setImmediate(() => { - let remove = []; - this.delayed_messages.filter(arr => arr[0] === from).forEach(arr => { - remove.push(arr); - this.ipc.send(arr[0], arr[1]); - }); - if (remove.length) { - log.d('sent %d delayed messages after %d worker\'s init', remove.length, from); - remove.forEach(m => { - const i = this.delayed_messages.indexOf(m); - if (i !== -1) { - this.delayed_messages.splice(i, 1); - } - }); - } - }); - return ret; - } - - let store = this.data.read(g); - if (!store) { - log.d(`No store for group ${g}`); - throw new Error('No such store ' + g); - } - - if (o === OP.PURGE) { - if (k) { - return this.purge(g, k, from); - } - else { - return this.purgeAll(g, from); - } - } - else if (o === OP.READ) { - return this.read(g, k, from); - } - else if (o === OP.WRITE) { - return this.write(g, k, d, from); - } - else if (o === OP.UPDATE) { - return this.update(g, k, d, from); - } - else if (o === OP.REMOVE) { - return this.remove(g, k, from); - } - else { - throw new Error(`Illegal cache operaion: ${o}, ${k}, ${g}, ${d}`); - } - }); - } - - /** - * Attach to IPC - * - * @return {Promise} void - */ - async start() { - this.ipc.attach(); - log.d('started master'); - } - - /** - * Detaches IPC instance - */ - stop() { - this.ipc.detach(); } /** @@ -569,55 +174,44 @@ class CacheMaster { * @param {Function} options.init initializer - an "async () => [Object]" kind of function, preloads data to cache on startup * @param {string[]} options.Cls class - an optional array of ["require path", "export name"] which resolves to a Jsonable subclass to construct instances * @param {Function} options.read reader - an "async (key) => Object" kind of function, returns data to cache if any for the key supplied - * @param {Function} options.write writer - an "async (key, data) => Object" kind of function, persists the data cached if needed (must return the data persisted on success) + * @param {Function} options.write writer - an "async (key, data) => Object" kind of function, persists the data cached if needed * @param {Function} options.update updater - an "async (key, update) => Object" kind of function, updates persisted data if needed * @param {Function} options.remove remover - an "async (key) => Object" kind of function, removes persisted data if needed * @param {int} age how long in ms to keep records in memory for the group * @param {int} size how much records to keep in memory for the group */ - init(group, {init, Cls, read, write, update, remove}, size = null, age = null) { + async init(group, {init, Cls, read, write, update, remove}, size = null, age = null) { this.operators[group] = {init, Cls, read, write, update, remove}; if (!size && size !== 0) { - size = config.api && config.api.cache && config.api.cache[group] && config.api.cache[group].size !== undefined ? config.api.cache[group].size : 10000; + size = config.api?.cache?.[group]?.size ?? 10000; } if (!age && age !== 0) { - age = config.api && config.api.cache && config.api.cache[group] && config.api.cache[group].age !== undefined ? config.api.cache[group].age : Number.MAX_SAFE_INTEGER; + age = config.api?.cache?.[group]?.age ?? Number.MAX_SAFE_INTEGER; } - this.data.write(group, new DataStore(size, age, k => { - this.ipc.send(0, {o: OP.PURGE, g: group, k}); - }, Cls)); - - this.ipc.send(0, {o: OP.INIT, g: group, d: {size, age, Cls}}); + this.data.write(group, new DataStore(size, age, null, Cls)); - init().then(arr => { + try { + const arr = await init(); (arr || []).forEach(([k, d]) => { this.data.read(group).write(k, d); - const msg = {o: OP.READ, g: group, k, d: d && (d instanceof Jsonable) ? d.json : d}; - for (const pid in this.ipc.workers) { - if (this.initialized[pid]) { - this.ipc.send(parseInt(pid), msg); - } - else { - this.delayed_messages.push([parseInt(pid), msg]); - } - } }); - }, log.e.bind(log, 'Error during initialization of cache group %s', group)); + } + catch (err) { + log.e('Error during initialization of cache group %s', group, err); + } } /** * Write data to the cache - * * @param {String} group group key * @param {String} id data key * @param {Object} data data to store - * @param {int} from originating pid if any * @return {Object} data if succeeded, null otherwise, throws in case of an error */ - async write(group, id, data, from = 0) { + async write(group, id, data) { if (!group || !id || (data === undefined) || typeof id !== 'string') { throw new Error('Where are my args?!'); } @@ -627,38 +221,29 @@ class CacheMaster { log.d(`writing ${group}:${id}: %j`, data); if (group in this.operators) { - return this.operators[group][data === null ? 'remove' : 'write'](id, data).then(rc => { - if (rc) { - if (data === null) { - rc = null; - } - if (rc instanceof Jsonable) { - rc = rc.json; - } - this.data.read(group)[data === null ? 'remove' : 'write'](id, rc); - this.ipc.send(-from, {o: OP.WRITE, g: group, k: id, d: rc}); - return data === null ? true : rc; - } - else { - return null; + const rc = await this.operators[group][data === null ? 'remove' : 'write'](id, data); + if (rc) { + if (data === null) { + this.data.read(group).remove(id); + return true; } - }); - } - else { + const toStore = rc instanceof Jsonable ? rc.json : rc; + this.data.read(group).write(id, toStore); + return toStore; + } return null; } + return null; } /** * Update data in the cache - * * @param {String} group group key * @param {String} id data key * @param {Object} update data to store - * @param {int} from originating pid if any * @return {Object} data if succeeded, null otherwise, throws in case of an error */ - async update(group, id, update, from = 0) { + async update(group, id, update) { if (!group || !id || !update || typeof id !== 'string') { throw new Error('Where are my args?!'); } @@ -668,15 +253,11 @@ class CacheMaster { log.d(`updating ${group}:${id} with %j`, update); if (group in this.operators) { - return this.operators[group].update(id, update).then(() => { - this.data.read(group).update(id, update); - this.ipc.send(-from, {o: OP.UPDATE, g: group, k: id, d: update}); - return update; - }); - } - else { - return null; + await this.operators[group].update(id, update); + this.data.read(group).update(id, update); + return update; } + return null; } /** @@ -684,33 +265,18 @@ class CacheMaster { * * @param {String} group group key * @param {String} id data key - * @param {int} from originating pid if any * @return {Boolean} true if removed */ - async remove(group, id, from) { + async remove(group, id) { if (!group || !id || typeof id !== 'string') { throw new Error('Where are my args?!'); } else if (!this.data.read(group)) { throw new Error('No such cache group'); } - log.d(`removing ${group}:${id}`); - if (group in this.operators) { - return this.operators[group].remove(id).then(rc => { - if (rc) { - this.data.read(group).remove(id); - this.ipc.send(-from, {o: OP.WRITE, g: group, k: id, d: null}); - return true; - } - else { - return null; - } - }); - } - else { - return null; - } + this.data.read(group).remove(id); + return true; } /** @@ -718,10 +284,9 @@ class CacheMaster { * * @param {String} group group key * @param {String} id data key - * @param {int} from originating pid if any * @return {Boolean} true if removed */ - async purge(group, id, from = 0) { + async purge(group, id) { if (!group || !id || typeof id !== 'string') { throw new Error('Where are my args?!'); } @@ -729,42 +294,33 @@ class CacheMaster { throw new Error('No such cache group'); } log.d(`purging ${group}:${id}`); - this.data.read(group).write(id, null); - this.ipc.send(-from, {o: OP.PURGE, g: group, k: id}); return true; } /** - * Remove from cache all record for given group. + * Remove from cache all records for a given group. * * @param {String} group group key - * @param {int} from originating pid if any - * @return {Boolean} true if removed */ - async purgeAll(group, from = 0) { + async purgeAll(group) { if (!group) { throw new Error('Where are my args?!'); } + else if (!this.data.read(group)) { + throw new Error('No such cache group'); + } log.d(`purging ${group}`); - - let grp = this.data.read(group); - grp.iterate(k => grp.write(k, null)); - this.ipc.send(-from, {o: OP.PURGE, g: group}); - return true; + this.data.read(group).iterate(id => this.data.read(group).write(id, null)); } /** - * Read a record from cache: - * - from local copy if exists; - * - send a read request to master otherwise. - * + * Read data from the cache * @param {String} group group key * @param {String} id data key - * @param {int} from originating pid if any - * @return {Object} data if any, null otherwise + * @return {Object} data if succeeded, null otherwise, throws in case of an error */ - async read(group, id, from = 0) { + async read(group, id) { if (!group || !id || typeof id !== 'string') { throw new Error('Where are my args?!'); } @@ -772,75 +328,52 @@ class CacheMaster { throw new Error('No such cache group'); } - let store = this.data.read(group), - rc = store.read(id); - if (rc) { - return rc; - } - else if (group in this.operators) { - return this.operators[group].read(id).then(x => { - if (x) { - this.ipc.send(-from, {o: OP.READ, g: group, k: id, d: x instanceof Jsonable ? x.json : x}); - store.write(id, x); - return x; - } - else { - return null; - } - }); + let data = this.has(group, id); + if (data) { + return data; } - else { - return null; + + if (group in this.operators) { + const rc = await this.operators[group].read(id); + if (rc) { + const toStore = rc instanceof Jsonable ? rc.json : rc; + this.data.read(group).write(id, toStore); + return toStore; + } } + return null; } /** - * Check if local copy has data under the key. - * + * Check if data exists in cache * @param {String} group group key * @param {String} id data key - * @return {Object} data if any, undefined otherwise + * @return {Object|null} data if exists, null otherwise */ has(group, id) { - if (!group) { + if (!group || !id || typeof id !== 'string') { throw new Error('Where are my args?!'); } - let store = this.data.read(group); - if (id) { - return store && store.read(id) || null; - } - else { - return store; + else if (!this.data.read(group)) { + throw new Error('No such cache group'); } + + return this.data.read(group).read(id); } /** - * Just a handy method which returns an object with partials with given group. - * + * Get class interface for a group * @param {String} group group name - * @return {Object} object with all the {@code CacheWorker} methods without group + * @return {Object} object with read/write/update methods bound to the group */ cls(group) { return { read: this.read.bind(this, group), write: this.write.bind(this, group), update: this.update.bind(this, group), - remove: this.remove.bind(this, group), - purge: this.purge.bind(this, group), - purgeAll: this.purgeAll.bind(this, group), - has: this.has.bind(this, group), - iterate: f => { - let g = this.data.read(group); - if (g) { - g.iterate(f); - } - else { - log.e('no cache group %s to iterate on', group); - } - } + remove: this.remove.bind(this, group) }; } - } /** @@ -856,4 +389,4 @@ class TestDataClass extends Jsonable { } -module.exports = {CacheMaster, CacheWorker, TestDataClass}; +module.exports = {Cache, TestDataClass}; diff --git a/api/parts/data/fetch.js b/api/parts/data/fetch.js index b6779981365..5b3eaf469e6 100644 --- a/api/parts/data/fetch.js +++ b/api/parts/data/fetch.js @@ -20,7 +20,6 @@ var fetch = {}, _ = require('underscore'), crypto = require('crypto'), usage = require('./usage.js'), - STATUS_MAP = require('../jobs/job').STATUS_MAP, plugins = require('../../../plugins/pluginManager.js'); @@ -2161,130 +2160,6 @@ function union(x, y) { return res; } -/** -* Get data for jobs listing for jobs api -* @param {string} metric - name of the collection where to get data from -* @param {params} params - params object with app_id and date -*/ -fetch.fetchJobs = async function(metric, params) { - try { - if (params.qstring.name) { - await fetch.jobDetails(metric, params); - } - else { - await fetch.alljobs(metric, params); - } - } - catch (e) { - console.log(e); - common.returnOutput(params, 500, "Fetching jobs failed"); - } -}; - -/** -* Get all jobs grouped by job name for jobs api -* @param {string} metric - name of the collection where to get data from -* @param {params} params - params object with app_id and date -*/ -fetch.alljobs = async function(metric, params) { - const columns = ["name", "schedule", "next", "finished", "status", "total"]; - let sort = {}; - let total = await common.db.collection('jobs').aggregate([ - { - $group: { _id: "$name" } - }, - { - $count: 'total' - } - ]).toArray(); - total = total.length > 0 ? total[0].total : 0; - const pipeline = [ - { - $addFields: { - sortKey: { - $cond: { - if: { $eq: ["$status", 0] }, - then: 0, - else: { - $cond: { - if: { $eq: ["$status", 7] }, - then: 1, - else: 2 - } - } - } - } - } - }, - { - $sort: { - sortKey: 1, - finished: -1 - } - }, - { - $group: { - _id: "$name", - name: { $first: "$name" }, - status: { $first: "$status" }, - schedule: { $first: "$schedule" }, - next: { $first: "$next" }, - finished: { $first: "$finished" }, - total: { $sum: 1 }, - rowId: { $first: "$_id" } - } - } - ]; - if (params.qstring.sSearch) { - var rr; - try { - rr = new RegExp(params.qstring.sSearch, "i"); - pipeline.unshift({ - $match: { name: { $regex: rr } } - }); - } - catch (e) { - console.log('Could not use as regex:' + params.qstring.sSearch); - } - } - const cursor = common.db.collection('jobs').aggregate(pipeline, { allowDiskUse: true }); - sort[columns[params.qstring.iSortCol_0 || 0]] = (params.qstring.sSortDir_0 === "asc") ? 1 : -1; - cursor.sort(sort); - cursor.skip(Number(params.qstring.iDisplayStart || 0)); - cursor.limit(Number(params.qstring.iDisplayLength || 10)); - let items = await cursor.toArray(); - items = items.map((job) => { - job.status = STATUS_MAP[job.status]; - return job; - }); - cursor.close(); - common.returnOutput(params, { sEcho: params.qstring.sEcho, iTotalRecords: total, iTotalDisplayRecords: total, aaData: items || [] }); -}; - -/** -* Get all documents for a given job name -* @param {string} metric - name of the collection where to get data from -* @param {params} params - params object with app_id and date -*/ -fetch.jobDetails = async function(metric, params) { - const columns = ["schedule", "next", "finished", "status", "data", "duration"]; - let sort = {}; - const total = await common.db.collection('jobs').count({ name: params.qstring.name }); - const cursor = common.db.collection('jobs').find({ name: params.qstring.name }); - sort[columns[params.qstring.iSortCol_0 || 0]] = (params.qstring.sSortDir_0 === "asc") ? 1 : -1; - cursor.sort(sort); - cursor.skip(Number(params.qstring.iDisplayStart || 0)); - cursor.limit(Number(params.qstring.iDisplayLength || 10)); - let items = await cursor.toArray(); - items = items.map((job) => { - job.status = STATUS_MAP[job.status]; - return job; - }); - cursor.close(); - common.returnOutput(params, { sEcho: params.qstring.sEcho, iTotalRecords: total, iTotalDisplayRecords: total, aaData: items || [] }); -}; - - /** * Fetch data for tops * @param {params} params - params object diff --git a/api/parts/jobs/scanner.js b/api/parts/jobs/scanner.js index dad13d46845..2050a0eb1c7 100644 --- a/api/parts/jobs/scanner.js +++ b/api/parts/jobs/scanner.js @@ -2,7 +2,30 @@ const log = require('../../utils/log.js')('jobs:scanner'), manager = require('../../../plugins/pluginManager.js'), - fs = require('fs'); + fs = require('fs'), + {Job, IPCJob, IPCFaçadeJob, TransientJob} = require('./job.js'); + +/** + * Validates if a job class has the required methods + * @param {Function} JobClass - The job class to validate + * @returns {boolean} - True if valid, throws error if invalid + */ +const validateJobClass = (JobClass) => { + // Check if it's a class/constructor + if (typeof JobClass !== 'function') { + throw new Error('Job must be a class constructor'); + } + + // Check if it inherits from one of the valid base classes + if (!(JobClass.prototype instanceof Job || + JobClass.prototype instanceof IPCJob || + JobClass.prototype instanceof IPCFaçadeJob || + JobClass.prototype instanceof TransientJob)) { + throw new Error('Job class must extend Job, IPCJob, IPCFaçadeJob, or TransientJob'); + } + + return true; +}; module.exports = (db, filesObj, classesObj) => { return new Promise((resolve, reject) => { @@ -52,12 +75,20 @@ module.exports = (db, filesObj, classesObj) => { (arr || []).forEach(job => { try { let name = job.category + ':' + job.name; - filesObj[name] = job.file; - classesObj[name] = require(job.file); - log.d('Found job %j at %j', name, job.file); + const JobClass = require(job.file); + if (validateJobClass(JobClass)) { + filesObj[name] = job.file; + classesObj[name] = JobClass; + log.d('Found valid job %j at %j', name, job.file); + } } catch (e) { - log.e('Error when loading job %s: %j ', job.file, e, e.stack); + if (e.message === "Job class must extend Job, IPCJob, IPCFaçadeJob, or TransientJob") { + // do nothing + } + else { + log.e('Error when loading job %s: %j ', job.file, e, e.stack); + } } }); }); diff --git a/api/parts/mgmt/tracker.js b/api/parts/mgmt/tracker.js index d878ab7188f..835bc951c35 100644 --- a/api/parts/mgmt/tracker.js +++ b/api/parts/mgmt/tracker.js @@ -13,7 +13,6 @@ var tracker = {}, countlyConfig = require("../../../frontend/express/config.js"), versionInfo = require('../../../frontend/express/version.info'), ip = require('./ip.js'), - cluster = require('cluster'), os = require('os'), fs = require('fs'), asyncjs = require('async'), @@ -107,20 +106,19 @@ tracker.enable = function() { if (countlyConfig.web.track !== "none" && countlyConfig.web.server_track !== "none") { Countly.track_errors(); } - if (cluster.isMaster) { - setTimeout(function() { - if (countlyConfig.web.track !== "none" && countlyConfig.web.server_track !== "none") { - Countly.begin_session(true); - setTimeout(function() { - collectServerStats(); - collectServerData(); - }, 20000); - } - }, 1000); - //report app start trace - if (Countly.report_app_start) { - Countly.report_app_start(); + + setTimeout(function() { + if (countlyConfig.web.track !== "none" && countlyConfig.web.server_track !== "none") { + Countly.begin_session(true); + setTimeout(function() { + collectServerStats(); + collectServerData(); + }, 20000); } + }, 1000); + //report app start trace + if (Countly.report_app_start) { + Countly.report_app_start(); } }; diff --git a/api/utils/log.js b/api/utils/log.js index 40bdc3e6a3d..205141a0570 100644 --- a/api/utils/log.js +++ b/api/utils/log.js @@ -1,68 +1,36 @@ -'use strict'; +const pino = require('pino'); + +// Optional OpenTelemetry imports +let trace; +let context; +let metrics; +let semanticConventions; +try { + trace = require('@opentelemetry/api').trace; + context = require('@opentelemetry/api').context; + metrics = require('@opentelemetry/api').metrics; + semanticConventions = require('@opentelemetry/semantic-conventions'); + // eslint-disable-next-line no-empty +} +catch (e) { + // do nothing +} /** - * Log provides a wrapper over debug or console functions with log level filtering, module filtering and ability to store log in database. - * Uses configuration require('../config.js').logging: - * { - * 'info': ['app', 'auth', 'static'], // log info and higher level for modules 'app*', 'auth*', 'static*' - * 'debug': ['api.users'], // log debug and higher (in fact everything) for modules 'api.users*' - * 'default': 'warn', // log warn and higher for all other modules - * } - * Note that log levels supported are ['debug', 'info', 'warn', 'error'] - * - * Usage is quite simple: - * var log = require('common.js').log('module[:submodule[:subsubmodule]]'); - * log.i('something happened: %s, %j', 'string', {obj: 'ect'}); - * log.e('something really bad happened: %j', new Error('Oops')); - * - * Whenever DEBUG is in process.env, log outputs all filtered messages with debug module instead of console so you could have pretty colors in console. - * In other cases only log.d is logged using debug module. - * - * To control log level at runtime, call require('common.js').log.setLevel('events', 'debug'). From now on 'events' logger will log everything. - * - * There is also a handy method for generating standard node.js callbacks which log error. Only applicable if no actions in case of error needed: - * collection.find().toArray(log.callback(function(arg1, arg2){ // all good })); - * - if error didn't happen, function is called - * - if error happened, it will be logged, but function won't be called - * - if error happened, arg1 is a first argument AFTER error, it's not an error - * @module api/utils/log + * Mapping of short level codes to full level names + * @type {Object.} */ - -var prefs = require('../config.js', 'dont-enclose').logging || {}; -prefs.default = prefs.default || "warn"; -var colors = require('colors'); -var deflt = (prefs && prefs.default) ? prefs.default : 'error'; - -for (let level in prefs) { - if (prefs[level].sort) { - prefs[level].sort(); - } -} - -var styles = { - moduleColors: { - // 'push:*api': 0 // green - '[last]': -1 - }, - colors: ['green', 'yellow', 'blue', 'magenta', 'cyan', 'white', 'gray', 'red'], - stylers: { - warn: function(args) { - for (var i = 0; i < args.length; i++) { - if (typeof args[i] === 'string') { - args[i] = colors.bgYellow.black(args[i].black); - } - } - }, - error: function(args) { - for (var i = 0; i < args.length; i++) { - if (typeof args[i] === 'string') { - args[i] = colors.bgRed.white(args[i].white); - } - } - } - } +const LEVELS = { + d: 'debug', + i: 'info', + w: 'warn', + e: 'error' }; +/** + * Mapping of log levels to acceptable log levels + * @type {Object.} + */ const ACCEPTABLE = { d: ['debug'], i: ['debug', 'info'], @@ -70,61 +38,97 @@ const ACCEPTABLE = { e: ['debug', 'info', 'warn', 'error'], }; -const NAMES = { - d: 'DEBUG', - i: 'INFO', - w: 'WARN', - e: 'ERROR' -}; + +// Initialize configuration with defaults +let prefs = require('../config.js', 'dont-enclose').logging || {}; +prefs.default = prefs.default || "warn"; +let deflt = (prefs && prefs.default) ? prefs.default : 'error'; /** - * Returns logger function for given preferences - * @param {string} level - log level - * @param {string} prefix - add prefix to message - * @param {boolean} enabled - whether function should log anything - * @param {object} outer - this for @out - * @param {function} out - output function (console or debug) - * @param {function} styler - function to apply styles - * @returns {function} logger function + * Current levels for all modules + * @type {Object.} */ -var log = function(level, prefix, enabled, outer, out, styler) { - return function() { - // console.log(level, prefix, enabled(), arguments); - if (enabled()) { - var args = Array.prototype.slice.call(arguments, 0); - var color = styles.moduleColors[prefix]; - if (color === undefined) { - color = (++styles.moduleColors['[last]']) % styles.colors.length; - styles.moduleColors[prefix] = color; - } - color = styles.colors[color]; - if (styler) { - args[0] = new Date().toISOString() + ': ' + level + '\t' + '[' + (prefix || '') + ']\t' + args[0]; - styler(args); - } - else { - args[0] = (new Date().toISOString() + ': ' + level + '\t').gray + colors[color]('[' + (prefix || '') + ']\t') + args[0]; - } - // args[0] = (new Date().toISOString() + ': ' + (prefix || '')).gray + args[0]; - // console.log('Logging %j', args); - if (typeof out === 'function') { - out.apply(outer, args); - } - else { - for (var k in out) { - out[k].apply(outer, args); - } - } - } +const levels = {}; + +// Metrics setup if OpenTelemetry is available +let logCounter; +let logDurationHistogram; + +if (metrics) { + const meter = metrics.getMeter('logger'); + logCounter = meter.createCounter('log_entries_total', { + description: 'Number of log entries by level and module', + }); + logDurationHistogram = meter.createHistogram('log_duration_seconds', { + description: 'Duration of logging operations', + }); +} + +/** + * Gets the current trace context if OpenTelemetry is available + * @returns {Object|null} Trace context object or null if unavailable + */ +function getTraceContext() { + if (!trace) { + return null; + } + + const currentSpan = trace.getSpan(context.active()); + if (!currentSpan) { + return null; + } + + const spanContext = currentSpan.spanContext(); + return { + 'trace.id': spanContext.traceId, + 'span.id': spanContext.spanId, + 'trace.flags': spanContext.traceFlags.toString(16) }; -}; +} + +/** + * Creates a logging span if OpenTelemetry is available + * @param {string} name - The module name + * @param {string} level - The log level + * @param {string} message - The log message + * @returns {Span|null} The created span or null if unavailable + */ +function createLoggingSpan(name, level, message) { + if (!trace) { + return null; + } + + const tracer = trace.getTracer('logger'); + return tracer.startSpan(`log.${level}`, { + attributes: { + [semanticConventions.SemanticAttributes.CODE_FUNCTION]: name, + [semanticConventions.SemanticAttributes.CODE_NAMESPACE]: 'logger', + 'logging.level': level, + 'logging.message': message + } + }); +} + +/** + * Records metrics for logging operations + * @param {string} name - The module name + * @param {string} level - The log level + */ +function recordMetrics(name, level) { + if (logCounter) { + logCounter.add(1, { + module: name, + level: level + }); + } +} /** * Looks for logging level in config for a particular module - * @param {string} name - module name - * @returns {string} log level + * @param {string} name - The module name + * @returns {string} The configured log level */ -var logLevel = function(name) { +const logLevel = function(name) { if (typeof prefs === 'undefined') { return 'error'; } @@ -132,20 +136,17 @@ var logLevel = function(name) { return prefs; } else { - for (var level in prefs) { + for (let level in prefs) { if (typeof prefs[level] === 'string' && name.indexOf(prefs[level]) === 0) { return level; } if (typeof prefs[level] === 'object' && prefs[level].length) { - for (var i = prefs[level].length - 1; i >= 0; i--) { - var opt = prefs[level][i]; + for (let i = prefs[level].length - 1; i >= 0; i--) { + let opt = prefs[level][i]; if (opt === name || name.indexOf(opt) === 0) { return level; } } - // for (var m in prefs[level]) { - // if (name.indexOf(prefs[level][m]) === 0) { return level; } - // } } } return deflt; @@ -153,250 +154,220 @@ var logLevel = function(name) { }; /** - * Current levels for all modules + * Creates a Pino logger instance with the appropriate configuration + * @param {string} name - The module name + * @param {string} [level] - The log level + * @returns {Logger} Configured Pino logger instance */ -var levels = { - // mongo: 'info' -}; -/** -* Sets current logging level -* @static -* @param {string} module - name of the module for logging -* @param {string} level - level of logging, possible values are: debug, info, warn, error -**/ -var setLevel = function(module, level) { - levels[module] = level; -}; -/** -* Sets default logging level for all modules, that do not have specific level set -* @static -* @param {string} level - level of logging, possible values are: debug, info, warn, error -**/ -var setDefault = function(level) { - deflt = level; -}; -/** -* Get currently set logging level for module -* @static -* @param {string} module - name of the module for logging -* @returns {string} level of logging, possible values are: debug, info, warn, error -**/ -var getLevel = function(module) { - return levels[module] || deflt; +const createLogger = (name, level) => { + return pino({ + name, + level: level || deflt, + timestamp: pino.stdTimeFunctions.isoTime, + formatters: { + level: (label) => { + return { level: label.toUpperCase() }; + }, + log: (object) => { + const traceContext = getTraceContext(); + return traceContext ? { ...object, ...traceContext } : object; + } + } + }); }; -var getEnabledWithLevel = function(acceptable, module) { - return function() { - // if (acceptable.indexOf(levels[module]) === -1) { - // console.log('Won\'t log %j because %j doesn\'t have %j (%j)', module, acceptable, levels[module], levels); - // } - return acceptable.indexOf(levels[module] || deflt) !== -1; - }; -}; /** -* Handle messages from ipc -* @static -* @param {string} msg - message received from other processes -**/ -var ipcHandler = function(msg) { - var m, l, modules, i; - - if (!msg || msg.cmd !== 'log' || !msg.config) { - return; - } - - // console.log('%d: Setting logging config to %j (was %j)', process.pid, msg.config, levels); - - if (msg.config.default) { - deflt = msg.config.default; - } - - for (m in levels) { - var found = null; - for (l in msg.config) { - modules = msg.config[l].split(',').map(function(v) { - return v.trim(); - }); - - for (i = 0; i < modules.length; i++) { - if (modules[i] === m) { - found = l; + * Creates a logging function for a specific level + * @param {Logger} logger - The Pino logger instance + * @param {string} name - The module name + * @param {string} level - The log level code (d, i, w, e) + * @returns {Function} The logging function + */ +const createLogFunction = (logger, name, level) => { + return function(...args) { + const currentLevel = levels[name] || deflt; + if (ACCEPTABLE[level].indexOf(currentLevel) !== -1) { + const startTime = performance.now(); + const message = args[0]; + + // Create span for this logging operation + const span = createLoggingSpan(name, LEVELS[level], message); + + try { + if (args.length === 1) { + logger[LEVELS[level]](args[0]); + } + else { + const msg = args.shift(); + logger[LEVELS[level]](msg, ...args); } - } - } - if (found === null) { - for (l in msg.config) { - modules = msg.config[l].split(',').map(function(v) { - return v.trim(); - }); + // Record metrics + recordMetrics(name, LEVELS[level]); - for (i = 0; i < modules.length; i++) { - if (modules[i].indexOf('*') === -1 && modules[i] === m.split(':')[0]) { - found = l; - } - else if (modules[i].indexOf('*') !== -1 && modules[i].split(':')[1] === '*' && modules[i].split(':')[0] === m.split(':')[0]) { - found = l; - } + // Record duration + if (logDurationHistogram) { + const duration = (performance.now() - startTime) / 1000; // Convert to seconds + logDurationHistogram.record(duration, { + module: name, + level: LEVELS[level] + }); } } - } - - if (found !== null) { - levels[m] = found; - } - else { - levels[m] = deflt; - } - } - - for (l in msg.config) { - if (msg.config[l] && l !== 'default') { - modules = msg.config[l].split(',').map(function(v) { - return v.trim(); - }); - prefs[l] = modules; - - for (i in modules) { - m = modules[i]; - if (!(m in levels)) { - levels[m] = l; + finally { + if (span) { + span.end(); } } } - else { - prefs[l] = []; - } - } + }; +}; - prefs.default = msg.config.default; +/** + * Sets current logging level for a module + * @param {string} module - The module name + * @param {string} level - The log level + */ +const setLevel = function(module, level) { + levels[module] = level; +}; - // console.log('%d: Set logging config to %j (now %j)', process.pid, msg.config, levels); +/** + * Sets default logging level + * @param {string} level - The log level + */ +const setDefault = function(level) { + deflt = level; }; /** - * @typedef {Object} Logger - * @property {function(): string} id - Get the logger id - * @example - * const loggerId = logger.id(); - * console.log(`Current logger ID: ${loggerId}`); - * - * @property {function(...*): void} d - Log debug level messages - * @example - * logger.d('Debug message: %s', 'Some debug info'); - * - * @property {function(...*): void} i - Log information level messages - * @example - * logger.i('Info message: User %s logged in', username); - * - * @property {function(...*): void} w - Log warning level messages - * @example - * logger.w('Warning: %d attempts failed', attempts); - * - * @property {function(...*): void} e - Log error level messages - * @example - * logger.e('Error occurred: %o', errorObject); - * - * @property {function(string, function, string, ...*): boolean} f - Log variable level messages - * @example - * logger.f('d', (log) => { - * const expensiveOperation = performExpensiveCalculation(); - * log('Debug: Expensive operation result: %j', expensiveOperation); - * }, 'i', 'Skipped expensive debug logging'); - * - * @property {function(function=): function} callback - Create a callback function for logging - * @example - * const logCallback = logger.callback((result) => { - * console.log('Operation completed with result:', result); - * }); - * someAsyncOperation(logCallback); - * - * @property {function(string, function=, function=): function} logdb - Create a callback function for logging database operations - * @example - * const dbCallback = logger.logdb('insert user', - * (result) => { console.log('User inserted:', result); }, - * (error) => { console.error('Failed to insert user:', error); } - * ); - * database.insertUser(userData, dbCallback); - * - * @property {function(string): Logger} sub - Create a sub-logger - * @example - * const subLogger = logger.sub('database'); - * subLogger.i('Connected to database'); + * Gets current logging level for a module + * @param {string} module - The module name + * @returns {string} The current log level */ +const getLevel = function(module) { + return levels[module] || deflt; +}; /** - * Creates a new logger object for the provided module - * @module api/utils/log - * @param {string} name - Name of the module - * @returns {Logger} Logger object - * @example - * const logger = require('./log.js')('myModule'); - * logger.i('MyModule initialized'); + * Creates a new logger instance + * @param {string} name - The module name + * @returns {Object} Logger instance with various methods */ module.exports = function(name) { setLevel(name, logLevel(name)); - // console.log('Got level for ' + name + ': ' + levels[name] + ' ( prefs ', prefs); + const logger = createLogger(name, levels[name]); + /** - * @type Logger - **/ + * Creates a sub-logger with the parent's name as prefix + * @param {string} subname - The sub-logger name + * @returns {Object} Sub-logger instance + */ + const createSubLogger = (subname) => { + const full = name + ':' + subname; + setLevel(full, logLevel(full)); + const subLogger = createLogger(full, levels[full]); + + return { + /** + * Returns the full identifier of this sub-logger + * @returns {string} Full logger identifier + */ + id: () => full, + + /** + * Logs a debug message + * @param {...*} args - Message and optional parameters + */ + d: createLogFunction(subLogger, full, 'd'), + + /** + * Logs an info message + * @param {...*} args - Message and optional parameters + */ + i: createLogFunction(subLogger, full, 'i'), + + /** + * Logs a warning message + * @param {...*} args - Message and optional parameters + */ + w: createLogFunction(subLogger, full, 'w'), + + /** + * Logs an error message + * @param {...*} args - Message and optional parameters + */ + e: createLogFunction(subLogger, full, 'e'), + + /** + * Conditionally executes a function based on current log level + * @param {string} l - Log level code + * @param {Function} fn - Function to execute if level is enabled + * @param {string} [fl] - Fallback log level + * @param {...*} fargs - Arguments for fallback + * @returns {boolean} True if the function was executed + */ + f: function(l, fn, fl, ...fargs) { + if (ACCEPTABLE[l].indexOf(levels[full] || deflt) !== -1) { + fn(createLogFunction(subLogger, full, l)); + return true; + } + else if (fl) { + this[fl].apply(this, fargs); + } + }, + + /** + * Creates a nested sub-logger + * @param {string} subname - The nested sub-logger name + * @returns {Object} Nested sub-logger instance + */ + sub: createSubLogger + }; + }; + return { /** - * Get logger id - * @returns {string} id of this logger - * @example - * const loggerId = logger.id(); - * console.log(`Current logger ID: ${loggerId}`); - */ + * Returns the identifier of this logger + * @returns {string} Logger identifier + */ id: () => name, + /** - * Log debug level messages - * @param {...*} var_args - string and values to format string with - * @example - * logger.d('Debug message: %s', 'Some debug info'); - */ - d: log(NAMES.d, name, getEnabledWithLevel(ACCEPTABLE.d, name), this, console.log), + * Logs a debug message + * @param {...*} args - Message and optional parameters + */ + d: createLogFunction(logger, name, 'd'), /** - * Log information level messages - * @param {...*} var_args - string and values to format string with - * @example - * logger.i('Info message: User %s logged in', username); + * Logs an info message + * @param {...*} args - Message and optional parameters */ - i: log(NAMES.i, name, getEnabledWithLevel(ACCEPTABLE.i, name), this, console.info), + i: createLogFunction(logger, name, 'i'), /** - * Log warning level messages - * @param {...*} var_args - string and values to format string with - * @example - * logger.w('Warning: %d attempts failed', attempts); + * Logs a warning message + * @param {...*} args - Message and optional parameters */ - w: log(NAMES.w, name, getEnabledWithLevel(ACCEPTABLE.w, name), this, console.warn, styles.stylers.warn), + w: createLogFunction(logger, name, 'w'), /** - * Log error level messages - * @param {...*} var_args - string and values to format string with - * @example - * logger.e('Error occurred: %o', errorObject); + * Logs an error message + * @param {...*} args - Message and optional parameters */ - e: log(NAMES.e, name, getEnabledWithLevel(ACCEPTABLE.e, name), this, console.error, styles.stylers.error), + e: createLogFunction(logger, name, 'e'), /** - * Log variable level messages (for cases when logging parameters calculation are expensive enough and shouldn't be done unless the level is enabled) - * @param {string} l - log level (d, i, w, e) - * @param {function} fn - function to call with single argument - logging function - * @param {string} fl - fallback level if l is disabled - * @param {...*} fargs - fallback level arguments - * @returns {boolean} true if f() has been called - * @example - * logger.f('d', (log) => { - * const expensiveOperation = performExpensiveCalculation(); - * log('Debug: Expensive operation result: %j', expensiveOperation); - * }, 'i', 'Skipped expensive debug logging'); + * Conditionally executes a function based on current log level + * @param {string} l - Log level code + * @param {Function} fn - Function to execute if level is enabled + * @param {string} [fl] - Fallback log level + * @param {...*} fargs - Arguments for fallback + * @returns {boolean} True if the function was executed */ f: function(l, fn, fl, ...fargs) { if (ACCEPTABLE[l].indexOf(levels[name] || deflt) !== -1) { - fn(log(NAMES[l], name, getEnabledWithLevel(ACCEPTABLE[l], name), this, l === 'e' ? console.error : l === 'w' ? console.warn : console.log, l === 'w' ? styles.stylers.warn : l === 'e' ? styles.stylers.error : undefined)); + fn(createLogFunction(logger, name, l)); return true; } else if (fl) { @@ -405,42 +376,32 @@ module.exports = function(name) { }, /** - * Logging inside callbacks - * @param {function=} next - next function to call, after callback executed - * @returns {function} function to pass as callback - * @example - * const logCallback = logger.callback((result) => { - * console.log('Operation completed with result:', result); - * }); - * someAsyncOperation(logCallback); + * Creates a callback function that logs errors + * @param {Function} [next] - Function to call on success + * @returns {Function} Callback function */ callback: function(next) { - var self = this; + const self = this; return function(err) { if (err) { self.e(err); } else if (next) { - var args = Array.prototype.slice.call(arguments, 1); + const args = Array.prototype.slice.call(arguments, 1); next.apply(this, args); } }; }, + /** - * Logging database callbacks - * @param {string} opname - name of the performed operation - * @param {function=} next - next function to call, after callback executed - * @param {function=} nextError - function to pass error to - * @returns {function} function to pass as callback - * @example - * const dbCallback = logger.logdb('insert user', - * (result) => { console.log('User inserted:', result); }, - * (error) => { console.error('Failed to insert user:', error); } - * ); - * database.insertUser(userData, dbCallback); + * Creates a database operation callback that logs results + * @param {string} opname - Operation name + * @param {Function} [next] - Function to call on success + * @param {Function} [nextError] - Function to call on error + * @returns {Function} Database callback function */ logdb: function(opname, next, nextError) { - var self = this; + const self = this; return function(err) { if (err) { self.e('Error while %j: %j', opname, err); @@ -456,99 +417,39 @@ module.exports = function(name) { } }; }, + /** - * Add one more level to the logging output while leaving loglevel the same - * @param {string} subname - sublogger name - * @returns {Logger} new logger - * @example - * const subLogger = logger.sub('database'); - * subLogger.i('Connected to database'); + * Creates a sub-logger with the current logger's name as prefix + * @param {string} subname - The sub-logger name + * @returns {Object} Sub-logger instance */ - - sub: function(subname) { - let full = name + ':' + subname, - self = this; - - setLevel(full, logLevel(full)); - - return { - /** - * Get logger id - * @returns {string} id of this logger - */ - id: () => full, - /** - * Log debug level messages - * @memberof module:api/utils/log~Logger - * @param {...*} var_args - string and values to format string with - **/ - d: log(NAMES.d, full, getEnabledWithLevel(ACCEPTABLE.d, full), this, console.log), - - /** - * Log information level messages - * @memberof module:api/utils/log~Logger - * @param {...*} var_args - string and values to format string with - **/ - i: log(NAMES.i, full, getEnabledWithLevel(ACCEPTABLE.i, full), this, console.info), - - /** - * Log warning level messages - * @memberof module:api/utils/log~Logger - * @param {...*} var_args - string and values to format string with - **/ - w: log(NAMES.w, full, getEnabledWithLevel(ACCEPTABLE.w, full), this, console.warn, styles.stylers.warn), - - /** - * Log error level messages - * @memberof module:api/utils/log~Logger - * @param {...*} var_args - string and values to format string with - **/ - e: log(NAMES.e, full, getEnabledWithLevel(ACCEPTABLE.e, full), this, console.error, styles.stylers.error), - - /** - * Log variable level messages (for cases when logging parameters calculation are expensive enough and shouldn't be done unless the level is enabled) - * @param {String} l log level (d, i, w, e) - * @param {function} fn function to call with single argument - logging function - * @param {String} fl fallback level if l is disabled - * @param {any[]} fargs fallback level arguments - * @returns {boolean} true if f() has been called - */ - f: function(l, fn, fl, ...fargs) { - if (ACCEPTABLE[l].indexOf(levels[name] || deflt) !== -1) { - fn(log(NAMES[l], full, getEnabledWithLevel(ACCEPTABLE[l], full), this, l === 'e' ? console.error : l === 'w' ? console.warn : console.log, l === 'w' ? styles.stylers.warn : l === 'e' ? styles.stylers.error : undefined)); - return true; - } - else if (fl) { - this[fl].apply(this, fargs); - } - }, - - /** - * Pass sub one level up - */ - sub: self.sub.bind(self) - }; - } + sub: createSubLogger }; - // return { - // d: log('DEBUG\t', getEnabledWithLevel(ACCEPTABLE.d, name), this, debug(name)), - // i: log('INFO\t', getEnabledWithLevel(ACCEPTABLE.i, name), this, debug(name)), - // w: log('WARN\t', getEnabledWithLevel(ACCEPTABLE.w, name), this, debug(name)), - // e: log('ERROR\t', getEnabledWithLevel(ACCEPTABLE.e, name), this, debug(name)), - // callback: function(next){ - // var self = this; - // return function(err) { - // if (err) { self.e(err); } - // else if (next) { - // var args = Array.prototype.slice.call(arguments, 1); - // next.apply(this, args); - // } - // }; - // }, - // }; }; +// Export static methods +/** + * Sets logging level for a specific module + * @param {string} module - The module name + * @param {string} level - The log level + */ module.exports.setLevel = setLevel; + +/** + * Sets default logging level for all modules without explicit configuration + * @param {string} level - The log level + */ module.exports.setDefault = setDefault; + +/** + * Gets current logging level for a module + * @param {string} module - The module name + * @returns {string} The current log level + */ module.exports.getLevel = getLevel; -module.exports.ipcHandler = ipcHandler; \ No newline at end of file + +/** + * Indicates if OpenTelemetry integration is available + * @type {boolean} + */ +module.exports.hasOpenTelemetry = Boolean(trace && metrics); \ No newline at end of file diff --git a/api/utils/requestProcessor.js b/api/utils/requestProcessor.js index eb1be1a16d2..99332620f6a 100644 --- a/api/utils/requestProcessor.js +++ b/api/utils/requestProcessor.js @@ -24,7 +24,14 @@ const validateUserForDataWriteAPI = validateUserForWrite; const validateUserForGlobalAdmin = validateGlobalAdmin; const validateUserForMgmtReadAPI = validateUser; const request = require('countly-request')(plugins.getConfig("security")); -const Handle = require('../../api/parts/jobs/index.js'); + +try { + require('../../jobServer/api'); + log.i('Job api loaded'); +} +catch (ex) { + log.e('Job api not available'); +} var loaded_configs_time = 0; @@ -2676,104 +2683,6 @@ const processRequest = (params) => { } switch (params.qstring.method) { - case 'jobs': - /** - * @api {get} /o?method=jobs Get Jobs Table Information - * @apiName GetJobsTableInfo - * @apiGroup Jobs - * - * @apiDescription Get jobs information in the jobs table - * @apiQuery {String} method which kind jobs requested, it should be 'jobs' - * - * @apiSuccess {Number} iTotalRecords Total number of jobs - * @apiSuccess {Number} iTotalDisplayRecords Total number of jobs by filtering - * @apiSuccess {Objects[]} aaData Job details - * @apiSuccess {Number} sEcho DataTable's internal counter - * - * @apiSuccessExample {json} Success-Response: - * HTTP/1.1 200 OK - * { - * "sEcho": "0", - * "iTotalRecords": 14, - * "iTotalDisplayRecords": 14, - * "aaData": [{ - * "_id": "server-stats:stats", - * "name": "server-stats:stats", - * "status": "SCHEDULED", - * "schedule": "every 1 day", - * "next": 1650326400000, - * "finished": 1650240007917, - * "total": 1 - * }] - * } - */ - - /** - * @api {get} /o?method=jobs/name Get Job Details Table Information - * @apiName GetJobDetailsTableInfo - * @apiGroup Jobs - * - * @apiDescription Get the information of the filtered job in the table - * @apiQuery {String} method Which kind jobs requested, it should be 'jobs' - * @apiQuery {String} name The job name is required to redirect to the selected job - * - * @apiSuccess {Number} iTotalRecords Total number of jobs - * @apiSuccess {Number} iTotalDisplayRecords Total number of jobs by filtering - * @apiSuccess {Objects[]} aaData Job details - * @apiSuccess {Number} sEcho DataTable's internal counter - * - * @apiSuccessExample {json} Success-Response: - * HTTP/1.1 200 OK - * { - * "sEcho": "0", - * "iTotalRecords": 1, - * "iTotalDisplayRecords": 1, - * "aaData": [{ - * "_id": "62596cd41307dc89c269b5a8", - * "name": "api:ping", - * "created": 1650027732240, - * "status": "SCHEDULED", - * "started": 1650240000865, - * "finished": 1650240000891, - * "duration": 30, - * "data": {}, - * "schedule": "every 1 day", - * "next": 1650326400000, - * "modified": 1650240000895, - * "error": null - * }] - * } - */ - - validateUserForGlobalAdmin(params, countlyApi.data.fetch.fetchJobs, 'jobs'); - break; - case 'suspend_job': { - /** - * @api {get} /o?method=suspend_job Suspend Job - * @apiName SuspendJob - * @apiGroup Jobs - * - * @apiDescription Suspend the selected job - * * - * @apiSuccessExample {json} Success-Response: - * HTTP/1.1 200 OK - * { - * "result": true, - * "message": "Job suspended successfully" - * } - * - * @apiErrorExample {json} Error-Response: - * HTTP/1.1 400 Bad Request - * { - * "result": "Updating job status failed" - * } - * - */ - validateUserForGlobalAdmin(params, async() => { - await Handle.suspendJob(params); - }); - break; - } case 'total_users': validateUserForDataReadAPI(params, 'core', countlyApi.data.fetch.fetchTotalUsersObj, params.qstring.metric || 'users'); break; diff --git a/frontend/express/public/core/jobs/javascripts/countly.views.js b/frontend/express/public/core/jobs/javascripts/countly.views.js index 1bfc8c5ad48..e80e152976f 100644 --- a/frontend/express/public/core/jobs/javascripts/countly.views.js +++ b/frontend/express/public/core/jobs/javascripts/countly.views.js @@ -1,201 +1,514 @@ -/*global countlyAuth, countlyCommon, app, countlyVue, CV, countlyGlobal, CountlyHelpers, $ */ +/*global countlyAuth, countlyCommon, app, countlyVue, CV, countlyGlobal, CountlyHelpers, $, moment */ (function() { + /** + * Helper function to map the job status to a color tag + * @param {Object} row The job row object + * @returns {string} Color code for the status + */ var getColor = function(row) { - if (row.status === "SCHEDULED") { - return "yellow"; + // row is the merged job object + // Use _originalStatus if available, otherwise fall back to status + var status = row._originalStatus || row.status; + if (status) { + status = status.toUpperCase(); // Convert to uppercase for consistent comparison } - else if (row.status === "SUSPENDED") { + + // Use _originalLastRunStatus if available, otherwise fall back to lastRunStatus + var lastRunStatus = row._originalLastRunStatus || row.lastRunStatus; + if (lastRunStatus) { + lastRunStatus = lastRunStatus.toUpperCase(); // Convert to uppercase for consistent comparison + } + + // Check if job is disabled via config.enabled + if (!row.config || !row.config.enabled) { return "gray"; } - else if (row.status === "CANCELLED") { - return "red"; + + // Backend uses "COMPLETED", "FAILED", "RUNNING", "SCHEDULED" (see getJobStatus in api.js) + // But also "success", "failed", "pending" (see getRunStatus in api.js) + switch (status) { + case "RUNNING": return "green"; + case "COMPLETED": return "green"; + case "SUCCESS": return "green"; + case "SCHEDULED": + case "PENDING": return "yellow"; + case "SUSPENDED": return "gray"; + case "CANCELLED": return "red"; } - else if (row.status === "RUNNING") { - return "green"; + + // If status doesn't match, check lastRunStatus + if (lastRunStatus === "FAILED") { + return "red"; } + + return "gray"; }; + + /** + * Helper to update row display fields (like nextRunDate, nextRunTime, lastRun) + * @param {Object} row The job row object to update + * @returns {void} + */ var updateScheduleRow = function(row) { - var index; - row.nextRunDate = countlyCommon.getDate(row.next); - row.nextRunTime = countlyCommon.getTime(row.next); - row.lastRun = countlyCommon.formatTimeAgo(row.finished); - row.scheduleLabel = row.schedule || ""; - index = row.scheduleLabel.indexOf("starting on"); - if (index > (-1)) { - row.scheduleLabel = row.schedule.substring(0, index).trim(); - row.scheduleDetail = row.schedule.substring(index).trim(); - } - if (row.schedule && row.schedule.startsWith("at")) { - index = row.schedule.indexOf("every"); - row.scheduleDetail = row.schedule.substring(0, index).trim(); - row.scheduleLabel = row.schedule.substring(index).trim(); + // Store original values for sorting + if (row.name !== undefined) { + row._sortName = String(row.name); + } + + if (row.status !== undefined) { + // Store the original status value for color determination and sorting + row._originalStatus = row.status; + row._sortStatus = String(row.status); + } + + // Add sortBy properties for date fields to ensure correct sorting + if (row.nextRunAt) { + var nextRunMoment = moment(row.nextRunAt); + row.nextRunDate = nextRunMoment.format('YYYY-MM-DD'); + row.nextRunTime = nextRunMoment.format('HH:mm:ss'); + + // Store original value for sorting + row._sortNextRunAt = new Date(row.nextRunAt).getTime(); + } + else { + row.nextRunDate = ''; + row.nextRunTime = ''; + } + + if (row.lastFinishedAt) { + var lastFinishedMoment = moment(row.lastFinishedAt); + row.lastRun = lastFinishedMoment.fromNow(); + + // Store original value for sorting + row._sortLastFinishedAt = new Date(row.lastFinishedAt).getTime(); + } + else { + row.lastRun = ''; + } + + if (row.scheduleLabel !== undefined) { + row._sortScheduleLabel = String(row.scheduleLabel); + } + + if (row.lastRunStatus !== undefined) { + // Store the original lastRunStatus value for color determination and sorting + row._originalLastRunStatus = row.lastRunStatus; + row._sortLastRunStatus = String(row.lastRunStatus); + } + + if (row.total !== undefined) { + row._sortTotal = Number(row.total); + } + + // If the row has .config.defaultConfig.schedule.value, use it as its "default schedule" + if (row.config && row.config.defaultConfig && row.config.defaultConfig.schedule) { + row.schedule = row.config.defaultConfig.schedule.value; + row.configuredSchedule = row.config.schedule; + row.scheduleOverridden = row.configuredSchedule && (row.configuredSchedule !== row.schedule); } }; + + /** + * Main view for listing jobs + */ var JobsView = countlyVue.views.create({ - template: CV.T('/core/jobs/templates/jobs.html'), + template: CV.T('/core/jobs/templates/jobs.html'), // your HTML template path data: function() { var self = this; - var tableStore = countlyVue.vuex.getLocalStore(countlyVue.vuex.ServerDataTable("jobsTable", { - columns: ['name', "schedule", "next", "finished", "status", "total"], - onRequest: function() { - self.loaded = false; - return { - type: "GET", - url: countlyCommon.API_URL + "/o", - data: { - app_id: countlyCommon.ACTIVE_APP_ID, - method: 'jobs' + + // Create a local vuex store for the server data table + var tableStore = countlyVue.vuex.getLocalStore( + countlyVue.vuex.ServerDataTable("jobsTable", { + // columns: ['name', "schedule", "next", "finished", "status", "total"], + columns: ["name", "status", "scheduleLabel", "nextRunAt", "lastFinishedAt", "lastRunStatus", "total"], + + onRequest: function() { + // Called before making the request + self.loaded = false; + return { + type: "GET", + url: countlyCommon.API_URL + "/o/jobs", // no ?name= param => list mode + data: { + app_id: countlyCommon.ACTIVE_APP_ID, + iDisplayStart: 0, + iDisplayLength: 50 + } + }; + }, + onReady: function(context, rows) { + // Called when request completes successfully + self.loaded = true; + + // rows.aaData is an array: [ { job: {...}, config: {...} }, ... ] + // We merge job + config into a single row object + var processedRows = []; + for (var i = 0; i < rows.length; i++) { + var mergedJob = rows[i].job; // from "job" + var config = rows[i].config; // from "config" + + mergedJob.enabled = config.enabled; + mergedJob.config = config; + + // Do any schedule display updates, etc. + updateScheduleRow(mergedJob); + + processedRows.push(mergedJob); } - }; - }, - onReady: function(context, rows) { - self.loaded = true; - var row; - for (var i = 0; i < rows.length; i++) { - row = rows[i]; - updateScheduleRow(row); + return processedRows; } - return rows; - } - })); + }) + ); + return { loaded: true, + saving: false, + scheduleDialogVisible: false, + selectedJobConfig: { + name: '', + schedule: '', + defaultSchedule: '', + scheduleLabel: '', + enabled: true + }, tableStore: tableStore, - remoteTableDataSource: countlyVue.vuex.getServerDataSource(tableStore, "jobsTable") + remoteTableDataSource: countlyVue.vuex.getServerDataSource(tableStore, "jobsTable"), + jobsTablePersistKey: "cly-jobs-table" }; }, computed: { + /** + * Whether the current user can enable/disable jobs + * @returns {boolean} True if user has admin rights + */ canSuspendJob: function() { return countlyGlobal.member.global_admin || countlyGlobal.admin_apps[countlyCommon.ACTIVE_APP_ID]; }, }, methods: { + formatDateTime: function(date) { + return date ? moment(date).format('D MMM, YYYY HH:mm:ss') : '-'; + }, + getStatusColor: function(details) { + // Not strictly used in the listing, but you can keep it for reference + if (!details.config.enabled) { + return 'gray'; + } + if (details.currentState.status === 'RUNNING') { + return 'green'; + } + if (details.currentState.status === 'FAILED') { + return 'red'; + } + if (details.currentState.status === 'COMPLETED') { + return 'green'; + } + if (details.currentState.status === 'PENDING') { + return 'yellow'; + } + return 'yellow'; + }, + getRunStatusColor(status) { + // For run status + // Use _originalLastRunStatus if available + var statusValue = status && status._originalLastRunStatus ? status._originalLastRunStatus : status; + + // Convert to uppercase for consistent comparison with backend values + if (typeof statusValue === 'string') { + statusValue = statusValue.toUpperCase(); + } + + // Backend uses "COMPLETED", "FAILED", "RUNNING", "SCHEDULED" (see getJobStatus in api.js) + // But also "success", "failed", "pending" (see getRunStatus in api.js) + switch (statusValue) { + case "SUCCESS": + case "COMPLETED": return 'green'; + case "RUNNING": return 'green'; + case "FAILED": return 'red'; + case "PENDING": + case "SCHEDULED": return 'yellow'; + default: return 'gray'; + } + }, refresh: function(force) { if (this.loaded || force) { this.loaded = false; this.tableStore.dispatch("fetchJobsTable"); } }, + /** + * Navigates to job details page + * @param {Object} row The job row to navigate to + * @returns {void} + */ goTo: function(row) { app.navigate("#/manage/jobs/" + row.name, true); }, getColor: getColor, + /** + * Called from the row's more options, e.g. "enable", "disable", "schedule", "runNow" + * @param {string} command The command to execute + * @param {Object} row The job row + * @returns {void} + */ handleCommand: function(command, row) { - if (row.rowId) { + if (row.name) { var self = this; - if (command === "change-job-status") { - const suspend = row.status !== "SUSPENDED" ? true : false; - var notifyType = "ok"; - $.ajax({ - type: "GET", - url: countlyCommon.API_URL + "/o", - data: { - app_id: countlyCommon.ACTIVE_APP_ID, - method: 'suspend_job', - id: row.rowId, - suspend: suspend - }, - contentType: "application/json", - success: function(res) { - if (res.result) { - self.refresh(true); - } - else { - notifyType = "error"; - } + if (command === 'schedule') { + // Show the schedule dialog + this.selectedJobConfig = { + name: row.name, + schedule: row.configuredSchedule || row.schedule, + defaultSchedule: row.schedule, + enabled: row.enabled + }; + this.scheduleDialogVisible = true; + return; + } + + // For enable, disable, runNow, etc. => /i/jobs + var data = { + app_id: countlyCommon.ACTIVE_APP_ID, + jobName: row.name, + action: command + }; + + $.ajax({ + type: "GET", // or POST if your server expects that + url: countlyCommon.API_URL + "/i/jobs", + data: data, + success: function(res) { + if (res.result === "Success") { + self.refresh(true); CountlyHelpers.notify({ - type: notifyType, - message: res.message + type: "ok", + message: CV.i18n("jobs." + command + "-success") }); - }, - error: function(err) { + } + else { CountlyHelpers.notify({ type: "error", - message: err.responseJSON.error + message: res.result }); } + }, + error: function(err) { + CountlyHelpers.notify({ + type: "error", + message: err.responseJSON?.result || "Error" + }); + } + }); + } + }, + /** + * Called when user clicks "Save" on the schedule dialog + */ + saveSchedule: function() { + var self = this; + self.saving = true; + + $.ajax({ + type: "GET", + url: countlyCommon.API_URL + "/i/jobs", + data: { + app_id: countlyCommon.ACTIVE_APP_ID, + jobName: this.selectedJobConfig.name, + action: 'updateSchedule', + schedule: this.selectedJobConfig.schedule + }, + success: function() { + self.saving = false; + self.scheduleDialogVisible = false; + self.refresh(true); + CountlyHelpers.notify({ + type: "ok", + message: CV.i18n("jobs.schedule-updated") + }); + }, + error: function(err) { + self.saving = false; + CountlyHelpers.notify({ + type: "error", + message: err.responseJSON?.result || "Error" }); } - } + }); }, } }); - var JobDetailView = countlyVue.views.create({ - template: CV.T('/core/jobs/templates/jobs-details.html'), + /** + * Detailed view for a single job + */ + var JobDetailsView = countlyVue.views.BaseView.extend({ + template: "#jobs-details-template", data: function() { - var self = this; - var tableStore = countlyVue.vuex.getLocalStore(countlyVue.vuex.ServerDataTable("jobsTable", { - columns: ['name', "schedule", "next", "finished", "status", "total"], - onRequest: function() { - self.loaded = false; - return { - type: "GET", - url: countlyCommon.API_URL + "/o", - data: { - app_id: countlyCommon.ACTIVE_APP_ID, - method: 'jobs', - name: self.job_name - } - }; - }, - onReady: function(context, rows) { - self.loaded = true; - var row; - for (var i = 0; i < rows.length; i++) { - row = rows[i]; - row.dataAsString = JSON.stringify(row.data, null, 2); - row.durationInSeconds = (row.duration / 1000) + 's'; - updateScheduleRow(row); - } - return rows; - } - })); return { - job_name: this.$route.params.job_name, - loaded: true, - tableStore: tableStore, - remoteTableDataSource: countlyVue.vuex.getServerDataSource(tableStore, "jobsTable") + job_name: this.$route.params.jobName, + jobDetails: null, + jobRuns: [], + isLoading: false, + // columns for the run history table + jobRunColumns: [ + { prop: "lastRunAt", label: CV.i18n('jobs.run-time'), sortable: true }, + { prop: "status", label: CV.i18n('jobs.status'), sortable: true }, + { prop: "duration", label: CV.i18n('jobs.duration'), sortable: true }, + { prop: "result", label: CV.i18n('jobs.result') } + ] }; }, + computed: { + /** + * Check if there are any scheduleOverride or retryOverride in the config + * @returns {boolean} True if overrides exist + */ + hasOverrides: function() { + return this.jobDetails && + (this.jobDetails.config?.scheduleOverride || + this.jobDetails.config?.retryOverride); + } + }, methods: { - refresh: function(force) { - if (this.loaded || force) { - this.loaded = false; - this.tableStore.dispatch("fetchJobsTable"); + /** + * Fetches jobDetails + normal docs from /o/jobs?name= + */ + fetchJobDetails: function() { + var self = this; + self.isLoading = true; + + CV.$.ajax({ + type: "GET", + url: countlyCommon.API_PARTS.data.r + "/jobs", + data: { + "app_id": countlyCommon.ACTIVE_APP_ID, + "name": self.job_name, + "iDisplayStart": 0, + "iDisplayLength": 50 + }, + dataType: "json", + success: function(response) { + // jobDetails => the main scheduled doc + overrides + self.jobDetails = response.jobDetails; + + // aaData => the array of normal run docs + self.jobRuns = (response.aaData || []).map(function(run) { + return { + lastRunAt: run.lastRunAt, + status: run.status, + duration: run.duration, + result: run.result, + failReason: run.failReason, + dataAsString: run.dataAsString + }; + }); + + self.isLoading = false; + }, + error: function() { + self.isLoading = false; + CountlyHelpers.notify({ + title: CV.i18n("common.error"), + message: CV.i18n("jobs.details-fetch-error"), + type: "error" + }); + } + }); + }, + formatDateTime: function(date) { + return date ? moment(date).format('D MMM, YYYY HH:mm:ss') : '-'; + }, + calculateDuration: function(run) { + // (optional) if you want dynamic calculations + if (!run.lastRunAt || !run.lastFinishedAt) { + return '-'; } + return ((new Date(run.lastFinishedAt) - new Date(run.lastRunAt)) / 1000).toFixed(2); }, - navigate: function(id) { - app.navigate("#/manage/jobs/" + id); + /** + * Map jobDetails.currentState.status to a color + * @param {Object} jobDetails The job details object + * @returns {string} Color code for the status + */ + getStatusColor: function(jobDetails) { + if (!jobDetails.config?.enabled) { + return "gray"; + } + switch (jobDetails.currentState.status) { + case "RUNNING": return "green"; + case "FAILED": return "red"; + case "COMPLETED": return "green"; + case "PENDING": return "yellow"; + default: return "yellow"; + } }, - getColor: getColor + /** + * Map each run's status to a color + * @param {Object} run The job run object + * @returns {string} Color code for the status + */ + getRunStatusColor: function(run) { + // Handle both string and object status + // Use _originalLastRunStatus if available + var status = run._originalLastRunStatus || run.status; + + // Convert to uppercase for consistent comparison with backend values + if (typeof status === 'string') { + status = status.toUpperCase(); + } + + // Backend uses "COMPLETED", "FAILED", "RUNNING", "SCHEDULED" (see getJobStatus in api.js) + // But also "success", "failed", "pending" (see getRunStatus in api.js) + switch (status) { + case "SUCCESS": + case "COMPLETED": return "green"; + case "RUNNING": return "green"; + case "FAILED": return "red"; + case "PENDING": + case "SCHEDULED": return "yellow"; + default: return "gray"; + } + } + }, + mounted: function() { + // On load, fetch data + this.fetchJobDetails(); } }); + /** + * Wrap the JobsView as a Countly Backbone view + * @returns {Object} Backbone wrapper view + */ var getMainView = function() { return new countlyVue.views.BackboneWrapper({ component: JobsView, - vuex: [] //empty array if none - }); - }; - - var getDetailedView = function() { - return new countlyVue.views.BackboneWrapper({ - component: JobDetailView, - vuex: [] //empty array if none + vuex: [] // empty array if none }); }; + /** + * Define routes for #/manage/jobs and #/manage/jobs/:jobName + */ if (countlyAuth.validateGlobalAdmin()) { app.route("/manage/jobs", "manageJobs", function() { this.renderWhenReady(getMainView()); }); - app.route("/manage/jobs/:name", "manageJobName", function(name) { - var view = getDetailedView(); - view.params = {job_name: name}; - this.renderWhenReady(view); + app.route("/manage/jobs/:jobName", 'jobs-details', function(jobName) { + var jobDetailsView = new countlyVue.views.BackboneWrapper({ + component: JobDetailsView, + templates: [ + { + namespace: "jobs", + mapping: { + "details-template": "/core/jobs/templates/jobs-details.html" + } + } + ] + }); + jobDetailsView.params = { jobName: jobName }; + this.renderWhenReady(jobDetailsView); }); } -})(); \ No newline at end of file +})(); diff --git a/frontend/express/public/core/jobs/templates/jobs-details.html b/frontend/express/public/core/jobs/templates/jobs-details.html index 4062a534513..e67a5184964 100644 --- a/frontend/express/public/core/jobs/templates/jobs-details.html +++ b/frontend/express/public/core/jobs/templates/jobs-details.html @@ -1,49 +1,241 @@ +
- + + - - -