-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.js
146 lines (112 loc) · 4.01 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
const assert = require('assert');
const debug = require('debug')('jars:server');
const { promisify } = require('util');
const { EventEmitter } = require('events');
const { createDebugRaw } = require('./helpers');
const debugRaw = createDebugRaw('jars:server');
async function createRpcServer(conn, identifier, handler) {
assert(conn, 'conn is required');
assert.equal(typeof identifier, 'string', 'identifier must be a string');
assert.equal(typeof handler, 'function', 'handler must be a function');
const pub = conn;
const sub = conn.duplicate();
let closePromise;
let isClosing = false;
const pendingRequests = [];
const publishAsync = promisify(pub.publish).bind(pub);
const listName = `jars.rpc.${identifier}`;
const handleRequest = function handleRequest(encoded) {
debugRaw(`REQ <-- ${encoded}`);
if (closePromise) {
debug(`Received request while closing. Ignoring`);
return;
}
// TODO: Parsing must be moved into a try...catch
const { method, params, id, meta } = JSON.parse(encoded);
const { replyChannel } = meta;
assert(replyChannel, 'replyChannel is required');
const reply = async message => {
const encoded = JSON.stringify({ id, ...message });
debugRaw(`RES --> ${replyChannel}: ${encoded}`);
await publishAsync(replyChannel, encoded);
};
const ack = async () => {
const encoded = JSON.stringify({ id, status: 'ack' });
debugRaw(`ACK --> ${replyChannel}: ${id}`);
await publishAsync(replyChannel, encoded);
};
const replyWithError = async (error, code, data) => {
let errorAsString;
if (error instanceof Error) {
debug(`Unhandled error: ${error.stack}`);
errorAsString = 'Internal Server Error';
} else {
errorAsString = error.toString();
}
return await reply({
error: {
code: code || 'InternalServerError',
message: errorAsString,
...(data ? { data } : {}),
},
});
};
const replyWithResult = async result => reply({ result });
const requestPromise = (async () => {
try {
await ack();
await handler({ method, params, id, meta, reply, replyWithResult, replyWithError, error: replyWithError });
} catch (error) {
await replyWithError(error);
} finally {
pendingRequests.splice(pendingRequests.indexOf(requestPromise), 1);
debug(`Pending request count reduced to ${pendingRequests.length}`);
}
})();
pendingRequests.push(requestPromise);
debug(`Pending request count increased to ${pendingRequests.length}`);
};
const emitter = new EventEmitter();
const popNextRequest = () =>
sub.blpop(listName, 0, (error, result) => {
if (isClosing) {
debug('Will not pop another. Server is closing.');
return;
}
if (error) {
emitter.emit('error', error);
return;
}
const [, encoded] = result;
setImmediate(() => handleRequest(encoded));
popNextRequest();
});
setImmediate(popNextRequest);
debug(`Listening for RPC requests on list ${listName}`);
return Object.assign(emitter, {
close: async () => {
if (!isClosing) {
isClosing = true;
closePromise = (async () => {
debug(`Closing`);
// Stop accepting new requests
debug('Quitting subscription connection');
sub.end(false);
debug('Quit subscription connection');
// Wait for all pending requests
if (pendingRequests.length) {
debug(`Waiting for ${pendingRequests.length} requests to finish`);
await Promise.all(pendingRequests.map(request => request.catch(_ => true)));
}
// Quit publishing Redis connection
debug('Quitting publishing connection');
pub.end(false);
debug('Quit publishing connection');
})();
}
assert(closePromise, 'Race condition');
return closePromise;
},
});
}
module.exports = createRpcServer;