-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Description
Detail Bug Report
Summary
- Context:
BatchTransformis aTransformStreamthat batchesAppendRecordobjects based on time (linger), record count, and byte size before sending them to the S2 service. - Bug: The
lingerTimer(asetTimeoutcall) is not cleared when the stream is cancelled or errors, and its callback does not handle errors when enqueuing to a closed or errored stream. Additionally, settinglingerDurationMillis: 0prevents the timer from ever starting, causing records to be stalled indefinitely. - Actual vs. expected: When a stream is cancelled (e.g., due to a client disconnect) or enters an error state, the linger timer remains active; when it fires and attempts to enqueue a batch, it throws a
TypeErrorwhich, being in an asynchronous callback, causes an unhandled exception and crashes the process. Furthermore, a linger duration of 0 should result in an immediate (or next-tick) flush, but currently it disables the timer check. - Impact: Unhandled exceptions in asynchronous callbacks are critical as they can crash the entire Node.js/Bun process, leading to service unavailability. Stalled records due to the linger 0 bug lead to data being held in memory indefinitely until the stream is closed.
Code with bug
00064| super({
00065| start: (c) => {
00066| controller = c; // <-- BUG 🔴 Dead code; should set this.controller
00067| },
00068| transform: (chunk, c) => {
00069| // Store controller reference on first transform
00070| if (!this.controller) {
00071| this.controller = c;
00072| }
00073| this.handleRecord(chunk);
00074| },
00075| flush: () => {
00076| this.flush();
00077| },
// Missing cancel hook to stop the linger timer! // <-- BUG 🔴
00078| });
...
00131| if (this.currentBatch.length === 0 && this.lingerDuration > 0) { // <-- BUG 🔴 Should be >= 0
00132| this.startLingerTimer();
00133| }
...
00192| this.lingerTimer = setTimeout(() => {
00193| this.lingerTimer = null;
00194| if (this.currentBatch.length > 0) {
00195| this.flush(); // <-- BUG 🔴 Can throw and crash if stream is closed/errored
00196| }
00197| }, this.lingerDuration);Failing test
it("crashes when linger timer fires after reader cancel", async () => {
const batcher = new BatchTransform({ lingerDurationMillis: 50 });
const writer = batcher.writable.getWriter();
const reader = batcher.readable.getReader();
writer.write(AppendRecord.string({ body: "a" })); // Starts timer
await reader.cancel(); // Close reader
await new Promise((resolve) => setTimeout(resolve, 100));
// Result: Vitest catches an unhandled rejection and process exits with code 1.
});Recommended fix
- Implement the
cancelhook in theTransformStreamtransformer to callthis.cancelLingerTimer(). - Wrap the
this.controller.enqueue()call in atry...catchblock or check for stream readiness to prevent asynchronous crashes. - Update the linger check in
handleRecordtothis.lingerDuration >= 0to allowsetTimeout(..., 0)which correctly batches synchronous writes and flushes in the next tick. - Properly initialize
this.controllerin thestarthook instead of thetransformhook.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels