diff --git a/CLUSTER-ARCHITECTURE.txt b/CLUSTER-ARCHITECTURE.txt new file mode 100644 index 00000000000..bcbc5d2553e --- /dev/null +++ b/CLUSTER-ARCHITECTURE.txt @@ -0,0 +1,175 @@ +Redis Cluster Architecture Diagram +=================================== + + ┌─────────────────────────────────────────┐ + │ RedisCluster (Root) │ + │ │ + │ - _options: RedisClusterOptions │ + │ - _slots: RedisClusterSlots │ + │ - _commandOptions │ + │ │ + │ Methods: │ + │ + connect() │ + │ + sendCommand() │ + │ + MULTI() │ + │ + SUBSCRIBE() / UNSUBSCRIBE() │ + │ + SSUBSCRIBE() / SUNSUBSCRIBE() │ + │ + close() / destroy() │ + └──────────────┬──────────────────────────┘ + │ + │ contains + │ + ┌──────────────────────────┴──────────────────────────┐ + │ │ + │ RedisClusterSlots │ + │ │ + │ - slots: Array[16384] │ + │ - masters: Array │ + │ - replicas: Array │ + │ - nodeByAddress: Map │ + │ - pubSubNode?: PubSubNode │ + │ - clientSideCache?: PooledClientSideCacheProvider │ + │ │ + │ Methods: │ + │ + connect() │ + │ + getClient() │ + │ + rediscover() │ + │ + getPubSubClient() │ + │ + getShardedPubSubClient() │ + │ + getRandomNode() │ + │ + getSlotRandomNode() │ + └───────┬─────────────────┬─────────────────┬─────────┘ + │ │ │ + ┌──────────┘ │ └─────────────┐ + │ │ │ + │ has many │ optionally has │ has many + ▼ ▼ ▼ + ┌────────────────────────┐ ┌────────────────────────┐ ┌────────────────────────┐ + │ Shard │ │ PubSubNode │ │ RedisClient │ + │ │ │ │ │ (per node) │ + │ - master: MasterNode │ │ - address: string │ │ │ + │ - replicas?: Array │ │ - client: RedisClient │ │ Socket, Queue, etc. │ + │ - nodesIterator │ │ - connectPromise │ │ │ + └──────────┬─────────────┘ └────────────────────────┘ └────────────────────────┘ + │ + │ contains + │ + ┌────────────┴────────────┐ + │ │ + ▼ ▼ +┌──────────────────┐ ┌──────────────────┐ +│ MasterNode │ │ ShardNode │ +│ │ │ (replica) │ +│ - id: string │ │ │ +│ - host: string │ │ - id: string │ +│ - port: number │ │ - host: string │ +│ - address │ │ - port: number │ +│ - readonly: no │ │ - address │ +│ - client? │ │ - readonly: yes │ +│ - pubSub? │ │ - client? │ +│ └─> client │ │ │ +│ └─> promise │ │ │ +└──────────────────┘ └──────────────────┘ + + +Additional Components: +───────────────────── + +┌────────────────────────────────────┐ +│ RedisClusterMultiCommand │ +│ │ +│ Used for MULTI/PIPELINE: │ +│ - Batches commands │ +│ - Routes to single node │ +│ - Returns typed results │ +│ │ +│ Methods: │ +│ + addCommand() │ +│ + exec() │ +│ + execAsPipeline() │ +└────────────────────────────────────┘ + +┌────────────────────────────────────┐ +│ PooledClientSideCacheProvider │ +│ (BasicPooledClientSideCache) │ +│ │ +│ RESP3 Client-Side Caching: │ +│ - Shared across all nodes │ +│ - Invalidation tracking │ +│ - TTL & eviction policies │ +│ │ +│ Methods: │ +│ + get() / set() │ +│ + invalidate() │ +│ + clear() / enable() / disable() │ +└────────────────────────────────────┘ + + +Key Relationships: +───────────────── + +1. RedisCluster + └─> RedisClusterSlots (manages topology) + └─> Shard[] (16,384 hash slots) + ├─> MasterNode (read/write) + │ └─> RedisClient + │ └─> PubSub RedisClient (sharded pub/sub) + └─> ShardNode[] (replicas, read-only if useReplicas=true) + └─> RedisClient + +2. RedisCluster + └─> RedisClusterMultiCommand (for transactions) + +3. RedisClusterSlots + └─> PubSubNode (global pub/sub) + └─> RedisClient + +4. RedisClusterSlots + └─> PooledClientSideCacheProvider (shared cache, RESP3 only) + + +Command Flow: +──────────── + +Single Command: + Client.sendCommand() + → Cluster._execute() + → Slots.getClient(key, isReadonly) + → Calculate slot from key + → Get Shard for slot + → Return master or replica client + → Client.sendCommand() + → [If MOVED/ASK error] + → Slots.rediscover() + → Retry with new node + +Transaction (MULTI): + Client.MULTI(routing) + → RedisClusterMultiCommand + → Accumulate commands + → All commands must route to same node + → client.exec() + +Pub/Sub: + Global: Uses single PubSubNode + Sharded: Uses per-master pubSub client based on channel hash + + +Discovery & Failover: +───────────────────── + +1. Initial Connect: + - Try rootNodes in random order + - Execute CLUSTER SLOTS command + - Build slot → shard mapping + - Create client connections + +2. Rediscovery (on MOVED error): + - Clear cache + - Re-fetch CLUSTER SLOTS + - Update topology + - Reconnect clients to new nodes + +3. Node Address Mapping: + - nodeAddressMap translates cluster IPs + - Useful for NAT/Docker scenarios diff --git a/CLUSTER-CLIENT-ARCHITECTURE.txt b/CLUSTER-CLIENT-ARCHITECTURE.txt new file mode 100644 index 00000000000..a2e85640492 --- /dev/null +++ b/CLUSTER-CLIENT-ARCHITECTURE.txt @@ -0,0 +1,157 @@ +═══════════════════════════════════════════════════════════════════════════════ + Redis Cluster Client Architecture + (Example: 3 Masters + 3 Replicas) +═══════════════════════════════════════════════════════════════════════════════ + +Legend: + 📦 = Actual object/instance in memory + 👉 = Reference/pointer to object + ⚡ = TCP connection to Redis server + +═══════════════════════════════════════════════════════════════════════════════ + +RedisClusterSlots 📦 +│ +├─── slots[16384] (Array) +│ │ +│ ├─── slots[0-5460] 👉─────┐ +│ ├─── slots[5461-10922] 👉─┤ +│ └─── slots[10923-16383] 👉┤ +│ │ +│ ▼ +│ Shard 📦 (references master/replica below) +│ +├─── masters[3] (Array) +│ │ +│ ├─── [0] MasterNode1 📦 +│ │ ├─── address: "host1:6379" +│ │ ├─── client: Client1 📦⚡ → Redis Master 1 +│ │ └─── pubSub?: 📦 +│ │ └─── client: PubSubClient1 📦⚡ → Redis Master 1 +│ │ +│ ├─── [1] MasterNode2 📦 +│ │ ├─── address: "host2:6379" +│ │ ├─── client: Client2 📦⚡ → Redis Master 2 +│ │ └─── pubSub?: 📦 +│ │ └─── client: PubSubClient2 📦⚡ → Redis Master 2 +│ │ +│ └─── [2] MasterNode3 📦 +│ ├─── address: "host3:6379" +│ ├─── client: Client3 📦⚡ → Redis Master 3 +│ └─── pubSub?: 📦 +│ └─── client: PubSubClient3 📦⚡ → Redis Master 3 +│ +├─── replicas[3] (Array) +│ │ +│ ├─── [0] ReplicaNode1 📦 +│ │ ├─── address: "host4:6379" +│ │ └─── client: Client4 📦⚡ → Redis Replica 1 +│ │ +│ ├─── [1] ReplicaNode2 📦 +│ │ ├─── address: "host5:6379" +│ │ └─── client: Client5 📦⚡ → Redis Replica 2 +│ │ +│ └─── [2] ReplicaNode3 📦 +│ ├─── address: "host6:6379" +│ └─── client: Client6 📦⚡ → Redis Replica 3 +│ +├─── nodeByAddress (Map) +│ │ +│ ├─── "host1:6379" 👉 MasterNode1 (same object as masters[0]) +│ ├─── "host2:6379" 👉 MasterNode2 (same object as masters[1]) +│ ├─── "host3:6379" 👉 MasterNode3 (same object as masters[2]) +│ ├─── "host4:6379" 👉 ReplicaNode1 (same object as replicas[0]) +│ ├─── "host5:6379" 👉 ReplicaNode2 (same object as replicas[1]) +│ └─── "host6:6379" 👉 ReplicaNode3 (same object as replicas[2]) +│ +└─── pubSubNode? (optional) 📦 + ├─── address: "host2:6379" (randomly selected) + └─── client: PubSubClient7 📦⚡ → Redis Master/Replica (any) + + +═══════════════════════════════════════════════════════════════════════════════ + CLIENT COUNT +═══════════════════════════════════════════════════════════════════════════════ + +Regular Clients (always created at connect): + ┌─────────────────────────────────────────┐ + │ Client1 📦⚡ → Master 1 (host1:6379) │ + │ Client2 📦⚡ → Master 2 (host2:6379) │ + │ Client3 📦⚡ → Master 3 (host3:6379) │ + │ Client4 📦⚡ → Replica 1 (host4:6379) │ + │ Client5 📦⚡ → Replica 2 (host5:6379) │ + │ Client6 📦⚡ → Replica 3 (host6:6379) │ + └─────────────────────────────────────────┘ + Total: 6 TCP connections ⚡ + +PubSub Clients (created on-demand): + ┌─────────────────────────────────────────────────────────────────┐ + │ PubSubClient1 📦⚡ → Master 1 (sharded pub/sub, SSUBSCRIBE) │ + │ PubSubClient2 📦⚡ → Master 2 (sharded pub/sub, SSUBSCRIBE) │ + │ PubSubClient3 📦⚡ → Master 3 (sharded pub/sub, SSUBSCRIBE) │ + │ PubSubClient7 📦⚡ → Any node (regular pub/sub, SUBSCRIBE) │ + └─────────────────────────────────────────────────────────────────┘ + Total: up to 4 TCP connections ⚡ + +═══════════════════════════════════════════════════════════════════════════════ +GRAND TOTAL: 6-10 TCP connections depending on pub/sub usage +═══════════════════════════════════════════════════════════════════════════════ + + +═══════════════════════════════════════════════════════════════════════════════ + HOW CLIENTS ARE USED +═══════════════════════════════════════════════════════════════════════════════ + +1. Regular Commands (GET, SET, HGET, etc.) + ┌─────────────────────────────────────────────────────────────┐ + │ • Calculate slot: slot = hash(key) % 16384 │ + │ • Lookup: shard = slots[slot] │ + │ • For writes: use shard.master.client │ + │ • For reads: use shard.master.client OR shard.replicas[].client │ + └─────────────────────────────────────────────────────────────┘ + +2. Sharded Pub/Sub (SSUBSCRIBE, SPUBLISH) - Redis 7.0+ + ┌─────────────────────────────────────────────────────────────┐ + │ • Calculate slot: slot = hash(channel) % 16384 │ + │ • Lookup: shard = slots[slot] │ + │ • Use: shard.master.pubSub.client │ + │ • Each master has its own dedicated sharded pub/sub client │ + └─────────────────────────────────────────────────────────────┘ + +3. Regular Pub/Sub (SUBSCRIBE, PSUBSCRIBE) + ┌─────────────────────────────────────────────────────────────┐ + │ • Use: pubSubNode.client │ + │ • Single shared client for all non-sharded pub/sub │ + │ • Can be any master or replica (randomly selected) │ + └─────────────────────────────────────────────────────────────┘ + +4. Node Lookup by Address + ┌─────────────────────────────────────────────────────────────┐ + │ • Use: nodeByAddress.get("host:port") │ + │ • Returns reference to master/replica node │ + │ • Useful for handling redirects (MOVED/ASK responses) │ + └─────────────────────────────────────────────────────────────┘ + + +═══════════════════════════════════════════════════════════════════════════════ + KEY INSIGHTS +═══════════════════════════════════════════════════════════════════════════════ + +✓ NO DUPLICATE NODE OBJECTS + All arrays and maps reference the same Node 📦 objects. There are exactly + 6 node objects (3 masters + 3 replicas), not 18 or 24. + +✓ SEPARATE CLIENTS FOR PUB/SUB + Pub/sub requires dedicated clients because once a Redis client enters + pub/sub mode, it can only execute pub/sub commands. Regular commands + cannot share the same connection. + +✓ LAZY CREATION + Pub/sub clients are only created when first needed (on-demand), not at + cluster connection time. + +✓ EFFICIENT ROUTING + - slots[] = Fast O(1) slot-to-shard lookup for commands + - masters[] = Iterate all masters + - replicas[] = Iterate all replicas + - nodeByAddress = Fast O(1) address-to-node lookup for redirects diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 9b7f737113b..9af4550691b 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -19,6 +19,11 @@ export interface CommandOptions { * Timeout for the command in milliseconds */ timeout?: number; + /** + * @internal + * The slot the command is targeted to (if any) + */ + slotNumber?: number; } export interface CommandToWrite extends CommandWaitingForReply { @@ -33,6 +38,7 @@ export interface CommandToWrite extends CommandWaitingForReply { listener: () => unknown; originalTimeout: number | undefined; } | undefined; + slotNumber?: number } interface CommandWaitingForReply { @@ -219,6 +225,7 @@ export default class RedisCommandsQueue { channelsCounter: undefined, typeMapping: options?.typeMapping }; + value.slotNumber = options?.slotNumber // If #maintenanceCommandTimeout was explicitly set, we should // use it instead of the timeout provided by the command @@ -283,7 +290,8 @@ export default class RedisCommandsQueue { if (Array.isArray(reply)) { if (this.#onPush(reply)) return; - if (PONG.equals(reply[0] as Buffer)) { + const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0]; + if (PONG.equals(firstElement as Buffer)) { const { resolve, typeMapping } = this.#waitingForReply.shift()!, buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer; resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString()); @@ -342,6 +350,10 @@ export default class RedisCommandsQueue { return this.#pubSub.removeAllListeners(); } + removeShardedPubSubListenersForSlots(slots: Set) { + return this.#pubSub.removeShardedPubSubListenersForSlots(slots); + } + resubscribe(chainId?: symbol) { const commands = this.#pubSub.resubscribe(); if (!commands.length) return; @@ -541,4 +553,46 @@ export default class RedisCommandsQueue { this.#waitingForReply.length === 0 ); } + + /** + * + * Extracts commands for the given slots from the toWrite queue. + * Some commands dont have "slotNumber", which means they are not designated to particular slot/node. + * We ignore those. + */ + extractCommandsForSlots(slots: Set): CommandToWrite[] { + const result: CommandToWrite[] = []; + let current = this.#toWrite.head; + while(current !== undefined) { + if(current.value.slotNumber !== undefined && slots.has(current.value.slotNumber)) { + result.push(current.value); + const toRemove = current; + current = current.next; + this.#toWrite.remove(toRemove); + } + } + return result; + } + + /** + * Gets all commands from the write queue without removing them. + */ + getAllCommands(): CommandToWrite[] { + const result: CommandToWrite[] = []; + let current = this.#toWrite.head; + while(current) { + result.push(current.value); + current = current.next; + } + return result; + } + + /** + * Prepends commands to the write queue in reverse. + */ + prependCommandsToWrite(commands: CommandToWrite[]) { + for (let i = commands.length - 1; i <= 0; i--) { + this.#toWrite.unshift(commands[i]); + } + } } diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index 9892a5be8a4..a05adfdc5cc 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -1,13 +1,25 @@ -import { RedisClientOptions } from "."; +import RedisClient, { RedisClientOptions } from "."; import RedisCommandsQueue from "./commands-queue"; import { RedisArgument } from "../.."; import { isIP } from "net"; import { lookup } from "dns/promises"; import assert from "node:assert"; import { setTimeout } from "node:timers/promises"; -import RedisSocket, { RedisTcpSocketOptions } from "./socket"; +import { RedisTcpSocketOptions } from "./socket"; import diagnostics_channel from "node:diagnostics_channel"; +type RedisType = RedisClient; + +export const SMIGRATED_EVENT = "__SMIGRATED"; +export interface SMigratedEvent { + seqId: number, + source: { host: string, port: number }; + destinations: { + host: string, port: number + slots: (number | [number, number])[] + }[] +} + export const MAINTENANCE_EVENTS = { PAUSE_WRITING: "pause-writing", RESUME_WRITING: "resume-writing", @@ -20,6 +32,8 @@ const PN = { MIGRATED: "MIGRATED", FAILING_OVER: "FAILING_OVER", FAILED_OVER: "FAILED_OVER", + SMIGRATING: "SMIGRATING", + SMIGRATED: "SMIGRATED", }; export type DiagnosticsEvent = { @@ -45,23 +59,11 @@ export interface MaintenanceUpdate { relaxedSocketTimeout?: number; } -interface Client { - _ejectSocket: () => RedisSocket; - _insertSocket: (socket: RedisSocket) => void; - _pause: () => void; - _unpause: () => void; - _maintenanceUpdate: (update: MaintenanceUpdate) => void; - duplicate: () => Client; - connect: () => Promise; - destroy: () => void; - on: (event: string, callback: (value: unknown) => void) => void; -} - export default class EnterpriseMaintenanceManager { #commandsQueue: RedisCommandsQueue; #options: RedisClientOptions; #isMaintenance = 0; - #client: Client; + #client: RedisType; static setupDefaultMaintOptions(options: RedisClientOptions) { if (options.maintNotifications === undefined) { @@ -93,7 +95,7 @@ export default class EnterpriseMaintenanceManager { if (!host) return; - const tls = options.socket?.tls ?? false + const tls = options.socket?.tls ?? false; const movingEndpointType = await determineEndpoint(tls, host, options); return { @@ -115,7 +117,7 @@ export default class EnterpriseMaintenanceManager { constructor( commandsQueue: RedisCommandsQueue, - client: Client, + client: RedisType, options: RedisClientOptions, ) { this.#commandsQueue = commandsQueue; @@ -128,19 +130,19 @@ export default class EnterpriseMaintenanceManager { #onPush = (push: Array): boolean => { dbgMaintenance("ONPUSH:", push.map(String)); - if (!Array.isArray(push) || !["MOVING", "MIGRATING", "MIGRATED", "FAILING_OVER", "FAILED_OVER"].includes(String(push[0]))) { + if (!Array.isArray(push) || !Object.values(PN).includes(String(push[0]))) { return false; } const type = String(push[0]); emitDiagnostics({ - type, - timestamp: Date.now(), - data: { - push: push.map(String), - }, - }); + type, + timestamp: Date.now(), + data: { + push: push.map(String), + }, + }); switch (type) { case PN.MOVING: { // [ 'MOVING', '17', '15', '54.78.247.156:12075' ] @@ -152,8 +154,9 @@ export default class EnterpriseMaintenanceManager { return true; } case PN.MIGRATING: + case PN.SMIGRATING: case PN.FAILING_OVER: { - dbgMaintenance("Received MIGRATING|FAILING_OVER"); + dbgMaintenance("Received MIGRATING|SMIGRATING|FAILING_OVER"); this.#onMigrating(); return true; } @@ -163,6 +166,11 @@ export default class EnterpriseMaintenanceManager { this.#onMigrated(); return true; } + case PN.SMIGRATED: { + dbgMaintenance("Received SMIGRATED"); + this.#onSMigrated(push); + return true; + } } return false; }; @@ -196,12 +204,9 @@ export default class EnterpriseMaintenanceManager { // period that was communicated by the server is over. if (url === null) { assert(this.#options.maintEndpointType === "none"); - assert(this.#options.socket !== undefined); - assert("host" in this.#options.socket); - assert(typeof this.#options.socket.host === "string"); - host = this.#options.socket.host; - assert(typeof this.#options.socket.port === "number"); - port = this.#options.socket.port; + const { host: h, port: p } = this.#getAddress() + host = h; + port = p; const waitTime = (afterSeconds * 1000) / 2; dbgMaintenance(`Wait for ${waitTime}ms`); await setTimeout(waitTime); @@ -220,7 +225,7 @@ export default class EnterpriseMaintenanceManager { // If the URL is provided, it takes precedense // the options object could just be mutated - if(this.#options.url) { + if (this.#options.url) { const u = new URL(this.#options.url); u.hostname = host; u.port = String(port); @@ -229,15 +234,17 @@ export default class EnterpriseMaintenanceManager { this.#options.socket = { ...this.#options.socket, host, - port - } + port, + }; } const tmpClient = this.#client.duplicate(); - tmpClient.on('error', (error: unknown) => { + tmpClient.on("error", (error: unknown) => { //We dont know how to handle tmp client errors - dbgMaintenance(`[ERR]`, error) + dbgMaintenance(`[ERR]`, error); }); - dbgMaintenance(`Tmp client created in ${( performance.now() - start ).toFixed(2)}ms`); + dbgMaintenance( + `Tmp client created in ${(performance.now() - start).toFixed(2)}ms`, + ); dbgMaintenance( `Set timeout for tmp client to ${this.#options.maintRelaxedSocketTimeout}`, ); @@ -248,7 +255,9 @@ export default class EnterpriseMaintenanceManager { dbgMaintenance(`Connecting tmp client: ${host}:${port}`); start = performance.now(); await tmpClient.connect(); - dbgMaintenance(`Connected to tmp client in ${(performance.now() - start).toFixed(2)}ms`); + dbgMaintenance( + `Connected to tmp client in ${(performance.now() - start).toFixed(2)}ms`, + ); // 3 [EVENT] New socket connected dbgMaintenance(`Wait for all in-flight commands to complete`); @@ -294,13 +303,60 @@ export default class EnterpriseMaintenanceManager { const update: MaintenanceUpdate = { relaxedCommandTimeout: undefined, - relaxedSocketTimeout: undefined + relaxedSocketTimeout: undefined, }; this.#client._maintenanceUpdate(update); }; + + #onSMigrated = (push: any[]) => { + // [ 'SMIGRATED', 15, [ [ '127.0.0.1:6379', '123,456,789-1000' ], [ '127.0.0.1:6380', '124,457,300-500' ] ] ] + // ^seq ^new endpoint1 ^slots ^new endpoint2 ^slots + const sequenceId: number = push[1]; + const smigratedEvent: SMigratedEvent = { + seqId: sequenceId, + source: { + ...this.#getAddress() + }, + destinations: [] + } + for(const [endpoint, slots] of push[2] as string[]) { + //TODO not sure if we need to handle fqdn/ip.. cluster manages clients by host:port. If `cluster slots` returns ip, + // but this notification returns fqdn, then we need to unify somehow ( maybe lookup ) + const [ host, port ] = endpoint.split(':'); + // `slots` could be mix of single slots and ranges, for example: 123,456,789-1000 + const parsedSlots = slots.split(',').map((singleOrRange): number | [number, number] => { + const separatorIndex = singleOrRange.indexOf('-'); + if(separatorIndex === -1) { + // Its single slot + return Number(singleOrRange); + } + // Its range + return [Number(singleOrRange.substring(0, separatorIndex)), Number(singleOrRange.substring(separatorIndex + 1))]; + }); + + smigratedEvent.destinations.push({ + host, + port: Number(port), + slots: parsedSlots + }) + } + this.#client._handleSmigrated(smigratedEvent); + } + + + #getAddress(): { host: string, port: number } { + assert(this.#options.socket !== undefined); + assert("host" in this.#options.socket); + assert(typeof this.#options.socket.host === "string"); + const host = this.#options.socket.host; + assert(typeof this.#options.socket.port === "number"); + const port = this.#options.socket.port; + return { host, port }; + } } + export type MovingEndpointType = | "auto" | "internal-ip" @@ -337,9 +393,7 @@ async function determineEndpoint( ): Promise { assert(options.maintEndpointType !== undefined); if (options.maintEndpointType !== "auto") { - dbgMaintenance( - `Determine endpoint type: ${options.maintEndpointType}`, - ); + dbgMaintenance(`Determine endpoint type: ${options.maintEndpointType}`); return options.maintEndpointType; } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index ea2102c37fd..9ad044d0a15 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -20,7 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } import { BasicCommandParser, CommandParser } from './parser'; import SingleEntryCache from '../single-entry-cache'; import { version } from '../../package.json' -import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType } from './enterprise-maintenance-manager'; +import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType, SMIGRATED_EVENT, SMigratedEvent } from './enterprise-maintenance-manager'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -1007,6 +1007,20 @@ export default class RedisClient< this._self.#maybeScheduleWrite(); } + /** + * @internal + */ + _handleSmigrated(smigratedEvent: SMigratedEvent) { + this._self.emit(SMIGRATED_EVENT, smigratedEvent); + } + + /** + * @internal + */ + _getQueue(): RedisCommandsQueue { + return this.#queue; + } + /** * @internal */ @@ -1073,7 +1087,7 @@ export default class RedisClient< // Merge global options with provided options const opts = { ...this._self._commandOptions, - ...options + ...options, } const promise = this._self.#queue.addCommand(args, opts); diff --git a/packages/client/lib/client/pub-sub.ts b/packages/client/lib/client/pub-sub.ts index 1895f96a883..9c705f4952d 100644 --- a/packages/client/lib/client/pub-sub.ts +++ b/packages/client/lib/client/pub-sub.ts @@ -1,5 +1,6 @@ import { RedisArgument } from '../RESP/types'; import { CommandToWrite } from './commands-queue'; +import calculateSlot from 'cluster-key-slot'; export const PUBSUB_TYPE = { CHANNELS: 'CHANNELS', @@ -51,17 +52,19 @@ export type PubSubCommand = ( export class PubSub { static isStatusReply(reply: Array): boolean { + const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0]; return ( - COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.equals(reply[0]) || - COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.equals(reply[0]) || - COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.equals(reply[0]) || - COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.equals(reply[0]) || - COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.equals(reply[0]) + COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.equals(firstElement) || + COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.equals(firstElement) || + COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.equals(firstElement) || + COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.equals(firstElement) || + COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.equals(firstElement) ); } static isShardedUnsubscribe(reply: Array): boolean { - return COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.equals(reply[0]); + const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0]; + return COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.equals(firstElement); } static #channelsArray(channels: string | Array) { @@ -370,14 +373,15 @@ export class PubSub { } handleMessageReply(reply: Array): boolean { - if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(reply[0])) { + const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0]; + if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(firstElement)) { this.#emitPubSubMessage( PUBSUB_TYPE.CHANNELS, reply[2], reply[1] ); return true; - } else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(reply[0])) { + } else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(firstElement)) { this.#emitPubSubMessage( PUBSUB_TYPE.PATTERNS, reply[3], @@ -385,7 +389,7 @@ export class PubSub { reply[1] ); return true; - } else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(reply[0])) { + } else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(firstElement)) { this.#emitPubSubMessage( PUBSUB_TYPE.SHARDED, reply[2], @@ -420,6 +424,22 @@ export class PubSub { return result; } + removeShardedPubSubListenersForSlots(slots: Set) { + const sharded = new Map(); + for (const [chanel, value] of this.listeners[PUBSUB_TYPE.SHARDED]) { + if (slots.has(calculateSlot(chanel))) { + sharded.set(chanel, value); + this.listeners[PUBSUB_TYPE.SHARDED].delete(chanel); + } + } + + this.#updateIsActive(); + + return { + [PUBSUB_TYPE.SHARDED]: sharded + }; + } + #emitPubSubMessage( type: PubSubType, message: Buffer, diff --git a/packages/client/lib/cluster/cluster-slots.spec.ts b/packages/client/lib/cluster/cluster-slots.spec.ts index 76c4cb53fdf..62b823e7c8d 100644 --- a/packages/client/lib/cluster/cluster-slots.spec.ts +++ b/packages/client/lib/cluster/cluster-slots.spec.ts @@ -1,7 +1,8 @@ import { strict as assert } from 'node:assert'; import { EventEmitter } from 'node:events'; -import { RedisClusterOptions, RedisClusterClientOptions } from './index'; +import { RedisClusterClientOptions } from './index'; import RedisClusterSlots from './cluster-slots'; +import TestUtils, { GLOBAL } from '../test-utils' describe('RedisClusterSlots', () => { describe('initialization', () => { diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index ae814958437..b803625c22c 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -7,6 +7,7 @@ import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions import calculateSlot from 'cluster-key-slot'; import { RedisSocketOptions } from '../client/socket'; import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; +import { SMIGRATED_EVENT, SMigratedEvent, dbgMaintenance } from '../client/enterprise-maintenance-manager'; interface NodeAddress { host: string; @@ -17,6 +18,8 @@ export type NodeAddressMap = { [address: string]: NodeAddress; } | ((address: string) => NodeAddress | undefined); +export const RESUBSCRIBE_LISTENERS_EVENT = '__resubscribeListeners' + export interface Node< M extends RedisModules, F extends RedisFunctions, @@ -113,6 +116,7 @@ export default class RedisClusterSlots< readonly nodeByAddress = new Map | ShardNode>(); pubSubNode?: PubSubNode; clientSideCache?: PooledClientSideCacheProvider; + smigratedSeqIdsSeen = new Set; #isOpen = false; @@ -192,6 +196,7 @@ export default class RedisClusterSlots< eagerConnect = this.#options.minimizeConnections !== true; const shards = await this.#getShards(rootNode); + dbgMaintenance(shards); this.#resetSlots(); // Reset slots AFTER shards have been fetched to prevent a race condition for (const { from, to, master, replicas } of shards) { const shard: Shard = { @@ -251,12 +256,160 @@ export default class RedisClusterSlots< } } + #handleSmigrated = async (event: SMigratedEvent) => { + dbgMaintenance(`[CSlots]: handle smigrated`, JSON.stringify(event, null, 2)); + + if(this.smigratedSeqIdsSeen.has(event.seqId)) { + dbgMaintenance(`[CSlots]: sequence id ${event.seqId} already seen, abort`) + return + } + this.smigratedSeqIdsSeen.add(event.seqId); + + const sourceAddress = `${event.source.host}:${event.source.port}`; + const sourceNode = this.nodeByAddress.get(sourceAddress); + if(!sourceNode) { + dbgMaintenance(`[CSlots]: address ${sourceAddress} not in 'nodeByAddress', abort SMIGRATED handling`); + return; + } + + // 1. Pausing + // 1.1 Normal + sourceNode.client?._pause(); + // 1.2 Sharded pubsub + if('pubSub' in sourceNode) { + sourceNode.pubSub?.client._pause(); + } + + for(const {host, port, slots} of event.destinations) { + const destinationAddress = `${host}:${port}`; + let destMasterNode: MasterNode | undefined = this.nodeByAddress.get(destinationAddress); + let destShard: Shard; + // 2. Create new Master + if(!destMasterNode) { + const promises: Promise[] = []; + destMasterNode = this.#initiateSlotNode({ host: host, port: port, id: 'asdff' }, false, true, new Set(), promises); + await Promise.all([...promises, this.#initiateShardedPubSubClient(destMasterNode)]); + // 2.1 Pause + destMasterNode.client?._pause(); + destMasterNode.pubSub?.client._pause(); + // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine + destShard = { + master: destMasterNode + }; + } else { + // In case destination node existed, this means there was a Shard already, so its best if we can find it. + const existingShard = this.slots.find(shard => shard.master.host === host && shard.master.port === port); + if(!existingShard) { + dbgMaintenance("Could not find shard"); + throw new Error('Could not find shard'); + } + destShard = existingShard; + } + // 3. Soft update shards. + // After this step we are expecting any new commands that hash to the same slots to be routed to the destinationShard + const movingSlots = new Set(); + for(const slot of slots) { + if(typeof slot === 'number') { + this.slots[slot] = destShard; + movingSlots.add(slot) + } else { + for (let s = slot[0]; s <= slot[1]; s++) { + this.slots[s] = destShard; + movingSlots.add(s) + } + } + } + + // 4. For all affected clients (normal, pubsub, spubsub): + // 4.1 Wait for inflight commands to complete + const inflightPromises: Promise[] = []; + //Normal + inflightPromises.push(sourceNode.client!._getQueue().waitForInflightCommandsToComplete()); + //Sharded pubsub + if('pubSub' in sourceNode) { + inflightPromises.push(sourceNode.pubSub!.client._getQueue().waitForInflightCommandsToComplete()); + } + //Regular pubsub + if(this.pubSubNode?.address === sourceAddress) { + inflightPromises.push(this.pubSubNode?.client._getQueue().waitForInflightCommandsToComplete()); + } + await Promise.all(inflightPromises); + + + // 4.2 Extract commands, channels, sharded channels + // TODO dont forget to extract channels and resubscribe + const sourceStillHasSlots = this.slots.find(slot => slot.master.address === sourceAddress) !== undefined; + // If source shard still has slots, this means we have to only extract commands for the moving slots. + // Commands that are for different slots or have no slots should stay in the source shard. + // Same goes for sharded pub sub listeners + if(sourceStillHasSlots) { + const normalCommandsToMove = sourceNode.client!._getQueue().extractCommandsForSlots(movingSlots); + // 5. Prepend extracted commands, chans + //TODO pubsub, spubsub + destMasterNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); + sourceNode.client?._unpause(); + if('pubSub' in sourceNode) { + const listeners = sourceNode.pubSub?.client._getQueue().removeShardedPubSubListenersForSlots(movingSlots); + this.#emit(RESUBSCRIBE_LISTENERS_EVENT, listeners); + sourceNode.pubSub?.client._unpause(); + } + } else { + // If source shard doesnt have any slots left, this means we can safely move all commands to the new shard. + // Same goes for sharded pub sub listeners + const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands(); + // 5. Prepend extracted commands, chans + destMasterNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); + if('pubSub' in sourceNode) { + const listeners = sourceNode.pubSub?.client._getQueue().removeAllPubSubListeners(); + this.#emit(RESUBSCRIBE_LISTENERS_EVENT, listeners); + } + + //Remove all local references to the dying shard's clients + this.masters = this.masters.filter(master => master.address !== sourceAddress); + //not sure if needed, since there should be no replicas in RE + this.replicas = this.replicas.filter(replica => replica.address !== sourceAddress); + this.nodeByAddress.delete(sourceAddress); + + // 4.3 Kill because no slots are pointing to it anymore + await sourceNode.client?.close() + if('pubSub' in sourceNode) { + await sourceNode.pubSub?.client.close(); + } + } + + // 5.1 Unpause + destMasterNode.client?._unpause(); + if('pubSub' in destMasterNode) { + destMasterNode.pubSub?.client._unpause(); + } + + // We want to replace the pubSubNode ONLY if it is pointing to the affected node AND the affected + // node is actually dying ( designated by the fact that there are no remaining slots assigned to it) + if(this.pubSubNode?.address === sourceAddress && !sourceStillHasSlots) { + const channelsListeners = this.pubSubNode.client.getPubSubListeners(PUBSUB_TYPE.CHANNELS), + patternsListeners = this.pubSubNode.client.getPubSubListeners(PUBSUB_TYPE.PATTERNS); + + this.pubSubNode.client.destroy(); + + // Only create the new pubSubNode if there are actual subscriptions to make. + // It will be lazily created later if needed. + if (channelsListeners.size || patternsListeners.size) { + await this.#initiatePubSubClient({ + [PUBSUB_TYPE.CHANNELS]: channelsListeners, + [PUBSUB_TYPE.PATTERNS]: patternsListeners + }) + } + } + } + } + async #getShards(rootNode: RedisClusterClientOptions) { const options = this.#clientOptionsDefaults(rootNode)!; options.socket ??= {}; options.socket.reconnectStrategy = false; options.RESP = this.#options.RESP; options.commandOptions = undefined; + options.maintNotifications = 'disabled'; // TODO: find a way to avoid type casting const client = await this.#clientFactory(options as RedisClientOptions) @@ -344,8 +497,7 @@ export default class RedisClusterSlots< port: socket.port, }); const emit = this.#emit; - const client = this.#clientFactory( - this.#clientOptionsDefaults({ + const client = this.#clientFactory( this.#clientOptionsDefaults({ clientSideCache: this.clientSideCache, RESP: this.#options.RESP, socket, @@ -356,9 +508,10 @@ export default class RedisClusterSlots< .once('ready', () => emit('node-ready', clientInfo)) .once('connect', () => emit('node-connect', clientInfo)) .once('end', () => emit('node-disconnect', clientInfo)) + .on(SMIGRATED_EVENT, this.#handleSmigrated) .on('__MOVED', async (allPubSubListeners: PubSubListeners) => { await this.rediscover(client); - this.#emit('__resubscribeAllPubSubListeners', allPubSubListeners); + this.#emit(RESUBSCRIBE_LISTENERS_EVENT, allPubSubListeners); }); return client; @@ -373,7 +526,7 @@ export default class RedisClusterSlots< nodeClient(node: ShardNode) { return ( node.connectPromise ?? // if the node is connecting - node.client ?? // if the node is connected + (node.client ? Promise.resolve(node.client) : undefined) ?? // if the node is connected this.#createNodeClient(node) // if the not is disconnected ); } @@ -467,20 +620,30 @@ export default class RedisClusterSlots< this.#emit('disconnect'); } - getClient( + async getClientAndSlotNumber( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined - ) { + ): Promise<{ + client: RedisClientType, + slotNumber?: number + }> { if (!firstKey) { - return this.nodeClient(this.getRandomNode()); + return { + client: await this.nodeClient(this.getRandomNode()) + }; } const slotNumber = calculateSlot(firstKey); if (!isReadonly) { - return this.nodeClient(this.slots[slotNumber].master); + return { + client: await this.nodeClient(this.slots[slotNumber].master), + slotNumber + }; } - return this.nodeClient(this.getSlotRandomNode(slotNumber)); + return { + client: await this.nodeClient(this.getSlotRandomNode(slotNumber)) + }; } *#iterateAllNodes() { diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 238f3a59198..5a1b4d5d4ca 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -4,7 +4,7 @@ import { Command, CommandArguments, CommanderConfig, TypeMapping, RedisArgument, import COMMANDS from '../commands'; import { EventEmitter } from 'node:events'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; -import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots'; +import RedisClusterSlots, { NodeAddressMap, RESUBSCRIBE_LISTENERS_EVENT, ShardNode } from './cluster-slots'; import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command'; import { PubSubListener, PubSubListeners } from '../client/pub-sub'; import { ErrorReply } from '../errors'; @@ -310,7 +310,7 @@ export default class RedisCluster< this._options = options; this._slots = new RedisClusterSlots(options, this.emit.bind(this)); - this.on('__resubscribeAllPubSubListeners', this.resubscribeAllPubSubListeners.bind(this)); + this.on(RESUBSCRIBE_LISTENERS_EVENT, this.resubscribeAllPubSubListeners.bind(this)); if (options?.commandOptions) { this._commandOptions = options.commandOptions; @@ -416,13 +416,16 @@ export default class RedisCluster< fn: (client: RedisClientType, opts?: ClusterCommandOptions) => Promise ): Promise { const maxCommandRedirections = this._options.maxCommandRedirections ?? 16; - let client = await this._slots.getClient(firstKey, isReadonly); + let { client, slotNumber } = await this._slots.getClientAndSlotNumber(firstKey, isReadonly); let i = 0; let myFn = fn; while (true) { try { + if(options !== undefined) { + options.slotNumber = slotNumber; + } return await myFn(client, options); } catch (err) { myFn = fn; @@ -451,7 +454,9 @@ export default class RedisCluster< if (err.message.startsWith('MOVED')) { await this._slots.rediscover(client); - client = await this._slots.getClient(firstKey, isReadonly); + const clientAndSlot = await this._slots.getClientAndSlotNumber(firstKey, isReadonly); + client = clientAndSlot.client; + slotNumber = clientAndSlot.slotNumber; continue; } @@ -485,11 +490,11 @@ export default class RedisCluster< type Multi = new (...args: ConstructorParameters) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>; return new ((this as any).Multi as Multi)( async (firstKey, isReadonly, commands) => { - const client = await this._self._slots.getClient(firstKey, isReadonly); + const { client } = await this._self._slots.getClientAndSlotNumber(firstKey, isReadonly); return client._executeMulti(commands); }, async (firstKey, isReadonly, commands) => { - const client = await this._self._slots.getClient(firstKey, isReadonly); + const { client } = await this._self._slots.getClientAndSlotNumber(firstKey, isReadonly); return client._executePipeline(commands); }, routing, diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index 47257964f6a..792e8792da1 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -9,6 +9,7 @@ import { promisify } from 'node:util'; import * as fs from 'node:fs'; import * as os from 'node:os'; import * as path from 'node:path'; +import assert from 'node:assert'; const execAsync = promisify(execFileCallback); @@ -116,6 +117,73 @@ options: RedisServerDockerOptions, serverArguments: Array): Promise, ReturnType>(); +export interface ProxiedRedisServerDocker { + ports: number[], + apiPort: number, + dockerId: string +} + +export interface ProxiedRedisServerConfig { + nOfProxies: number, + defaultInterceptors: ('cluster'|'hitless'|'logger')[] +} + +const RUNNING_PROXIED_SERVERS = new Map>(); + +export async function spawnProxiedRedisServer(config: ProxiedRedisServerConfig): Promise { + const key = JSON.stringify(config); + const runningServer = RUNNING_PROXIED_SERVERS.get(key); + if (runningServer) { + return runningServer; + } + + const server = spawnProxiedRedisServerDocker(config); + RUNNING_PROXIED_SERVERS.set(key, server); + return server; +} + +export async function spawnProxiedRedisServerDocker( + config: ProxiedRedisServerConfig, +): Promise { + + assert(config.nOfProxies > 0, 'At least one proxy should be started'); + const ports: number[] = []; + for (let i = 0; i < config.nOfProxies; i++) { + ports.push((await portIterator.next()).value); + } + const apiPort = (await portIterator.next()).value; + + const dockerArgs =[ + "run", + "-d", + "--network", "host", + "-e", `LISTEN_PORT=${ports.join(',')}`, + "-e", `API_PORT=${apiPort}`, + "-e", "TIEOUT=0", + "-e", `DEFAULT_INTERCEPTORS=${config.defaultInterceptors.join(',')}`, + "-e", "ENABLE_LOGGING=true", + "cae-resp-proxy-standalone" + ] + + console.log(`[Docker] Spawning Proxy container`, dockerArgs.join(' ')); + + const { stdout, stderr } = await execAsync("docker", dockerArgs); + + if (!stdout) { + throw new Error(`docker run error - ${stderr}`); + } + + while (await isPortAvailable(ports[0])) { + await setTimeout(50); + } + + return { + ports, + apiPort, + dockerId: stdout.trim(), + }; +} + export function spawnRedisServer(dockerConfig: RedisServerDockerOptions, serverArguments: Array): Promise { const runningServer = RUNNING_SERVERS.get(serverArguments); if (runningServer) { @@ -134,13 +202,7 @@ async function dockerRemove(dockerId: string): Promise { } } -after(() => { - return Promise.all( - [...RUNNING_SERVERS.values()].map(async dockerPromise => - await dockerRemove((await dockerPromise).dockerId) - ) - ); -}); + export type RedisClusterDockersConfig = RedisServerDockerOptions & { numberOfMasters?: number; @@ -311,15 +373,7 @@ export function spawnRedisCluster( return dockersPromise; } -after(() => { - return Promise.all( - [...RUNNING_CLUSTERS.values()].map(async dockersPromise => { - return Promise.all( - (await dockersPromise).map(({ dockerId }) => dockerRemove(dockerId)) - ); - }) - ); -}); + const RUNNING_NODES = new Map, Array>(); @@ -333,7 +387,7 @@ export async function spawnRedisSentinel( if (runningNodes) { return runningNodes; } - + const passIndex = serverArguments.indexOf('--requirepass')+1; let password: string | undefined = undefined; if (passIndex != 0) { @@ -343,7 +397,7 @@ export async function spawnRedisSentinel( const master = await spawnRedisServerDocker(dockerConfigs, serverArguments); const redisNodes: Array = [master]; const replicaPromises: Array> = []; - + const replicasCount = 2; for (let i = 0; i < replicasCount; i++) { replicaPromises.push((async () => { @@ -358,26 +412,26 @@ export async function spawnRedisSentinel( await client.connect(); await client.replicaOf("127.0.0.1", master.port); await client.close(); - + return replica; })()); } - + const replicas = await Promise.all(replicaPromises); redisNodes.push(...replicas); RUNNING_NODES.set(serverArguments, redisNodes); const sentinelPromises: Array> = []; const sentinelCount = 3; - + const appPrefix = 'sentinel-config-dir'; const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), appPrefix)); for (let i = 0; i < sentinelCount; i++) { sentinelPromises.push( spawnSentinelNode( - dockerConfigs, - serverArguments, + dockerConfigs, + serverArguments, master.port, "mymaster", path.join(tmpDir, i.toString()), @@ -385,10 +439,10 @@ export async function spawnRedisSentinel( ), ) } - + const sentinelNodes = await Promise.all(sentinelPromises); RUNNING_SENTINELS.set(serverArguments, sentinelNodes); - + if (tmpDir) { fs.rmSync(tmpDir, { recursive: true }); } @@ -397,20 +451,30 @@ export async function spawnRedisSentinel( } after(() => { - return Promise.all( - [...RUNNING_NODES.values(), ...RUNNING_SENTINELS.values()].map(async dockersPromise => { - return Promise.all( - dockersPromise.map(({ dockerId }) => dockerRemove(dockerId)) - ); - }) - ); + return Promise.all([ + ...Array.from(RUNNING_SERVERS.values()).map(async dockerPromise => + dockerRemove((await dockerPromise).dockerId) + ), + ...Array.from(RUNNING_CLUSTERS.values()).map(async dockersPromise => + Promise.all((await dockersPromise).map(({ dockerId }) => dockerRemove(dockerId))) + ), + ...Array.from(RUNNING_NODES.values()).map(dockersPromise => + Promise.all(dockersPromise.map(({ dockerId }) => dockerRemove(dockerId))) + ), + ...Array.from(RUNNING_SENTINELS.values()).map(dockersPromise => + Promise.all(dockersPromise.map(({ dockerId }) => dockerRemove(dockerId))) + ), + ...Array.from(RUNNING_PROXIED_SERVERS.values()).map(async dockerPromise => + dockerRemove((await dockerPromise).dockerId) + ) + ]); }); export async function spawnSentinelNode( dockerConfigs: RedisServerDockerOptions, serverArguments: Array, - masterPort: number, + masterPort: number, sentinelName: string, tmpDir: string, password?: string, @@ -436,12 +500,12 @@ sentinel failover-timeout ${sentinelName} 1000 return await spawnRedisServerDocker( { - image: dockerConfigs.image, - version: dockerConfigs.version, + image: dockerConfigs.image, + version: dockerConfigs.version, mode: "sentinel", - mounts: [`${dir}/redis.conf:/redis/config/node-sentinel-1/redis.conf`], + mounts: [`${dir}/redis.conf:/redis/config/node-sentinel-1/redis.conf`], port: port, - }, + }, serverArguments, ); -} \ No newline at end of file +} diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 1a9d1c9845a..8475c62d975 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -19,7 +19,7 @@ import { RedisClusterType } from '@redis/client/index'; import { RedisNode } from '@redis/client/lib/sentinel/types' -import { spawnRedisServer, spawnRedisCluster, spawnRedisSentinel, RedisServerDockerOptions, RedisServerDocker, spawnSentinelNode, spawnRedisServerDocker } from './dockers'; +import { spawnRedisServer, spawnRedisCluster, spawnRedisSentinel, RedisServerDockerOptions, RedisServerDocker, spawnSentinelNode, spawnRedisServerDocker, spawnProxiedRedisServer } from './dockers'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; @@ -27,6 +27,8 @@ import * as fs from 'node:fs'; import * as os from 'node:os'; import * as path from 'node:path'; import { RedisProxy, getFreePortNumber } from './proxy/redis-proxy'; +import ProxyController from './proxy/proxy-controller'; + interface TestUtilsConfig { /** @@ -298,6 +300,56 @@ export default class TestUtils { } }); } + + testWithProxiedCluster< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} + >( + title: string, + fn: (proxiedClusterClient: RedisClusterType, proxyController: ProxyController) => unknown, + options: Omit, 'numberOfReplicas' | 'minimumDockerVersion' | 'serverArguments'> + ) { + let spawnPromise: ReturnType; + before(function () { + this.timeout(30000); + spawnPromise = spawnProxiedRedisServer({ nOfProxies: options.numberOfMasters ?? 3, defaultInterceptors: ['cluster', 'hitless'] }); + }); + + it(title, async function () { + if (!spawnPromise) return this.skip(); + const { ports, apiPort } = await spawnPromise; + + const cluster = createCluster({ + rootNodes: ports.map(port => ({ + socket: { + port + } + })), + minimizeConnections: options.clusterConfiguration?.minimizeConnections ?? true, + ...options.clusterConfiguration + }); + + const proxyController = new ProxyController(`http://localhost:${apiPort}`) + + if(options.disableClusterSetup) { + return fn(cluster, proxyController); + } + + await cluster.connect(); + + try { + await TestUtils.#clusterFlushAll(cluster); + await fn(cluster, proxyController); + } finally { + await TestUtils.#clusterFlushAll(cluster); + cluster.destroy(); + } + }); + } + testWithProxiedClient( title: string, fn: (proxiedClient: RedisClientType, proxy: RedisProxy) => unknown, diff --git a/packages/test-utils/lib/proxy/proxy-controller.ts b/packages/test-utils/lib/proxy/proxy-controller.ts new file mode 100644 index 00000000000..dbb38a641ee --- /dev/null +++ b/packages/test-utils/lib/proxy/proxy-controller.ts @@ -0,0 +1,103 @@ +export type ProxyStats = { + totalConnections: number; + activeConnections: number; + totalBytesReceived: number; + totalBytesSent: number; + totalCommandsReceived: number; + totalCommandsSent: number; +}; + +export type SendResult = { + success: boolean; + connectionId: string; + error?: string; +}; + +export type ProxyConfig = { + listenPort: number; + listenHost?: string; + targetHost: string; + targetPort: number; + timeout?: number; + enableLogging?: boolean; +}; + +export default class ProxyController { + constructor(private url: string) { } + + private fetchJson(path: string, options?: RequestInit): Promise { + return fetch(`${this.url}${path}`, options).then(res => res.json()); + } + + getStats(): Promise> { + return this.fetchJson('/stats') as Promise>; + } + + getConnections(): Promise> { + return this.fetchJson('/connections') as Promise>; + } + + getNodes(): Promise<{ ids: string[] }> { + return this.fetchJson('/nodes') as Promise<{ ids: string[] }>; + } + + createNode(config: Partial): Promise<{ success: boolean; cfg: ProxyConfig }> { + return this.fetchJson('/nodes', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(config) + }) as Promise<{ success: boolean; cfg: ProxyConfig }>; + } + + deleteNode(nodeId: string): Promise<{ success: boolean }> { + return this.fetchJson(`/nodes/${nodeId}`, { + method: 'DELETE' + }) as Promise<{ success: boolean }>; + } + + sendToClient(connectionId: string, data: string, encoding: 'base64' | 'raw' = 'base64'): Promise { + return this.fetchJson(`/send-to-client/${connectionId}?encoding=${encoding}`, { + method: 'POST', + headers: { 'Content-Type': 'text/plain' }, + body: data + }) as Promise; + } + + sendToClients(connectionIds: string[], data: string, encoding: 'base64' | 'raw' = 'base64'): Promise<{ results: SendResult[] }> { + return this.fetchJson(`/send-to-clients?connectionIds=${connectionIds.join(',')}&encoding=${encoding}`, { + method: 'POST', + headers: { 'Content-Type': 'text/plain' }, + body: data + }) as Promise<{ results: SendResult[] }>; + } + + sendToAllClients(data: string, encoding: 'base64' | 'raw' = 'base64'): Promise<{ results: SendResult[] }> { + return this.fetchJson(`/send-to-all-clients?encoding=${encoding}`, { + method: 'POST', + headers: { 'Content-Type': 'text/plain' }, + body: data + }) as Promise<{ results: SendResult[] }>; + } + + closeConnection(connectionId: string): Promise<{ success: boolean; connectionId: string }> { + return this.fetchJson(`/connections/${connectionId}`, { + method: 'DELETE' + }) as Promise<{ success: boolean; connectionId: string }>; + } + + createScenario(responses: string[], encoding: 'base64' | 'raw' = 'base64'): Promise<{ success: boolean; totalResponses: number }> { + return this.fetchJson('/scenarios', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ responses, encoding }) + }) as Promise<{ success: boolean; totalResponses: number }>; + } + + addInterceptor(name: string, match: string, response: string, encoding: 'base64' | 'raw' = 'base64'): Promise<{ success: boolean; name: string }> { + return this.fetchJson('/interceptors', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ name, match, response, encoding }) + }) as Promise<{ success: boolean; name: string }>; + } +}