Skip to content

Commit 5761703

Browse files
fix(NODE-4845): allocate sessions lazily in cursors (#4575)
1 parent 52ed3d1 commit 5761703

File tree

6 files changed

+143
-82
lines changed

6 files changed

+143
-82
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 61 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { ReadConcern, type ReadConcernLike } from '../read_concern';
2020
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2121
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
2222
import type { Server } from '../sdam/server';
23-
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
23+
import { type ClientSession, maybeClearPinnedConnection } from '../sessions';
2424
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
2525
import {
2626
addAbortListener,
@@ -227,7 +227,7 @@ export abstract class AbstractCursor<
227227
/** @internal */
228228
private cursorId: Long | null;
229229
/** @internal */
230-
private cursorSession: ClientSession;
230+
private cursorSession: ClientSession | null;
231231
/** @internal */
232232
private selectedServer?: Server;
233233
/** @internal */
@@ -352,11 +352,7 @@ export abstract class AbstractCursor<
352352
this.cursorOptions.maxAwaitTimeMS = options.maxAwaitTimeMS;
353353
}
354354

355-
if (options.session instanceof ClientSession) {
356-
this.cursorSession = options.session;
357-
} else {
358-
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
359-
}
355+
this.cursorSession = options.session ?? null;
360356

361357
this.deserializationOptions = {
362358
...this.cursorOptions,
@@ -413,7 +409,7 @@ export abstract class AbstractCursor<
413409
}
414410

415411
/** @internal */
416-
get session(): ClientSession {
412+
get session(): ClientSession | null {
417413
return this.cursorSession;
418414
}
419415

@@ -877,11 +873,12 @@ export abstract class AbstractCursor<
877873
this.trackCursor();
878874

879875
// We only want to end this session if we created it, and it hasn't ended yet
880-
if (this.cursorSession.explicit === false) {
876+
if (this.cursorSession?.explicit === false) {
881877
if (!this.cursorSession.hasEnded) {
882878
this.cursorSession.endSession().then(undefined, squashError);
883879
}
884-
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
880+
881+
this.cursorSession = null;
885882
}
886883
}
887884

@@ -907,6 +904,13 @@ export abstract class AbstractCursor<
907904
'Unexpected null selectedServer. A cursor creating command should have set this'
908905
);
909906
}
907+
908+
if (this.cursorSession == null) {
909+
throw new MongoRuntimeError(
910+
'Unexpected null session. A cursor creating command should have set this'
911+
);
912+
}
913+
910914
const getMoreOptions = {
911915
...this.cursorOptions,
912916
session: this.cursorSession,
@@ -941,6 +945,7 @@ export abstract class AbstractCursor<
941945
);
942946
}
943947
try {
948+
this.cursorSession ??= this.cursorClient.startSession({ owner: this, explicit: false });
944949
const state = await this._initialize(this.cursorSession);
945950
// Set omitMaxTimeMS to the value needed for subsequent getMore calls
946951
this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null;
@@ -1032,41 +1037,57 @@ export abstract class AbstractCursor<
10321037
return this.timeoutContext?.refreshed();
10331038
}
10341039
};
1035-
try {
1036-
if (
1037-
!this.isKilled &&
1038-
this.cursorId &&
1039-
!this.cursorId.isZero() &&
1040-
this.cursorNamespace &&
1041-
this.selectedServer &&
1042-
!this.cursorSession.hasEnded
1043-
) {
1044-
this.isKilled = true;
1045-
const cursorId = this.cursorId;
1046-
this.cursorId = Long.ZERO;
1047-
1048-
await executeOperation(
1049-
this.cursorClient,
1050-
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
1051-
session: this.cursorSession
1052-
}),
1053-
timeoutContextForKillCursors()
1054-
);
1040+
1041+
const withEmitClose = async (fn: () => Promise<void>) => {
1042+
try {
1043+
await fn();
1044+
} finally {
1045+
this.emitClose();
10551046
}
1056-
} catch (error) {
1057-
squashError(error);
1058-
} finally {
1047+
};
1048+
1049+
const close = async () => {
1050+
// if no session has been defined on the cursor, the cursor was never initialized
1051+
// or the cursor was re-wound and never re-iterated. In either case, we
1052+
// 1. do not need to end the session (there is no session after all)
1053+
// 2. do not need to kill the cursor server-side
1054+
const session = this.cursorSession;
1055+
if (!session) return;
1056+
10591057
try {
1060-
if (this.cursorSession?.owner === this) {
1061-
await this.cursorSession.endSession({ error });
1062-
}
1063-
if (!this.cursorSession?.inTransaction()) {
1064-
maybeClearPinnedConnection(this.cursorSession, { error });
1058+
if (
1059+
!this.isKilled &&
1060+
this.cursorId &&
1061+
!this.cursorId.isZero() &&
1062+
this.cursorNamespace &&
1063+
this.selectedServer &&
1064+
!session.hasEnded
1065+
) {
1066+
this.isKilled = true;
1067+
const cursorId = this.cursorId;
1068+
this.cursorId = Long.ZERO;
1069+
1070+
await executeOperation(
1071+
this.cursorClient,
1072+
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
1073+
session
1074+
}),
1075+
timeoutContextForKillCursors()
1076+
);
10651077
}
1078+
} catch (error) {
1079+
squashError(error);
10661080
} finally {
1067-
this.emitClose();
1081+
if (session.owner === this) {
1082+
await session.endSession({ error });
1083+
}
1084+
if (!session.inTransaction()) {
1085+
maybeClearPinnedConnection(session, { error });
1086+
}
10681087
}
1069-
}
1088+
};
1089+
1090+
await withEmitClose(close);
10701091
}
10711092

