Skip to content

Commit 1702987

Browse files
feat(NODE-4808)!: remove support for stream() transform on cursors and change streams (#4728)
1 parent 9e9059a commit 1702987

File tree

6 files changed

+10
-84
lines changed

6 files changed

+10
-84
lines changed

src/change_stream.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { Readable } from 'stream';
33
import type { Binary, Document, Timestamp } from './bson';
44
import { Collection } from './collection';
55
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
6-
import { type CursorStreamOptions, CursorTimeoutContext } from './cursor/abstract_cursor';
6+
import { CursorTimeoutContext } from './cursor/abstract_cursor';
77
import { ChangeStreamCursor, type ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
88
import { Db } from './db';
99
import {
@@ -594,7 +594,6 @@ export class ChangeStream<
594594
type: symbol;
595595
/** @internal */
596596
private cursor: ChangeStreamCursor<TSchema, TChange>;
597-
streamOptions?: CursorStreamOptions;
598597
/** @internal */
599598
private cursorStream?: Readable & AsyncIterable<TChange>;
600599
/** @internal */
@@ -862,13 +861,12 @@ export class ChangeStream<
862861
*
863862
* @throws MongoChangeStreamError if the underlying cursor or the change stream is closed
864863
*/
865-
stream(options?: CursorStreamOptions): Readable & AsyncIterable<TChange> {
864+
stream(): Readable & AsyncIterable<TChange> {
866865
if (this.closed) {
867866
throw new MongoChangeStreamError(CHANGESTREAM_CLOSED_ERROR);
868867
}
869868

870-
this.streamOptions = options;
871-
return this.cursor.stream(options);
869+
return this.cursor.stream();
872870
}
873871

874872
/** @internal */

src/cursor/abstract_cursor.ts

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Readable, Transform } from 'stream';
1+
import { Readable } from 'stream';
22

33
import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
44
import { type OnDemandDocumentDeserializeOptions } from '../cmap/wire_protocol/on_demand/document';
@@ -58,12 +58,6 @@ export const CURSOR_FLAGS = [
5858
'partial'
5959
] as const;
6060

61-
/** @public */
62-
export interface CursorStreamOptions {
63-
/** A transformation method applied to each document emitted by the stream */
64-
transform?(this: void, doc: Document): Document;
65-
}
66-
6761
/** @public */
6862
export type CursorFlag = (typeof CURSOR_FLAGS)[number];
6963

@@ -519,7 +513,7 @@ export abstract class AbstractCursor<
519513
}
520514
}
521515

522-
stream(options?: CursorStreamOptions): Readable & AsyncIterable<TSchema> {
516+
stream(): Readable & AsyncIterable<TSchema> {
523517
const readable = new ReadableCursorStream(this);
524518
const abortListener = addAbortListener(this.signal, function () {
525519
readable.destroy(this.reason);
@@ -528,31 +522,6 @@ export abstract class AbstractCursor<
528522
abortListener?.[kDispose]();
529523
});
530524

531-
if (options?.transform) {
532-
const transform = options.transform;
533-
534-
const transformedStream = readable.pipe(
535-
new Transform({
536-
objectMode: true,
537-
highWaterMark: 1,
538-
transform(chunk, _, callback) {
539-
try {
540-
const transformed = transform(chunk);
541-
callback(undefined, transformed);
542-
} catch (err) {
543-
callback(err);
544-
}
545-
}
546-
})
547-
);
548-
549-
// Bubble errors to transformed stream, because otherwise no way
550-
// to handle this error.
551-
readable.on('error', err => transformedStream.emit('error', err));
552-
553-
return transformedStream;
554-
}
555-
556525
return readable;
557526
}
558527

src/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,8 +365,7 @@ export type {
365365
export type {
366366
AbstractCursorEvents,
367367
AbstractCursorOptions,
368-
CursorFlag,
369-
CursorStreamOptions
368+
CursorFlag
370369
} from './cursor/abstract_cursor';
371370
export type {
372371
CursorTimeoutContext,

test/integration/change-streams/change_stream.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,8 @@ describe('Change Streams', function () {
786786

787787
const transform = doc => ({ doc: JSON.stringify(doc) });
788788
changeStream
789-
.stream({ transform })
789+
.stream()
790+
.map(transform)
790791
.on('error', () => null)
791792
.pipe(outStream)
792793
.on('error', () => null);

test/integration/crud/misc_cursors.test.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,9 +1823,7 @@ describe('Cursor', function () {
18231823

18241824
const filename = path.join(os.tmpdir(), '_nodemongodbnative_stream_out.txt');
18251825
const out = fs.createWriteStream(filename);
1826-
const stream = collection.find().stream({
1827-
transform: doc => JSON.stringify(doc)
1828-
});
1826+
const stream = collection.find().stream().map(JSON.stringify);
18291827

18301828
stream.pipe(out);
18311829
// Wait for output stream to close
@@ -3746,14 +3744,13 @@ describe('Cursor', function () {
37463744
{ _id: 2, a: { b: 1, c: 0 } }
37473745
];
37483746
const resultSet = new Set();
3749-
const transformParam = transformFunc != null ? { transform: transformFunc } : null;
37503747
Promise.resolve()
37513748
.then(() => db.createCollection(collectionName))
37523749
.then(() => (collection = db.collection(collectionName)))
37533750
.then(() => collection.insertMany(docs))
37543751
.then(() => {
37553752
cursor = collection.find();
3756-
return cursor.stream(transformParam);
3753+
return cursor.stream().map(transformFunc ?? (doc => doc));
37573754
})
37583755
.then(stream => {
37593756
stream.on('data', function (doc) {

test/integration/node-specific/abstract_cursor.test.ts

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { expect } from 'chai';
22
import { once } from 'events';
33
import * as sinon from 'sinon';
4-
import { Transform } from 'stream';
54
import { inspect } from 'util';
65

76
import {
@@ -16,7 +15,6 @@ import {
1615
type MongoClient,
1716
MongoCursorExhaustedError,
1817
MongoOperationTimeoutError,
19-
MongoServerError,
2018
TimeoutContext
2119
} from '../../mongodb';
2220
import { clearFailPoint, configureFailPoint } from '../../tools/utils';
@@ -317,42 +315,6 @@ describe('class AbstractCursor', function () {
317315
});
318316
});
319317

320-
describe('transform stream error handling', function () {
321-
let client: MongoClient;
322-
let collection: Collection;
323-
const docs = [{ count: 0 }];
324-
325-
beforeEach(async function () {
326-
client = this.configuration.newClient();
327-
328-
collection = client.db('abstract_cursor_integration').collection('test');
329-
330-
await collection.insertMany(docs);
331-
});
332-
333-
afterEach(async function () {
334-
await collection.deleteMany({});
335-
await client.close();
336-
});
337-
338-
it('propagates errors to transform stream', async function () {
339-
const transform = new Transform({
340-
transform(data, encoding, callback) {
341-
callback(null, data);
342-
}
343-
});
344-
345-
// MongoServerError: unknown operator: $bar
346-
const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform });
347-
348-
const error: Error | null = await new Promise(resolve => {
349-
stream.on('error', error => resolve(error));
350-
stream.on('end', () => resolve(null));
351-
});
352-
expect(error).to.be.instanceof(MongoServerError);
353-
});
354-
});
355-
356318
describe('cursor end state', function () {
357319
let client: MongoClient;
358320
let cursor: FindCursor;

0 commit comments

Comments
 (0)