Skip to content

Commit 60fb554

Browse files
authored
Merge pull request #59 from jsonjoy-com/ws
feat: 🎸 add WebSocket codec
2 parents e8a68dc + b83dcb4 commit 60fb554

File tree

10 files changed

+1023
-1
lines changed

10 files changed

+1023
-1
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@
115115
"tslint": "^6.1.3",
116116
"tslint-config-common": "^1.6.2",
117117
"typedoc": "^0.25.12",
118-
"typescript": "^5.3.3"
118+
"typescript": "^5.3.3",
119+
"websocket": "^1.0.35"
119120
},
120121
"jest": {
121122
"verbose": true,

src/ws/WsFrameDecoder.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import {StreamingOctetReader} from '@jsonjoy.com/util/lib/buffers/StreamingOctetReader';
2+
import {WsFrameOpcode} from './constants';
3+
import {WsFrameDecodingError} from './errors';
4+
import {WsCloseFrame, WsFrameHeader, WsPingFrame, WsPongFrame} from './frames';
5+
6+
export class WsFrameDecoder {
7+
public readonly reader = new StreamingOctetReader();
8+
9+
public push(uint8: Uint8Array): void {
10+
this.reader.push(uint8);
11+
}
12+
13+
public readFrameHeader(): WsFrameHeader | undefined {
14+
try {
15+
const reader = this.reader;
16+
if (reader.size() < 2) return undefined;
17+
const b0 = reader.u8();
18+
const b1 = reader.u8();
19+
const fin = <0 | 1>(b0 >>> 7);
20+
const opcode = b0 & 0b1111;
21+
const maskBit = b1 >>> 7;
22+
let length = b1 & 0b01111111;
23+
if (length === 126) {
24+
if (reader.size() < 2) return undefined;
25+
length = (reader.u8() << 8) | reader.u8();
26+
} else if (length === 127) {
27+
if (reader.size() < 8) return undefined;
28+
reader.skip(4);
29+
length = reader.u32();
30+
}
31+
let mask: undefined | [number, number, number, number];
32+
if (maskBit) {
33+
if (reader.size() < 4) return undefined;
34+
mask = [reader.u8(), reader.u8(), reader.u8(), reader.u8()];
35+
}
36+
if (opcode >= WsFrameOpcode.MIN_CONTROL_OPCODE) {
37+
switch (opcode) {
38+
case WsFrameOpcode.CLOSE: {
39+
return new WsCloseFrame(fin, opcode, length, mask, 0, '');
40+
}
41+
case WsFrameOpcode.PING: {
42+
if (length > 125) throw new WsFrameDecodingError();
43+
const data = mask ? reader.bufXor(length, mask, 0) : reader.buf(length);
44+
return new WsPingFrame(fin, opcode, length, mask, data);
45+
}
46+
case WsFrameOpcode.PONG: {
47+
if (length > 125) throw new WsFrameDecodingError();
48+
const data = mask ? reader.bufXor(length, mask, 0) : reader.buf(length);
49+
return new WsPongFrame(fin, opcode, length, mask, data);
50+
}
51+
default: {
52+
throw new WsFrameDecodingError();
53+
}
54+
}
55+
}
56+
return new WsFrameHeader(fin, opcode, length, mask);
57+
} catch (err) {
58+
if (err instanceof RangeError) return undefined;
59+
throw err;
60+
}
61+
}
62+
63+
/**
64+
* Read application data of a frame and copy it to the destination buffer.
65+
* Receives the frame header and the number of bytes that still need to be
66+
* copied, returns back the number of bytes that still need to be copied in
67+
* subsequent calls.
68+
*
69+
* @param frame Frame header.
70+
* @param remaining How many bytes are remaining to be copied.
71+
* @param dst The destination buffer to write to.
72+
* @param pos Position in the destination buffer to start writing to.
73+
* @returns The number of bytes that still need to be copied in the next call.
74+
*/
75+
public readFrameData(frame: WsFrameHeader, remaining: number, dst: Uint8Array, pos: number): number {
76+
const reader = this.reader;
77+
const mask = frame.mask;
78+
const readSize = Math.min(reader.size(), remaining);
79+
if (!mask) reader.copy(readSize, dst, pos);
80+
else {
81+
const alreadyRead = frame.length - remaining;
82+
reader.copyXor(readSize, dst, pos, mask, alreadyRead);
83+
}
84+
return remaining - readSize;
85+
}
86+
87+
public copyFrameData(frame: WsFrameHeader, dst: Uint8Array, pos: number): void {
88+
const reader = this.reader;
89+
const mask = frame.mask;
90+
const readSize = frame.length;
91+
if (!mask) reader.copy(readSize, dst, pos);
92+
else reader.copyXor(readSize, dst, pos, mask, 0);
93+
}
94+
95+
/**
96+
* Reads application data of the CLOSE frame and sets the code and reason
97+
* properties of the frame.
98+
*
99+
* @param frame Close frame.
100+
*/
101+
public readCloseFrameData(frame: WsCloseFrame): void {
102+
let length = frame.length;
103+
if (length > 125) throw new WsFrameDecodingError();
104+
let code = 0;
105+
let reason = '';
106+
if (length > 0) {
107+
if (length < 2) throw new WsFrameDecodingError();
108+
const reader = this.reader;
109+
const mask = frame.mask;
110+
const octet1 = reader.u8() ^ (mask ? mask[0] : 0);
111+
const octet2 = reader.u8() ^ (mask ? mask[1] : 0);
112+
code = (octet1 << 8) | octet2;
113+
length -= 2;
114+
if (length) reason = reader.utf8(length, mask ?? [0, 0, 0, 0], 2);
115+
}
116+
frame.code = code;
117+
frame.reason = reason;
118+
}
119+
}

src/ws/WsFrameEncoder.ts

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import {Writer} from '@jsonjoy.com/util/lib/buffers/Writer';
2+
import {WsFrameOpcode} from './constants';
3+
import {WsFrameEncodingError} from './errors';
4+
import type {IWriter, IWriterGrowable} from '@jsonjoy.com/util/lib/buffers';
5+
6+
const maskBuf = new Uint8Array(4);
7+
const maskBufView = new DataView(maskBuf.buffer, maskBuf.byteOffset, maskBuf.byteLength);
8+
9+
export class WsFrameEncoder<W extends IWriter & IWriterGrowable = IWriter & IWriterGrowable> {
10+
constructor(public readonly writer: W = new Writer() as any) {}
11+
12+
public encodePing(data: Uint8Array | null): Uint8Array {
13+
this.writePing(data);
14+
return this.writer.flush();
15+
}
16+
17+
public encodePong(data: Uint8Array | null): Uint8Array {
18+
this.writePong(data);
19+
return this.writer.flush();
20+
}
21+
22+
public encodeClose(reason: string, code = 0): Uint8Array {
23+
this.writeClose(reason, code);
24+
return this.writer.flush();
25+
}
26+
27+
public encodeHdr(fin: 0 | 1, opcode: WsFrameOpcode, length: number, mask: number): Uint8Array {
28+
this.writeHdr(fin, opcode, length, mask);
29+
return this.writer.flush();
30+
}
31+
32+
public encodeDataMsgHdrFast(length: number): Uint8Array {
33+
this.writeDataMsgHdrFast(length);
34+
return this.writer.flush();
35+
}
36+
37+
public writePing(data: Uint8Array | null): void {
38+
let length = 0;
39+
if (data && (length = data.length)) {
40+
this.writeHdr(1, WsFrameOpcode.PING, length, 0);
41+
this.writer.buf(data, length);
42+
} else {
43+
this.writeHdr(1, WsFrameOpcode.PING, 0, 0);
44+
}
45+
}
46+
47+
public writePong(data: Uint8Array | null): void {
48+
let length = 0;
49+
if (data && (length = data.length)) {
50+
this.writeHdr(1, WsFrameOpcode.PONG, length, 0);
51+
this.writer.buf(data, length);
52+
} else {
53+
this.writeHdr(1, WsFrameOpcode.PONG, 0, 0);
54+
}
55+
}
56+
57+
public writeClose(reason: string, code = 0): void {
58+
if (reason || code) {
59+
const reasonLength = reason.length;
60+
const length = 2 + reasonLength;
61+
const writer = this.writer;
62+
writer.ensureCapacity(
63+
2 + // Frame header
64+
2 + // Close code 2 bytes
65+
reasonLength * 4, // Close reason, max 4 bytes per UTF-8 char
66+
);
67+
const lengthX = writer.x + 1;
68+
this.writeHdr(1, WsFrameOpcode.CLOSE, length, 0);
69+
writer.u16(code);
70+
if (reasonLength) {
71+
const utf8Length = writer.utf8(reason);
72+
if (utf8Length !== reasonLength) {
73+
if (utf8Length > 126 - 2) throw new WsFrameEncodingError();
74+
writer.uint8[lengthX] = (writer.uint8[lengthX] & 0b10000000) | (utf8Length + 2);
75+
}
76+
}
77+
} else {
78+
this.writeHdr(1, WsFrameOpcode.CLOSE, 0, 0);
79+
}
80+
}
81+
82+
public writeHdr(fin: 0 | 1, opcode: WsFrameOpcode, length: number, mask: number): void {
83+
const octet1 = (fin << 7) | opcode;
84+
const maskBit = mask ? 0b10000000 : 0b00000000;
85+
const writer = this.writer;
86+
if (length < 126) {
87+
const octet2 = maskBit | length;
88+
writer.u16((octet1 << 8) | octet2);
89+
} else if (length < 0x10000) {
90+
const octet2 = maskBit | 126;
91+
writer.u32(((octet1 << 8) | octet2) * 0x10000 + length);
92+
} else {
93+
const octet2 = maskBit | 127;
94+
writer.u16((octet1 << 8) | octet2);
95+
writer.u32(0);
96+
writer.u32(length);
97+
}
98+
if (mask) writer.u32(mask);
99+
}
100+
101+
public writeDataMsgHdrFast(length: number): void {
102+
const writer = this.writer;
103+
if (length < 126) {
104+
writer.u16(0b10000010_00000000 + length);
105+
return;
106+
}
107+
if (length < 0x10000) {
108+
writer.u32(0b10000010_01111110_00000000_00000000 + length);
109+
return;
110+
}
111+
writer.u16(0b10000010_01111111);
112+
writer.u32(0);
113+
writer.u32(length);
114+
}
115+
116+
public writeBufXor(buf: Uint8Array, mask: number): void {
117+
maskBufView.setUint32(0, mask, false);
118+
const writer = this.writer;
119+
const length = buf.length;
120+
writer.ensureCapacity(length);
121+
let x = writer.x;
122+
const uint8 = writer.uint8;
123+
for (let i = 0; i < length; i++) uint8[x++] = buf[i] ^ maskBuf[i & 3];
124+
writer.x = x;
125+
}
126+
}

0 commit comments

Comments
 (0)