diff --git a/README.md b/README.md index fea8a3d..cc505e3 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ If you don't provide an `options` object then the following defaults will be use * **maxConcurrentCalls** allows you to control the maximum number of calls in the queue—either actively being processed or waiting for a worker to be processed. `Infinity` indicates no limit but if you have conditions that may endlessly queue jobs and you need to set a limit then provide a `>0` value and any calls that push the limit will return on their callback with a `MaxConcurrentCallsError` error (check `err.type == 'MaxConcurrentCallsError'`). - * **maxCallTime** *(use with caution, understand what this does before you use it!)* when `!== Infinity`, will cap a time, in milliseconds, that *any single call* can take to execute in a worker. If this time limit is exceeded by just a single call then the worker running that call will be killed and any calls running on that worker will have their callbacks returned with a `TimeoutError` (check `err.type == 'TimeoutError'`). If you are running with `maxConcurrentCallsPerWorker` value greater than `1` then **all calls currently executing** will fail and will be automatically resubmitted unless you've changed the `maxRetries` option. Use this if you have jobs that may potentially end in infinite loops that you can't programatically end with your child code. Preferably run this with a `maxConcurrentCallsPerWorker` so you don't interrupt other calls when you have a timeout. This timeout operates on a per-call basis but will interrupt a whole worker. + * **maxCallTime** *(use with caution, understand what this does before you use it!)* when `!== Infinity`, will cap a time, in milliseconds, that *any single call* can take to execute in a worker. If this time limit is exceeded by just a single call then the worker running that call will be killed and any calls running on that worker will have their callbacks returned with a `TimeoutError` (check `err.type == 'TimeoutError'`). If you are running with `maxConcurrentCallsPerWorker` value greater than `1` then **all calls currently executing** will fail and will be automatically resubmitted unless you've changed the `maxRetries` option. Use this if you have jobs that may potentially end in infinite loops that you can't programatically end with your child code. Preferably run this with a `maxConcurrentCallsPerWorker=1` and `maxCallsPerWorker=1` so you don't interrupt other calls when you have a timeout. This timeout operates on a per-call basis but will interrupt a whole worker. * **maxRetries** allows you to control the max number of call requeues after worker termination (unexpected or timeout). By default this option is set to `Infinity` which means that each call of each terminated worker will always be auto requeued. When the number of retries exceeds `maxRetries` value, the job callback will be executed with a `ProcessTerminatedError`. Note that if you are running with finite `maxCallTime` and `maxConcurrentCallsPerWorkers` greater than `1` then any `TimeoutError` will increase the retries counter *for each* concurrent call of the terminated worker. diff --git a/tests/child.js b/tests/child.js index 71cb728..6827805 100644 --- a/tests/child.js +++ b/tests/child.js @@ -32,6 +32,14 @@ module.exports.killable = function (id, callback) { callback(null, id, process.pid) } +module.exports.crashable = function (id, callback) { + if (Math.random() < 0.5) { + const array = [] + while(true) array.push('this array is gonna make child to run out of memory') + } + callback(null, id, process.pid) +} + module.exports.err = function (type, message, data, callback) { if (typeof data == 'function') { diff --git a/tests/index.js b/tests/index.js index e6be7bb..c109fee 100644 --- a/tests/index.js +++ b/tests/index.js @@ -338,7 +338,7 @@ tape('multiple concurrent calls', function (t) { // call a method that will die with a probability of 0.5 but expect that // we'll get results for each of our calls anyway -tape('durability', function (t) { +tape('durability due "process.exit(-1)"', function (t) { t.plan(3) let child = workerFarm({ maxConcurrentWorkers: 2 }, childPath, [ 'killable' ]) @@ -363,6 +363,89 @@ tape('durability', function (t) { } }) +// call a method that will die with a probability of 0.5 but expect that +// all of them will be called when using maxCallsPerWorker=1 & maxRetries=0 +// `maxConcurrentCallsPerWorker=1` instead of `maxCallsPerWorker=1` fails this test! Why? +tape('don\'t skip jobs with flaky "process.exit(-1)"', function (t) { + t.plan(2) + + let child = workerFarm({ maxConcurrentWorkers: 2, maxRetries: 0, maxCallsPerWorker: 1}, childPath, [ 'killable' ]) + , errors = [] + , count = 20 + , i = count + , counter = 0 + + while (i--) { + child.killable(i, function (err, id, pid) { + counter++ + if(err) errors.push(err) + if (counter == count) { + t.ok(errors.length > 2, 'got ' + (count - errors.length) + ' successes and ' + (errors.length) + ' errors, but run all of them!') + workerFarm.end(child, function () { + t.ok(true, 'workerFarm ended') + }) + } + }) + } +}) + +// call a method that will die with a probability of 0.5 but expect that +// we'll get results for each of our calls anyway +tape('durability due out-of-memory', function (t) { + t.plan(3) + + let child = workerFarm({ + workerOptions:{stdio: 'ignore'}, + maxConcurrentWorkers: 1, + env: {NODE_OPTIONS: "--max-old-space-size=256"}, // Run out of memory faster. + }, childPath, [ 'crashable' ]) + , ids = [] + , pids = [] + , count = 10 + , i = count + + while (i--) { + child.crashable(i, function (err, id, pid) { + ids.push(id) + pids.push(pid) + if (ids.length == count) { + t.ok(uniq(pids).length > 2, 'processed by many (' + uniq(pids).length + ') workers, but got there in the end!') + t.ok(uniq(ids).length == count, 'received a single result for each unique call') + workerFarm.end(child, function () { + t.ok(true, 'workerFarm ended') + }) + } else if (ids.length > count) + t.fail('too many callbacks!') + }) + } +}) + +// call a method that will crash with a probability of 0.5 but expect that +// all of them will be called when using maxCallsPerWorker=1 & maxRetries=0 +// `maxConcurrentCallsPerWorker=1` instead of `maxCallsPerWorker=1` fails this test! Why? +tape('don\'t skip jobs with flaky out-of-memory', function (t) { + t.plan(2) + + let child = workerFarm({ workerOptions:{stdio: 'ignore'}, maxConcurrentWorkers: 2, maxRetries: 0, maxCallsPerWorker: 1}, childPath, [ 'crashable' ]) + , errors = [] + , count = 20 + , i = count + , counter = 0 + + while (i--) { + child.crashable(i, function (err, id, pid) { + counter++ + if(err) errors.push(err) + if (counter == count) { + t.ok(errors.length > 2, 'got ' + (count - errors.length) + ' successes and ' + (errors.length) + ' errors, but run all of them!') + workerFarm.end(child, function () { + t.ok(true, 'workerFarm ended') + }) + } + }) + } +}) + // a callback provided to .end() can and will be called (uses "simple, exports=function test" to create a child) tape('simple, end callback', function (t) {