Skip to content

Commit

Permalink
Add workerfarm ID to enable separate capacity control for large messa…
Browse files Browse the repository at this point in the history
…ges (aurora-opensource#637)

This adds a workerId field to parseXVIZMessage() that allows for separate worker pools.

Arbitrarily dropping an XVIZ Message in the default pool can
result in loosing a stream that contains the most recent data. However, in a situation where
you know what streams a message contains you can apply a custom settings to
the worker capacity.
  • Loading branch information
twojtasz authored Apr 20, 2021
1 parent 2ce7949 commit 800c5c9
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 26 deletions.
40 changes: 25 additions & 15 deletions modules/parser/src/parsers/parse-xviz-message-workerfarm.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,29 @@ import {WorkerFarm} from '../utils/worker-utils';
import {getXVIZConfig, subscribeXVIZConfigChange} from '../config/xviz-config';
import streamDataWorker from '../../dist/workers/stream-data.worker.js';

let workerFarm = null;
const workerFarm = {};

export function getWorkerFarm() {
return workerFarm;
export function getWorkerFarm(id = 'default') {
return workerFarm[id];
}

// Mainly for testing
export function destroyWorkerFarm() {
if (workerFarm) {
workerFarm.destroy();
workerFarm = null;
}
Object.keys(workerFarm).forEach(id => {
if (workerFarm[id]) {
workerFarm[id].destroy();
delete workerFarm[id];
}
});
}

export function initializeWorkerFarm({worker, maxConcurrency = 4, capacity = null}) {
if (!workerFarm) {
export function initializeWorkerFarm({
worker,
maxConcurrency = 4,
capacity = null,
id = 'default'
}) {
if (!workerFarm[id]) {
const xvizConfig = {...getXVIZConfig()};
delete xvizConfig.preProcessPrimitive;
let workerURL;
Expand All @@ -46,7 +53,8 @@ export function initializeWorkerFarm({worker, maxConcurrency = 4, capacity = nul
workerURL = URL.createObjectURL(blob);
}

workerFarm = new WorkerFarm({
workerFarm[id] = new WorkerFarm({
id,
workerURL,
maxConcurrency,
capacity,
Expand All @@ -56,12 +64,14 @@ export function initializeWorkerFarm({worker, maxConcurrency = 4, capacity = nul
}

export function updateWorkerXVIZVersion() {
if (workerFarm) {
const xvizConfig = {...getXVIZConfig()};
delete xvizConfig.preProcessPrimitive;
Object.keys(workerFarm).forEach(id => {
if (workerFarm[id]) {
const xvizConfig = {...getXVIZConfig()};
delete xvizConfig.preProcessPrimitive;

workerFarm.broadcast({xvizConfig});
}
workerFarm[id].broadcast({xvizConfig});
}
});
}

// Subscribe to XVIZConfig changes so we can
Expand Down
12 changes: 7 additions & 5 deletions modules/parser/src/parsers/parse-xviz-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import {postDeserialize} from './serialize';
import {getWorkerFarm, initializeWorkerFarm} from './parse-xviz-message-workerfarm';

// Public function for initializing workers
export function initializeWorkers({worker, maxConcurrency = 4, capacity = null}) {
initializeWorkerFarm({worker, maxConcurrency, capacity});
export function initializeWorkers({id, worker, maxConcurrency = 4, capacity = null}) {
initializeWorkerFarm({id, worker, maxConcurrency, capacity});
}

export function parseXVIZMessage({
Expand All @@ -29,16 +29,18 @@ export function parseXVIZMessage({
debug,
// worker options
worker = false,
workerId = 'default',
maxConcurrency = 4,
capacity = null,
opts = {}
}) {
if (worker) {
if (!getWorkerFarm()) {
initializeWorkers({worker, maxConcurrency, capacity});
const id = workerId;
if (!getWorkerFarm(id)) {
initializeWorkers({id, worker, maxConcurrency, capacity});
}

const workerFarm = getWorkerFarm();
const workerFarm = getWorkerFarm(id);

if (debug) {
workerFarm.debug = debug;
Expand Down
6 changes: 5 additions & 1 deletion modules/parser/src/utils/worker-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ export class WorkerFarm {
maxConcurrency = 1,
debug = () => {},
initialMessage = null,
capacity = null
capacity = null,
id = 'default'
}) {
this.id = id;
this.workerURL = workerURL;
this.workers = [];
this.queue = [];
Expand Down Expand Up @@ -141,6 +143,7 @@ export class WorkerFarm {
const job = queue.shift();

this.debug({
id: this.id,
message: 'processing',
worker: worker.metadata.name,
backlog: queue.length,
Expand All @@ -153,6 +156,7 @@ export class WorkerFarm {
.catch(job.onError)
.then(() => {
this.debug({
id: this.id,
message: 'waiting',
worker: worker.metadata.name,
backlog: queue.length,
Expand Down
55 changes: 55 additions & 0 deletions test/modules/parser/parsers/parse-xviz-message.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,58 @@ tape('parseXVIZMessage#parseMetadata worker', t => {
t.end();
}
});

tape('parseXVIZMessage#parseMetadata multiple worker', t => {
if (isBrowser) {
// XVIZ Version of workers would be set to 1 by default
initializeWorkers({id: 'A', worker: true, maxConcurrency: 1});
initializeWorkers({id: 'B', worker: true, maxConcurrency: 1});

// After parsing on main thread, this will call setXVIZConfig, which
// should trigger workers to have their XVIZ version updated
parseXVIZMessage({
message: metadataMessageV2,
onResult: result => {
t.equal(result.type, 'METADATA', 'Message type detected as metadata');
t.equal(getXVIZConfig().currentMajorVersion, 2, 'XVIZ Version set to 2');
},
onError: err => t.fail(err),
debug: msg => t.comment(msg),
worker: false
});

// Verify the XVIZ v2 messages are properly parsed in different worker pools
let resultCalled = 0;
parseXVIZMessage({
message: xvizUpdateV2,
onResult: result => {
t.equal(result.type, 'TIMESLICE', 'XVIZ message properly parsed on worker');
resultCalled++;
if (resultCalled === 2) {
t.end();
}
},
onError: err => t.fail(err),
debug: msg => t.comment(JSON.stringify(msg)),
worker: true,
workerId: 'A'
});
parseXVIZMessage({
message: xvizUpdateV2,
onResult: result => {
t.equal(result.type, 'TIMESLICE', 'XVIZ message properly parsed on worker');
resultCalled++;
if (resultCalled === 2) {
t.end();
}
},
onError: err => t.fail(err),
debug: msg => t.comment(JSON.stringify(msg)),
worker: true,
workerId: 'B'
});
} else {
t.comment('-- browser only test');
t.end();
}
});
63 changes: 61 additions & 2 deletions test/modules/parser/utils/worker-utils.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function shouldRunTest(t) {
return true;
}

function runWorkerTest(t, total, workerFarmConfig, onFinished) {
function runWorkerTest(t, total, workerFarmConfig, onFinished, skipEnd) {
let processed = 0;

const workerFarm = new WorkerFarm(workerFarmConfig);
Expand All @@ -52,7 +52,9 @@ function runWorkerTest(t, total, workerFarmConfig, onFinished) {
onFinished(workerFarm);
}
workerFarm.destroy();
t.end();
if (!skipEnd) {
t.end();
}
}
};

Expand Down Expand Up @@ -128,3 +130,60 @@ test('WorkerFarm#Capped', t => {

runWorkerTest(t, CHUNKS_TOTAL, workerFarmConfig, onFinished);
});

class TestWorkerFarmer {
constructor(t, id, chunks, concurrency, capacity) {
this.t = t;
this.id = id;
this.chunks = chunks;
this.concurrency = concurrency;
this.capacity = capacity;

this.updates = [];
}

config() {
return {
id: this.id,
workerURL: testWorker,
maxConcurrency: this.concurrency,
debug: message => {
this.t.comment(`Processing with worker ${message.worker}, backlog ${message.backlog}`);
this.updates.push(message);
}
};
}
}

test('WorkerFarm#Multiple', t => {
if (!shouldRunTest(t)) {
return;
}

const farmerA = new TestWorkerFarmer(t, 'A', 6, 3, 1000);
const farmerB = new TestWorkerFarmer(t, 'B', 5, 2, 1000);

let resultCalled = 0;
const onFinished = farmer => {
return workerFarm => {
t.equals(0, workerFarm.dropped, 'No data dropped');

const messageCounts = {};
farmer.updates.forEach(u => {
messageCounts[u.message] = messageCounts[u.message] + 1 || 1;
});

t.equals(messageCounts.processing, farmer.chunks, 'worker sends processing messages');
t.ok(messageCounts.waiting >= farmer.concurrency, 'worker sends waiting messages');

resultCalled++;
if (resultCalled === 2) {
t.end();
}
};
};

const skipEnd = true;
runWorkerTest(t, farmerA.chunks, farmerA.config(), onFinished(farmerA), skipEnd);
runWorkerTest(t, farmerB.chunks, farmerB.config(), onFinished(farmerB), skipEnd);
});
6 changes: 3 additions & 3 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4232,9 +4232,9 @@ camelcase@^6.0.0:
integrity sha512-8KMDF1Vz2gzOq54ONPJS65IvTUaB1cHJ2DMM7MbPmLZljDH1qpzzLsWdiN9pHh6qvkRVDTi/07+eNGch/oLU4w==

caniuse-lite@^1.0.30001093:
version "1.0.30001094"
resolved "https://registry.yarnpkg.com/caniuse-lite/-/caniuse-lite-1.0.30001094.tgz#0b11d02e1cdc201348dbd8e3e57bd9b6ce82b175"
integrity sha512-ufHZNtMaDEuRBpTbqD93tIQnngmJ+oBknjvr0IbFympSdtFpAUFmNv4mVKbb53qltxFx0nK3iy32S9AqkLzUNA==
version "1.0.30001211"
resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001211.tgz"
integrity sha512-v3GXWKofIkN3PkSidLI5d1oqeKNsam9nQkqieoMhP87nxOY0RPDC8X2+jcv8pjV4dRozPLSoMqNii9sDViOlIg==

caseless@~0.12.0:
version "0.12.0"
Expand Down

0 comments on commit 800c5c9

Please sign in to comment.