From b42ec939ff06a521b596da8d36c352e27f71b22e Mon Sep 17 00:00:00 2001 From: tmalahie Date: Mon, 14 Oct 2024 23:22:00 +0200 Subject: [PATCH 1/3] Implement promises library with non-blocking methods --- promises/index.js | 66 ++++++++++++++++++++++++++++++++++++++++++++++ promises/worker.js | 36 +++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 promises/index.js create mode 100644 promises/worker.js diff --git a/promises/index.js b/promises/index.js new file mode 100644 index 0000000..f6dfeac --- /dev/null +++ b/promises/index.js @@ -0,0 +1,66 @@ +const path = require("path"); +const { Worker } = require("worker_threads"); + +let workerPromise; +async function initWorker() { + if (workerPromise) return workerPromise; + const worker = new Worker(path.join(__dirname, "worker.js")); + workerPromise = new Promise((resolve) => { + function onReady() { + worker.off("message", onReady); + let pendingPromises = {}; + worker.on("message", ({ id, res, err }) => { + if (err) pendingPromises[id].reject(err); + else pendingPromises[id].resolve(res); + delete pendingPromises[id]; + }); + const wrappedWorker = { + sendMessage: ({ isClass, self, method, args }) => { + const id = Math.random().toString(36).substring(2); + return new Promise((resolve, reject) => { + pendingPromises[id] = { resolve, reject }; + worker.postMessage({ id, isClass, self, method, args }); + }); + }, + }; + resolve(wrappedWorker); + } + worker.on("message", onReady); + }); + + return workerPromise; +} + +function proxify(worker, res) { + return new Proxy(res, { + get(obj, method) { + if (method === "then" || method === "catch" || method === "finally") + return obj[method]; + return (...args) => worker.sendMessage({ self: obj, method, args }); + }, + }); +} + +function wrapClass(className) { + return async (...args) => { + const worker = await initWorker(); + const res = await worker.sendMessage({ + isClass: true, + method: className, + args, + }); + return proxify(worker, res); + }; +} + +function wrapMethod(methodName) { + return async (...args) => { + const worker = await initWorker(); + return worker.sendMessage({ method: methodName, args }); + }; +} + +exports.Wallet = wrapClass("Wallet"); +exports.dropOnline = wrapMethod("dropOnline"); +exports.generateKeys = wrapMethod("generateKeys"); +exports.restoreKeys = wrapMethod("restoreKeys"); diff --git a/promises/worker.js b/promises/worker.js new file mode 100644 index 0000000..db5378f --- /dev/null +++ b/promises/worker.js @@ -0,0 +1,36 @@ +const { parentPort } = require("worker_threads"); +const wrapper = require("../wrapper"); + +const registry = {}; +parentPort.on("message", ({ id, isClass, self, method, args }) => { + try { + if (self) self = getArgValue(self); + else self = wrapper; + if (typeof self[method] !== "function") { + throw new Error( + `${self[method]} is not a function (calling ${method})`, + ); + } + + let res; + if (isClass) res = new self[method](...args.map(getArgValue)); + else res = self[method](...args.map(getArgValue)); + try { + parentPort.postMessage({ id, res }); + } catch (e) { + if (e.name === "DataCloneError") { + registry[id] = res; + parentPort.postMessage({ id, res: { _id: id } }); + } else { + throw e; + } + } + } catch (err) { + parentPort.postMessage({ id, err }); + } +}); +parentPort.postMessage(null); + +function getArgValue(a) { + return registry[a?._id] || a; +} From f124f4d80150389297afcd2fc165c4db9d08bb3d Mon Sep 17 00:00:00 2001 From: tmalahie Date: Sun, 16 Feb 2025 19:54:19 +0100 Subject: [PATCH 2/3] Fix non-clonable objects not handled properly since node 22 --- promises/worker.js | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/promises/worker.js b/promises/worker.js index db5378f..75b82d8 100644 --- a/promises/worker.js +++ b/promises/worker.js @@ -15,15 +15,10 @@ parentPort.on("message", ({ id, isClass, self, method, args }) => { let res; if (isClass) res = new self[method](...args.map(getArgValue)); else res = self[method](...args.map(getArgValue)); - try { - parentPort.postMessage({ id, res }); - } catch (e) { - if (e.name === "DataCloneError") { - registry[id] = res; - parentPort.postMessage({ id, res: { _id: id } }); - } else { - throw e; - } + if (isTransferable(res)) parentPort.postMessage({ id, res }); + else { + registry[id] = res; + parentPort.postMessage({ id, res: { _id: id } }); } } catch (err) { parentPort.postMessage({ id, err }); @@ -34,3 +29,11 @@ parentPort.postMessage(null); function getArgValue(a) { return registry[a?._id] || a; } +function isTransferable(obj) { + return !( + obj != null && + typeof obj === "object" && + obj.constructor !== Object && + obj.constructor !== Array + ); +} From b271c9faeec382865c7af2af963186f452fbdfdc Mon Sep 17 00:00:00 2001 From: tmalahie Date: Sun, 16 Feb 2025 19:57:24 +0100 Subject: [PATCH 3/3] Fix process not exiting - add method to clear resources --- promises/index.js | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/promises/index.js b/promises/index.js index f6dfeac..84330fa 100644 --- a/promises/index.js +++ b/promises/index.js @@ -22,6 +22,7 @@ async function initWorker() { worker.postMessage({ id, isClass, self, method, args }); }); }, + terminate: () => worker.terminate(), }; resolve(wrappedWorker); } @@ -31,6 +32,14 @@ async function initWorker() { return workerPromise; } +async function clearResources() { + if (workerPromise) { + const worker = await workerPromise; + workerPromise = null; + await worker.terminate(); + } +} + function proxify(worker, res) { return new Proxy(res, { get(obj, method) { @@ -64,3 +73,4 @@ exports.Wallet = wrapClass("Wallet"); exports.dropOnline = wrapMethod("dropOnline"); exports.generateKeys = wrapMethod("generateKeys"); exports.restoreKeys = wrapMethod("restoreKeys"); +exports.clearResources = clearResources;