Skip to content

Commit b456b3b

Browse files
Initial POC - all tests passing but using full server descriptions for change tracking.
1 parent 76664e4 commit b456b3b

File tree

62 files changed

+2396
-167
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+2396
-167
lines changed

src/change_stream.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1073,7 +1073,8 @@ export class ChangeStream<
10731073
try {
10741074
await topology.selectServer(this.cursor.readPreference, {
10751075
operationName: 'reconnect topology in change stream',
1076-
timeoutContext: this.timeoutContext
1076+
timeoutContext: this.timeoutContext,
1077+
deprioritizedServers: []
10771078
});
10781079
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
10791080
} catch {

src/mongo_client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
789789
// to avoid the server selection timeout.
790790
const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred);
791791
const serverDescriptions = Array.from(topologyDescription.servers.values());
792-
const servers = selector(topologyDescription, serverDescriptions);
792+
const servers = selector(topologyDescription, serverDescriptions, []);
793793
if (servers.length !== 0) {
794794
const endSessions = Array.from(client.s.sessionPool.sessions, ({ id }) => id);
795795
if (endSessions.length !== 0) {

src/operations/execute_operation.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
207207
session,
208208
operationName: operation.commandName,
209209
timeoutContext,
210-
signal: operation.options.signal
210+
signal: operation.options.signal,
211+
deprioritizedServers: []
211212
});
212213

213214
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
@@ -234,7 +235,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
234235

235236
const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
236237
let previousOperationError: MongoError | undefined;
237-
let previousServer: ServerDescription | undefined;
238+
const deprioritizedServers: ServerDescription[] = [];
238239

