Skip to content

Commit 354d70f

Browse files
committed
update node.js streams implementation
1 parent aeaff54 commit 354d70f

30 files changed

+3049
-2150
lines changed

.bazelproject

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,5 @@ test_sources:
2222
src/workerd/api/*-test.js
2323
src/workerd/api/*-test.wd-test
2424
src/workerd/api/*-test.c++
25+
26+
use_query_sync: true

src/node/internal/internal_errors.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ export class ConnResetException extends NodeError {
762762

763763
export function aggregateTwoErrors(
764764
innerError: unknown,
765-
outerError: Error | null
765+
outerError: Error | null | undefined
766766
): AggregateError {
767767
if (innerError && outerError && innerError !== outerError) {
768768
if ('errors' in outerError && Array.isArray(outerError.errors)) {

src/node/internal/internal_fs_streams.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ import type {
4040

4141
import type { FileHandle } from 'node-internal:internal_fs_promises';
4242

43-
import { errorOrDestroy, eos } from 'node-internal:streams_util';
43+
import { errorOrDestroy } from 'node-internal:streams_destroy';
44+
import { eos } from 'node-internal:streams_end_of_stream';
4445

4546
import {
4647
ERR_INVALID_ARG_VALUE,
@@ -541,7 +542,7 @@ function readImpl(this: ReadStream, n: number): void {
541542
}
542543

543544
if (er) {
544-
errorOrDestroy(this, er);
545+
errorOrDestroy(this, er as Error);
545546
return;
546547
}
547548

src/node/internal/internal_http_client.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import {
2929
validateNumber,
3030
} from 'node-internal:validators';
3131
import { getTimerDuration } from 'node-internal:internal_net';
32-
import { addAbortSignal } from 'node-internal:streams_util';
32+
import { addAbortSignal } from 'node-internal:streams_add_abort_signal';
3333
import { Writable } from 'node-internal:streams_writable';
3434
import type {
3535
ClientRequest as _ClientRequest,
@@ -43,7 +43,7 @@ import {
4343
import { OutgoingMessage } from 'node-internal:internal_http_outgoing';
4444
import { Agent, globalAgent } from 'node-internal:internal_http_agent';
4545
import type { IncomingMessageCallback } from 'node-internal:internal_http_util';
46-
import type { Socket } from 'net';
46+
import type { Socket } from 'node:net';
4747

4848
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/;
4949

@@ -84,7 +84,7 @@ export class ClientRequest extends OutgoingMessage implements _ClientRequest {
8484
agent: Agent | undefined;
8585

8686
// Unused fields required to be Node.js compatible.
87-
aborted: boolean = false;
87+
override aborted: boolean = false;
8888
reusedSocket: boolean = false;
8989
maxHeadersCount: number = Infinity;
9090
connection: Socket | null = null;
@@ -173,7 +173,7 @@ export class ClientRequest extends OutgoingMessage implements _ClientRequest {
173173

174174
const signal = options.signal;
175175
if (signal) {
176-
addAbortSignal(signal, this);
176+
addAbortSignal(signal, this as unknown as Writable);
177177
}
178178
let method = options.method;
179179
const methodIsString = typeof method === 'string';

src/node/internal/internal_http_incoming.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
// https://opensource.org/licenses/Apache-2.0
44
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
55

6-
// this.aborted attribute is set as deprecated in @types/node package.
7-
/* eslint-disable @typescript-eslint/no-deprecated */
8-
96
import { EventEmitter } from 'node-internal:events';
107
import { Readable } from 'node-internal:streams_readable';
118
import { isIPv4, Socket } from 'node-internal:internal_net';
@@ -43,7 +40,7 @@ export class IncomingMessage extends Readable implements _IncomingMessage {
4340
#socket: unknown;
4441
#stream: ReadableStream | null = null;
4542

46-
aborted = false;
43+
override aborted = false;
4744
url: string = '';
4845
// @ts-expect-error TS2416 Type-inconsistencies
4946
method: string | null = null;

src/node/internal/internal_http_outgoing.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
import { validateString } from 'node-internal:validators';
1010
import { Writable } from 'node-internal:streams_writable';
11-
import { getDefaultHighWaterMark } from 'node-internal:streams_util';
11+
import { getDefaultHighWaterMark } from 'node-internal:streams_state';
1212
import type { DataWrittenEvent } from 'node-internal:internal_http_server';
1313
import {
1414
ERR_HTTP_HEADERS_SENT,
@@ -228,7 +228,7 @@ export class OutgoingMessage extends Writable implements _OutgoingMessage {
228228
maxRequestsOnConnectionReached = false;
229229

230230
// These are attributes provided by the Node.js implementation.
231-
_closed = false;
231+
override _closed = false;
232232
_headerSent = false;
233233
_onPendingData: (delta: number) => void = () => {};
234234
_header: string | null = null;

src/node/internal/internal_http_server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {
2424
ERR_SERVER_ALREADY_LISTEN,
2525
} from 'node-internal:internal_errors';
2626
import { EventEmitter } from 'node-internal:events';
27-
import { getDefaultHighWaterMark } from 'node-internal:streams_util';
27+
import { getDefaultHighWaterMark } from 'node-internal:streams_state';
2828
import {
2929
kUniqueHeaders,
3030
OutgoingMessage,

src/node/internal/internal_zlib_base.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import {
2222
NodeError,
2323
} from 'node-internal:internal_errors';
2424
import { Transform, type DuplexOptions } from 'node-internal:streams_transform';
25-
import { eos as finished } from 'node-internal:streams_util';
25+
import { eos as finished } from 'node-internal:streams_end_of_stream';
2626
import {
2727
isArrayBufferView,
2828
isAnyArrayBuffer,
@@ -439,6 +439,7 @@ export class ZlibBase extends Transform {
439439
this._maxOutputLength = maxOutputLength;
440440
}
441441

442+
// @ts-expect-error TS2611 This should be a property according to types.
442443
get _closed(): boolean {
443444
return this._handle == null;
444445
}

src/node/internal/public_process.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,12 @@ function chunkToBuffer(
6161
// For stdout, we emulate `nohup node foo.js`
6262
class SyncWriteStream extends Writable {
6363
fd: number;
64-
readable: boolean;
64+
override readable: boolean = false;
6565
_type = 'fs';
6666
_isStdio = true;
6767
constructor(fd: number) {
6868
super({ autoDestroy: true });
6969
this.fd = fd;
70-
this.readable = false;
7170
}
7271
override _write(
7372
chunk: string | Buffer | ArrayBufferView | DataView,
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright (c) 2017-2022 Cloudflare, Inc.
2+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
// https://opensource.org/licenses/Apache-2.0
4+
//
5+
// Copyright Joyent, Inc. and other Node contributors.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a
8+
// copy of this software and associated documentation files (the
9+
// "Software"), to deal in the Software without restriction, including
10+
// without limitation the rights to use, copy, modify, merge, publish,
11+
// distribute, sublicense, and/or sell copies of the Software, and to permit
12+
// persons to whom the Software is furnished to do so, subject to the
13+
// following conditions:
14+
//
15+
// The above copyright notice and this permission notice shall be included
16+
// in all copies or substantial portions of the Software.
17+
//
18+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
19+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20+
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
21+
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
22+
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
23+
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
24+
// USE OR OTHER DEALINGS IN THE SOFTWARE.
25+
26+
import {
27+
AbortError,
28+
ERR_INVALID_ARG_TYPE,
29+
} from 'node-internal:internal_errors';
30+
31+
import {
32+
isNodeStream,
33+
isWebStream,
34+
kControllerErrorFunction,
35+
} from 'node-internal:streams_util';
36+
37+
import { eos } from 'node-internal:streams_end_of_stream';
38+
import type { Readable } from 'node-internal:streams_readable';
39+
import type { Writable } from 'node-internal:streams_writable';
40+
import type { Transform } from 'node-internal:streams_transform';
41+
import { addAbortListener } from 'node-internal:events';
42+
43+
// This method is inlined here for readable-stream
44+
// It also does not allow for signal to not exist on the stream
45+
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
46+
function validateAbortSignal(
47+
signal: unknown,
48+
name: string
49+
): asserts signal is AbortSignal {
50+
if (signal == null || typeof signal !== 'object' || !('aborted' in signal)) {
51+
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
52+
}
53+
}
54+
55+
type StreamType =
56+
| Readable
57+
| Writable
58+
| Transform
59+
| ReadableStream
60+
| WritableStream;
61+
62+
export function addAbortSignal<T extends StreamType>(
63+
signal: unknown,
64+
stream: T
65+
): T {
66+
validateAbortSignal(signal, 'signal');
67+
if (!isNodeStream(stream) && !isWebStream(stream)) {
68+
throw new ERR_INVALID_ARG_TYPE(
69+
'stream',
70+
['ReadableStream', 'WritableStream', 'Stream'],
71+
stream
72+
);
73+
}
74+
return addAbortSignalNoValidate(signal, stream);
75+
}
76+
77+
export function addAbortSignalNoValidate<T extends StreamType>(
78+
signal: AbortSignal | null | undefined,
79+
stream: T
80+
): T {
81+
if (signal == null || typeof signal !== 'object' || !('aborted' in signal)) {
82+
return stream;
83+
}
84+
const onAbort = isNodeStream(stream)
85+
? (): void => {
86+
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
87+
}
88+
: (): void => {
89+
(
90+
stream as ReadableStream & {
91+
[kControllerErrorFunction]: (err: Error) => void;
92+
}
93+
)[kControllerErrorFunction](
94+
new AbortError(undefined, { cause: signal.reason })
95+
);
96+
};
97+
if (signal.aborted) {
98+
onAbort();
99+
} else {
100+
const disposable = addAbortListener(signal, onAbort);
101+
eos(
102+
stream as Readable | Writable | Transform,
103+
disposable[Symbol.dispose] as () => void
104+
);
105+
}
106+
return stream;
107+
}

0 commit comments

Comments
 (0)