Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/node/internal/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ export class EventEmitterAsyncResource
}
}

export function addAbortListener(signal: AbortSignal, listener: any) {
export function addAbortListener(
signal: AbortSignal | undefined,
listener: any
) {
if (signal === undefined) {
throw new ERR_INVALID_ARG_TYPE('signal', 'AbortSignal', signal);
}
Expand Down
4 changes: 2 additions & 2 deletions src/node/internal/internal_http_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import {
import { OutgoingMessage } from 'node-internal:internal_http_outgoing';
import { Agent, globalAgent } from 'node-internal:internal_http_agent';
import type { IncomingMessageCallback } from 'node-internal:internal_http_util';
import type { Socket } from 'net';
import type { Socket } from 'node:net';

const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/;

Expand Down Expand Up @@ -168,7 +168,7 @@ export class ClientRequest extends OutgoingMessage implements _ClientRequest {

const signal = options.signal;
if (signal) {
addAbortSignal(signal, this);
addAbortSignal(signal, this as unknown as Writable);
}
let method = options.method;
const methodIsString = typeof method === 'string';
Expand Down
4 changes: 3 additions & 1 deletion src/node/internal/process.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ export const platform: string;

declare global {
const Cloudflare: {
readonly compatibilityFlags: Record<string, boolean>;
readonly compatibilityFlags: Record<string, boolean> & {
enable_streams_nodejs_v24_compat: boolean;
};
};
}

Expand Down
3 changes: 1 addition & 2 deletions src/node/internal/public_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,12 @@ function chunkToBuffer(
// For stdout, we emulate `nohup node foo.js`
class SyncWriteStream extends Writable {
fd: number;
override readable: boolean;
override readable: boolean = false;
_type = 'fs';
_isStdio = true;
constructor(fd: number) {
super({ autoDestroy: true });
this.fd = fd;
this.readable = false;
}
override _write(
chunk: string | Buffer | ArrayBufferView | DataView,
Expand Down
76 changes: 60 additions & 16 deletions src/node/internal/streams_add_abort_signal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,85 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

import { validateAbortSignal } from 'node-internal:validators';
import { isNodeStream } from 'node-internal:streams_util';
import { eos } from 'node-internal:streams_end_of_stream';
import {
AbortError,
ERR_INVALID_ARG_TYPE,
} from 'node-internal:internal_errors';
import { addAbortListener } from 'node-internal:events';

import {
isNodeStream,
isWebStream,
kControllerErrorFunction,
} from 'node-internal:streams_util';

import { eos } from 'node-internal:streams_end_of_stream';
import type { Readable } from 'node-internal:streams_readable';
import type { Writable } from 'node-internal:streams_writable';
import type { Transform } from 'node-internal:streams_transform';
import { addAbortListener } from 'node-internal:events';

// This method is inlined here for readable-stream
// It also does not allow for signal to not exist on the stream
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
function validateAbortSignal(
signal: unknown,
name: string
): asserts signal is AbortSignal {
if (signal == null || typeof signal !== 'object' || !('aborted' in signal)) {
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
}
}

type NodeStream = Readable | Writable | Transform;
type StreamType =
| Readable
| Writable
| Transform
| ReadableStream
| WritableStream;

export function addAbortSignal<T extends { destroy: (err: Error) => void }>(
export function addAbortSignal<T extends StreamType>(
signal: unknown,
stream: T
): T {
validateAbortSignal(signal, 'signal');
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
}
const onAbort = (): void => {
stream.destroy(
new AbortError(undefined, {
cause: signal.reason,
})
if (!isNodeStream(stream) && !isWebStream(stream)) {
throw new ERR_INVALID_ARG_TYPE(
'stream',
['ReadableStream', 'WritableStream', 'Stream'],
stream
);
};
}
return addAbortSignalNoValidate(signal, stream);
}

export function addAbortSignalNoValidate<T extends StreamType>(
signal: AbortSignal | null | undefined,
stream: T
): T {
if (signal == null || typeof signal !== 'object' || !('aborted' in signal)) {
return stream;
}
const onAbort = isNodeStream(stream)
? (): void => {
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
}
: (): void => {
(
stream as ReadableStream & {
[kControllerErrorFunction]: (err: Error) => void;
}
)[kControllerErrorFunction](
new AbortError(undefined, { cause: signal.reason })
);
};
if (signal.aborted) {
onAbort();
} else {
const disposable = addAbortListener(signal, onAbort);
eos(stream as NodeStream, disposable[Symbol.dispose]);
eos(
stream as Readable | Writable | Transform,
disposable[Symbol.dispose] as () => void
);
}
return stream;
}
Loading