Skip to content

Commit 2170d90

Browse files
test(NODE-7180): Migrate test/integration/change-streams tests (#4684)
1 parent 15cebd7 commit 2170d90

File tree

2 files changed

+77
-111
lines changed

2 files changed

+77
-111
lines changed

test/integration/change-streams/change_stream.test.ts

Lines changed: 61 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { strict as assert } from 'assert';
2-
import { UUID } from 'bson';
2+
import { Long, UUID } from 'bson';
33
import { expect } from 'chai';
44
import { on, once } from 'events';
55
import { gte, lt } from 'semver';
@@ -11,19 +11,16 @@ import {
1111
type ChangeStream,
1212
type ChangeStreamDocument,
1313
type ChangeStreamOptions,
14-
type Collection,
15-
type CommandStartedEvent,
16-
type Db,
17-
isHello,
18-
LEGACY_HELLO_COMMAND,
19-
Long,
20-
MongoAPIError,
21-
MongoChangeStreamError,
22-
type MongoClient,
23-
MongoServerError,
24-
ReadPreference,
2514
type ResumeToken
26-
} from '../../mongodb';
15+
} from '../../../src/change_stream';
16+
import { type CommandStartedEvent } from '../../../src/cmap/command_monitoring_events';
17+
import { type Collection } from '../../../src/collection';
18+
import { LEGACY_HELLO_COMMAND } from '../../../src/constants';
19+
import { type Db } from '../../../src/db';
20+
import { MongoAPIError, MongoChangeStreamError, MongoServerError } from '../../../src/error';
21+
import { type MongoClient } from '../../../src/mongo_client';
22+
import { ReadPreference } from '../../../src/read_preference';
23+
import { isHello } from '../../../src/utils';
2724
import * as mock from '../../tools/mongodb-mock/index';
2825
import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder';
2926
import { type FailCommandFailPoint, sleep } from '../../tools/utils';
@@ -323,30 +320,25 @@ describe('Change Streams', function () {
323320
it('should properly close ChangeStream cursor', {
324321
metadata: { requires: { topology: 'replicaset' } },
325322

326-
test: function (done) {
323+
test: async function () {
327324
const configuration = this.configuration;
328325
const client = configuration.newClient();
329326

330-
client.connect((err, client) => {
331-
expect(err).to.not.exist;
332-
this.defer(() => client.close());
327+
await client.connect();
328+
const database = client.db('integration_tests');
329+
const changeStream = database.collection('changeStreamCloseTest').watch(pipeline);
333330

334-
const database = client.db('integration_tests');
335-
const changeStream = database.collection('changeStreamCloseTest').watch(pipeline);
336-
this.defer(() => changeStream.close());
331+
assert.equal(changeStream.closed, false);
332+
assert.equal(changeStream.cursor.closed, false);
337333

338-
assert.equal(changeStream.closed, false);
339-
assert.equal(changeStream.cursor.closed, false);
334+
await changeStream.close();
340335

341-
changeStream.close(err => {
342-
expect(err).to.not.exist;
336+
// Check the cursor is closed
337+
expect(changeStream.closed).to.be.true;
338+
expect(changeStream.cursor).property('closed', true);
343339

344-
// Check the cursor is closed
345-
expect(changeStream.closed).to.be.true;
346-
expect(changeStream.cursor).property('closed', true);
347-
done();
348-
});
349-
});
340+
await changeStream.close();
341+
await client.close();
350342
}
351343
});
352344

@@ -355,32 +347,28 @@ describe('Change Streams', function () {
355347
{
356348
metadata: { requires: { topology: 'replicaset' } },
357349

358-
test: function (done) {
350+
test: async function () {
359351
const configuration = this.configuration;
360352
const client = configuration.newClient();
361353

362-
client.connect((err, client) => {
363-
expect(err).to.not.exist;
364-
this.defer(() => client.close());
354+
await client.connect();
365355

366-
const forbiddenStage = {};
367-
const forbiddenStageName = '$alksdjfhlaskdfjh';
368-
forbiddenStage[forbiddenStageName] = 2;
356+
const forbiddenStage = {};
357+
const forbiddenStageName = '$alksdjfhlaskdfjh';
358+
forbiddenStage[forbiddenStageName] = 2;
369359

370-
const database = client.db('integration_tests');
371-
const changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]);
372-
this.defer(() => changeStream.close());
360+
const database = client.db('integration_tests');
361+
const changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]);
373362

374-
changeStream.next(err => {
375-
assert.ok(err);
376-
assert.ok(err.message);
377-
assert.ok(
378-
err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1
379-
);
363+
const err = await changeStream.next().catch(e => e);
364+
assert.ok(err);
365+
assert.ok(err.message);
366+
assert.ok(
367+
err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1
368+
);
380369

381-
done();
382-
});
383-
});
370+
await changeStream.close();
371+
await client.close();
384372
}
385373
}
386374
);
@@ -459,37 +447,25 @@ describe('Change Streams', function () {
459447

460448
it('should error if resume token projected out of change stream document using iterator', {
461449
metadata: { requires: { topology: 'replicaset' } },
462-
test(done) {
450+
async test() {
463451
const configuration = this.configuration;
464452
const client = configuration.newClient();
465453

466-
client.connect((err, client) => {
467-
expect(err).to.not.exist;
454+
await client.connect();
468455

469-
const database = client.db('integration_tests');
470-
const collection = database.collection('resumetokenProjectedOutCallback');
471-
const changeStream = collection.watch([{ $project: { _id: false } }]);
456+
const database = client.db('integration_tests');
457+
const collection = database.collection('resumetokenProjectedOutCallback');
458+
const changeStream = collection.watch([{ $project: { _id: false } }]);
472459

473-
changeStream.hasNext(() => {
474-
// trigger initialize
475-
});
460+
await initIteratorMode(changeStream);
476461

477-
changeStream.cursor.on('init', () => {
478-
collection.insertOne({ b: 2 }, (err, res) => {
479-
expect(err).to.be.undefined;
480-
expect(res).to.exist;
481-
482-
changeStream.next(err => {
483-
expect(err).to.exist;
484-
changeStream.close(() => {
485-
client.close(() => {
486-
done();
487-
});
488-
});
489-
});
490-
});
491-
});
492-
});
462+
const res = await collection.insertOne({ b: 2 });
463+
expect(res).to.exist;
464+
465+
const err = await changeStream.next().catch(e => e);
466+
expect(err).to.exist;
467+
await changeStream.close();
468+
await client.close();
493469
}
494470
});
495471

@@ -1291,7 +1267,7 @@ describe('Change Streams', function () {
12911267
await mock.cleanup();
12921268
});
12931269

1294-
it('changeStream should close if cursor id for initial aggregate is Long.ZERO', function (done) {
1270+
it('changeStream should close if cursor id for initial aggregate is Long.ZERO', async function () {
12951271
mockServer.setMessageHandler(req => {
12961272
const doc = req.document;
12971273
if (isHello(doc)) {
@@ -1320,17 +1296,16 @@ describe('Change Streams', function () {
13201296
const client = this.configuration.newClient(`mongodb://${mockServer.uri()}/`, {
13211297
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
13221298
});
1323-
client.connect(err => {
1324-
expect(err).to.not.exist;
1325-
const collection = client.db('cs').collection('test');
1326-
const changeStream = collection.watch();
1327-
changeStream.next((err, doc) => {
1328-
expect(err).to.exist;
1329-
expect(doc).to.not.exist;
1330-
expect(err?.message).to.equal('ChangeStream is closed');
1331-
changeStream.close(() => client.close(done));
1332-
});
1333-
});
1299+
await client.connect();
1300+
const collection = client.db('cs').collection('test');
1301+
const changeStream = collection.watch();
1302+
1303+
const err = await changeStream.next().catch(e => e);
1304+
expect(err).to.exist;
1305+
expect(err?.message).to.equal('ChangeStream is closed');
1306+
1307+
await changeStream.close();
1308+
await client.close();
13341309
});
13351310
});
13361311
});

test/integration/change-streams/change_streams.prose.test.ts

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { expect } from 'chai';
2-
import { once } from 'events';
2+
import { on, once } from 'events';
33
import * as sinon from 'sinon';
44
import { setTimeout } from 'timers';
55

@@ -57,9 +57,7 @@ function triggerResumableError(
5757
nextStub.restore();
5858
});
5959

60-
changeStream.next(() => {
61-
// ignore
62-
});
60+
changeStream.next();
6361
}
6462

6563
if (typeof delay === 'number') {
@@ -78,12 +76,6 @@ const initIteratorMode = async (cs: ChangeStream) => {
7876
return;
7977
};
8078

81-
/** Waits for a change stream to start */
82-
async function waitForStarted(changeStream, callback) {
83-
await once(changeStream.cursor, 'init');
84-
await callback();
85-
}
86-
8779
describe('Change Stream prose tests', function () {
8880
before(async function () {
8981
return await setupDatabase(this.configuration, ['integration_tests']);
@@ -643,12 +635,10 @@ describe('Change Stream prose tests', function () {
643635
// when resuming a change stream.
644636
it('$changeStream with results must include resumeAfter and not startAfter', {
645637
metadata: { requires: { topology: 'replicaset' } },
646-
test: function (done) {
638+
test: async function () {
647639
let events = [];
648640
client.on('commandStarted', e => recordEvent(events, e));
649641
const changeStream = coll.watch([], { startAfter });
650-
changeStream.on('error', done);
651-
this.defer(() => changeStream.close());
652642

653643
changeStream.on('change', change => {
654644
events.push({ change: { insert: { x: change.fullDocument.x } } });
@@ -658,21 +648,22 @@ describe('Change Stream prose tests', function () {
658648
events = [];
659649
triggerResumableError(changeStream, () => events.push('error'));
660650
break;
661-
case 3:
662-
expect(events).to.be.an('array').with.lengthOf(3);
663-
expect(events[0]).to.equal('error');
664-
expect(events[1]).nested.property('$changeStream.resumeAfter').to.exist;
665-
expect(events[2]).to.eql({ change: { insert: { x: 3 } } });
666-
done();
667-
break;
668651
}
669652
});
670653

671-
waitForStarted(changeStream, () =>
672-
coll
673-
.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } })
674-
.then(() => coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } }))
675-
);
654+
await once(changeStream.cursor, 'init');
655+
const changes = on(changeStream, 'change');
656+
await coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } });
657+
await changes.next();
658+
await coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } });
659+
await changes.next();
660+
661+
expect(events).to.be.an('array').with.lengthOf(3);
662+
expect(events[0]).to.equal('error');
663+
expect(events[1]).nested.property('$changeStream.resumeAfter').to.exist;
664+
expect(events[2]).to.eql({ change: { insert: { x: 3 } } });
665+
666+
await changeStream.close();
676667
}
677668
});
678669
});

0 commit comments

Comments
 (0)