10721093
/** @internal */

src/cursor/run_command_cursor.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { BSONSerializeOptions, Document } from '../bson';
22
import { CursorResponse } from '../cmap/wire_protocol/responses';
33
import type { Db } from '../db';
4-
import { MongoAPIError } from '../error';
4+
import { MongoAPIError, MongoRuntimeError } from '../error';
55
import { executeOperation } from '../operations/execute_operation';
66
import { GetMoreOperation } from '../operations/get_more';
77
import { RunCommandOperation } from '../operations/run_command';
@@ -161,6 +161,12 @@ export class RunCommandCursor extends AbstractCursor {
161161

162162
/** @internal */
163163
override async getMore(_batchSize: number): Promise<CursorResponse> {
164+
if (!this.session) {
165+
throw new MongoRuntimeError(
166+
'Unexpected null session. A cursor creating command should have set this'
167+
);
168+
}
169+
164170
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
165171
const getMoreOperation = new GetMoreOperation(this.namespace, this.id!, this.server!, {
166172
...this.cursorOptions,

test/integration/crud/misc_cursors.test.js

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,22 +1676,24 @@ describe('Cursor', function () {
16761676
const collection = await client.db().collection('test');
16771677

16781678
const cursor = collection.find({});
1679+
await cursor.next();
1680+
16791681
const clonedCursor = cursor.clone();
16801682

1681-
expect(cursor).to.have.property('session');
1682-
expect(clonedCursor).to.have.property('session');
1683-
expect(cursor.session).to.not.equal(clonedCursor.session);
1683+
expect(cursor).to.have.property('session').not.to.be.null;
1684+
expect(clonedCursor).to.have.property('session').to.be.null;
16841685
});
16851686

16861687
it('removes session when cloning an aggregation cursor', async function () {
16871688
const collection = await client.db().collection('test');
16881689

16891690
const cursor = collection.aggregate([{ $match: {} }]);
1691+
await cursor.next();
1692+
16901693
const clonedCursor = cursor.clone();
16911694

1692-
expect(cursor).to.have.property('session');
1693-
expect(clonedCursor).to.have.property('session');
1694-
expect(cursor.session).to.not.equal(clonedCursor.session);
1695+
expect(cursor).to.have.property('session').not.to.be.null;
1696+
expect(clonedCursor).to.have.property('session').to.be.null;
16951697
});
16961698

16971699
it('destroying a stream stops it', async function () {
@@ -3598,42 +3600,38 @@ describe('Cursor', function () {
35983600
});
35993601

36003602
context('when executing on a find cursor', function () {
3601-
it('removes the existing session from the cloned cursor', function () {
3603+
it('removes the existing session from the cloned cursor', async function () {
36023604
const docs = [{ name: 'test1' }, { name: 'test2' }];
3603-
return collection.insertMany(docs).then(() => {
3604-
const cursor = collection.find({}, { batchSize: 1 });
3605-
return cursor
3606-
.next()
3607-
.then(doc => {
3608-
expect(doc).to.exist;
3609-
const clonedCursor = cursor.clone();
3610-
expect(clonedCursor.cursorOptions.session).to.not.exist;
3611-
expect(clonedCursor.session).to.have.property('_serverSession', null); // session is brand new and has not been used
3612-
})
3613-
.finally(() => {
3614-
return cursor.close();
3615-
});
3616-
});
3605+
await collection.insertMany(docs);
3606+
3607+
const cursor = collection.find({}, { batchSize: 1 });
3608+
try {
3609+
const doc = await cursor.next();
3610+
expect(doc).to.exist;
3611+
3612+
const clonedCursor = cursor.clone();
3613+
expect(clonedCursor.session).to.be.null;
3614+
} finally {
3615+
await cursor.close();
3616+
}
36173617
});
36183618
});
36193619

36203620
context('when executing on an aggregation cursor', function () {
3621-
it('removes the existing session from the cloned cursor', function () {
3621+
it('removes the existing session from the cloned cursor', async function () {
36223622
const docs = [{ name: 'test1' }, { name: 'test2' }];
3623-
return collection.insertMany(docs).then(() => {
3624-
const cursor = collection.aggregate([{ $match: {} }], { batchSize: 1 });
3625-
return cursor
3626-
.next()
3627-
.then(doc => {
3628-
expect(doc).to.exist;
3629-
const clonedCursor = cursor.clone();
3630-
expect(clonedCursor.cursorOptions.session).to.not.exist;
3631-
expect(clonedCursor.session).to.have.property('_serverSession', null); // session is brand new and has not been used
3632-
})
3633-
.finally(() => {
3634-
return cursor.close();
3635-
});
3636-
});
3623+
await collection.insertMany(docs);
3624+
3625+
const cursor = collection.aggregate([{ $match: {} }], { batchSize: 1 });
3626+
try {
3627+
const doc = await cursor.next();
3628+
expect(doc).to.exist;
3629+
3630+
const clonedCursor = cursor.clone();
3631+
expect(clonedCursor.session).to.be.null;
3632+
} finally {
3633+
await cursor.close();
3634+
}
36373635
});
36383636
});
36393637
});

test/integration/node-specific/abstract_cursor.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,43 @@ import { clearFailPoint, configureFailPoint } from '../../tools/utils';
2323
import { filterForCommands } from '../shared';
2424

2525
describe('class AbstractCursor', function () {
26+
describe('lazy implicit session acquisition', function () {
27+
let client: MongoClient;
28+
let collection: Collection;
29+
const docs = [{ count: 0 }, { count: 10 }];
30+
31+
beforeEach(async function () {
32+
client = this.configuration.newClient();
33+
34+
collection = client.db('abstract_cursor_integration').collection('test');
35+
36+
await collection.insertMany(docs);
37+
});
38+
39+
afterEach(async function () {
40+
await collection.deleteMany({});
41+
await client.close();
42+
});
43+
44+
it('does not allocate a session when the cursor is constructed', function () {
45+
const cursor = collection.find();
46+
expect(cursor.session).to.be.null;
47+
});
48+
49+
it('allocates a session once the cursor is initialized', async function () {
50+
const cursor = collection.find({}, { batchSize: 1 });
51+
await cursor.next();
52+
expect(cursor.session).not.to.be.null;
53+
});
54+
55+
it('sets the session to `null` when rewound', async function () {
56+
const cursor = collection.find({}, { batchSize: 1 });
57+
await cursor.next();
58+
cursor.rewind();
59+
expect(cursor.session).to.be.null;
60+
});
61+
});
62+
2663
describe('regression tests NODE-5372', function () {
2764
let client: MongoClient;
2865
let collection: Collection;

test/unit/cursor/abstract_cursor.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
AbstractCursor,
55
type AbstractCursorOptions,
66
type Callback,
7-
ClientSession,
7+
type ClientSession,
88
type ExecutionResult,
99
MongoClient,
1010
ns,
@@ -32,9 +32,9 @@ describe('class AbstractCursor', () => {
3232
});
3333

3434
context('#constructor', () => {
35-
it('creates a session if none passed in', () => {
35+
it('does not create a session if none passed in', () => {
3636
const cursor = new ConcreteCursor(client);
37-
expect(cursor).to.have.property('session').that.is.instanceOf(ClientSession);
37+
expect(cursor).to.have.property('session').that.is.null;
3838
});
3939

4040
it('uses the passed in session', async () => {

test/unit/cursor/aggregation_cursor.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ describe('class AggregationCursor', () => {
2929
});
3030

3131
context('clone()', () => {
32-
it('returns a new cursor with a different session', () => {
32+
it('returns a new cursor', () => {
3333
const cloned = cursor.clone();
3434
expect(cursor).to.not.equal(cloned);
35-
expect(cursor.session).to.not.equal(cloned.session);
3635
});
3736
});
3837

0 commit comments

Comments
 (0)