diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 52a07a7e3b..9cc454791d 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,10 +1,12 @@ -import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list'; +import { DoublyLinkedNode, DoublyLinkedList, EmptyAwareSinglyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; -import { AbortError, ErrorReply, TimeoutError } from '../errors'; +import { AbortError, ErrorReply, TimeoutDuringMaintanance, TimeoutError } from '../errors'; import { MonitorCallback } from '.'; +import EventEmitter from 'events'; +import assert from 'assert'; export interface CommandOptions { chainId?: symbol; @@ -30,6 +32,7 @@ export interface CommandToWrite extends CommandWaitingForReply { timeout: { signal: AbortSignal; listener: () => unknown; + originalTimeout: number | undefined; } | undefined; } @@ -54,11 +57,56 @@ export default class RedisCommandsQueue { readonly #respVersion; readonly #maxLength; readonly #toWrite = new DoublyLinkedList(); - readonly #waitingForReply = new SinglyLinkedList(); + readonly #waitingForReply = new EmptyAwareSinglyLinkedList(); readonly #onShardedChannelMoved; #chainInExecution: symbol | undefined; readonly decoder; readonly #pubSub = new PubSub(); + readonly events = new EventEmitter(); + + // If this value is set, we are in a maintenance mode. + // This means any existing commands should have their timeout + // overwritten to the new timeout. And all new commands should + // have their timeout set as the new timeout. + #maintenanceCommandTimeout: number | undefined + + setMaintenanceCommandTimeout(ms: number | undefined) { + // Prevent possible api misuse + if (this.#maintenanceCommandTimeout === ms) return; + + this.#maintenanceCommandTimeout = ms; + + // Overwrite timeouts of all eligible toWrite commands + this.#toWrite.forEachNode(node => { + const command = node.value; + + // If the command didnt have a timeout, skip it + if (!command.timeout) return; + + // Remove existing timeout listener + RedisCommandsQueue.#removeTimeoutListener(command) + + //TODO see if this is needed + // // Keep a flag to know if we were in maintenance at this point in time. + // // To be used in the timeout listener, which needs to know which exact error to use. + // const wasMaintenance = !!this.#maintenanceCommandTimeout + + // Determine newTimeout + const newTimeout = this.#maintenanceCommandTimeout ?? command.timeout?.originalTimeout; + assert(newTimeout !== undefined, 'Trying to reset timeout to `undefined`') + + const signal = AbortSignal.timeout(newTimeout); + command.timeout = { + signal, + listener: () => { + this.#toWrite.remove(node); + command.reject(this.#maintenanceCommandTimeout ? new TimeoutDuringMaintanance(newTimeout) : new TimeoutError()); + }, + originalTimeout: command.timeout.originalTimeout + }; + signal.addEventListener('abort', command.timeout.listener, { once: true }); + }); + } get isPubSubActive() { return this.#pubSub.isActive; @@ -134,6 +182,21 @@ export default class RedisCommandsQueue { } break; } + case 'MOVING': { + const [_, afterMs, url] = push; + const [host, port] = url.toString().split(':'); + this.events.emit('moving', afterMs, host, Number(port)); + break; + } + case 'MIGRATING': { + console.log('GOT MIGRATING', push.map(p => p.toString())); + this.events.emit('migrating'); + break; + } + case 'MIGRATED': { + this.events.emit('migrated'); + break; + } } } }, @@ -145,6 +208,17 @@ export default class RedisCommandsQueue { this.#invalidateCallback = callback; } + async waitForInflightCommandsToComplete(): Promise { + // In-flight commands already completed + if(this.#waitingForReply.length === 0) { + return + }; + // Otherwise wait for in-flight commands to fire `empty` event + return new Promise(resolve => { + this.#waitingForReply.events.on('empty', resolve) + }); + } + addCommand( args: ReadonlyArray, options?: CommandOptions @@ -168,15 +242,25 @@ export default class RedisCommandsQueue { typeMapping: options?.typeMapping }; - const timeout = options?.timeout; + // If #commandTimeout was explicitly set, this + // means we are in maintenance mode and should + // use it instead of the timeout provided by the command + const timeout = this.#maintenanceCommandTimeout || options?.timeout if (timeout) { + + //TODO see if this is needed + // // Keep a flag to know if we were in maintenance at this point in time. + // // To be used in the timeout listener, which needs to know which exact error to use. + // const wasMaintenance = !!this.#maintenanceCommandTimeout + const signal = AbortSignal.timeout(timeout); value.timeout = { signal, listener: () => { this.#toWrite.remove(node); - value.reject(new TimeoutError()); - } + value.reject(this.#maintenanceCommandTimeout ? new TimeoutDuringMaintanance(timeout) : new TimeoutError()); + }, + originalTimeout: options?.timeout }; signal.addEventListener('abort', value.timeout.listener, { once: true }); } @@ -432,7 +516,7 @@ export default class RedisCommandsQueue { } static #removeTimeoutListener(command: CommandToWrite) { - command.timeout!.signal.removeEventListener('abort', command.timeout!.listener); + command.timeout?.signal.removeEventListener('abort', command.timeout!.listener); } static #flushToWrite(toBeSent: CommandToWrite, err: Error) { diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts new file mode 100644 index 0000000000..c2a9c46baa --- /dev/null +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -0,0 +1,78 @@ +import EventEmitter from "events"; +import { RedisClientOptions } from "."; +import RedisCommandsQueue from "./commands-queue"; +import RedisSocket from "./socket"; + +export default class EnterpriseMaintenanceManager extends EventEmitter { + #commandsQueue: RedisCommandsQueue; + #options: RedisClientOptions; + constructor(commandsQueue: RedisCommandsQueue, options: RedisClientOptions) { + super(); + this.#commandsQueue = commandsQueue; + this.#options = options; + + this.#commandsQueue.events.on("moving", this.#onMoving); + this.#commandsQueue.events.on("migrating", this.#onMigrating); + this.#commandsQueue.events.on("migrated", this.#onMigrated); + } + + // Queue: + // toWrite [ C D E ] + // waitingForReply [ A B ] - aka In-flight commands + // + // time: ---1-2---3-4-5-6--------------------------- + // + // 1. [EVENT] MOVING PN received + // 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete ) + // 3. [EVENT] New socket connected + // 4. [EVENT] In-flight commands completed + // 5. [ACTION] Destroy old socket + // 6. [ACTION] Resume writing -> we are going to write to the new socket from now on + #onMoving = async ( + _afterMs: number, + host: string, + port: number, + ): Promise => { + // 1 [EVENT] MOVING PN received + // 2 [ACTION] Pause writing + this.emit("pause"); + + const newSocket = new RedisSocket({ + ...this.#options.socket, + host, + port, + }); + //todo + newSocket.setMaintenanceTimeout(); + await newSocket.connect(); + // 3 [EVENT] New socket connected + + await this.#commandsQueue.waitForInflightCommandsToComplete(); + // 4 [EVENT] In-flight commands completed + + // 5 + 6 + this.emit("resume", newSocket); + }; + + #onMigrating = async () => { + this.#commandsQueue.setMaintenanceCommandTimeout(this.#getCommandTimeout()); + this.emit("maintenance", this.#getSocketTimeout()); + }; + + #onMigrated = async () => { + this.#commandsQueue.setMaintenanceCommandTimeout(undefined); + this.emit("maintenance", undefined); + } + + #getSocketTimeout(): number | undefined { + return this.#options.gracefulMaintenance?.handleTimeouts === "error" + ? this.#options.socket?.socketTimeout + : this.#options.gracefulMaintenance?.handleTimeouts; + } + + #getCommandTimeout(): number | undefined { + return this.#options.gracefulMaintenance?.handleTimeouts === "error" + ? this.#options.commandOptions?.timeout + : this.#options.gracefulMaintenance?.handleTimeouts; + } +} diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 128dc59967..58e77feb1a 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -20,6 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } import { BasicCommandParser, CommandParser } from './parser'; import SingleEntryCache from '../single-entry-cache'; import { version } from '../../package.json' +import EnterpriseMaintenanceManager from './enterprise-maintenance-manager'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -144,6 +145,44 @@ export interface RedisClientOptions< * Tag to append to library name that is sent to the Redis server */ clientInfoTag?: string; + + /** + * Configuration for handling Redis Enterprise graceful maintenance scenarios. + * + * When Redis Enterprise performs maintenance operations, nodes will be replaced, resulting in disconnects. + * This configuration allows the client to handle these scenarios gracefully by automatically + * reconnecting and managing command execution during maintenance windows. + * + * @example Basic graceful maintenance configuration + * ``` + * const client = createClient({ + * gracefulMaintenance: { + * handleFailedCommands: 'retry', + * handleTimeouts: 'exception', + * } + * }); + * ``` + * + * @example Graceful maintenance with timeout smoothing + * ``` + * const client = createClient({ + * gracefulMaintenance: { + * handleFailedCommands: 'retry', + * handleTimeouts: 5000, // Extend timeouts to 5 seconds during maintenance + * } + * }); + * ``` + */ + gracefulMaintenance?: { + /** + * Designates how failed commands should be handled. A failed command is when the time isn’t sufficient to deal with the responses on the old connection before the server shuts it down + */ + handleFailedCommands: 'exception' | 'retry', + /** + * Specify whether we should throw a TimeoutDuringMaintanance exception or provide more relaxed timeout, in order to minimize command timeouts during maintenance. + */ + handleTimeouts: 'error' | number, + } } type WithCommands< @@ -366,7 +405,7 @@ export default class RedisClient< } readonly #options?: RedisClientOptions; - readonly #socket: RedisSocket; + #socket: RedisSocket; readonly #queue: RedisCommandsQueue; #selectedDB = 0; #monitorCallback?: MonitorCallback; @@ -379,11 +418,16 @@ export default class RedisClient< #watchEpoch?: number; #clientSideCache?: ClientSideCacheProvider; #credentialsSubscription: Disposable | null = null; + // Flag used to pause writing to the socket during maintenance windows. + // When true, prevents new commands from being written while waiting for: + // 1. New socket to be ready after maintenance redirect + // 2. In-flight commands on the old socket to complete + #pausedForMaintenance = false; + get clientSideCache() { return this._self.#clientSideCache; } - get options(): RedisClientOptions | undefined { return this._self.#options; } @@ -417,6 +461,15 @@ export default class RedisClient< return this._self.#dirtyWatch !== undefined } + #resumeFromMaintenance(newSocket: RedisSocket) { + this._self.#socket.removeAllListeners(); + this._self.#socket.destroy(); + this._self.#socket = newSocket; + this._self.#pausedForMaintenance = false; + this._self.#initiateSocket(); + this._self.#maybeScheduleWrite(); + } + /** * Marks the client's WATCH command as invalidated due to a topology change. * This will cause any subsequent EXEC in a transaction to fail with a WatchError. @@ -431,7 +484,14 @@ export default class RedisClient< this.#validateOptions(options) this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); - this.#socket = this.#initiateSocket(); + this.#socket = this.#createSocket(this.#options); + + if(options?.gracefulMaintenance) { + new EnterpriseMaintenanceManager(this.#queue, this.#options!) + .on('pause', () => this._self.#pausedForMaintenance = true ) + .on('resume', this.#resumeFromMaintenance.bind(this)) + .on('maintenance', (mtm: number | undefined) => this._self.#socket.setMaintenanceTimeout(mtm)) + } if (options?.clientSideCache) { if (options.clientSideCache instanceof ClientSideCacheProvider) { @@ -449,7 +509,12 @@ export default class RedisClient< throw new Error('Client Side Caching is only supported with RESP3'); } + if (options?.gracefulMaintenance && options?.RESP !== 3) { + throw new Error('Graceful Maintenance is only supported with RESP3'); + } + } + #initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { // Convert username/password to credentialsProvider if no credentialsProvider is already in place @@ -657,38 +722,10 @@ export default class RedisClient< return commands; } - #initiateSocket(): RedisSocket { - const socketInitiator = async () => { - const promises = [], - chainId = Symbol('Socket Initiator'); - - const resubscribePromise = this.#queue.resubscribe(chainId); - if (resubscribePromise) { - promises.push(resubscribePromise); - } - - if (this.#monitorCallback) { - promises.push( - this.#queue.monitor( - this.#monitorCallback, - { - typeMapping: this._commandOptions?.typeMapping, - chainId, - asap: true - } - ) - ); - } + async #initiateSocket(): Promise { + await this.#socket.waitForReady(); - promises.push(...(await this.#handshake(chainId, true))); - - if (promises.length) { - this.#write(); - return Promise.all(promises); - } - }; - - return new RedisSocket(socketInitiator, this.#options?.socket) + this.#socket .on('data', chunk => { try { this.#queue.decoder.write(chunk); @@ -706,15 +743,47 @@ export default class RedisClient< this.#queue.flushAll(err); } }) - .on('connect', () => this.emit('connect')) - .on('ready', () => { - this.emit('ready'); - this.#setPingTimer(); - this.#maybeScheduleWrite(); - }) .on('reconnecting', () => this.emit('reconnecting')) .on('drain', () => this.#maybeScheduleWrite()) .on('end', () => this.emit('end')); + + const promises = []; + const chainId = Symbol('Socket Initiator'); + + const resubscribePromise = this.#queue.resubscribe(chainId); + if (resubscribePromise) { + promises.push(resubscribePromise); + } + + if (this.#monitorCallback) { + promises.push( + this.#queue.monitor( + this.#monitorCallback, + { + typeMapping: this._commandOptions?.typeMapping, + chainId, + asap: true + } + ) + ); + } + + promises.push(...(await this.#handshake(chainId, true))); + + this.#setPingTimer(); + + if (promises.length) { + this.#write(); + await Promise.all(promises); + } + } + + #createSocket(options?: RedisClientOptions): RedisSocket { + return new RedisSocket(options?.socket) + .on('connect', () => this.emit('connect')) + .on('ready', () => { + this.emit('ready'); + }); } #pingTimer?: NodeJS.Timeout; @@ -823,6 +892,7 @@ export default class RedisClient< async connect() { await this._self.#socket.connect(); + await this._self.#initiateSocket(); return this as unknown as RedisClientType; } @@ -1055,6 +1125,9 @@ export default class RedisClient< } #write() { + if(this.#pausedForMaintenance) { + return + } this.#socket.write(this.#queue.commandsToWrite()); } diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index 29678f027b..1fc0458cfe 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -1,3 +1,5 @@ +import EventEmitter from "events"; + export interface DoublyLinkedNode { value: T; previous: DoublyLinkedNode | undefined; @@ -32,7 +34,7 @@ export class DoublyLinkedList { next: undefined, value }; - } + } return this.#tail = this.#tail.next = { previous: this.#tail, @@ -93,7 +95,7 @@ export class DoublyLinkedList { node.previous!.next = node.next; node.previous = undefined; } - + node.next = undefined; } @@ -109,6 +111,15 @@ export class DoublyLinkedList { node = node.next; } } + + forEachNode(fn: (node: DoublyLinkedNode) => void) { + let node = this.#head; + while(node) { + fn(node); + node = node.next; + } + } + } export interface SinglyLinkedNode { @@ -201,3 +212,26 @@ export class SinglyLinkedList { } } } + +export class EmptyAwareSinglyLinkedList extends SinglyLinkedList { + readonly events = new EventEmitter(); + reset() { + super.reset(); + this.events.emit('empty'); + } + shift(): T | undefined { + const old = this.length; + const ret = super.shift(); + if(old !== this.length && this.length === 0) { + this.events.emit('empty'); + } + return ret; + } + remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined) { + const old = this.length; + super.remove(node, parent); + if(old !== this.length && this.length === 0) { + this.events.emit('empty'); + } + } +} diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 5f0bcc4492..91d30abaa5 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -1,7 +1,7 @@ import { EventEmitter, once } from 'node:events'; import net from 'node:net'; import tls from 'node:tls'; -import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError, SocketTimeoutError } from '../errors'; +import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError, SocketTimeoutError, TimeoutDuringMaintanance } from '../errors'; import { setTimeout } from 'node:timers/promises'; import { RedisArgument } from '../RESP/types'; @@ -51,15 +51,14 @@ export type RedisTcpSocketOptions = RedisTcpOptions | RedisTlsOptions; export type RedisSocketOptions = RedisTcpSocketOptions | RedisIpcOptions; -export type RedisSocketInitiator = () => void | Promise; - export default class RedisSocket extends EventEmitter { - readonly #initiator; readonly #connectTimeout; readonly #reconnectStrategy; readonly #socketFactory; readonly #socketTimeout; + #maintenanceTimeout: number | undefined; + #socket?: net.Socket | tls.TLSSocket; #isOpen = false; @@ -82,16 +81,23 @@ export default class RedisSocket extends EventEmitter { return this.#socketEpoch; } - constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { + constructor(options?: RedisSocketOptions) { super(); - this.#initiator = initiator; this.#connectTimeout = options?.connectTimeout ?? 5000; this.#reconnectStrategy = this.#createReconnectStrategy(options); this.#socketFactory = this.#createSocketFactory(options); this.#socketTimeout = options?.socketTimeout; } + async waitForReady(): Promise { + if (this.#isReady) return + return new Promise((resolve, reject) => { + this.once('ready', resolve); + this.once('error', reject); + }); + } + #createReconnectStrategy(options?: RedisSocketOptions): ReconnectStrategyFunction { const strategy = options?.reconnectStrategy; if (strategy === false || typeof strategy === 'number') { @@ -214,14 +220,6 @@ export default class RedisSocket extends EventEmitter { try { this.#socket = await this.#createSocket(); this.emit('connect'); - - try { - await this.#initiator(); - } catch (err) { - this.#socket.destroy(); - this.#socket = undefined; - throw err; - } this.#isReady = true; this.#socketEpoch++; this.emit('ready'); @@ -238,6 +236,16 @@ export default class RedisSocket extends EventEmitter { } while (this.#isOpen && !this.#isReady); } + setMaintenanceTimeout(ms?: number) { + this.#maintenanceTimeout = ms; + + if(ms !== undefined) { + this.#socket?.setTimeout(ms); + } else if (this.#socketTimeout !== undefined) { + this.#socket?.setTimeout(this.#socketTimeout); + } + } + async #createSocket(): Promise { const socket = this.#socketFactory.create(); @@ -260,7 +268,10 @@ export default class RedisSocket extends EventEmitter { if (this.#socketTimeout) { socket.once('timeout', () => { - socket.destroy(new SocketTimeoutError(this.#socketTimeout!)); + const error = this.#maintenanceTimeout + ? new TimeoutDuringMaintanance(this.#socketTimeout!) + : new SocketTimeoutError(this.#socketTimeout!) + socket.destroy(error); }); socket.setTimeout(this.#socketTimeout); } diff --git a/packages/client/lib/errors.ts b/packages/client/lib/errors.ts index db37ec1a9b..21e748807d 100644 --- a/packages/client/lib/errors.ts +++ b/packages/client/lib/errors.ts @@ -76,6 +76,13 @@ export class BlobError extends ErrorReply {} export class TimeoutError extends Error {} +export class TimeoutDuringMaintanance extends Error { + constructor(timeout: number) { + super(`Socket timeout during maintenance. Expecting data, but didn't receive any in ${timeout}ms.`); + } +} + + export class MultiErrorReply extends ErrorReply { replies: Array; errorIndexes: Array;