Skip to content

Commit 8dd306b

Browse files
done
1 parent ae2e037 commit 8dd306b

File tree

7 files changed

+206
-70
lines changed

7 files changed

+206
-70
lines changed

src/mongo_client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
789789
// to avoid the server selection timeout.
790790
const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred);
791791
const serverDescriptions = Array.from(topologyDescription.servers.values());
792-
const servers = selector(topologyDescription, serverDescriptions);
792+
const servers = selector(topologyDescription, serverDescriptions, []);
793793
if (servers.length !== 0) {
794794
const endSessions = Array.from(client.s.sessionPool.sessions, ({ id }) => id);
795795
if (endSessions.length !== 0) {

src/operations/execute_operation.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
207207
session,
208208
operationName: operation.commandName,
209209
timeoutContext,
210-
signal: operation.options.signal
210+
signal: operation.options.signal,
211+
deprioritizedServers: []
211212
});
212213

213214
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
@@ -234,7 +235,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
234235

235236
const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
236237
let previousOperationError: MongoError | undefined;
237-
let previousServer: ServerDescription | undefined;
238+
const deprioritizedServers: ServerDescription[] = [];
238239

239240
for (let tries = 0; tries < maxTries; tries++) {
240241
if (previousOperationError) {
@@ -270,7 +271,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
270271
server = await topology.selectServer(selector, {
271272
session,
272273
operationName: operation.commandName,
273-
previousServer,
274+
deprioritizedServers,
274275
signal: operation.options.signal
275276
});
276277

@@ -303,7 +304,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
303304
) {
304305
throw previousOperationError;
305306
}
306-
previousServer = server.description;
307+
deprioritizedServers.push(server.description);
307308
previousOperationError = operationError;
308309

309310
// Reset timeouts

src/sdam/server_selection.ts

Lines changed: 98 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,38 @@ export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
1515
export type ServerSelector = (
1616
topologyDescription: TopologyDescription,
1717
servers: ServerDescription[],
18-
deprioritized?: ServerDescription[]
18+
deprioritized: ServerDescription[]
1919
) => ServerDescription[];
2020

21+
function sdEquals(a: ServerDescription, b: ServerDescription) {
22+
return a.address === b.address && a.equals(b);
23+
}
24+
function filterDeprioritized(
25+
candidates: ServerDescription[],
26+
deprioritized: ServerDescription[]
27+
): ServerDescription[] {
28+
const filtered = candidates.filter(
29+
candidate => !deprioritized.some(serverDescription => sdEquals(serverDescription, candidate))
30+
);
31+
32+
return filtered.length ? filtered : candidates;
33+
}
34+
2135
/**
2236
* Returns a server selector that selects for writable servers
2337
*/
2438
export function writableServerSelector(): ServerSelector {
2539
return function writableServer(
2640
topologyDescription: TopologyDescription,
27-
servers: ServerDescription[]
41+
servers: ServerDescription[],
42+
deprioritized: ServerDescription[]
2843
): ServerDescription[] {
29-
return latencyWindowReducer(
30-
topologyDescription,
31-
servers.filter((s: ServerDescription) => s.isWritable)
44+
const eligibleServers = filterDeprioritized(
45+
servers.filter(({ isWritable }) => isWritable),
46+
deprioritized
3247
);
48+
49+
return latencyWindowReducer(topologyDescription, eligibleServers);
3350
};
3451
}
3552

@@ -39,8 +56,9 @@ export function writableServerSelector(): ServerSelector {
3956
*/
4057
export function sameServerSelector(description?: ServerDescription): ServerSelector {
4158
return function sameServerSelector(
42-
topologyDescription: TopologyDescription,
43-
servers: ServerDescription[]
59+
_topologyDescription: TopologyDescription,
60+
servers: ServerDescription[],
61+
_deprioritized: ServerDescription[]
4462
): ServerDescription[] {
4563
if (!description) return [];
4664
// Filter the servers to match the provided description only if
@@ -231,10 +249,7 @@ function latencyWindowReducer(
231249
);
232250

233251
const high = low + topologyDescription.localThresholdMS;
234-
return servers.reduce((result: ServerDescription[], server: ServerDescription) => {
235-
if (server.roundTripTime <= high && server.roundTripTime >= low) result.push(server);
236-
return result;
237-
}, []);
252+
return servers.filter(server => server.roundTripTime <= high && server.roundTripTime >= low);
238253
}
239254

240255
// filters
@@ -258,6 +273,18 @@ function loadBalancerFilter(server: ServerDescription): boolean {
258273
return server.type === ServerType.LoadBalancer;
259274
}
260275

276+
function isDeprioritizedFactory(
277+
deprioritized: ServerDescription[]
278+
): (server: ServerDescription) => boolean {
279+
return server =>
280+
// if any deprioritized servers equal the server, here we are.
281+
!deprioritized.some(deprioritizedServer => {
282+
const result = sdEquals(deprioritizedServer, server);
283+
// console.error(result);
284+
return result;
285+
});
286+
}
287+
261288
/**
262289
* Returns a function which selects servers based on a provided read preference
263290
*
@@ -271,7 +298,7 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
271298
return function readPreferenceServers(
272299
topologyDescription: TopologyDescription,
273300
servers: ServerDescription[],
274-
deprioritized: ServerDescription[] = []
301+
deprioritized: ServerDescription[]
275302
): ServerDescription[] {
276303
if (topologyDescription.type === TopologyType.LoadBalanced) {
277304
return servers.filter(loadBalancerFilter);
@@ -286,38 +313,79 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
286313
}
287314

288315
if (topologyDescription.type === TopologyType.Sharded) {
289-
const filtered = servers.filter(server => {
290-
return !deprioritized.includes(server);
291-
});
292-
const selectable = filtered.length > 0 ? filtered : deprioritized;
316+
const selectable = filterDeprioritized(servers, deprioritized);
293317
return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter));
294318
}
295319

296320
const mode = readPreference.mode;
297321
if (mode === ReadPreference.PRIMARY) {
298-
return servers.filter(primaryFilter);
322+
return filterDeprioritized(servers.filter(primaryFilter), deprioritized);
299323
}
300324

301325
if (mode === ReadPreference.PRIMARY_PREFERRED) {
302-
const result = servers.filter(primaryFilter);
303-
if (result.length) {
304-
return result;
326+
const primary = servers.filter(primaryFilter);
327+
328+
// If there is a primary and it is not deprioritized, use the primary. Otherwise,
329+
// check for secondaries.
330+
const eligiblePrimary = primary.filter(isDeprioritizedFactory(deprioritized));
331+
if (eligiblePrimary.length) {
332+
return eligiblePrimary;
305333
}
306-
}
307334

308-
const filter = mode === ReadPreference.NEAREST ? nearestFilter : secondaryFilter;
309-
const selectedServers = latencyWindowReducer(
310-
topologyDescription,
311-
tagSetReducer(
335+
const secondaries = tagSetReducer(
312336
readPreference,
313-
maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))
314-
)
337+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
338+
);
339+
const deprioritizedSecondaries = secondaries.filter(isDeprioritizedFactory(deprioritized));
340+
341+
// console.error({ deprioritizedSecondaries, secondaries, deprioritized });
342+
if (deprioritizedSecondaries.length)
343+
return latencyWindowReducer(topologyDescription, deprioritizedSecondaries);
344+
345+
// if we make it here, we have no primaries or secondaries that not deprioritized.
346+
// prefer the primary (which may not exist, if the topology has no primary).
347+
// otherwise, return the secondaries (which also may not exist, but there is nothing else to check here).
348+
return primary.length ? primary : latencyWindowReducer(topologyDescription, secondaries);
349+
}
350+
351+
// TODO: should we be applying the latency window to nearest servers?
352+
if (mode === 'nearest') {
353+
// if read preference is nearest
354+
return latencyWindowReducer(
355+
topologyDescription,
356+
filterDeprioritized(
357+
tagSetReducer(
358+
readPreference,
359+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(nearestFilter))
360+
),
361+
deprioritized
362+
)
363+
);
364+
}
365+
366+
const filter = secondaryFilter;
367+
368+
const secondaries = tagSetReducer(
369+
readPreference,
370+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))
315371
);
372+
const eligibleSecondaries = secondaries.filter(isDeprioritizedFactory(deprioritized));
373+
374+
if (eligibleSecondaries.length)
375+
return latencyWindowReducer(topologyDescription, eligibleSecondaries);
376+
377+
// we have no eligible secondaries, try for a primary.
378+
if (mode === ReadPreference.SECONDARY_PREFERRED) {
379+
const primary = servers.filter(primaryFilter);
380+
const eligiblePrimary = primary.filter(isDeprioritizedFactory(deprioritized));
381+
382+
if (eligiblePrimary.length) return eligiblePrimary;
316383

317-
if (mode === ReadPreference.SECONDARY_PREFERRED && selectedServers.length === 0) {
318-
return servers.filter(primaryFilter);
384+
// we have no eligible primary nor secondaries that have not been deprioritized
385+
return secondaries.length ? latencyWindowReducer(topologyDescription, secondaries) : primary;
319386
}
320387

321-
return selectedServers;
388+
// return all secondaries in the latency window.
389+
return latencyWindowReducer(topologyDescription, secondaries);
322390
};
323391
}

src/sdam/topology.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ export interface ServerSelectionRequest {
105105
cancelled: boolean;
106106
operationName: string;
107107
waitingLogged: boolean;
108-
previousServer?: ServerDescription;
108+
deprioritizedServers: ServerDescription[];
109109
}
110110

111111
/** @internal */
@@ -169,7 +169,7 @@ export interface SelectServerOptions {
169169
serverSelectionTimeoutMS?: number;
170170
session?: ClientSession;
171171
operationName: string;
172-
previousServer?: ServerDescription;
172+
deprioritizedServers: ServerDescription[];
173173
/**
174174
* @internal
175175
* TODO(NODE-6496): Make this required by making ChangeStream use LegacyTimeoutContext
@@ -455,7 +455,8 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
455455
const selectServerOptions = {
456456
operationName: 'handshake',
457457
...options,
458-
timeoutContext
458+
timeoutContext,
459+
deprioritizedServers: []
459460
};
460461

461462
try {
@@ -605,7 +606,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
605606
startTime: processTimeMS(),
606607
operationName: options.operationName,
607608
waitingLogged: false,
608-
previousServer: options.previousServer
609+
deprioritizedServers: options.deprioritizedServers
609610
};
610611

611612
const abortListener = addAbortListener(options.signal, function () {
@@ -957,13 +958,9 @@ function processWaitQueue(topology: Topology) {
957958
let selectedDescriptions;
958959
try {
959960
const serverSelector = waitQueueMember.serverSelector;
960-
const previousServer = waitQueueMember.previousServer;
961+
const deprioritizedServers = waitQueueMember.deprioritizedServers;
961962
selectedDescriptions = serverSelector
962-
? serverSelector(
963-
topology.description,
964-
serverDescriptions,
965-
previousServer ? [previousServer] : []
966-
)
963+
? serverSelector(topology.description, serverDescriptions, deprioritizedServers)
967964
: serverDescriptions;
968965
} catch (selectorError) {
969966
if (

test/unit/assorted/server_selection.spec.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
runServerSelectionLogicTest
1212
} from './server_selection_logic_spec_utils';
1313

14-
describe('Server Selection Logic (spec)', function () {
14+
describe.only('Server Selection Logic (spec)', function () {
1515
beforeEach(function () {
1616
if (this.currentTest.title.match(/Possible/)) {
1717
this.currentTest.skipReason = 'Nodejs driver does not support PossiblePrimary';

test/unit/assorted/server_selection_logic_spec_utils.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { type ServerType, type TopologyType } from '../../../src/sdam/common';
1212
import { type ServerDescription, type TagSet } from '../../../src/sdam/server_description';
1313
import {
1414
readPreferenceServerSelector,
15+
type ServerSelector,
1516
writableServerSelector
1617
} from '../../../src/sdam/server_selection';
1718
import { TopologyDescription } from '../../../src/sdam/topology_description';
@@ -41,6 +42,7 @@ interface ServerSelectionLogicTest {
4142
*/
4243
suitable_servers: never;
4344
in_latency_window: ServerSelectionLogicTestServer[];
45+
deprioritized_servers?: ServerSelectionLogicTestServer[];
4446
}
4547

4648
function readPreferenceFromDefinition(definition) {
@@ -96,8 +98,11 @@ export function runServerSelectionLogicTest(testDefinition: ServerSelectionLogic
9698
const expectedServers = serverDescriptionsToMap(
9799
testDefinition.in_latency_window.map(s => serverDescriptionFromDefinition(s))
98100
);
101+
const deprioritized =
102+
testDefinition.deprioritized_servers?.map(s => serverDescriptionFromDefinition(s, allHosts)) ??
103+
[];
99104

100-
let selector;
105+
let selector: ServerSelector;
101106
if (testDefinition.operation === 'write') {
102107
selector = writableServerSelector();
103108
} else if (testDefinition.operation === 'read' || testDefinition.read_preference) {
@@ -107,10 +112,11 @@ export function runServerSelectionLogicTest(testDefinition: ServerSelectionLogic
107112
expect.fail('test operation was neither read nor write, and no read preference was provided.');
108113
}
109114

110-
const result = selector(topologyDescription, serversInTopology);
115+
const result = selector(topologyDescription, serversInTopology, deprioritized);
111116

112117
expect(result.length).to.equal(expectedServers.size);
113118

119+
// console.error({ result, expectedServers });
114120
for (const server of result) {
115121
const expectedServer = expectedServers.get(server.address);
116122
expect(expectedServer).to.exist;

0 commit comments

Comments
 (0)