239240
for (let tries = 0; tries < maxTries; tries++) {
240241
if (previousOperationError) {
@@ -270,7 +271,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
270271
server = await topology.selectServer(selector, {
271272
session,
272273
operationName: operation.commandName,
273-
previousServer,
274+
deprioritizedServers,
274275
signal: operation.options.signal
275276
});
276277

@@ -303,7 +304,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
303304
) {
304305
throw previousOperationError;
305306
}
306-
previousServer = server.description;
307+
deprioritizedServers.push(server.description);
307308
previousOperationError = operationError;
308309

309310
// Reset timeouts

src/sdam/server_selection.ts

Lines changed: 98 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,38 @@ export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
1515
export type ServerSelector = (
1616
topologyDescription: TopologyDescription,
1717
servers: ServerDescription[],
18-
deprioritized?: ServerDescription[]
18+
deprioritized: ServerDescription[]
1919
) => ServerDescription[];
2020

21+
function sdEquals(a: ServerDescription, b: ServerDescription) {
22+
return a.address === b.address && a.equals(b);
23+
}
24+
function filterDeprioritized(
25+
candidates: ServerDescription[],
26+
deprioritized: ServerDescription[]
27+
): ServerDescription[] {
28+
const filtered = candidates.filter(
29+
candidate => !deprioritized.some(serverDescription => sdEquals(serverDescription, candidate))
30+
);
31+
32+
return filtered.length ? filtered : candidates;
33+
}
34+
2135
/**
2236
* Returns a server selector that selects for writable servers
2337
*/
2438
export function writableServerSelector(): ServerSelector {
2539
return function writableServer(
2640
topologyDescription: TopologyDescription,
27-
servers: ServerDescription[]
41+
servers: ServerDescription[],
42+
deprioritized: ServerDescription[]
2843
): ServerDescription[] {
29-
return latencyWindowReducer(
30-
topologyDescription,
31-
servers.filter((s: ServerDescription) => s.isWritable)
44+
const eligibleServers = filterDeprioritized(
45+
servers.filter(({ isWritable }) => isWritable),
46+
deprioritized
3247
);
48+
49+
return latencyWindowReducer(topologyDescription, eligibleServers);
3350
};
3451
}
3552

@@ -39,8 +56,9 @@ export function writableServerSelector(): ServerSelector {
3956
*/
4057
export function sameServerSelector(description?: ServerDescription): ServerSelector {
4158
return function sameServerSelector(
42-
topologyDescription: TopologyDescription,
43-
servers: ServerDescription[]
59+
_topologyDescription: TopologyDescription,
60+
servers: ServerDescription[],
61+
_deprioritized: ServerDescription[]
4462
): ServerDescription[] {
4563
if (!description) return [];
4664
// Filter the servers to match the provided description only if
@@ -218,10 +236,7 @@ function latencyWindowReducer(
218236
);
219237

220238
const high = low + topologyDescription.localThresholdMS;
221-
return servers.reduce((result: ServerDescription[], server: ServerDescription) => {
222-
if (server.roundTripTime <= high && server.roundTripTime >= low) result.push(server);
223-
return result;
224-
}, []);
239+
return servers.filter(server => server.roundTripTime <= high && server.roundTripTime >= low);
225240
}
226241

227242
// filters
@@ -245,6 +260,18 @@ function loadBalancerFilter(server: ServerDescription): boolean {
245260
return server.type === ServerType.LoadBalancer;
246261
}
247262

263+
function isDeprioritizedFactory(
264+
deprioritized: ServerDescription[]
265+
): (server: ServerDescription) => boolean {
266+
return server =>
267+
// if any deprioritized servers equal the server, here we are.
268+
!deprioritized.some(deprioritizedServer => {
269+
const result = sdEquals(deprioritizedServer, server);
270+
// console.error(result);
271+
return result;
272+
});
273+
}
274+
248275
/**
249276
* Returns a function which selects servers based on a provided read preference
250277
*
@@ -258,7 +285,7 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
258285
return function readPreferenceServers(
259286
topologyDescription: TopologyDescription,
260287
servers: ServerDescription[],
261-
deprioritized: ServerDescription[] = []
288+
deprioritized: ServerDescription[]
262289
): ServerDescription[] {
263290
if (topologyDescription.type === TopologyType.LoadBalanced) {
264291
return servers.filter(loadBalancerFilter);
@@ -273,38 +300,79 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
273300
}
274301

275302
if (topologyDescription.type === TopologyType.Sharded) {
276-
const filtered = servers.filter(server => {
277-
return !deprioritized.includes(server);
278-
});
279-
const selectable = filtered.length > 0 ? filtered : deprioritized;
303+
const selectable = filterDeprioritized(servers, deprioritized);
280304
return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter));
281305
}
282306

283307
const mode = readPreference.mode;
284308
if (mode === ReadPreference.PRIMARY) {
285-
return servers.filter(primaryFilter);
309+
return filterDeprioritized(servers.filter(primaryFilter), deprioritized);
286310
}
287311

288312
if (mode === ReadPreference.PRIMARY_PREFERRED) {
289-
const result = servers.filter(primaryFilter);
290-
if (result.length) {
291-
return result;
313+
const primary = servers.filter(primaryFilter);
314+
315+
// If there is a primary and it is not deprioritized, use the primary. Otherwise,
316+
// check for secondaries.
317+
const eligiblePrimary = primary.filter(isDeprioritizedFactory(deprioritized));
318+
if (eligiblePrimary.length) {
319+
return eligiblePrimary;
292320
}
293-
}
294321

295-
const filter = mode === ReadPreference.NEAREST ? nearestFilter : secondaryFilter;
296-
const selectedServers = latencyWindowReducer(
297-
topologyDescription,
298-
tagSetReducer(
322+
const secondaries = tagSetReducer(
299323
readPreference,
300-
maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))
301-
)
324+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
325+
);
326+
const deprioritizedSecondaries = secondaries.filter(isDeprioritizedFactory(deprioritized));
327+
328+
// console.error({ deprioritizedSecondaries, secondaries, deprioritized });
329+
if (deprioritizedSecondaries.length)
330+
return latencyWindowReducer(topologyDescription, deprioritizedSecondaries);
331+
332+
// if we make it here, we have no primaries or secondaries that not deprioritized.
333+
// prefer the primary (which may not exist, if the topology has no primary).
334+
// otherwise, return the secondaries (which also may not exist, but there is nothing else to check here).
335+
return primary.length ? primary : latencyWindowReducer(topologyDescription, secondaries);
336+
}
337+
338+
// TODO: should we be applying the latency window to nearest servers?
339+
if (mode === 'nearest') {
340+
// if read preference is nearest
341+
return latencyWindowReducer(
342+
topologyDescription,
343+
filterDeprioritized(
344+
tagSetReducer(
345+
readPreference,
346+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(nearestFilter))
347+
),
348+
deprioritized
349+
)
350+
);
351+
}
352+
353+
const filter = secondaryFilter;
354+
355+
const secondaries = tagSetReducer(
356+
readPreference,
357+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))
302358
);
359+
const eligibleSecondaries = secondaries.filter(isDeprioritizedFactory(deprioritized));
360+
361+
if (eligibleSecondaries.length)
362+
return latencyWindowReducer(topologyDescription, eligibleSecondaries);
363+
364+
// we have no eligible secondaries, try for a primary.
365+
if (mode === ReadPreference.SECONDARY_PREFERRED) {
366+
const primary = servers.filter(primaryFilter);
367+
const eligiblePrimary = primary.filter(isDeprioritizedFactory(deprioritized));
368+
369+
if (eligiblePrimary.length) return eligiblePrimary;
303370

304-
if (mode === ReadPreference.SECONDARY_PREFERRED && selectedServers.length === 0) {
305-
return servers.filter(primaryFilter);
371+
// we have no eligible primary nor secondaries that have not been deprioritized
372+
return secondaries.length ? latencyWindowReducer(topologyDescription, secondaries) : primary;
306373
}
307374

308-
return selectedServers;
375+
// return all secondaries in the latency window.
376+
return latencyWindowReducer(topologyDescription, secondaries);
309377
};
310378
}

src/sdam/topology.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ export interface ServerSelectionRequest {
105105
cancelled: boolean;
106106
operationName: string;
107107
waitingLogged: boolean;
108-
previousServer?: ServerDescription;
108+
deprioritizedServers: ServerDescription[];
109109
}
110110

111111
/** @internal */
@@ -169,7 +169,7 @@ export interface SelectServerOptions {
169169
serverSelectionTimeoutMS?: number;
170170
session?: ClientSession;
171171
operationName: string;
172-
previousServer?: ServerDescription;
172+
deprioritizedServers: ServerDescription[];
173173
/**
174174
* @internal
175175
* TODO(NODE-6496): Make this required by making ChangeStream use LegacyTimeoutContext
@@ -455,7 +455,8 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
455455
const selectServerOptions = {
456456
operationName: 'handshake',
457457
...options,
458-
timeoutContext
458+
timeoutContext,
459+
deprioritizedServers: []
459460
};
460461

461462
try {
@@ -605,7 +606,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
605606
startTime: processTimeMS(),
606607
operationName: options.operationName,
607608
waitingLogged: false,
608-
previousServer: options.previousServer
609+
deprioritizedServers: options.deprioritizedServers
609610
};
610611

611612
const abortListener = addAbortListener(options.signal, function () {
@@ -957,13 +958,9 @@ function processWaitQueue(topology: Topology) {
957958
let selectedDescriptions;
958959
try {
959960
const serverSelector = waitQueueMember.serverSelector;
960-
const previousServer = waitQueueMember.previousServer;
961+
const deprioritizedServers = waitQueueMember.deprioritizedServers;
961962
selectedDescriptions = serverSelector
962-
? serverSelector(
963-
topology.description,
964-
serverDescriptions,
965-
previousServer ? [previousServer] : []
966-
)
963+
? serverSelector(topology.description, serverDescriptions, deprioritizedServers)
967964
: serverDescriptions;
968965
} catch (selectorError) {
969966
if (
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
{
2+
"topology_description": {
3+
"type": "ReplicaSetNoPrimary",
4+
"servers": [
5+
{
6+
"address": "b:27017",
7+
"avg_rtt_ms": 5,
8+
"type": "RSSecondary",
9+
"tags": {
10+
"data_center": "nyc"
11+
}
12+
},
13+
{
14+
"address": "c:27017",
15+
"avg_rtt_ms": 100,
16+
"type": "RSSecondary",
17+
"tags": {
18+
"data_center": "nyc"
19+
}
20+
}
21+
]
22+
},
23+
"operation": "read",
24+
"read_preference": {
25+
"mode": "Nearest",
26+
"tag_sets": [
27+
{
28+
"data_center": "nyc"
29+
}
30+
]
31+
},
32+
"deprioritized_servers": [
33+
{
34+
"address": "b:27017",
35+
"avg_rtt_ms": 5,
36+
"type": "RSSecondary",
37+
"tags": {
38+
"data_center": "nyc"
39+
}
40+
}
41+
],
42+
"suitable_servers": [
43+
{
44+
"address": "c:27017",
45+
"avg_rtt_ms": 100,
46+
"type": "RSSecondary",
47+
"tags": {
48+
"data_center": "nyc"
49+
}
50+
}
51+
],
52+
"in_latency_window": [
53+
{
54+
"address": "c:27017",
55+
"avg_rtt_ms": 100,
56+
"type": "RSSecondary",
57+
"tags": {
58+
"data_center": "nyc"
59+
}
60+
}
61+
]
62+
}
63+
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
topology_description:
2+
type: ReplicaSetNoPrimary
3+
servers:
4+
- &1
5+
address: b:27017
6+
avg_rtt_ms: 5
7+
type: RSSecondary
8+
tags:
9+
data_center: nyc
10+
- &2
11+
address: c:27017
12+
avg_rtt_ms: 100
13+
type: RSSecondary
14+
tags:
15+
data_center: nyc
16+
operation: read
17+
read_preference:
18+
mode: Nearest
19+
tag_sets:
20+
- data_center: nyc
21+
deprioritized_servers:
22+
- *1
23+
suitable_servers:
24+
- *2
25+
in_latency_window:
26+
- *2

0 commit comments

Comments
 (0)