Skip to content

Commit 3a0f29f

Browse files
feat: implement the serverSideEmit functionality
This new API replaces the customRequest/customHook methods that were removed in v6. Syntax: ```js // server A io.serverSideEmit("hello", "world"); // server B io.on("hello", (arg) => { console.log(arg); // prints "world" }); ``` With acknowledgements: ```js // server A io.serverSideEmit("hello", "world", (err, responses) => { console.log(responses); // prints ["hi"] }); // server B io.on("hello", (arg, callback) => { callback("hi"); }); ``` Related: #370
1 parent c68a47c commit 3a0f29f

File tree

5 files changed

+211
-87
lines changed

5 files changed

+211
-87
lines changed

lib/index.ts

+110
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ enum RequestType {
1717
REMOTE_LEAVE = 3,
1818
REMOTE_DISCONNECT = 4,
1919
REMOTE_FETCH = 5,
20+
SERVER_SIDE_EMIT = 6,
2021
}
2122

2223
interface Request {
@@ -300,6 +301,37 @@ export class RedisAdapter extends Adapter {
300301
this.pubClient.publish(this.responseChannel, response);
301302
break;
302303

304+
case RequestType.SERVER_SIDE_EMIT:
305+
if (request.uid === this.uid) {
306+
debug("ignore same uid");
307+
return;
308+
}
309+
const withAck = request.requestId !== undefined;
310+
if (!withAck) {
311+
this.nsp._onServerSideEmit(request.data);
312+
return;
313+
}
314+
let called = false;
315+
const callback = (arg) => {
316+
// only one argument is expected
317+
if (called) {
318+
return;
319+
}
320+
called = true;
321+
debug("calling acknowledgement with %j", arg);
322+
this.pubClient.publish(
323+
this.responseChannel,
324+
JSON.stringify({
325+
type: RequestType.SERVER_SIDE_EMIT,
326+
requestId: request.requestId,
327+
data: arg,
328+
})
329+
);
330+
};
331+
request.data.push(callback);
332+
this.nsp._onServerSideEmit(request.data);
333+
break;
334+
303335
default:
304336
debug("ignoring unknown request type: %s", request.type);
305337
}
@@ -381,6 +413,23 @@ export class RedisAdapter extends Adapter {
381413
this.requests.delete(requestId);
382414
break;
383415

416+
case RequestType.SERVER_SIDE_EMIT:
417+
request.responses.push(response.data);
418+
419+
debug(
420+
"serverSideEmit: got %d responses out of %d",
421+
request.responses.length,
422+
request.numSub
423+
);
424+
if (request.responses.length === request.numSub) {
425+
clearTimeout(request.timeout);
426+
if (request.resolve) {
427+
request.resolve(null, request.responses);
428+
}
429+
this.requests.delete(requestId);
430+
}
431+
break;
432+
384433
default:
385434
debug("ignoring unknown request type: %s", request.type);
386435
}
@@ -733,6 +782,67 @@ export class RedisAdapter extends Adapter {
733782
this.pubClient.publish(this.requestChannel, request);
734783
}
735784

785+
public serverSideEmit(packet: any[]): void {
786+
const withAck = typeof packet[packet.length - 1] === "function";
787+
788+
if (withAck) {
789+
this.serverSideEmitWithAck(packet).catch(() => {
790+
// ignore errors
791+
});
792+
return;
793+
}
794+
795+
const request = JSON.stringify({
796+
uid: this.uid,
797+
type: RequestType.SERVER_SIDE_EMIT,
798+
data: packet,
799+
});
800+
801+
this.pubClient.publish(this.requestChannel, request);
802+
}
803+
804+
private async serverSideEmitWithAck(packet: any[]) {
805+
const ack = packet.pop();
806+
const numSub = (await this.getNumSub()) - 1; // ignore self
807+
808+
debug('waiting for %d responses to "serverSideEmit" request', numSub);
809+
810+
if (numSub <= 0) {
811+
return ack(null, []);
812+
}
813+
814+
const requestId = uid2(6);
815+
const request = JSON.stringify({
816+
uid: this.uid,
817+
requestId, // the presence of this attribute defines whether an acknowledgement is needed
818+
type: RequestType.SERVER_SIDE_EMIT,
819+
data: packet,
820+
});
821+
822+
const timeout = setTimeout(() => {
823+
const storedRequest = this.requests.get(requestId);
824+
if (storedRequest) {
825+
ack(
826+
new Error(
827+
`timeout reached: only ${storedRequest.responses.length} responses received out of ${storedRequest.numSub}`
828+
),
829+
storedRequest.responses
830+
);
831+
this.requests.delete(requestId);
832+
}
833+
}, this.requestsTimeout);
834+
835+
this.requests.set(requestId, {
836+
type: RequestType.SERVER_SIDE_EMIT,
837+
numSub,
838+
timeout,
839+
resolve: ack,
840+
responses: [],
841+
});
842+
843+
this.pubClient.publish(this.requestChannel, request);
844+
}
845+
736846
/**
737847
* Get the number of subscribers of the request channel
738848
*

package-lock.json

+28-83
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
"dependencies": {
2222
"debug": "~4.3.1",
2323
"notepack.io": "~2.2.0",
24-
"socket.io-adapter": "^2.2.0",
24+
"socket.io-adapter": "~2.3.0",
2525
"uid2": "0.0.3"
2626
},
2727
"devDependencies": {
@@ -35,8 +35,8 @@
3535
"nyc": "^15.1.0",
3636
"prettier": "^2.1.2",
3737
"redis": "^3.1.2",
38-
"socket.io": "^4.0.0",
39-
"socket.io-client": "^4.0.0",
38+
"socket.io": "^4.1.1",
39+
"socket.io-client": "^4.1.1",
4040
"ts-node": "^9.1.1",
4141
"typescript": "^4.0.5"
4242
},

0 commit comments

Comments
 (0)