Skip to content

Commit 10967f4

Browse files
authored
Merge pull request #60 from jsonjoy-com/rpc
Rpc
2 parents 60fb554 + 27d38d3 commit 10967f4

40 files changed

+13786
-76
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@
6464
},
6565
"dependencies": {
6666
"@jsonjoy.com/base64": "^1.1.2",
67-
"@jsonjoy.com/buffers": "^1.0.0",
67+
"@jsonjoy.com/buffers": "^1.2.0",
6868
"@jsonjoy.com/codegen": "^1.0.0",
69-
"@jsonjoy.com/json-pointer": "^1.0.1",
69+
"@jsonjoy.com/json-pointer": "^1.0.2",
7070
"@jsonjoy.com/util": "^1.9.0",
7171
"hyperdyperid": "^1.2.0",
7272
"thingies": "^2.5.0"

src/rm/README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Record Marking (RM) Protocol
2+
3+
Implements rm/tcp/ip protocol Record Marking (RM) Standard as specified in RFC 1057.
4+
The RM standard splits a byte stream into discrete messages by prefixing each
5+
message with a 4-byte header.
6+
7+
Excerpt from RFC 1057, Section 10:
8+
9+
```
10+
10. RECORD MARKING STANDARD
11+
12+
When RPC messages are passed on top of a byte stream transport
13+
protocol (like TCP), it is necessary to delimit one message from
14+
another in order to detect and possibly recover from protocol errors.
15+
This is called record marking (RM). Sun uses this RM/TCP/IP
16+
transport for passing RPC messages on TCP streams. One RPC message
17+
fits into one RM record.
18+
19+
A record is composed of one or more record fragments. A record
20+
21+
22+
23+
Sun Microsystems [Page 18]
24+
25+
RFC 1057 Remote Procedure Call, Version 2 June 1988
26+
27+
28+
fragment is a four-byte header followed by 0 to (2**31) - 1 bytes of
29+
fragment data. The bytes encode an unsigned binary number; as with
30+
XDR integers, the byte order is from highest to lowest. The number
31+
encodes two values -- a boolean which indicates whether the fragment
32+
is the last fragment of the record (bit value 1 implies the fragment
33+
is the last fragment) and a 31-bit unsigned binary value which is the
34+
length in bytes of the fragment's data. The boolean value is the
35+
highest-order bit of the header; the length is the 31 low-order bits.
36+
(Note that this record specification is NOT in XDR standard form!)
37+
```

src/rm/RmRecordDecoder.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import {StreamingReader} from '@jsonjoy.com/buffers/lib/StreamingReader';
2+
import {Reader} from '@jsonjoy.com/buffers/lib/Reader';
3+
import {concatList} from '@jsonjoy.com/buffers/lib/concat';
4+
5+
export class RmRecordDecoder {
6+
public readonly reader = new StreamingReader();
7+
protected fragments: Uint8Array[] = [];
8+
9+
public push(uint8: Uint8Array): void {
10+
this.reader.push(uint8);
11+
}
12+
13+
/**
14+
* @todo PERF: Make it return Slice instead of Uint8Array
15+
*/
16+
public readRecord(): Reader | undefined {
17+
const reader = this.reader;
18+
let size = reader.size();
19+
if (size < 4) return undefined;
20+
const x = reader.x;
21+
READ_FRAGMENT: {
22+
try {
23+
const header = reader.u32();
24+
size -= 4;
25+
const fin = !!(header & 0b10000000_00000000_00000000_00000000);
26+
const len = header & 0b01111111_11111111_11111111_11111111;
27+
if (size < len) break READ_FRAGMENT;
28+
reader.consume();
29+
const fragments = this.fragments;
30+
if (fin) {
31+
if (!fragments.length) return reader.cut(len);
32+
fragments.push(reader.buf(len));
33+
const record = concatList(fragments);
34+
this.fragments = [];
35+
return record.length ? new Reader(record) : undefined;
36+
} else {
37+
fragments.push(reader.buf(len));
38+
return undefined;
39+
}
40+
} catch (err) {
41+
reader.x = x;
42+
if (err instanceof RangeError) return undefined;
43+
else throw err;
44+
}
45+
}
46+
reader.x = x;
47+
return undefined;
48+
}
49+
}

src/rm/RmRecordEncoder.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import {Writer} from '@jsonjoy.com/util/lib/buffers/Writer';
2+
import type {IWriter, IWriterGrowable} from '@jsonjoy.com/util/lib/buffers';
3+
4+
export class RmRecordEncoder<W extends IWriter & IWriterGrowable = IWriter & IWriterGrowable> {
5+
constructor(public readonly writer: W = new Writer() as any) {}
6+
7+
public encodeHdr(fin: 0 | 1, length: number): Uint8Array {
8+
this.writeHdr(fin, length);
9+
return this.writer.flush();
10+
}
11+
12+
public encodeRecord(record: Uint8Array): Uint8Array {
13+
this.writeRecord(record);
14+
return this.writer.flush();
15+
}
16+
17+
public writeHdr(fin: 0 | 1, length: number): void {
18+
this.writer.u32((fin ? 0b10000000_00000000_00000000_00000000 : 0) + length);
19+
}
20+
21+
public writeRecord(record: Uint8Array): void {
22+
const length = record.length;
23+
if (length <= 2147483647) {
24+
const writer = this.writer;
25+
writer.u32(0b10000000_00000000_00000000_00000000 + length);
26+
writer.buf(record, length);
27+
return;
28+
}
29+
let offset = 0;
30+
while (offset < length) {
31+
const fragmentLength = Math.min(length - offset, 0x7fffffff);
32+
const fin = fragmentLength + offset >= length ? 1 : 0;
33+
this.writeFragment(record, offset, fragmentLength, fin);
34+
offset += fragmentLength;
35+
}
36+
}
37+
38+
public writeFragment(record: Uint8Array, offset: number, length: number, fin: 0 | 1): void {
39+
this.writeHdr(fin, length);
40+
const fragment = record.subarray(offset, offset + length);
41+
this.writer.buf(fragment, length);
42+
}
43+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import {RmRecordDecoder} from '../RmRecordDecoder';
2+
3+
describe('RmRecordDecoder', () => {
4+
describe('.readRecord()', () => {
5+
test('returns undefined when no data available', () => {
6+
const decoder = new RmRecordDecoder();
7+
const result = decoder.readRecord();
8+
expect(result).toBeUndefined();
9+
});
10+
11+
test('decodes empty record', () => {
12+
const decoder = new RmRecordDecoder();
13+
decoder.push(new Uint8Array([0, 0, 0, 0]));
14+
expect(decoder.readRecord()).toBeUndefined();
15+
});
16+
17+
test('decodes empty record', () => {
18+
const decoder = new RmRecordDecoder();
19+
decoder.push(new Uint8Array([0, 0, 0, 0, 0]));
20+
expect(decoder.readRecord()).toBeUndefined();
21+
});
22+
23+
test('decodes empty record - 2', () => {
24+
const decoder = new RmRecordDecoder();
25+
expect(decoder.readRecord()).toBeUndefined();
26+
decoder.push(new Uint8Array([0]));
27+
expect(decoder.readRecord()).toBeUndefined();
28+
decoder.push(new Uint8Array([0]));
29+
expect(decoder.readRecord()).toBeUndefined();
30+
decoder.push(new Uint8Array([0]));
31+
expect(decoder.readRecord()).toBeUndefined();
32+
decoder.push(new Uint8Array([0]));
33+
expect(decoder.readRecord()).toBeUndefined();
34+
});
35+
36+
test('decodes two records streamed one byte at a time', () => {
37+
const decoder = new RmRecordDecoder();
38+
expect(decoder.readRecord()).toBeUndefined();
39+
decoder.push(new Uint8Array([0b10000000]));
40+
expect(decoder.readRecord()).toBeUndefined();
41+
decoder.push(new Uint8Array([0]));
42+
expect(decoder.readRecord()).toBeUndefined();
43+
decoder.push(new Uint8Array([0]));
44+
expect(decoder.readRecord()).toBeUndefined();
45+
decoder.push(new Uint8Array([1]));
46+
expect(decoder.readRecord()).toBeUndefined();
47+
decoder.push(new Uint8Array([42]));
48+
expect(decoder.readRecord()?.buf()).toEqual(new Uint8Array([42]));
49+
expect(decoder.readRecord()).toBeUndefined();
50+
decoder.push(new Uint8Array([0b10000000, 0, 0]));
51+
expect(decoder.readRecord()).toBeUndefined();
52+
expect(decoder.readRecord()).toBeUndefined();
53+
decoder.push(new Uint8Array([1, 43]));
54+
expect(decoder.readRecord()?.buf()).toEqual(new Uint8Array([43]));
55+
expect(decoder.readRecord()).toBeUndefined();
56+
});
57+
58+
test('decodes single-byte record', () => {
59+
const decoder = new RmRecordDecoder();
60+
decoder.push(new Uint8Array([0b10000000, 0, 0, 1, 42]));
61+
const result = decoder.readRecord()?.buf();
62+
expect(result).toBeInstanceOf(Uint8Array);
63+
expect(result!.length).toBe(1);
64+
expect(result![0]).toBe(42);
65+
});
66+
67+
test('decodes multi-byte record', () => {
68+
const decoder = new RmRecordDecoder();
69+
const data = new Uint8Array([1, 2, 3, 4, 5]);
70+
decoder.push(new Uint8Array([0b10000000, 0, 0, data.length, ...data]));
71+
const result = decoder.readRecord()?.buf();
72+
expect(result).toBeInstanceOf(Uint8Array);
73+
expect(result!.length).toBe(data.length);
74+
expect(result).toEqual(data);
75+
});
76+
77+
test('decodes ASCII string data', () => {
78+
const text = 'hello world';
79+
const data = new TextEncoder().encode(text);
80+
const decoder = new RmRecordDecoder();
81+
decoder.push(new Uint8Array([0b10000000, 0, 0, data.length, ...data]));
82+
const result = decoder.readRecord()?.buf();
83+
expect(result).toBeInstanceOf(Uint8Array);
84+
expect(result!.length).toBe(data.length);
85+
expect(result).toEqual(data);
86+
});
87+
88+
test('decodes large record', () => {
89+
const size = 10000;
90+
const data = new Uint8Array(size);
91+
for (let i = 0; i < size; i++) data[i] = i % 256;
92+
const decoder = new RmRecordDecoder();
93+
decoder.push(new Uint8Array([0b10000000, (size >> 16) & 0xff, (size >> 8) & 0xff, size & 0xff, ...data]));
94+
const result = decoder.readRecord()?.buf();
95+
expect(result).toBeInstanceOf(Uint8Array);
96+
expect(result!.length).toBe(data.length);
97+
expect(result).toEqual(data);
98+
});
99+
});
100+
101+
describe('fragmented records', () => {
102+
test('decodes record with two fragments', () => {
103+
const part1 = new Uint8Array([1, 2, 3]);
104+
const part2 = new Uint8Array([4, 5, 6]);
105+
const decoder = new RmRecordDecoder();
106+
decoder.push(new Uint8Array([0b00000000, 0, 0, part1.length, ...part1]));
107+
expect(decoder.readRecord()).toBeUndefined();
108+
decoder.push(new Uint8Array([0b10000000, 0, 0, part2.length, ...part2]));
109+
const result = decoder.readRecord()?.buf();
110+
expect(result).toBeInstanceOf(Uint8Array);
111+
expect(result!.length).toBe(part1.length + part2.length);
112+
expect(result).toEqual(new Uint8Array([...part1, ...part2]));
113+
});
114+
115+
test('decodes record with three fragments', () => {
116+
const part1 = new Uint8Array([1, 2]);
117+
const part2 = new Uint8Array([3, 4]);
118+
const part3 = new Uint8Array([5, 6]);
119+
const decoder = new RmRecordDecoder();
120+
decoder.push(new Uint8Array([0b00000000, 0, 0, part1.length, ...part1]));
121+
expect(decoder.readRecord()).toBeUndefined();
122+
decoder.push(new Uint8Array([0b00000000, 0, 0, part2.length, ...part2]));
123+
expect(decoder.readRecord()).toBeUndefined();
124+
decoder.push(new Uint8Array([0b10000000, 0, 0, part3.length, ...part3]));
125+
const result = decoder.readRecord()?.buf();
126+
expect(result).toBeInstanceOf(Uint8Array);
127+
expect(result!.length).toBe(part1.length + part2.length + part3.length);
128+
expect(result).toEqual(new Uint8Array([...part1, ...part2, ...part3]));
129+
});
130+
});
131+
});

0 commit comments

Comments
 (0)