Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add flaky tests with maxRetries=0 #105

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ If you don't provide an `options` object then the following defaults will be use

* **<code>maxConcurrentCalls</code>** allows you to control the maximum number of calls in the queue&mdash;either actively being processed or waiting for a worker to be processed. `Infinity` indicates no limit but if you have conditions that may endlessly queue jobs and you need to set a limit then provide a `>0` value and any calls that push the limit will return on their callback with a `MaxConcurrentCallsError` error (check `err.type == 'MaxConcurrentCallsError'`).

* **<code>maxCallTime</code>** *(use with caution, understand what this does before you use it!)* when `!== Infinity`, will cap a time, in milliseconds, that *any single call* can take to execute in a worker. If this time limit is exceeded by just a single call then the worker running that call will be killed and any calls running on that worker will have their callbacks returned with a `TimeoutError` (check `err.type == 'TimeoutError'`). If you are running with `maxConcurrentCallsPerWorker` value greater than `1` then **all calls currently executing** will fail and will be automatically resubmitted unless you've changed the `maxRetries` option. Use this if you have jobs that may potentially end in infinite loops that you can't programatically end with your child code. Preferably run this with a `maxConcurrentCallsPerWorker` so you don't interrupt other calls when you have a timeout. This timeout operates on a per-call basis but will interrupt a whole worker.
* **<code>maxCallTime</code>** *(use with caution, understand what this does before you use it!)* when `!== Infinity`, will cap a time, in milliseconds, that *any single call* can take to execute in a worker. If this time limit is exceeded by just a single call then the worker running that call will be killed and any calls running on that worker will have their callbacks returned with a `TimeoutError` (check `err.type == 'TimeoutError'`). If you are running with `maxConcurrentCallsPerWorker` value greater than `1` then **all calls currently executing** will fail and will be automatically resubmitted unless you've changed the `maxRetries` option. Use this if you have jobs that may potentially end in infinite loops that you can't programatically end with your child code. Preferably run this with a `maxConcurrentCallsPerWorker=1` and `maxCallsPerWorker=1` so you don't interrupt other calls when you have a timeout. This timeout operates on a per-call basis but will interrupt a whole worker.

* **<code>maxRetries</code>** allows you to control the max number of call requeues after worker termination (unexpected or timeout). By default this option is set to `Infinity` which means that each call of each terminated worker will always be auto requeued. When the number of retries exceeds `maxRetries` value, the job callback will be executed with a `ProcessTerminatedError`. Note that if you are running with finite `maxCallTime` and `maxConcurrentCallsPerWorkers` greater than `1` then any `TimeoutError` will increase the retries counter *for each* concurrent call of the terminated worker.

Expand Down
8 changes: 8 additions & 0 deletions tests/child.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ module.exports.killable = function (id, callback) {
callback(null, id, process.pid)
}

module.exports.crashable = function (id, callback) {
if (Math.random() < 0.5) {
const array = []
while(true) array.push('this array is gonna make child to run out of memory')
}
callback(null, id, process.pid)
}


module.exports.err = function (type, message, data, callback) {
if (typeof data == 'function') {
Expand Down
85 changes: 84 additions & 1 deletion tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ tape('multiple concurrent calls', function (t) {

// call a method that will die with a probability of 0.5 but expect that
// we'll get results for each of our calls anyway
tape('durability', function (t) {
tape('durability due "process.exit(-1)"', function (t) {
t.plan(3)

let child = workerFarm({ maxConcurrentWorkers: 2 }, childPath, [ 'killable' ])
Expand All @@ -363,6 +363,89 @@ tape('durability', function (t) {
}
})

// call a method that will die with a probability of 0.5 but expect that
// all of them will be called when using maxCallsPerWorker=1 & maxRetries=0
// `maxConcurrentCallsPerWorker=1` instead of `maxCallsPerWorker=1` fails this test! Why?
tape('don\'t skip jobs with flaky "process.exit(-1)"', function (t) {
t.plan(2)

let child = workerFarm({ maxConcurrentWorkers: 2, maxRetries: 0, maxCallsPerWorker: 1}, childPath, [ 'killable' ])
, errors = []
, count = 20
, i = count
, counter = 0

while (i--) {
child.killable(i, function (err, id, pid) {
counter++
if(err) errors.push(err)
if (counter == count) {
t.ok(errors.length > 2, 'got ' + (count - errors.length) + ' successes and ' + (errors.length) + ' errors, but run all of them!')
workerFarm.end(child, function () {
t.ok(true, 'workerFarm ended')
})
}
})
}
})

// call a method that will die with a probability of 0.5 but expect that
// we'll get results for each of our calls anyway
tape('durability due out-of-memory', function (t) {
t.plan(3)

let child = workerFarm({
workerOptions:{stdio: 'ignore'},
maxConcurrentWorkers: 1,
env: {NODE_OPTIONS: "--max-old-space-size=256"}, // Run out of memory faster.
}, childPath, [ 'crashable' ])
, ids = []
, pids = []
, count = 10
, i = count

while (i--) {
child.crashable(i, function (err, id, pid) {
ids.push(id)
pids.push(pid)
if (ids.length == count) {
t.ok(uniq(pids).length > 2, 'processed by many (' + uniq(pids).length + ') workers, but got there in the end!')
t.ok(uniq(ids).length == count, 'received a single result for each unique call')
workerFarm.end(child, function () {
t.ok(true, 'workerFarm ended')
})
} else if (ids.length > count)
t.fail('too many callbacks!')
})
}
})

// call a method that will crash with a probability of 0.5 but expect that
// all of them will be called when using maxCallsPerWorker=1 & maxRetries=0
// `maxConcurrentCallsPerWorker=1` instead of `maxCallsPerWorker=1` fails this test! Why?
tape('don\'t skip jobs with flaky out-of-memory', function (t) {
t.plan(2)

let child = workerFarm({ workerOptions:{stdio: 'ignore'}, maxConcurrentWorkers: 2, maxRetries: 0, maxCallsPerWorker: 1}, childPath, [ 'crashable' ])
, errors = []
, count = 20
, i = count
, counter = 0

while (i--) {
child.crashable(i, function (err, id, pid) {
counter++
if(err) errors.push(err)
if (counter == count) {
t.ok(errors.length > 2, 'got ' + (count - errors.length) + ' successes and ' + (errors.length) + ' errors, but run all of them!')
workerFarm.end(child, function () {
t.ok(true, 'workerFarm ended')
})
}
})
}
})


// a callback provided to .end() can and will be called (uses "simple, exports=function test" to create a child)
tape('simple, end callback', function (t) {
Expand Down