Skip to content

Commit 42b62b9

Browse files
committed
Implement promises library with non-blocking methods
1 parent f727907 commit 42b62b9

File tree

2 files changed

+104
-0
lines changed

2 files changed

+104
-0
lines changed

promises/index.js

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
const path = require("path");
2+
const { Worker } = require("worker_threads");
3+
4+
let workerPromise;
5+
async function initWorker() {
6+
if (workerPromise) return workerPromise;
7+
const worker = new Worker(path.join(__dirname, "worker.js"));
8+
workerPromise = new Promise((resolve) => {
9+
function onReady() {
10+
worker.off("message", onReady);
11+
let pendingPromises = {};
12+
worker.on("message", ({ id, res, err }) => {
13+
if (err)
14+
pendingPromises[id].reject(err);
15+
else
16+
pendingPromises[id].resolve(res);
17+
delete pendingPromises[id];
18+
});
19+
const wrappedWorker = {
20+
sendMessage: ({ isClass, self, method, args }) => {
21+
const id = Math.random().toString(36).substring(2);
22+
return new Promise((resolve, reject) => {
23+
pendingPromises[id] = { resolve, reject };
24+
worker.postMessage({ id, isClass, self, method, args });
25+
});
26+
}
27+
}
28+
resolve(wrappedWorker);
29+
}
30+
worker.on("message", onReady);
31+
});
32+
33+
return workerPromise;
34+
}
35+
36+
function proxify(worker, res) {
37+
return new Proxy(res, {
38+
get(obj, method) {
39+
if (method === "then" || method === "catch" || method === "finally") return obj[method];
40+
return (...args) => worker.sendMessage({ self: obj, method, args });
41+
}
42+
});
43+
}
44+
45+
function wrapClass(className) {
46+
return async (...args) => {
47+
const worker = await initWorker();
48+
const res = await worker.sendMessage({ isClass: true, method: className, args });
49+
return proxify(worker, res);
50+
}
51+
}
52+
53+
function wrapMethod(methodName) {
54+
return async (...args) => {
55+
const worker = await initWorker();
56+
return worker.sendMessage({ method: methodName, args });
57+
}
58+
}
59+
60+
exports.Wallet = wrapClass("Wallet");
61+
exports.dropOnline = wrapMethod("dropOnline");
62+
exports.generateKeys = wrapMethod("generateKeys");
63+
exports.restoreKeys = wrapMethod("restoreKeys");

promises/worker.js

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
const { parentPort } = require('worker_threads');
2+
const wrapper = require('../wrapper');
3+
4+
const registry = {};
5+
parentPort.on('message', ({ id, isClass, self, method, args }) => {
6+
try {
7+
if (self)
8+
self = getArgValue(self);
9+
else
10+
self = wrapper;
11+
if (typeof self[method] !== 'function') {
12+
throw new Error(`${self[method]} is not a function (calling ${method})`);
13+
}
14+
15+
let res;
16+
if (isClass)
17+
res = new self[method](...args.map(getArgValue));
18+
else
19+
res = self[method](...args.map(getArgValue));
20+
try {
21+
parentPort.postMessage({ id, res });
22+
}
23+
catch (e) {
24+
if (e.name === 'DataCloneError') {
25+
registry[id] = res;
26+
parentPort.postMessage({ id, res: { _id: id } });
27+
}
28+
else {
29+
throw e;
30+
}
31+
}
32+
}
33+
catch (err) {
34+
parentPort.postMessage({ id, err });
35+
}
36+
});
37+
parentPort.postMessage(null);
38+
39+
function getArgValue(a) {
40+
return registry[a?._id] || a;
41+
}

0 commit comments

Comments
 (0)