diff --git a/.eslintignore b/.eslintignore old mode 100644 new mode 100755 diff --git a/.eslintrc.json b/.eslintrc.json old mode 100644 new mode 100755 diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml old mode 100644 new mode 100755 diff --git a/.github/ISSUE_TEMPLATE/peer-template.md b/.github/ISSUE_TEMPLATE/peer-template.md old mode 100644 new mode 100755 diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/.gitpod.yml b/.gitpod.yml old mode 100644 new mode 100755 diff --git a/.travis.yml b/.travis.yml old mode 100644 new mode 100755 diff --git a/Dockerfile b/Dockerfile old mode 100644 new mode 100755 diff --git a/LICENSE b/LICENSE old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 diff --git a/app.json b/app.json old mode 100644 new mode 100755 diff --git a/bin/peerjs b/bin/peerjs index f6a04e67a..5a5674961 100755 --- a/bin/peerjs +++ b/bin/peerjs @@ -5,7 +5,7 @@ const path = require("path"); const pkg = require("../package.json"); const fs = require("fs"); const optimistUsageLength = 98; -const yargs = require("yargs") +const yargs = require("yargs"); const version = pkg.version; const { PeerServer } = require("../dist/src"); const opts = yargs @@ -16,57 +16,71 @@ const opts = yargs demandOption: false, alias: "t", describe: "timeout (milliseconds)", - default: 5000 + default: 5000, }, concurrent_limit: { demandOption: false, alias: "c", describe: "concurrent limit", - default: 5000 + default: 5000, }, alive_timeout: { demandOption: false, describe: "broken connection check timeout (milliseconds)", - default: 60000 + default: 60000, }, key: { demandOption: false, alias: "k", describe: "connection key", - default: "peerjs" + default: "peerjs", }, sslkey: { demandOption: false, - describe: "path to SSL key" + describe: "path to SSL key", }, sslcert: { demandOption: false, - describe: "path to SSL certificate" + describe: "path to SSL certificate", }, port: { demandOption: true, alias: "p", - describe: "port" + describe: "port", }, path: { demandOption: false, describe: "custom path", - default: "/" + default: "/", }, allow_discovery: { demandOption: false, - describe: "allow discovery of peers" + describe: "allow discovery of peers", }, proxied: { demandOption: false, describe: "Set true if PeerServer stays behind a reverse proxy", - default: false - } + default: false, + }, + redis: { + demandOption: false, + describe: "Should it use redis?", + default: false, + }, + redisHost: { + demandOption: false, + describe: "Redis Host to use", + default: "", + }, + redisPort: { + demandOption: false, + describe: "Redis Port to use", + default: 0, + }, }) - .boolean("allow_discovery") - .argv; + .boolean("allow_discovery").argv; -process.on("uncaughtException", function (e) { +process.on("uncaughtException", function(e) { console.error("Error: " + e); }); @@ -74,33 +88,38 @@ if (opts.sslkey || opts.sslcert) { if (opts.sslkey && opts.sslcert) { opts.ssl = { key: fs.readFileSync(path.resolve(opts.sslkey)), - cert: fs.readFileSync(path.resolve(opts.sslcert)) + cert: fs.readFileSync(path.resolve(opts.sslcert)), }; delete opts.sslkey; delete opts.sslcert; } else { - console.error("Warning: PeerServer will not run because either " + - "the key or the certificate has not been provided."); + console.error( + "Warning: PeerServer will not run because either " + + "the key or the certificate has not been provided." + ); process.exit(1); } } const userPath = opts.path; -const server = PeerServer(opts, server => { +const server = PeerServer(opts, (server) => { const host = server.address().address; const port = server.address().port; console.log( "Started PeerServer on %s, port: %s, path: %s (v. %s)", - host, port, userPath || "/", version + host, + port, + userPath || "/", + version ); }); -server.on("connection", client => { +server.on("connection", (client) => { console.log(`Client connected: ${client.getId()}`); }); -server.on("disconnect", client => { +server.on("disconnect", (client) => { console.log(`Client disconnected: ${client.getId()}`); }); diff --git a/changelog.md b/changelog.md old mode 100644 new mode 100755 diff --git a/dist/app.json b/dist/app.json old mode 100644 new mode 100755 diff --git a/dist/src/api/index.js b/dist/src/api/index.js old mode 100644 new mode 100755 diff --git a/dist/src/api/middleware/auth/index.js b/dist/src/api/middleware/auth/index.js old mode 100644 new mode 100755 diff --git a/dist/src/api/middleware/middleware.js b/dist/src/api/middleware/middleware.js old mode 100644 new mode 100755 diff --git a/dist/src/api/v1/calls/index.js b/dist/src/api/v1/calls/index.js old mode 100644 new mode 100755 index 17ae5e300..b8c8f3c71 --- a/dist/src/api/v1/calls/index.js +++ b/dist/src/api/v1/calls/index.js @@ -4,10 +4,11 @@ var __importDefault = (this && this.__importDefault) || function (mod) { }; Object.defineProperty(exports, "__esModule", { value: true }); const express_1 = __importDefault(require("express")); -exports.default = ({ realm, messageHandler }) => { +exports.default = ({ realm, messageHandler, }) => { const app = express_1.default.Router(); const handle = (req, res, next) => { const { id } = req.params; + console.log("Got request..."); if (!id) return next(); const client = realm.getClientById(id); @@ -19,7 +20,7 @@ exports.default = ({ realm, messageHandler }) => { type, src: id, dst, - payload + payload, }; messageHandler.handle(client, message); res.sendStatus(200); diff --git a/dist/src/api/v1/public/index.js b/dist/src/api/v1/public/index.js old mode 100644 new mode 100755 diff --git a/dist/src/config/index.js b/dist/src/config/index.js old mode 100644 new mode 100755 index 6a1771720..22e07132e --- a/dist/src/config/index.js +++ b/dist/src/config/index.js @@ -12,7 +12,10 @@ const defaultConfig = { cleanup_out_msgs: 1000, ssl: { key: "", - cert: "" - } + cert: "", + }, + redis: false, + redisHost: "", + redisPort: 0, }; exports.default = defaultConfig; diff --git a/dist/src/enums.js b/dist/src/enums.js old mode 100644 new mode 100755 diff --git a/dist/src/index.js b/dist/src/index.js old mode 100644 new mode 100755 index 7be958b6a..ae8e160c0 --- a/dist/src/index.js +++ b/dist/src/index.js @@ -16,8 +16,7 @@ function ExpressPeerServer(server, options) { } app.on("mount", () => { if (!server) { - throw new Error("Server is not passed to constructor - " + - "can't start PeerServer"); + throw new Error("Server is not passed to constructor - " + "can't start PeerServer"); } instance_1.createInstance({ app, server, options: newOptions }); }); diff --git a/dist/src/instance.js b/dist/src/instance.js old mode 100644 new mode 100755 index ca142e988..ca5c5dcd9 --- a/dist/src/instance.js +++ b/dist/src/instance.js @@ -10,32 +10,36 @@ const messagesExpire_1 = require("./services/messagesExpire"); const webSocketServer_1 = require("./services/webSocketServer"); const messageHandler_1 = require("./messageHandler"); const api_1 = require("./api"); -exports.createInstance = ({ app, server, options }) => { +exports.createInstance = ({ app, server, options, }) => { const config = options; const realm = new realm_1.Realm(); const messageHandler = new messageHandler_1.MessageHandler(realm); const api = api_1.Api({ config, realm, messageHandler }); - const messagesExpire = new messagesExpire_1.MessagesExpire({ realm, config, messageHandler }); + const messagesExpire = new messagesExpire_1.MessagesExpire({ + realm, + config, + messageHandler, + }); const checkBrokenConnections = new checkBrokenConnections_1.CheckBrokenConnections({ realm, config, - onClose: client => { + onClose: (client) => { app.emit("disconnect", client); - } + }, }); app.use(options.path, api); //use mountpath for WS server - const customConfig = Object.assign(Object.assign({}, config), { path: path_1.default.posix.join(app.path(), options.path, '/') }); + const customConfig = Object.assign(Object.assign({}, config), { path: path_1.default.posix.join(app.path(), options.path, "/") }); const wss = new webSocketServer_1.WebSocketServer({ server, realm, - config: customConfig + config: customConfig, }); wss.on("connection", (client) => { const messageQueue = realm.getMessageQueueById(client.getId()); if (messageQueue) { let message; - while (message = messageQueue.readMessage()) { + while ((message = messageQueue.readMessage())) { messageHandler.handle(client, message); } realm.clearMessageQueue(client.getId()); diff --git a/dist/src/messageHandler/handler.js b/dist/src/messageHandler/handler.js old mode 100644 new mode 100755 diff --git a/dist/src/messageHandler/handlers/heartbeat/index.js b/dist/src/messageHandler/handlers/heartbeat/index.js old mode 100644 new mode 100755 diff --git a/dist/src/messageHandler/handlers/index.js b/dist/src/messageHandler/handlers/index.js old mode 100644 new mode 100755 diff --git a/dist/src/messageHandler/handlers/transmission/index.js b/dist/src/messageHandler/handlers/transmission/index.js old mode 100644 new mode 100755 index 5183eb472..d57c69543 --- a/dist/src/messageHandler/handlers/transmission/index.js +++ b/dist/src/messageHandler/handlers/transmission/index.js @@ -1,7 +1,7 @@ "use strict"; Object.defineProperty(exports, "__esModule", { value: true }); const enums_1 = require("../../../enums"); -exports.TransmissionHandler = ({ realm }) => { +exports.TransmissionHandler = ({ realm, }) => { const handle = (client, message) => { const type = message.type; const srcId = message.src; @@ -33,7 +33,7 @@ exports.TransmissionHandler = ({ realm }) => { handle(client, { type: enums_1.MessageType.LEAVE, src: dstId, - dst: srcId + dst: srcId, }); } } diff --git a/dist/src/messageHandler/handlersRegistry.js b/dist/src/messageHandler/handlersRegistry.js old mode 100644 new mode 100755 diff --git a/dist/src/messageHandler/index.js b/dist/src/messageHandler/index.js old mode 100644 new mode 100755 diff --git a/dist/src/models/client.js b/dist/src/models/client.js old mode 100644 new mode 100755 diff --git a/dist/src/models/message.js b/dist/src/models/message.js old mode 100644 new mode 100755 diff --git a/dist/src/models/messageQueue.js b/dist/src/models/messageQueue.js old mode 100644 new mode 100755 diff --git a/dist/src/models/realm.js b/dist/src/models/realm.js old mode 100644 new mode 100755 diff --git a/dist/src/services/checkBrokenConnections/index.js b/dist/src/services/checkBrokenConnections/index.js old mode 100644 new mode 100755 diff --git a/dist/src/services/messagesExpire/index.js b/dist/src/services/messagesExpire/index.js old mode 100644 new mode 100755 diff --git a/dist/src/services/webSocketServer/index.js b/dist/src/services/webSocketServer/index.js old mode 100644 new mode 100755 index fde0b0309..665a0ce4e --- a/dist/src/services/webSocketServer/index.js +++ b/dist/src/services/webSocketServer/index.js @@ -8,18 +8,39 @@ const url_1 = __importDefault(require("url")); const ws_1 = __importDefault(require("ws")); const enums_1 = require("../../enums"); const client_1 = require("../../models/client"); -const WS_PATH = 'peerjs'; +const Redis = require("ioredis"); +const WS_PATH = "peerjs"; class WebSocketServer extends events_1.default { - constructor({ server, realm, config }) { + constructor({ server, realm, config, }) { super(); this.setMaxListeners(0); this.realm = realm; this.config = config; const path = this.config.path; - this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`; + this.path = `${path}${path.endsWith("/") ? "" : "/"}${WS_PATH}`; this.socketServer = new ws_1.default.Server({ path: this.path, server }); this.socketServer.on("connection", (socket, req) => this._onSocketConnection(socket, req)); this.socketServer.on("error", (error) => this._onSocketError(error)); + if (config.redis) { + this.messagePublisher = new Redis(this.config.redisPort, this.config.redisHost); + this.messageSubscriber = new Redis(this.config.redisPort, this.config.redisHost); + this._configureRedis(); + } + } + _configureRedis() { + this.messageSubscriber.subscribe("transmission", (err) => { + if (!err) + console.log("Subscribed to Transmission messages"); + }); + this.messageSubscriber.on("message", (channel, tmessage) => { + if (channel === "transmission") { + const receivedMessage = JSON.parse(tmessage); + if (receivedMessage.dst && + this.realm.getClientById(receivedMessage.dst)) { + this.emit("message", undefined, receivedMessage); + } + } + }); } _onSocketConnection(socket, req) { const { query = {} } = url_1.default.parse(req.url, true); @@ -36,7 +57,7 @@ class WebSocketServer extends events_1.default { // ID-taken, invalid token socket.send(JSON.stringify({ type: enums_1.MessageType.ID_TAKEN, - payload: { msg: "ID is taken" } + payload: { msg: "ID is taken" }, })); return socket.close(); } @@ -48,12 +69,13 @@ class WebSocketServer extends events_1.default { // handle error this.emit("error", error); } - _registerClient({ socket, id, token }) { + _registerClient({ socket, id, token, }) { // Check concurrent limit const clientsCount = this.realm.getClientsIds().length; if (clientsCount >= this.config.concurrent_limit) { return this._sendErrorAndClose(socket, enums_1.Errors.CONNECTION_LIMIT_EXCEED); } + console.log("NEW CLIENT:::", id); const newClient = new client_1.Client({ id, token }); this.realm.setClient(newClient, id); socket.send(JSON.stringify({ type: enums_1.MessageType.OPEN })); @@ -73,6 +95,10 @@ class WebSocketServer extends events_1.default { try { const message = JSON.parse(data); message.src = client.getId(); + if (message.type !== "HEARTBEAT" && this.config.redis) { + this.messagePublisher.publish("transmission", JSON.stringify(message)); + return; + } this.emit("message", client, message); } catch (e) { @@ -84,7 +110,7 @@ class WebSocketServer extends events_1.default { _sendErrorAndClose(socket, msg) { socket.send(JSON.stringify({ type: enums_1.MessageType.ERROR, - payload: { msg } + payload: { msg }, })); socket.close(); } diff --git a/dist/src/services/webSocketServer/webSocket.js b/dist/src/services/webSocketServer/webSocket.js old mode 100644 new mode 100755 diff --git a/dist/src/start.js b/dist/src/start.js new file mode 100755 index 000000000..cdf92e55b --- /dev/null +++ b/dist/src/start.js @@ -0,0 +1,19 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const index_1 = require("./index"); +const options = { + port: 9000, + expire_timeout: 5000, + alive_timeout: 60000, + key: "peerjs", + path: "/myapp", + concurrent_limit: 5000, + allow_discovery: false, + proxied: false, + cleanup_out_msgs: 1000, + redis: true, + redisHost: "127.0.0.1", + redisPort: 6379, +}; +index_1.PeerServer(options); +console.log("Server started at port 9000"); diff --git a/dist/src/utils.js b/dist/src/utils.js new file mode 100755 index 000000000..2b1788693 --- /dev/null +++ b/dist/src/utils.js @@ -0,0 +1,15 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const chalk = require("chalk"); +const stackTrace = require("stack-trace"); +exports.clog = (message) => { + console.log(chalk.blue(message)); +}; +exports.trace = () => { + stackTrace + .get() + .filter((site) => !site.getFileName().includes("node_modules")) + .map((site) => { + console.log(chalk.red(`${site.getFileName()} --- ${site.getLineNumber()}`)); + }); +}; diff --git a/index.d.ts b/index.d.ts old mode 100644 new mode 100755 diff --git a/package-lock.json b/package-lock.json old mode 100644 new mode 100755 index 9e7ad046e..f87b09238 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,28 @@ "chalk": "^2.0.0", "esutils": "^2.0.2", "js-tokens": "^4.0.0" + }, + "dependencies": { + "chalk": { + "version": "2.4.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz", + "integrity": "sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ==", + "dev": true, + "requires": { + "ansi-styles": "^3.2.1", + "escape-string-regexp": "^1.0.5", + "supports-color": "^5.3.0" + } + }, + "supports-color": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", + "integrity": "sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==", + "dev": true, + "requires": { + "has-flag": "^3.0.0" + } + } } }, "@sinonjs/commons": { @@ -570,6 +592,26 @@ "resolved": "https://registry.npmjs.org/camelcase/-/camelcase-4.1.0.tgz", "integrity": "sha1-1UVjW+HjPFQmScaRc+Xeas+uNN0=", "dev": true + }, + "chalk": { + "version": "2.4.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz", + "integrity": "sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ==", + "dev": true, + "requires": { + "ansi-styles": "^3.2.1", + "escape-string-regexp": "^1.0.5", + "supports-color": "^5.3.0" + } + }, + "supports-color": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", + "integrity": "sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==", + "dev": true, + "requires": { + "has-flag": "^3.0.0" + } } } }, @@ -678,23 +720,47 @@ } }, "chalk": { - "version": "2.4.2", - "resolved": "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz", - "integrity": "sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ==", - "dev": true, + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.0.0.tgz", + "integrity": "sha512-N9oWFcegS0sFr9oh1oz2d7Npos6vNoWW9HvtCg5N1KRFpUhaAhvTv5Y58g880fZaEYSNm3qDz8SU1UrGvp+n7A==", "requires": { - "ansi-styles": "^3.2.1", - "escape-string-regexp": "^1.0.5", - "supports-color": "^5.3.0" + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" }, "dependencies": { + "ansi-styles": { + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.2.1.tgz", + "integrity": "sha512-9VGjrMsG1vePxcSweQsN20KY/c4zN0h9fLjqAbwbPfahM3t+NL+M9HC8xeXG2I8pX5NoamTGNuomEUFI7fcUjA==", + "requires": { + "@types/color-name": "^1.1.1", + "color-convert": "^2.0.1" + } + }, + "color-convert": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", + "integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==", + "requires": { + "color-name": "~1.1.4" + } + }, + "color-name": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", + "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==" + }, + "has-flag": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz", + "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==" + }, "supports-color": { - "version": "5.5.0", - "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", - "integrity": "sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==", - "dev": true, + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.1.0.tgz", + "integrity": "sha512-oRSIpR8pxT1Wr2FquTNnGet79b3BWljqOuoW/h4oBhxJ/HUbX5nX6JSruTkvXDCFMwDPvsaTTbvMLKZWSy0R5g==", "requires": { - "has-flag": "^3.0.0" + "has-flag": "^4.0.0" } } } @@ -844,6 +910,11 @@ } } }, + "cluster-key-slot": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.0.tgz", + "integrity": "sha512-2Nii8p3RwAPiFwsnZvukotvow2rIHM+yQ6ZcBXGHdniadkYGZYiGmkHJIbZPIV9nfv7m/U1IPMVVcAhoWFeklw==" + }, "collection-visit": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/collection-visit/-/collection-visit-1.0.0.tgz", @@ -1057,6 +1128,11 @@ } } }, + "denque": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/denque/-/denque-1.4.1.tgz", + "integrity": "sha512-OfzPuSZKGcgr96rf1oODnfjqBFmr1DVoc/TrItj3Ohe0Ah1C5WX5Baquw/9U9KovnQ88EqmJbD66rKYUQYN1tQ==" + }, "depd": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/depd/-/depd-1.1.2.tgz", @@ -1208,6 +1284,17 @@ "integrity": "sha512-1apePfXM1UOSqw0o9IiFAovVz9M5S1Dg+4TrDwfMewQ6p/rmMueb7tWZjQ1rx4Loy1ArBggoqGpfqqdI4rondg==", "dev": true }, + "chalk": { + "version": "2.4.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz", + "integrity": "sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ==", + "dev": true, + "requires": { + "ansi-styles": "^3.2.1", + "escape-string-regexp": "^1.0.5", + "supports-color": "^5.3.0" + } + }, "debug": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/debug/-/debug-4.1.1.tgz", @@ -1249,6 +1336,15 @@ "resolved": "https://registry.npmjs.org/strip-json-comments/-/strip-json-comments-3.0.1.tgz", "integrity": "sha512-VTyMAUfdm047mwKl+u79WIdrZxtFtn+nBxHeb844XBQ9uMNTuTHdx2hc5RiAJYqwTj3wc/xe5HLSdJSkJ+WfZw==", "dev": true + }, + "supports-color": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", + "integrity": "sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==", + "dev": true, + "requires": { + "has-flag": "^3.0.0" + } } } }, @@ -2593,6 +2689,37 @@ } } }, + "ioredis": { + "version": "4.16.3", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.16.3.tgz", + "integrity": "sha512-Ejvcs2yW19Vq8AipvbtfcX3Ig8XG9EAyFOvGbhI/Q1QoVOK9ZdgY092kdOyOWIYBnPHjfjMJhU9qhsnp0i0K1w==", + "requires": { + "cluster-key-slot": "^1.1.0", + "debug": "^4.1.1", + "denque": "^1.1.0", + "lodash.defaults": "^4.2.0", + "lodash.flatten": "^4.4.0", + "redis-commands": "1.5.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.0.1" + }, + "dependencies": { + "debug": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.1.1.tgz", + "integrity": "sha512-pYAIzeRo8J6KPEaJ0VWOh5Pzkbw/RetuzehGM7QRRX5he4fPHx2rdKMB256ehJCkX+XRQm16eZLqLNS8RSZXZw==", + "requires": { + "ms": "^2.1.1" + } + }, + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + } + } + }, "ipaddr.js": { "version": "1.9.0", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.0.tgz", @@ -2963,6 +3090,16 @@ "integrity": "sha512-8xOcRHvCjnocdS5cpwXQXVzmmh5e5+saE2QGoeQmbKmRS6J3VQppPOIt0MnmE+4xlZoumy0GPG0D0MVIQbNA1A==", "dev": true }, + "lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha1-0JF4cW/+pN3p5ft7N/bwgCJ0WAw=" + }, + "lodash.flatten": { + "version": "4.4.0", + "resolved": "https://registry.npmjs.org/lodash.flatten/-/lodash.flatten-4.4.0.tgz", + "integrity": "sha1-8xwiIlqWMtK7+OSt2+8kCqdlph8=" + }, "log-symbols": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/log-symbols/-/log-symbols-3.0.0.tgz", @@ -2970,6 +3107,28 @@ "dev": true, "requires": { "chalk": "^2.4.2" + }, + "dependencies": { + "chalk": { + "version": "2.4.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz", + "integrity": "sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ==", + "dev": true, + "requires": { + "ansi-styles": "^3.2.1", + "escape-string-regexp": "^1.0.5", + "supports-color": "^5.3.0" + } + }, + "supports-color": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", + "integrity": "sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==", + "dev": true, + "requires": { + "has-flag": "^3.0.0" + } + } } }, "lolex": { @@ -3556,6 +3715,28 @@ "read-pkg": "^3.0.0", "shell-quote": "^1.6.1", "string.prototype.padend": "^3.0.0" + }, + "dependencies": { + "chalk": { + "version": "2.4.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz", + "integrity": "sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ==", + "dev": true, + "requires": { + "ansi-styles": "^3.2.1", + "escape-string-regexp": "^1.0.5", + "supports-color": "^5.3.0" + } + }, + "supports-color": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", + "integrity": "sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==", + "dev": true, + "requires": { + "has-flag": "^3.0.0" + } + } } }, "npm-run-path": { @@ -4061,6 +4242,24 @@ "readable-stream": "^2.0.2" } }, + "redis-commands": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.5.0.tgz", + "integrity": "sha512-6KxamqpZ468MeQC3bkWmCB1fp56XL64D4Kf0zJSwDZbVLLm7KFkoIcHrgRvQ+sk8dnhySs7+yBg94yIkAK7aJg==" + }, + "redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha1-62LSrbFeTq9GEMBK/hUpOEJQq60=" + }, + "redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha1-tm2CjNyv5rS4pCin3vTGvKwxyLQ=", + "requires": { + "redis-errors": "^1.0.0" + } + }, "regex-not": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/regex-not/-/regex-not-1.0.2.tgz", @@ -4564,6 +4763,17 @@ "integrity": "sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw=", "dev": true }, + "stack-trace": { + "version": "0.0.10", + "resolved": "https://registry.npmjs.org/stack-trace/-/stack-trace-0.0.10.tgz", + "integrity": "sha1-VHxws0fo0ytOEI6hoqFZ5f3eGcA=", + "dev": true + }, + "standard-as-callback": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.0.1.tgz", + "integrity": "sha512-NQOxSeB8gOI5WjSaxjBgog2QFw55FV8TkS6Y07BiB3VJ8xNTvUYm0wl0s8ObgQ5NhdpnNfigMIKjgPESzgr4tg==" + }, "static-extend": { "version": "0.1.2", "resolved": "https://registry.npmjs.org/static-extend/-/static-extend-0.1.2.tgz", @@ -4995,6 +5205,28 @@ "latest-version": "^3.0.0", "semver-diff": "^2.0.0", "xdg-basedir": "^3.0.0" + }, + "dependencies": { + "chalk": { + "version": "2.4.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz", + "integrity": "sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ==", + "dev": true, + "requires": { + "ansi-styles": "^3.2.1", + "escape-string-regexp": "^1.0.5", + "supports-color": "^5.3.0" + } + }, + "supports-color": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", + "integrity": "sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==", + "dev": true, + "requires": { + "has-flag": "^3.0.0" + } + } } }, "uri-js": { diff --git a/package.json b/package.json old mode 100644 new mode 100755 index 42bd9975a..ec1d2d766 --- a/package.json +++ b/package.json @@ -34,9 +34,9 @@ "tsc": "tsc", "prebuild": "npm run lint", "test": "npm run lint && mocha -r ts-node/register \"test/**/*\"", - "start": "bin/peerjs --port ${PORT:=9000}", + "start": "bin/peerjs --port ${PORT:=9000} --path /myapp", "dev:start": "npm-run-all build start", - "dev": "nodemon --watch src -e ts --exec npm run dev:start" + "dev": "NODE_ENV=development nodemon --watch src -e ts --exec npm run dev:start" }, "release": { "branch": "master" @@ -48,6 +48,7 @@ "body-parser": "^1.19.0", "cors": "^2.8.5", "express": "^4.17.1", + "ioredis": "^4.16.3", "uuid": "^3.4.0", "ws": "^7.2.3", "yargs": "^15.3.1" diff --git a/src/api/README.md b/src/api/README.md old mode 100644 new mode 100755 diff --git a/src/api/index.ts b/src/api/index.ts old mode 100644 new mode 100755 diff --git a/src/api/middleware/auth/index.ts b/src/api/middleware/auth/index.ts old mode 100644 new mode 100755 diff --git a/src/api/middleware/middleware.ts b/src/api/middleware/middleware.ts old mode 100644 new mode 100755 diff --git a/src/api/v1/calls/index.ts b/src/api/v1/calls/index.ts old mode 100644 new mode 100755 index ea42f8c13..9e48bd07f --- a/src/api/v1/calls/index.ts +++ b/src/api/v1/calls/index.ts @@ -3,12 +3,24 @@ import { IMessageHandler } from "../../../messageHandler"; import { IMessage } from "../../../models/message"; import { IRealm } from "../../../models/realm"; -export default ({ realm, messageHandler }: { realm: IRealm, messageHandler: IMessageHandler; }): express.Router => { +export default ({ + realm, + messageHandler, +}: { + realm: IRealm; + messageHandler: IMessageHandler; +}): express.Router => { const app = express.Router(); - const handle = (req: express.Request, res: express.Response, next: express.NextFunction): any => { + const handle = ( + req: express.Request, + res: express.Response, + next: express.NextFunction + ): any => { const { id } = req.params; + console.log("Got request..."); + if (!id) return next(); const client = realm.getClientById(id); @@ -23,7 +35,7 @@ export default ({ realm, messageHandler }: { realm: IRealm, messageHandler: IMes type, src: id, dst, - payload + payload, }; messageHandler.handle(client, message); diff --git a/src/api/v1/public/index.ts b/src/api/v1/public/index.ts old mode 100644 new mode 100755 diff --git a/src/config/index.ts b/src/config/index.ts old mode 100644 new mode 100755 index 6ac406ae0..6a269aa12 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -13,6 +13,9 @@ export interface IConfig { cert: string; }; readonly generateClientId?: () => string; + readonly redis?: boolean; + readonly redisHost?: string; + readonly redisPort?: number; } const defaultConfig: IConfig = { @@ -27,8 +30,11 @@ const defaultConfig: IConfig = { cleanup_out_msgs: 1000, ssl: { key: "", - cert: "" - } + cert: "", + }, + redis: false, + redisHost: "", + redisPort: 0, }; export default defaultConfig; diff --git a/src/enums.ts b/src/enums.ts old mode 100644 new mode 100755 diff --git a/src/index.ts b/src/index.ts old mode 100644 new mode 100755 index a71882888..d8763829e --- a/src/index.ts +++ b/src/index.ts @@ -7,7 +7,7 @@ import defaultConfig, { IConfig } from "./config"; import { createInstance } from "./instance"; type Optional = { - [P in keyof T]?: (T[P] | undefined); + [P in keyof T]?: T[P] | undefined; }; function ExpressPeerServer(server: Server, options?: IConfig) { @@ -15,17 +15,21 @@ function ExpressPeerServer(server: Server, options?: IConfig) { const newOptions: IConfig = { ...defaultConfig, - ...options + ...options, }; if (newOptions.proxied) { - app.set("trust proxy", newOptions.proxied === "false" ? false : !!newOptions.proxied); + app.set( + "trust proxy", + newOptions.proxied === "false" ? false : !!newOptions.proxied + ); } app.on("mount", () => { if (!server) { - throw new Error("Server is not passed to constructor - " + - "can't start PeerServer"); + throw new Error( + "Server is not passed to constructor - " + "can't start PeerServer" + ); } createInstance({ app, server, options: newOptions }); @@ -34,12 +38,15 @@ function ExpressPeerServer(server: Server, options?: IConfig) { return app; } -function PeerServer(options: Optional = {}, callback?: (server: Server) => void) { +function PeerServer( + options: Optional = {}, + callback?: (server: Server) => void +) { const app = express(); const newOptions: IConfig = { ...defaultConfig, - ...options + ...options, }; const port = newOptions.port; @@ -62,7 +69,4 @@ function PeerServer(options: Optional = {}, callback?: (server: Server) return peerjs; } -export { - ExpressPeerServer, - PeerServer -}; +export { ExpressPeerServer, PeerServer }; diff --git a/src/instance.ts b/src/instance.ts old mode 100644 new mode 100755 index 1cd032e97..04f4933e5 --- a/src/instance.ts +++ b/src/instance.ts @@ -1,6 +1,6 @@ import express from "express"; import { Server } from "net"; -import path from 'path'; +import path from "path"; import { IClient } from "./models/client"; import { IMessage } from "./models/message"; import { Realm } from "./models/realm"; @@ -12,9 +12,13 @@ import { MessageHandler } from "./messageHandler"; import { Api } from "./api"; import { IConfig } from "./config"; -export const createInstance = ({ app, server, options }: { - app: express.Application, - server: Server, +export const createInstance = ({ + app, + server, + options, +}: { + app: express.Application; + server: Server; options: IConfig; }): void => { const config = options; @@ -22,24 +26,31 @@ export const createInstance = ({ app, server, options }: { const messageHandler = new MessageHandler(realm); const api = Api({ config, realm, messageHandler }); - const messagesExpire: IMessagesExpire = new MessagesExpire({ realm, config, messageHandler }); + const messagesExpire: IMessagesExpire = new MessagesExpire({ + realm, + config, + messageHandler, + }); const checkBrokenConnections = new CheckBrokenConnections({ realm, config, - onClose: client => { + onClose: (client) => { app.emit("disconnect", client); - } + }, }); app.use(options.path, api); //use mountpath for WS server - const customConfig = { ...config, path: path.posix.join(app.path(), options.path, '/') }; + const customConfig = { + ...config, + path: path.posix.join(app.path(), options.path, "/"), + }; const wss: IWebSocketServer = new WebSocketServer({ server, realm, - config: customConfig + config: customConfig, }); wss.on("connection", (client: IClient) => { @@ -48,7 +59,7 @@ export const createInstance = ({ app, server, options }: { if (messageQueue) { let message: IMessage | undefined; - while (message = messageQueue.readMessage()) { + while ((message = messageQueue.readMessage())) { messageHandler.handle(client, message); } realm.clearMessageQueue(client.getId()); @@ -72,4 +83,4 @@ export const createInstance = ({ app, server, options }: { messagesExpire.startMessagesExpiration(); checkBrokenConnections.start(); -}; \ No newline at end of file +}; diff --git a/src/messageHandler/handler.ts b/src/messageHandler/handler.ts old mode 100644 new mode 100755 diff --git a/src/messageHandler/handlers/heartbeat/index.ts b/src/messageHandler/handlers/heartbeat/index.ts old mode 100644 new mode 100755 diff --git a/src/messageHandler/handlers/index.ts b/src/messageHandler/handlers/index.ts old mode 100644 new mode 100755 diff --git a/src/messageHandler/handlers/transmission/index.ts b/src/messageHandler/handlers/transmission/index.ts old mode 100644 new mode 100755 index bd31c707c..8de0347bc --- a/src/messageHandler/handlers/transmission/index.ts +++ b/src/messageHandler/handlers/transmission/index.ts @@ -3,7 +3,11 @@ import { IClient } from "../../../models/client"; import { IMessage } from "../../../models/message"; import { IRealm } from "../../../models/realm"; -export const TransmissionHandler = ({ realm }: { realm: IRealm; }): (client: IClient | undefined, message: IMessage) => boolean => { +export const TransmissionHandler = ({ + realm, +}: { + realm: IRealm; +}): ((client: IClient | undefined, message: IMessage) => boolean) => { const handle = (client: IClient | undefined, message: IMessage) => { const type = message.type; const srcId = message.src; @@ -36,7 +40,7 @@ export const TransmissionHandler = ({ realm }: { realm: IRealm; }): (client: ICl handle(client, { type: MessageType.LEAVE, src: dstId, - dst: srcId + dst: srcId, }); } } else { diff --git a/src/messageHandler/handlersRegistry.ts b/src/messageHandler/handlersRegistry.ts old mode 100644 new mode 100755 diff --git a/src/messageHandler/index.ts b/src/messageHandler/index.ts old mode 100644 new mode 100755 index 7f6c67a26..44150a865 --- a/src/messageHandler/index.ts +++ b/src/messageHandler/index.ts @@ -11,11 +11,17 @@ export interface IMessageHandler { } export class MessageHandler implements IMessageHandler { - constructor(realm: IRealm, private readonly handlersRegistry: IHandlersRegistry = new HandlersRegistry()) { + constructor( + realm: IRealm, + private readonly handlersRegistry: IHandlersRegistry = new HandlersRegistry() + ) { const transmissionHandler: Handler = TransmissionHandler({ realm }); const heartbeatHandler: Handler = HeartbeatHandler; - const handleTransmission: Handler = (client: IClient | undefined, { type, src, dst, payload }: IMessage): boolean => { + const handleTransmission: Handler = ( + client: IClient | undefined, + { type, src, dst, payload }: IMessage + ): boolean => { return transmissionHandler(client, { type, src, @@ -24,14 +30,33 @@ export class MessageHandler implements IMessageHandler { }); }; - const handleHeartbeat = (client: IClient | undefined, message: IMessage) => heartbeatHandler(client, message); + const handleHeartbeat = (client: IClient | undefined, message: IMessage) => + heartbeatHandler(client, message); - this.handlersRegistry.registerHandler(MessageType.HEARTBEAT, handleHeartbeat); - this.handlersRegistry.registerHandler(MessageType.OFFER, handleTransmission); - this.handlersRegistry.registerHandler(MessageType.ANSWER, handleTransmission); - this.handlersRegistry.registerHandler(MessageType.CANDIDATE, handleTransmission); - this.handlersRegistry.registerHandler(MessageType.LEAVE, handleTransmission); - this.handlersRegistry.registerHandler(MessageType.EXPIRE, handleTransmission); + this.handlersRegistry.registerHandler( + MessageType.HEARTBEAT, + handleHeartbeat + ); + this.handlersRegistry.registerHandler( + MessageType.OFFER, + handleTransmission + ); + this.handlersRegistry.registerHandler( + MessageType.ANSWER, + handleTransmission + ); + this.handlersRegistry.registerHandler( + MessageType.CANDIDATE, + handleTransmission + ); + this.handlersRegistry.registerHandler( + MessageType.LEAVE, + handleTransmission + ); + this.handlersRegistry.registerHandler( + MessageType.EXPIRE, + handleTransmission + ); } public handle(client: IClient | undefined, message: IMessage): boolean { diff --git a/src/models/client.ts b/src/models/client.ts old mode 100644 new mode 100755 index 4dee3ca89..969ccb694 --- a/src/models/client.ts +++ b/src/models/client.ts @@ -22,7 +22,7 @@ export class Client implements IClient { private socket: MyWebSocket | null = null; private lastPing: number = new Date().getTime(); - constructor({ id, token }: { id: string, token: string; }) { + constructor({ id, token }: { id: string; token: string; }) { this.id = id; this.token = token; } diff --git a/src/models/message.ts b/src/models/message.ts old mode 100644 new mode 100755 diff --git a/src/models/messageQueue.ts b/src/models/messageQueue.ts old mode 100644 new mode 100755 diff --git a/src/models/realm.ts b/src/models/realm.ts old mode 100644 new mode 100755 index 5b73fc2d9..bf25e39fb --- a/src/models/realm.ts +++ b/src/models/realm.ts @@ -45,11 +45,8 @@ export class Realm implements IRealm { public removeClientById(id: string): boolean { const client = this.getClientById(id); - if (!client) return false; - this.clients.delete(id); - return true; } @@ -70,7 +67,6 @@ export class Realm implements IRealm { } public generateClientId(generateClientId?: () => string): string { - const generateId = generateClientId ? generateClientId : uuidv4; let clientId = generateId(); diff --git a/src/services/checkBrokenConnections/index.ts b/src/services/checkBrokenConnections/index.ts old mode 100644 new mode 100755 diff --git a/src/services/messagesExpire/index.ts b/src/services/messagesExpire/index.ts old mode 100644 new mode 100755 diff --git a/src/services/webSocketServer/index.ts b/src/services/webSocketServer/index.ts old mode 100644 new mode 100755 index 3b237a17a..ec224b16a --- a/src/services/webSocketServer/index.ts +++ b/src/services/webSocketServer/index.ts @@ -8,6 +8,8 @@ import { Client, IClient } from "../../models/client"; import { IRealm } from "../../models/realm"; import { MyWebSocket } from "./webSocket"; +const Redis = require("ioredis"); + export interface IWebSocketServer extends EventEmitter { readonly path: string; } @@ -18,18 +20,30 @@ interface IAuthParams { key?: string; } -type CustomConfig = Pick; +type CustomConfig = Pick< + IConfig, + "path" | "key" | "concurrent_limit" | "redis" | "redisHost" | "redisPort" +>; -const WS_PATH = 'peerjs'; +const WS_PATH = "peerjs"; export class WebSocketServer extends EventEmitter implements IWebSocketServer { - public readonly path: string; private readonly realm: IRealm; private readonly config: CustomConfig; + private readonly messageSubscriber: any; + private readonly messagePublisher: any; public readonly socketServer: WebSocketLib.Server; - constructor({ server, realm, config }: { server: any, realm: IRealm, config: CustomConfig; }) { + constructor({ + server, + realm, + config, + }: { + server: any; + realm: IRealm; + config: CustomConfig; + }) { super(); this.setMaxListeners(0); @@ -38,12 +52,46 @@ export class WebSocketServer extends EventEmitter implements IWebSocketServer { this.config = config; const path = this.config.path; - this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`; + this.path = `${path}${path.endsWith("/") ? "" : "/"}${WS_PATH}`; this.socketServer = new WebSocketLib.Server({ path: this.path, server }); - this.socketServer.on("connection", (socket: MyWebSocket, req) => this._onSocketConnection(socket, req)); + this.socketServer.on("connection", (socket: MyWebSocket, req) => + this._onSocketConnection(socket, req) + ); this.socketServer.on("error", (error: Error) => this._onSocketError(error)); + + if (config.redis) { + this.messagePublisher = new Redis( + this.config.redisPort, + this.config.redisHost + ); + this.messageSubscriber = new Redis( + this.config.redisPort, + this.config.redisHost + ); + this._configureRedis(); + } + } + + private _configureRedis() { + this.messageSubscriber.subscribe("transmission", (err: Error) => { + if (!err) console.log("Subscribed to Transmission messages"); + }); + this.messageSubscriber.on( + "message", + (channel: string, tmessage: string) => { + if (channel === "transmission") { + const receivedMessage = JSON.parse(tmessage); + if ( + receivedMessage.dst && + this.realm.getClientById(receivedMessage.dst) + ) { + this.emit("message", undefined, receivedMessage); + } + } + } + ); } private _onSocketConnection(socket: MyWebSocket, req: IncomingMessage): void { @@ -64,10 +112,12 @@ export class WebSocketServer extends EventEmitter implements IWebSocketServer { if (client) { if (token !== client.getToken()) { // ID-taken, invalid token - socket.send(JSON.stringify({ - type: MessageType.ID_TAKEN, - payload: { msg: "ID is taken" } - })); + socket.send( + JSON.stringify({ + type: MessageType.ID_TAKEN, + payload: { msg: "ID is taken" }, + }) + ); return socket.close(); } @@ -83,12 +133,15 @@ export class WebSocketServer extends EventEmitter implements IWebSocketServer { this.emit("error", error); } - private _registerClient({ socket, id, token }: - { - socket: MyWebSocket; - id: string; - token: string; - }): void { + private _registerClient({ + socket, + id, + token, + }: { + socket: MyWebSocket; + id: string; + token: string; + }): void { // Check concurrent limit const clientsCount = this.realm.getClientsIds().length; @@ -96,6 +149,8 @@ export class WebSocketServer extends EventEmitter implements IWebSocketServer { return this._sendErrorAndClose(socket, Errors.CONNECTION_LIMIT_EXCEED); } + console.log("NEW CLIENT:::", id); + const newClient: IClient = new Client({ id, token }); this.realm.setClient(newClient, id); socket.send(JSON.stringify({ type: MessageType.OPEN })); @@ -118,9 +173,14 @@ export class WebSocketServer extends EventEmitter implements IWebSocketServer { socket.on("message", (data: WebSocketLib.Data) => { try { const message = JSON.parse(data as string); - message.src = client.getId(); - + if (message.type !== "HEARTBEAT" && this.config.redis) { + this.messagePublisher.publish( + "transmission", + JSON.stringify(message) + ); + return; + } this.emit("message", client, message); } catch (e) { this.emit("error", e); @@ -134,7 +194,7 @@ export class WebSocketServer extends EventEmitter implements IWebSocketServer { socket.send( JSON.stringify({ type: MessageType.ERROR, - payload: { msg } + payload: { msg }, }) ); diff --git a/src/services/webSocketServer/webSocket.ts b/src/services/webSocketServer/webSocket.ts old mode 100644 new mode 100755 diff --git a/test/messageHandler/handlers/heartbeat/index.ts b/test/messageHandler/handlers/heartbeat/index.ts old mode 100644 new mode 100755 diff --git a/test/messageHandler/handlers/transmission/index.ts b/test/messageHandler/handlers/transmission/index.ts old mode 100644 new mode 100755 diff --git a/test/messageHandler/handlersRegistry.ts b/test/messageHandler/handlersRegistry.ts old mode 100644 new mode 100755 diff --git a/test/models/messageQueue.ts b/test/models/messageQueue.ts old mode 100644 new mode 100755 diff --git a/test/models/realm.ts b/test/models/realm.ts old mode 100644 new mode 100755 diff --git a/test/services/checkBrokenConnections/index.ts b/test/services/checkBrokenConnections/index.ts old mode 100644 new mode 100755 diff --git a/test/services/messagesExpire/index.ts b/test/services/messagesExpire/index.ts old mode 100644 new mode 100755 diff --git a/test/services/webSocketServer/index.ts b/test/services/webSocketServer/index.ts old mode 100644 new mode 100755 diff --git a/test/utils.ts b/test/utils.ts old mode 100644 new mode 100755 diff --git a/tsconfig.json b/tsconfig.json old mode 100644 new mode 100755