Skip to content

Common shared WorkerPool for both Potree1 and Potree2 pointclouds #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions src/loading/binary-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Box3, BufferAttribute, BufferGeometry, Uint8BufferAttribute, Vector3 }
import { PointAttributeName, PointAttributeType } from '../point-attributes';
import { PointCloudOctreeGeometryNode } from '../point-cloud-octree-geometry-node';
import { handleEmptyBuffer, handleFailedRequest } from '../utils/utils';
import { WorkerPool } from '../utils/worker-pool';
import { WorkerPool, WorkerType } from '../utils/worker-pool';
import { Version } from '../version';
import { GetUrlFn, XhrRequest } from './types';

Expand Down Expand Up @@ -48,10 +48,7 @@ export class BinaryLoader {
xhrRequest: XhrRequest;
callbacks: Callback[];

public static readonly WORKER_POOL = new WorkerPool(
32,
require('../workers/binary-decoder.worker.js').default,
);
public static readonly WORKER_POOL = WorkerPool.getInstance();

constructor({
getUrl = s => Promise.resolve(s),
Expand Down Expand Up @@ -111,7 +108,7 @@ export class BinaryLoader {
return;
}

BinaryLoader.WORKER_POOL.getWorker().then(autoTerminatingWorker => {
BinaryLoader.WORKER_POOL.getWorker(WorkerType.BINARY_DECODER_WORKER).then(autoTerminatingWorker => {
const pointAttributes = node.pcoGeometry.pointAttributes;
const numPoints = buffer.byteLength / pointAttributes.byteSize;

Expand All @@ -122,7 +119,7 @@ export class BinaryLoader {
autoTerminatingWorker.worker.onmessage = (e: WorkerResponse) => {
if (this.disposed) {
resolve();
BinaryLoader.WORKER_POOL.releaseWorker(autoTerminatingWorker);
BinaryLoader.WORKER_POOL.releaseWorker(WorkerType.BINARY_DECODER_WORKER, autoTerminatingWorker);
return;
}

Expand All @@ -145,7 +142,7 @@ export class BinaryLoader {

this.callbacks.forEach(callback => callback(node));
resolve();
BinaryLoader.WORKER_POOL.releaseWorker(autoTerminatingWorker);
BinaryLoader.WORKER_POOL.releaseWorker(WorkerType.BINARY_DECODER_WORKER, autoTerminatingWorker);
};

const message = {
Expand Down
12 changes: 6 additions & 6 deletions src/loading2/octree-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { GetUrlFn, XhrRequest } from '../loading/types';
import { OctreeGeometry } from './octree-geometry';
import { OctreeGeometryNode } from './octree-geometry-node';
import { PointAttribute, PointAttributes, PointAttributeTypes } from './point-attributes';
import { WorkerPool, WorkerType } from './worker-pool';
import { WorkerPool, WorkerType } from '../utils/worker-pool';
import { buildUrl, extractBasePath } from './utils';

// Buffer files for DEFAULT encoding
Expand Down Expand Up @@ -95,14 +95,14 @@ export class NodeLoader {
}

const workerType = this.metadata.encoding === 'GLTF' ? WorkerType.DECODER_WORKER_GLTF : WorkerType.DECODER_WORKER;
const worker = this.workerPool.getWorker(workerType);
const autoTerminatingWorker = await this.workerPool.getWorker(workerType);

worker.onmessage = (e) => {
autoTerminatingWorker.worker.onmessage = (e) => {

const data = e.data;
const buffers = data.attributeBuffers;

this.workerPool.returnWorker(workerType, worker);
this.workerPool.releaseWorker(workerType, autoTerminatingWorker);

const geometry = new BufferGeometry();

Expand Down Expand Up @@ -168,7 +168,7 @@ export class NodeLoader {
numPoints: numPoints
};

worker.postMessage(message, [message.buffer]);
autoTerminatingWorker.worker.postMessage(message, [message.buffer]);
} catch (e) {
node.loaded = false;
node.loading = false;
Expand Down Expand Up @@ -371,7 +371,7 @@ export interface Metadata {

export class OctreeLoader {

workerPool: WorkerPool = new WorkerPool();
workerPool: WorkerPool = WorkerPool.getInstance();

basePath = '';
hierarchyPath = '';
Expand Down
52 changes: 0 additions & 52 deletions src/loading2/worker-pool.ts

This file was deleted.

7 changes: 4 additions & 3 deletions src/potree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
PERSPECTIVE_CAMERA,
} from './constants';
import { FEATURES } from './features';
import { BinaryLoader, GetUrlFn, loadPOC } from './loading';
import { GetUrlFn, loadPOC } from './loading';
import { loadOctree } from './loading2/load-octree';
import { ClipMode } from './materials';
import { PointCloudOctree } from './point-cloud-octree';
Expand All @@ -28,6 +28,7 @@ import { IPointCloudGeometryNode, IPointCloudTreeNode, IPotree, IVisibilityUpdat
import { BinaryHeap } from './utils/binary-heap';
import { Box3Helper } from './utils/box3-helper';
import { LRU } from './utils/lru';
import { WorkerPool } from './utils/worker-pool';

export class QueueItem {
constructor(
Expand Down Expand Up @@ -117,11 +118,11 @@ export class Potree implements IPotree {
}

static set maxLoaderWorkers(value: number) {
BinaryLoader.WORKER_POOL.maxWorkers = value;
WorkerPool.getInstance().maxWorkersPerPool = value;
}

static get maxLoaderWorkers(): number {
return BinaryLoader.WORKER_POOL.maxWorkers;
return WorkerPool.getInstance().maxWorkersPerPool;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously setting maxLoaderWorkers had no impact on Potree2 pointclouds, which could spawn an unlimited number of workers. Now, we can set the maximum number of workers that each pool is allowed to spawn

}

private updateVisibility(
Expand Down
102 changes: 41 additions & 61 deletions src/utils/worker-pool.ts
Original file line number Diff line number Diff line change
@@ -1,74 +1,54 @@
import { AsyncBlockingQueue } from './async-blocking-queue';
import { AutoTerminatingWorker, WorkerQueue } from './worker-queue';

export class AutoTerminatingWorker {
private timeoutId: number | undefined = undefined;
private terminated: boolean = false;
export enum WorkerType {
// Potree 1 workers
BINARY_DECODER_WORKER = 'BINARY_DECODER_WORKER',

constructor(private wrappedWorker: Worker, private maxIdle: number) {}
// Potree 2 workers
DECODER_WORKER = 'DECODER_WORKER',
DECODER_WORKER_GLTF = 'DECODER_WORKER_GLTF',
}

public get worker(): Worker {
return this.wrappedWorker;
}
export const DEFAULT_MAX_WORKERS_PER_POOL = 32;

get isTerminated(): boolean {
return this.terminated;
}
export class WorkerPool {
Copy link
Contributor Author

@Jordan-Lane Jordan-Lane Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new WorkerPool is a Singleton and has a pool that consists of multiple WorkerQueues for each worker type. We set a maximum number of workers per queue, and this allows us to manage all the different Queues in a single place

public _maxWorkersPerPool = DEFAULT_MAX_WORKERS_PER_POOL;

private static instance: WorkerPool | undefined;
private constructor() {}

private pool: { [key in WorkerType]: WorkerQueue } = {
BINARY_DECODER_WORKER: new WorkerQueue(
this._maxWorkersPerPool,
require('../workers/binary-decoder.worker.js').default,
),
DECODER_WORKER: new WorkerQueue(
this._maxWorkersPerPool,
require('../loading2/decoder.worker.js').default,
),
DECODER_WORKER_GLTF: new WorkerQueue(
this._maxWorkersPerPool,
require('../loading2/gltf-decoder.worker.js').default,
),
};

static getInstance(): WorkerPool {
if (!this.instance) {
this.instance = new WorkerPool();
}

markIdle(): void {
this.timeoutId = window.setTimeout(() => {
this.terminated = true;
this.wrappedWorker.terminate();
}, this.maxIdle);
return this.instance;
}

markInUse(): void {
if (this.timeoutId) {
window.clearTimeout(this.timeoutId);
}
set maxWorkersPerPool(count: number) {
Object.entries(this.pool).forEach(([_, pool]) => (pool.maxWorkers = count));
}
}

export class WorkerPool {
/**
* The maximum amount of idle time that can elapse before a worker from this pool is automatically terminated
*/
private static readonly POOL_MAX_IDLE = 7000;

private pool = new AsyncBlockingQueue<AutoTerminatingWorker>();
private poolSize = 0;

constructor(public maxWorkers: number, private workerType: any) {}

/**
* Returns a worker promise which is resolved when one is available.
*/
public getWorker(): Promise<AutoTerminatingWorker> {
// If the number of active workers is smaller than the maximum, return a new one.
// Otherwise, return a promise for worker from the pool.
if (this.poolSize < this.maxWorkers) {
this.poolSize++;
return Promise.resolve(
new AutoTerminatingWorker(new this.workerType(), WorkerPool.POOL_MAX_IDLE),
);
} else {
return this.pool.dequeue().then(worker => {
worker.markInUse();
// If the dequeued worker has been terminated, decrease the pool size and make a recursive call to get a new worker
if (worker.isTerminated) {
this.poolSize--;
return this.getWorker();
}
return worker;
});
}
public getWorker(workerType: WorkerType): Promise<AutoTerminatingWorker> {
return this.pool[workerType].getWorker();
}

/**
* Releases a Worker back into the pool
* @param worker
*/
public releaseWorker(worker: AutoTerminatingWorker): void {
worker.markIdle();
this.pool.enqueue(worker);
public releaseWorker(workerType: WorkerType, worker: AutoTerminatingWorker): void {
return this.pool[workerType].releaseWorker(worker);
}
}
74 changes: 74 additions & 0 deletions src/utils/worker-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { AsyncBlockingQueue } from './async-blocking-queue';

export class AutoTerminatingWorker {
private timeoutId: number | undefined = undefined;
private terminated: boolean = false;

constructor(private wrappedWorker: Worker, private maxIdle: number) {}

public get worker(): Worker {
return this.wrappedWorker;
}

get isTerminated(): boolean {
return this.terminated;
}

markIdle(): void {
this.timeoutId = window.setTimeout(() => {
this.terminated = true;
this.wrappedWorker.terminate();
}, this.maxIdle);
}

markInUse(): void {
if (this.timeoutId) {
window.clearTimeout(this.timeoutId);
}
}
}

export class WorkerQueue {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed and moved the previous Potree1 WorkerPool -> WorkerQueue. The WorkerQueue is responsible for returning a single worker type

The AsyncBlockingQueue with AutoTerminatingWorkers are awesome. By reusing this functionality as a Queue, we know that workers will be cleaned up when they are not being used

/**
* The maximum amount of idle time that can elapse before a worker from this pool is automatically terminated
*/
private static readonly QUEUE_MAX_IDLE = 7000;

private queue = new AsyncBlockingQueue<AutoTerminatingWorker>();
private queueSize = 0;

constructor(public maxWorkers: number, private workerType: any) {}

/**
* Returns a worker promise which is resolved when one is available.
*/
public getWorker(): Promise<AutoTerminatingWorker> {
// If the number of active workers is smaller than the maximum, return a new one.
// Otherwise, return a promise for worker from the pool.
if (this.queueSize < this.maxWorkers) {
this.queueSize++;
return Promise.resolve(
new AutoTerminatingWorker(new this.workerType(), WorkerQueue.QUEUE_MAX_IDLE),
);
} else {
return this.queue.dequeue().then(worker => {
worker.markInUse();
// If the dequeued worker has been terminated, decrease the pool size and make a recursive call to get a new worker
if (worker.isTerminated) {
this.queueSize--;
return this.getWorker();
}
return worker;
});
}
}

/**
* Releases a Worker back into the pool
* @param worker
*/
public releaseWorker(worker: AutoTerminatingWorker): void {
worker.markIdle();
this.queue.enqueue(worker);
}
}