From 5d80d0760ffca9a1f1109d4b48e5f924886edee2 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 11 Nov 2025 16:53:36 +0200 Subject: [PATCH 01/15] handle smigrating smigrating notification should effect in increased command and socket timeout for the given connection --- .../client/lib/client/enterprise-maintenance-manager.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index 9892a5be8a4..0dde6d6c52a 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -20,6 +20,7 @@ const PN = { MIGRATED: "MIGRATED", FAILING_OVER: "FAILING_OVER", FAILED_OVER: "FAILED_OVER", + SMIGRATING: "SMIGRATING", }; export type DiagnosticsEvent = { @@ -128,7 +129,7 @@ export default class EnterpriseMaintenanceManager { #onPush = (push: Array): boolean => { dbgMaintenance("ONPUSH:", push.map(String)); - if (!Array.isArray(push) || !["MOVING", "MIGRATING", "MIGRATED", "FAILING_OVER", "FAILED_OVER"].includes(String(push[0]))) { + if (!Array.isArray(push) || !Object.values(PN).includes(String(push[0]))) { return false; } @@ -152,8 +153,9 @@ export default class EnterpriseMaintenanceManager { return true; } case PN.MIGRATING: + case PN.SMIGRATING: case PN.FAILING_OVER: { - dbgMaintenance("Received MIGRATING|FAILING_OVER"); + dbgMaintenance("Received MIGRATING|SMIGRATING|FAILING_OVER"); this.#onMigrating(); return true; } From 4ec677142d322b60138d8484beee8674a1b479e3 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 13 Nov 2025 13:37:49 +0200 Subject: [PATCH 02/15] first approximation to handling smigrated --- CLUSTER-ARCHITECTURE.txt | 175 ++++++++++++++++++ CLUSTER-CLIENT-ARCHITECTURE.txt | 157 ++++++++++++++++ .../client/enterprise-maintenance-manager.ts | 76 ++++---- packages/client/lib/client/index.ts | 9 +- packages/client/lib/cluster/cluster-slots.ts | 77 +++++++- 5 files changed, 457 insertions(+), 37 deletions(-) create mode 100644 CLUSTER-ARCHITECTURE.txt create mode 100644 CLUSTER-CLIENT-ARCHITECTURE.txt diff --git a/CLUSTER-ARCHITECTURE.txt b/CLUSTER-ARCHITECTURE.txt new file mode 100644 index 00000000000..bcbc5d2553e --- /dev/null +++ b/CLUSTER-ARCHITECTURE.txt @@ -0,0 +1,175 @@ +Redis Cluster Architecture Diagram +=================================== + + ┌─────────────────────────────────────────┐ + │ RedisCluster (Root) │ + │ │ + │ - _options: RedisClusterOptions │ + │ - _slots: RedisClusterSlots │ + │ - _commandOptions │ + │ │ + │ Methods: │ + │ + connect() │ + │ + sendCommand() │ + │ + MULTI() │ + │ + SUBSCRIBE() / UNSUBSCRIBE() │ + │ + SSUBSCRIBE() / SUNSUBSCRIBE() │ + │ + close() / destroy() │ + └──────────────┬──────────────────────────┘ + │ + │ contains + │ + ┌──────────────────────────┴──────────────────────────┐ + │ │ + │ RedisClusterSlots │ + │ │ + │ - slots: Array[16384] │ + │ - masters: Array │ + │ - replicas: Array │ + │ - nodeByAddress: Map │ + │ - pubSubNode?: PubSubNode │ + │ - clientSideCache?: PooledClientSideCacheProvider │ + │ │ + │ Methods: │ + │ + connect() │ + │ + getClient() │ + │ + rediscover() │ + │ + getPubSubClient() │ + │ + getShardedPubSubClient() │ + │ + getRandomNode() │ + │ + getSlotRandomNode() │ + └───────┬─────────────────┬─────────────────┬─────────┘ + │ │ │ + ┌──────────┘ │ └─────────────┐ + │ │ │ + │ has many │ optionally has │ has many + ▼ ▼ ▼ + ┌────────────────────────┐ ┌────────────────────────┐ ┌────────────────────────┐ + │ Shard │ │ PubSubNode │ │ RedisClient │ + │ │ │ │ │ (per node) │ + │ - master: MasterNode │ │ - address: string │ │ │ + │ - replicas?: Array │ │ - client: RedisClient │ │ Socket, Queue, etc. │ + │ - nodesIterator │ │ - connectPromise │ │ │ + └──────────┬─────────────┘ └────────────────────────┘ └────────────────────────┘ + │ + │ contains + │ + ┌────────────┴────────────┐ + │ │ + ▼ ▼ +┌──────────────────┐ ┌──────────────────┐ +│ MasterNode │ │ ShardNode │ +│ │ │ (replica) │ +│ - id: string │ │ │ +│ - host: string │ │ - id: string │ +│ - port: number │ │ - host: string │ +│ - address │ │ - port: number │ +│ - readonly: no │ │ - address │ +│ - client? │ │ - readonly: yes │ +│ - pubSub? │ │ - client? │ +│ └─> client │ │ │ +│ └─> promise │ │ │ +└──────────────────┘ └──────────────────┘ + + +Additional Components: +───────────────────── + +┌────────────────────────────────────┐ +│ RedisClusterMultiCommand │ +│ │ +│ Used for MULTI/PIPELINE: │ +│ - Batches commands │ +│ - Routes to single node │ +│ - Returns typed results │ +│ │ +│ Methods: │ +│ + addCommand() │ +│ + exec() │ +│ + execAsPipeline() │ +└────────────────────────────────────┘ + +┌────────────────────────────────────┐ +│ PooledClientSideCacheProvider │ +│ (BasicPooledClientSideCache) │ +│ │ +│ RESP3 Client-Side Caching: │ +│ - Shared across all nodes │ +│ - Invalidation tracking │ +│ - TTL & eviction policies │ +│ │ +│ Methods: │ +│ + get() / set() │ +│ + invalidate() │ +│ + clear() / enable() / disable() │ +└────────────────────────────────────┘ + + +Key Relationships: +───────────────── + +1. RedisCluster + └─> RedisClusterSlots (manages topology) + └─> Shard[] (16,384 hash slots) + ├─> MasterNode (read/write) + │ └─> RedisClient + │ └─> PubSub RedisClient (sharded pub/sub) + └─> ShardNode[] (replicas, read-only if useReplicas=true) + └─> RedisClient + +2. RedisCluster + └─> RedisClusterMultiCommand (for transactions) + +3. RedisClusterSlots + └─> PubSubNode (global pub/sub) + └─> RedisClient + +4. RedisClusterSlots + └─> PooledClientSideCacheProvider (shared cache, RESP3 only) + + +Command Flow: +──────────── + +Single Command: + Client.sendCommand() + → Cluster._execute() + → Slots.getClient(key, isReadonly) + → Calculate slot from key + → Get Shard for slot + → Return master or replica client + → Client.sendCommand() + → [If MOVED/ASK error] + → Slots.rediscover() + → Retry with new node + +Transaction (MULTI): + Client.MULTI(routing) + → RedisClusterMultiCommand + → Accumulate commands + → All commands must route to same node + → client.exec() + +Pub/Sub: + Global: Uses single PubSubNode + Sharded: Uses per-master pubSub client based on channel hash + + +Discovery & Failover: +───────────────────── + +1. Initial Connect: + - Try rootNodes in random order + - Execute CLUSTER SLOTS command + - Build slot → shard mapping + - Create client connections + +2. Rediscovery (on MOVED error): + - Clear cache + - Re-fetch CLUSTER SLOTS + - Update topology + - Reconnect clients to new nodes + +3. Node Address Mapping: + - nodeAddressMap translates cluster IPs + - Useful for NAT/Docker scenarios diff --git a/CLUSTER-CLIENT-ARCHITECTURE.txt b/CLUSTER-CLIENT-ARCHITECTURE.txt new file mode 100644 index 00000000000..a2e85640492 --- /dev/null +++ b/CLUSTER-CLIENT-ARCHITECTURE.txt @@ -0,0 +1,157 @@ +═══════════════════════════════════════════════════════════════════════════════ + Redis Cluster Client Architecture + (Example: 3 Masters + 3 Replicas) +═══════════════════════════════════════════════════════════════════════════════ + +Legend: + 📦 = Actual object/instance in memory + 👉 = Reference/pointer to object + ⚡ = TCP connection to Redis server + +═══════════════════════════════════════════════════════════════════════════════ + +RedisClusterSlots 📦 +│ +├─── slots[16384] (Array) +│ │ +│ ├─── slots[0-5460] 👉─────┐ +│ ├─── slots[5461-10922] 👉─┤ +│ └─── slots[10923-16383] 👉┤ +│ │ +│ ▼ +│ Shard 📦 (references master/replica below) +│ +├─── masters[3] (Array) +│ │ +│ ├─── [0] MasterNode1 📦 +│ │ ├─── address: "host1:6379" +│ │ ├─── client: Client1 📦⚡ → Redis Master 1 +│ │ └─── pubSub?: 📦 +│ │ └─── client: PubSubClient1 📦⚡ → Redis Master 1 +│ │ +│ ├─── [1] MasterNode2 📦 +│ │ ├─── address: "host2:6379" +│ │ ├─── client: Client2 📦⚡ → Redis Master 2 +│ │ └─── pubSub?: 📦 +│ │ └─── client: PubSubClient2 📦⚡ → Redis Master 2 +│ │ +│ └─── [2] MasterNode3 📦 +│ ├─── address: "host3:6379" +│ ├─── client: Client3 📦⚡ → Redis Master 3 +│ └─── pubSub?: 📦 +│ └─── client: PubSubClient3 📦⚡ → Redis Master 3 +│ +├─── replicas[3] (Array) +│ │ +│ ├─── [0] ReplicaNode1 📦 +│ │ ├─── address: "host4:6379" +│ │ └─── client: Client4 📦⚡ → Redis Replica 1 +│ │ +│ ├─── [1] ReplicaNode2 📦 +│ │ ├─── address: "host5:6379" +│ │ └─── client: Client5 📦⚡ → Redis Replica 2 +│ │ +│ └─── [2] ReplicaNode3 📦 +│ ├─── address: "host6:6379" +│ └─── client: Client6 📦⚡ → Redis Replica 3 +│ +├─── nodeByAddress (Map) +│ │ +│ ├─── "host1:6379" 👉 MasterNode1 (same object as masters[0]) +│ ├─── "host2:6379" 👉 MasterNode2 (same object as masters[1]) +│ ├─── "host3:6379" 👉 MasterNode3 (same object as masters[2]) +│ ├─── "host4:6379" 👉 ReplicaNode1 (same object as replicas[0]) +│ ├─── "host5:6379" 👉 ReplicaNode2 (same object as replicas[1]) +│ └─── "host6:6379" 👉 ReplicaNode3 (same object as replicas[2]) +│ +└─── pubSubNode? (optional) 📦 + ├─── address: "host2:6379" (randomly selected) + └─── client: PubSubClient7 📦⚡ → Redis Master/Replica (any) + + +═══════════════════════════════════════════════════════════════════════════════ + CLIENT COUNT +═══════════════════════════════════════════════════════════════════════════════ + +Regular Clients (always created at connect): + ┌─────────────────────────────────────────┐ + │ Client1 📦⚡ → Master 1 (host1:6379) │ + │ Client2 📦⚡ → Master 2 (host2:6379) │ + │ Client3 📦⚡ → Master 3 (host3:6379) │ + │ Client4 📦⚡ → Replica 1 (host4:6379) │ + │ Client5 📦⚡ → Replica 2 (host5:6379) │ + │ Client6 📦⚡ → Replica 3 (host6:6379) │ + └─────────────────────────────────────────┘ + Total: 6 TCP connections ⚡ + +PubSub Clients (created on-demand): + ┌─────────────────────────────────────────────────────────────────┐ + │ PubSubClient1 📦⚡ → Master 1 (sharded pub/sub, SSUBSCRIBE) │ + │ PubSubClient2 📦⚡ → Master 2 (sharded pub/sub, SSUBSCRIBE) │ + │ PubSubClient3 📦⚡ → Master 3 (sharded pub/sub, SSUBSCRIBE) │ + │ PubSubClient7 📦⚡ → Any node (regular pub/sub, SUBSCRIBE) │ + └─────────────────────────────────────────────────────────────────┘ + Total: up to 4 TCP connections ⚡ + +═══════════════════════════════════════════════════════════════════════════════ +GRAND TOTAL: 6-10 TCP connections depending on pub/sub usage +═══════════════════════════════════════════════════════════════════════════════ + + +═══════════════════════════════════════════════════════════════════════════════ + HOW CLIENTS ARE USED +═══════════════════════════════════════════════════════════════════════════════ + +1. Regular Commands (GET, SET, HGET, etc.) + ┌─────────────────────────────────────────────────────────────┐ + │ • Calculate slot: slot = hash(key) % 16384 │ + │ • Lookup: shard = slots[slot] │ + │ • For writes: use shard.master.client │ + │ • For reads: use shard.master.client OR shard.replicas[].client │ + └─────────────────────────────────────────────────────────────┘ + +2. Sharded Pub/Sub (SSUBSCRIBE, SPUBLISH) - Redis 7.0+ + ┌─────────────────────────────────────────────────────────────┐ + │ • Calculate slot: slot = hash(channel) % 16384 │ + │ • Lookup: shard = slots[slot] │ + │ • Use: shard.master.pubSub.client │ + │ • Each master has its own dedicated sharded pub/sub client │ + └─────────────────────────────────────────────────────────────┘ + +3. Regular Pub/Sub (SUBSCRIBE, PSUBSCRIBE) + ┌─────────────────────────────────────────────────────────────┐ + │ • Use: pubSubNode.client │ + │ • Single shared client for all non-sharded pub/sub │ + │ • Can be any master or replica (randomly selected) │ + └─────────────────────────────────────────────────────────────┘ + +4. Node Lookup by Address + ┌─────────────────────────────────────────────────────────────┐ + │ • Use: nodeByAddress.get("host:port") │ + │ • Returns reference to master/replica node │ + │ • Useful for handling redirects (MOVED/ASK responses) │ + └─────────────────────────────────────────────────────────────┘ + + +═══════════════════════════════════════════════════════════════════════════════ + KEY INSIGHTS +═══════════════════════════════════════════════════════════════════════════════ + +✓ NO DUPLICATE NODE OBJECTS + All arrays and maps reference the same Node 📦 objects. There are exactly + 6 node objects (3 masters + 3 replicas), not 18 or 24. + +✓ SEPARATE CLIENTS FOR PUB/SUB + Pub/sub requires dedicated clients because once a Redis client enters + pub/sub mode, it can only execute pub/sub commands. Regular commands + cannot share the same connection. + +✓ LAZY CREATION + Pub/sub clients are only created when first needed (on-demand), not at + cluster connection time. + +✓ EFFICIENT ROUTING + - slots[] = Fast O(1) slot-to-shard lookup for commands + - masters[] = Iterate all masters + - replicas[] = Iterate all replicas + - nodeByAddress = Fast O(1) address-to-node lookup for redirects diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index 0dde6d6c52a..139a060a110 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -1,13 +1,22 @@ -import { RedisClientOptions } from "."; +import RedisClient, { RedisClientOptions } from "."; import RedisCommandsQueue from "./commands-queue"; import { RedisArgument } from "../.."; import { isIP } from "net"; import { lookup } from "dns/promises"; import assert from "node:assert"; import { setTimeout } from "node:timers/promises"; -import RedisSocket, { RedisTcpSocketOptions } from "./socket"; +import { RedisTcpSocketOptions } from "./socket"; import diagnostics_channel from "node:diagnostics_channel"; +type RedisType = RedisClient; + +export const SMIGRATED_EVENT = "__SMIGRATED"; +export interface SMigratedEvent { + source: { host: string, port: number }; + destination: { host: string, port: number }; + ranges: (number | [number, number])[] +} + export const MAINTENANCE_EVENTS = { PAUSE_WRITING: "pause-writing", RESUME_WRITING: "resume-writing", @@ -21,6 +30,7 @@ const PN = { FAILING_OVER: "FAILING_OVER", FAILED_OVER: "FAILED_OVER", SMIGRATING: "SMIGRATING", + SMIGRATED: "SMIGRATED", }; export type DiagnosticsEvent = { @@ -46,23 +56,11 @@ export interface MaintenanceUpdate { relaxedSocketTimeout?: number; } -interface Client { - _ejectSocket: () => RedisSocket; - _insertSocket: (socket: RedisSocket) => void; - _pause: () => void; - _unpause: () => void; - _maintenanceUpdate: (update: MaintenanceUpdate) => void; - duplicate: () => Client; - connect: () => Promise; - destroy: () => void; - on: (event: string, callback: (value: unknown) => void) => void; -} - export default class EnterpriseMaintenanceManager { #commandsQueue: RedisCommandsQueue; #options: RedisClientOptions; #isMaintenance = 0; - #client: Client; + #client: RedisType; static setupDefaultMaintOptions(options: RedisClientOptions) { if (options.maintNotifications === undefined) { @@ -94,7 +92,7 @@ export default class EnterpriseMaintenanceManager { if (!host) return; - const tls = options.socket?.tls ?? false + const tls = options.socket?.tls ?? false; const movingEndpointType = await determineEndpoint(tls, host, options); return { @@ -116,7 +114,7 @@ export default class EnterpriseMaintenanceManager { constructor( commandsQueue: RedisCommandsQueue, - client: Client, + client: RedisType, options: RedisClientOptions, ) { this.#commandsQueue = commandsQueue; @@ -136,12 +134,12 @@ export default class EnterpriseMaintenanceManager { const type = String(push[0]); emitDiagnostics({ - type, - timestamp: Date.now(), - data: { - push: push.map(String), - }, - }); + type, + timestamp: Date.now(), + data: { + push: push.map(String), + }, + }); switch (type) { case PN.MOVING: { // [ 'MOVING', '17', '15', '54.78.247.156:12075' ] @@ -165,6 +163,14 @@ export default class EnterpriseMaintenanceManager { this.#onMigrated(); return true; } + case PN.SMIGRATED: { + // [ 'SMIGRATED', '123', '54.78.247.156:12075' 'slot1,rangediff1' ] + // ^seq ^new ip ^slots info + const sequenceId = Number(push[1]); + dbgMaintenance("Received SMIGRATED"); + this.#client._handleSmigrated(sequenceId); + return true; + } } return false; }; @@ -222,7 +228,7 @@ export default class EnterpriseMaintenanceManager { // If the URL is provided, it takes precedense // the options object could just be mutated - if(this.#options.url) { + if (this.#options.url) { const u = new URL(this.#options.url); u.hostname = host; u.port = String(port); @@ -231,15 +237,17 @@ export default class EnterpriseMaintenanceManager { this.#options.socket = { ...this.#options.socket, host, - port - } + port, + }; } const tmpClient = this.#client.duplicate(); - tmpClient.on('error', (error: unknown) => { + tmpClient.on("error", (error: unknown) => { //We dont know how to handle tmp client errors - dbgMaintenance(`[ERR]`, error) + dbgMaintenance(`[ERR]`, error); }); - dbgMaintenance(`Tmp client created in ${( performance.now() - start ).toFixed(2)}ms`); + dbgMaintenance( + `Tmp client created in ${(performance.now() - start).toFixed(2)}ms`, + ); dbgMaintenance( `Set timeout for tmp client to ${this.#options.maintRelaxedSocketTimeout}`, ); @@ -250,7 +258,9 @@ export default class EnterpriseMaintenanceManager { dbgMaintenance(`Connecting tmp client: ${host}:${port}`); start = performance.now(); await tmpClient.connect(); - dbgMaintenance(`Connected to tmp client in ${(performance.now() - start).toFixed(2)}ms`); + dbgMaintenance( + `Connected to tmp client in ${(performance.now() - start).toFixed(2)}ms`, + ); // 3 [EVENT] New socket connected dbgMaintenance(`Wait for all in-flight commands to complete`); @@ -296,7 +306,7 @@ export default class EnterpriseMaintenanceManager { const update: MaintenanceUpdate = { relaxedCommandTimeout: undefined, - relaxedSocketTimeout: undefined + relaxedSocketTimeout: undefined, }; this.#client._maintenanceUpdate(update); @@ -339,9 +349,7 @@ async function determineEndpoint( ): Promise { assert(options.maintEndpointType !== undefined); if (options.maintEndpointType !== "auto") { - dbgMaintenance( - `Determine endpoint type: ${options.maintEndpointType}`, - ); + dbgMaintenance(`Determine endpoint type: ${options.maintEndpointType}`); return options.maintEndpointType; } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index ea2102c37fd..d8236b4d81c 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -20,7 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } import { BasicCommandParser, CommandParser } from './parser'; import SingleEntryCache from '../single-entry-cache'; import { version } from '../../package.json' -import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType } from './enterprise-maintenance-manager'; +import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType, SMIGRATED_EVENT } from './enterprise-maintenance-manager'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -1007,6 +1007,13 @@ export default class RedisClient< this._self.#maybeScheduleWrite(); } + /** + * @internal + */ + _handleSmigrated(sequenceId: number) { + this._self.emit(SMIGRATED_EVENT, sequenceId); + } + /** * @internal */ diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index ae814958437..ec0bf0384d9 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -7,6 +7,7 @@ import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions import calculateSlot from 'cluster-key-slot'; import { RedisSocketOptions } from '../client/socket'; import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; +import { SMIGRATED_EVENT, SMigratedEvent, dbgMaintenance } from '../client/enterprise-maintenance-manager'; interface NodeAddress { host: string; @@ -192,6 +193,7 @@ export default class RedisClusterSlots< eagerConnect = this.#options.minimizeConnections !== true; const shards = await this.#getShards(rootNode); + dbgMaintenance(shards); this.#resetSlots(); // Reset slots AFTER shards have been fetched to prevent a race condition for (const { from, to, master, replicas } of shards) { const shard: Shard = { @@ -251,12 +253,83 @@ export default class RedisClusterSlots< } } + #handleSmigrated = async (event: SMigratedEvent) => { + dbgMaintenance(`[CSlots]: handle smigrated`, event); + + // slots = new Array>(RedisClusterSlots.#SLOTS); + // masters = new Array>(); + // replicas = new Array>(); + // readonly nodeByAddress = new Map | ShardNode>(); + // pubSubNode?: PubSubNode; + + const sourceAddress = `${event.source.host}:${event.source.port}`; + const sourceNode = this.nodeByAddress.get(sourceAddress); + if(!sourceNode) { + dbgMaintenance(`[CSlots]: address ${sourceAddress} not in 'nodeByAddress', abort SMIGRATED handling`); + return; + } + + // 1. Pausing + //TODO - check the single pubsubnode + sourceNode.client?._pause(); + if('pubSub' in sourceNode) { + sourceNode.pubSub?.client._pause(); + } + + const destinationAddress = `${event.destination.host}:${event.destination.port}`; + let destinationNode = this.nodeByAddress.get(destinationAddress); + let destinationShard: Shard; + + // 2. Create new Master + if(!destinationNode) { + const promises: Promise[] = []; + destinationNode = this.#initiateSlotNode({ host: event.destination.host, port: event.destination.port, id: 'asdff' }, false, true, new Set(), promises); + await Promise.all(promises); + // 2.1 Pause + destinationNode.client?._pause(); + // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine + destinationShard = { + master: destinationNode + }; + } else { + // In case destination node existed, this means there was a Shard already, so its best if we can find it. + const existingShard = this.slots.find(shard => shard.master.host === event.destination.host && shard.master.port === event.destination.port); + if(!existingShard) { + dbgMaintenance("Could not find shard"); + throw new Error('Could not find shard'); + } + destinationShard = existingShard; + } + + // 3. Soft update shards + for(const range of event.ranges) { + if(typeof range === 'number') { + this.slots[range] = destinationShard; + } else { + for (let slot = range[0]; slot <= range[1]; slot++) { + this.slots[slot] = destinationShard; + } + } + } + + // 4. For all affected clients (normal, pubsub, spubsub): + // 4.1 Wait for inflight commands to complete + // 4.2 Extract commands, channels, sharded channels + // 4.3 Kill if no slots are pointing to it + // + + // 5. Prepend extracted commands, chans + // 5.1 Unpause + + } + async #getShards(rootNode: RedisClusterClientOptions) { const options = this.#clientOptionsDefaults(rootNode)!; options.socket ??= {}; options.socket.reconnectStrategy = false; options.RESP = this.#options.RESP; options.commandOptions = undefined; + options.maintNotifications = 'disabled'; // TODO: find a way to avoid type casting const client = await this.#clientFactory(options as RedisClientOptions) @@ -344,8 +417,7 @@ export default class RedisClusterSlots< port: socket.port, }); const emit = this.#emit; - const client = this.#clientFactory( - this.#clientOptionsDefaults({ + const client = this.#clientFactory( this.#clientOptionsDefaults({ clientSideCache: this.clientSideCache, RESP: this.#options.RESP, socket, @@ -356,6 +428,7 @@ export default class RedisClusterSlots< .once('ready', () => emit('node-ready', clientInfo)) .once('connect', () => emit('node-connect', clientInfo)) .once('end', () => emit('node-disconnect', clientInfo)) + .on(SMIGRATED_EVENT, this.#handleSmigrated) .on('__MOVED', async (allPubSubListeners: PubSubListeners) => { await this.rediscover(client); this.#emit('__resubscribeAllPubSubListeners', allPubSubListeners); From e186a130e24f9bb6bd5b2524247ffae7692a051f Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 2 Dec 2025 14:56:50 +0200 Subject: [PATCH 03/15] deduplicate notifications based on sequence id --- .../client/lib/client/enterprise-maintenance-manager.ts | 1 + packages/client/lib/cluster/cluster-slots.ts | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index 139a060a110..953d7be673a 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -12,6 +12,7 @@ type RedisType = RedisClient; export const SMIGRATED_EVENT = "__SMIGRATED"; export interface SMigratedEvent { + seqId: number, source: { host: string, port: number }; destination: { host: string, port: number }; ranges: (number | [number, number])[] diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index ec0bf0384d9..78aa30d46aa 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -114,6 +114,7 @@ export default class RedisClusterSlots< readonly nodeByAddress = new Map | ShardNode>(); pubSubNode?: PubSubNode; clientSideCache?: PooledClientSideCacheProvider; + smigratedSeqIdsSeen = new Set; #isOpen = false; @@ -256,6 +257,12 @@ export default class RedisClusterSlots< #handleSmigrated = async (event: SMigratedEvent) => { dbgMaintenance(`[CSlots]: handle smigrated`, event); + if(this.smigratedSeqIdsSeen.has(event.seqId)) { + dbgMaintenance(`[CSlots]: sequence id ${event.seqId} already seen, abort`) + return + } + this.smigratedSeqIdsSeen.add(event.seqId); + // slots = new Array>(RedisClusterSlots.#SLOTS); // masters = new Array>(); // replicas = new Array>(); From 7648abbd41cf371bbc83c965faeaf63183c59068 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 2 Dec 2025 14:57:07 +0200 Subject: [PATCH 04/15] add slotnumber to commands --- packages/client/lib/client/commands-queue.ts | 7 +++++++ packages/client/lib/cluster/cluster-slots.ts | 22 ++++++++++++++------ packages/client/lib/cluster/index.ts | 13 ++++++++---- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 9b7f737113b..2620bb4d767 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -19,6 +19,11 @@ export interface CommandOptions { * Timeout for the command in milliseconds */ timeout?: number; + /** + * @internal + * The slot the command is targeted to (if any) + */ + slotNumber?: number; } export interface CommandToWrite extends CommandWaitingForReply { @@ -33,6 +38,7 @@ export interface CommandToWrite extends CommandWaitingForReply { listener: () => unknown; originalTimeout: number | undefined; } | undefined; + slotNumber?: number } interface CommandWaitingForReply { @@ -219,6 +225,7 @@ export default class RedisCommandsQueue { channelsCounter: undefined, typeMapping: options?.typeMapping }; + value.slotNumber = options?.slotNumber // If #maintenanceCommandTimeout was explicitly set, we should // use it instead of the timeout provided by the command diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 78aa30d46aa..165b329c9d1 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -453,7 +453,7 @@ export default class RedisClusterSlots< nodeClient(node: ShardNode) { return ( node.connectPromise ?? // if the node is connecting - node.client ?? // if the node is connected + (node.client ? Promise.resolve(node.client) : undefined) ?? // if the node is connected this.#createNodeClient(node) // if the not is disconnected ); } @@ -547,20 +547,30 @@ export default class RedisClusterSlots< this.#emit('disconnect'); } - getClient( + async getClientAndSlotNumber( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined - ) { + ): Promise<{ + client: RedisClientType, + slotNumber?: number + }> { if (!firstKey) { - return this.nodeClient(this.getRandomNode()); + return { + client: await this.nodeClient(this.getRandomNode()) + }; } const slotNumber = calculateSlot(firstKey); if (!isReadonly) { - return this.nodeClient(this.slots[slotNumber].master); + return { + client: await this.nodeClient(this.slots[slotNumber].master), + slotNumber + }; } - return this.nodeClient(this.getSlotRandomNode(slotNumber)); + return { + client: await this.nodeClient(this.getSlotRandomNode(slotNumber)) + }; } *#iterateAllNodes() { diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 238f3a59198..3a56e99ec97 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -416,13 +416,16 @@ export default class RedisCluster< fn: (client: RedisClientType, opts?: ClusterCommandOptions) => Promise ): Promise { const maxCommandRedirections = this._options.maxCommandRedirections ?? 16; - let client = await this._slots.getClient(firstKey, isReadonly); + let { client, slotNumber } = await this._slots.getClientAndSlotNumber(firstKey, isReadonly); let i = 0; let myFn = fn; while (true) { try { + if(options !== undefined) { + options.slotNumber = slotNumber; + } return await myFn(client, options); } catch (err) { myFn = fn; @@ -451,7 +454,9 @@ export default class RedisCluster< if (err.message.startsWith('MOVED')) { await this._slots.rediscover(client); - client = await this._slots.getClient(firstKey, isReadonly); + const clientAndSlot = await this._slots.getClientAndSlotNumber(firstKey, isReadonly); + client = clientAndSlot.client; + slotNumber = clientAndSlot.slotNumber; continue; } @@ -485,11 +490,11 @@ export default class RedisCluster< type Multi = new (...args: ConstructorParameters) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>; return new ((this as any).Multi as Multi)( async (firstKey, isReadonly, commands) => { - const client = await this._self._slots.getClient(firstKey, isReadonly); + const { client } = await this._self._slots.getClientAndSlotNumber(firstKey, isReadonly); return client._executeMulti(commands); }, async (firstKey, isReadonly, commands) => { - const client = await this._self._slots.getClient(firstKey, isReadonly); + const { client } = await this._self._slots.getClientAndSlotNumber(firstKey, isReadonly); return client._executePipeline(commands); }, routing, From 8abb22d243ff3482963f97d63a2ee9c95246c9e3 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 1 Dec 2025 15:37:20 +0200 Subject: [PATCH 05/15] add support for extracting commands from queue --- packages/client/lib/client/commands-queue.ts | 46 ++++++++++++++++++++ packages/client/lib/client/pub-sub.ts | 26 +++++++++++ 2 files changed, 72 insertions(+) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 2620bb4d767..f1d9e400d5c 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -349,6 +349,10 @@ export default class RedisCommandsQueue { return this.#pubSub.removeAllListeners(); } + removePubSubListenersForSlots(slots: Set) { + return this.#pubSub.removePubSubListenersForSlots(slots); + } + resubscribe(chainId?: symbol) { const commands = this.#pubSub.resubscribe(); if (!commands.length) return; @@ -548,4 +552,46 @@ export default class RedisCommandsQueue { this.#waitingForReply.length === 0 ); } + + /** + * + * Extracts commands for the given slots from the toWrite queue. + * Some commands dont have "slotNumber", which means they are not designated to particular slot/node. + * We ignore those. + */ + extractCommandsForSlots(slots: Set): CommandToWrite[] { + const result: CommandToWrite[] = []; + let current = this.#toWrite.head; + while(current !== undefined) { + if(current.value.slotNumber !== undefined && slots.has(current.value.slotNumber)) { + result.push(current.value); + const toRemove = current; + current = current.next; + this.#toWrite.remove(toRemove); + } + } + return result; + } + + /** + * Gets all commands from the write queue without removing them. + */ + getAllCommands(): CommandToWrite[] { + const result: CommandToWrite[] = []; + let current = this.#toWrite.head; + while(current) { + result.push(current.value); + current = current.next; + } + return result; + } + + /** + * Prepends commands to the write queue in reverse. + */ + prependCommandsToWrite(commands: CommandToWrite[]) { + for (let i = commands.length - 1; i <= 0; i--) { + this.#toWrite.unshift(commands[i]); + } + } } diff --git a/packages/client/lib/client/pub-sub.ts b/packages/client/lib/client/pub-sub.ts index 1895f96a883..a0dfe68cc15 100644 --- a/packages/client/lib/client/pub-sub.ts +++ b/packages/client/lib/client/pub-sub.ts @@ -1,5 +1,6 @@ import { RedisArgument } from '../RESP/types'; import { CommandToWrite } from './commands-queue'; +import calculateSlot from 'cluster-key-slot'; export const PUBSUB_TYPE = { CHANNELS: 'CHANNELS', @@ -420,6 +421,31 @@ export class PubSub { return result; } + removePubSubListenersForSlots(slots: Set) { + const channels = new Map(); + for (const [channel, value] of this.listeners[PUBSUB_TYPE.CHANNELS]) { + if (slots.has(calculateSlot(channel))) { + channels.set(channel, value); + this.listeners[PUBSUB_TYPE.CHANNELS].delete(channel); + } + } + + const sharded = new Map(); + for (const [chanel, value] of this.listeners[PUBSUB_TYPE.SHARDED]) { + if (slots.has(calculateSlot(chanel))) { + sharded.set(chanel, value); + this.listeners[PUBSUB_TYPE.SHARDED].delete(chanel); + } + } + + this.#updateIsActive(); + + return { + [PUBSUB_TYPE.CHANNELS]: channels, + [PUBSUB_TYPE.SHARDED]: sharded + }; + } + #emitPubSubMessage( type: PubSubType, message: Buffer, From 7b3a1fdfb0b9c24ef4f2c98331a53888c5d69791 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 1 Dec 2025 16:16:04 +0200 Subject: [PATCH 06/15] parse notification --- .../client/enterprise-maintenance-manager.ts | 68 +++++++++++++++---- packages/client/lib/client/index.ts | 9 +-- packages/client/lib/cluster/cluster-slots.ts | 2 +- 3 files changed, 62 insertions(+), 17 deletions(-) diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index 953d7be673a..e3eec0b0172 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -14,8 +14,10 @@ export const SMIGRATED_EVENT = "__SMIGRATED"; export interface SMigratedEvent { seqId: number, source: { host: string, port: number }; - destination: { host: string, port: number }; - ranges: (number | [number, number])[] + destinations: { + host: string, port: number + slots: (number | [number, number])[] + }[] } export const MAINTENANCE_EVENTS = { @@ -165,11 +167,8 @@ export default class EnterpriseMaintenanceManager { return true; } case PN.SMIGRATED: { - // [ 'SMIGRATED', '123', '54.78.247.156:12075' 'slot1,rangediff1' ] - // ^seq ^new ip ^slots info - const sequenceId = Number(push[1]); dbgMaintenance("Received SMIGRATED"); - this.#client._handleSmigrated(sequenceId); + this.#onSMigrated(push); return true; } } @@ -205,12 +204,9 @@ export default class EnterpriseMaintenanceManager { // period that was communicated by the server is over. if (url === null) { assert(this.#options.maintEndpointType === "none"); - assert(this.#options.socket !== undefined); - assert("host" in this.#options.socket); - assert(typeof this.#options.socket.host === "string"); - host = this.#options.socket.host; - assert(typeof this.#options.socket.port === "number"); - port = this.#options.socket.port; + const { host: h, port: p } = this.#getAddress() + host = h; + port = p; const waitTime = (afterSeconds * 1000) / 2; dbgMaintenance(`Wait for ${waitTime}ms`); await setTimeout(waitTime); @@ -312,8 +308,56 @@ export default class EnterpriseMaintenanceManager { this.#client._maintenanceUpdate(update); }; + + #onSMigrated = (push: any[]) => { + // [ 'SMIGRATED', '15', [ '127.0.0.1:6379 123,456,789-1000', '127.0.0.1:6380 124,457,300-500' ] ] + // ^seq ^new endpoint1 ^slots ^new endpoint2 ^slots + const sequenceId = Number(push[1]); + const smigratedEvent: SMigratedEvent = { + seqId: sequenceId, + source: { + ...this.#getAddress() + }, + destinations: [] + } + for(const endpointInfo of push[2]) { + const [endpoint, slots] = String(endpointInfo).split(' '); + //TODO not sure if we need to handle fqdn/ip.. cluster manages clients by host:port. If `cluster slots` returns ip, + // but this notification returns fqdn, then we need to unify somehow ( maybe lookup ) + const [ host, port ] = endpoint.split(':'); + // `slots` could be mix of single slots and ranges, for example: 123,456,789-1000 + const parsedSlots = slots.split(',').map((singleOrRange): number | [number, number] => { + const separatorIndex = singleOrRange.indexOf('-'); + if(separatorIndex === -1) { + // Its single slot + return Number(singleOrRange); + } + // Its range + return [Number(singleOrRange.substring(0, separatorIndex)), Number(singleOrRange.substring(separatorIndex + 1))]; + }); + + smigratedEvent.destinations.push({ + host, + port: Number(port), + slots: parsedSlots + }) + } + this.#client._handleSmigrated(smigratedEvent); + } + + + #getAddress(): { host: string, port: number } { + assert(this.#options.socket !== undefined); + assert("host" in this.#options.socket); + assert(typeof this.#options.socket.host === "string"); + const host = this.#options.socket.host; + assert(typeof this.#options.socket.port === "number"); + const port = this.#options.socket.port; + return { host, port }; + } } + export type MovingEndpointType = | "auto" | "internal-ip" diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index d8236b4d81c..a97dfa02044 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -20,7 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } import { BasicCommandParser, CommandParser } from './parser'; import SingleEntryCache from '../single-entry-cache'; import { version } from '../../package.json' -import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType, SMIGRATED_EVENT } from './enterprise-maintenance-manager'; +import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType, SMIGRATED_EVENT, SMigratedEvent } from './enterprise-maintenance-manager'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -1010,9 +1010,10 @@ export default class RedisClient< /** * @internal */ - _handleSmigrated(sequenceId: number) { - this._self.emit(SMIGRATED_EVENT, sequenceId); - } + _handleSmigrated(smigratedEvent: SMigratedEvent) { + this._self.emit(SMIGRATED_EVENT, smigratedEvent); + } + /** * @internal diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 165b329c9d1..2e42e2b9286 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -255,7 +255,7 @@ export default class RedisClusterSlots< } #handleSmigrated = async (event: SMigratedEvent) => { - dbgMaintenance(`[CSlots]: handle smigrated`, event); + dbgMaintenance(`[CSlots]: handle smigrated`, JSON.stringify(event, null, 2)); if(this.smigratedSeqIdsSeen.has(event.seqId)) { dbgMaintenance(`[CSlots]: sequence id ${event.seqId} already seen, abort`) From 9b8177eba344daa583ddc24f59d4f242e3968773 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 2 Dec 2025 14:46:59 +0200 Subject: [PATCH 07/15] work on main algo --- packages/client/lib/client/index.ts | 8 +- packages/client/lib/cluster/cluster-slots.ts | 162 +++++++++++++------ 2 files changed, 117 insertions(+), 53 deletions(-) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index a97dfa02044..9ad044d0a15 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -1014,6 +1014,12 @@ export default class RedisClient< this._self.emit(SMIGRATED_EVENT, smigratedEvent); } + /** + * @internal + */ + _getQueue(): RedisCommandsQueue { + return this.#queue; + } /** * @internal @@ -1081,7 +1087,7 @@ export default class RedisClient< // Merge global options with provided options const opts = { ...this._self._commandOptions, - ...options + ...options, } const promise = this._self.#queue.addCommand(args, opts); diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 2e42e2b9286..07196a65b4a 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -263,12 +263,6 @@ export default class RedisClusterSlots< } this.smigratedSeqIdsSeen.add(event.seqId); - // slots = new Array>(RedisClusterSlots.#SLOTS); - // masters = new Array>(); - // replicas = new Array>(); - // readonly nodeByAddress = new Map | ShardNode>(); - // pubSubNode?: PubSubNode; - const sourceAddress = `${event.source.host}:${event.source.port}`; const sourceNode = this.nodeByAddress.get(sourceAddress); if(!sourceNode) { @@ -277,56 +271,120 @@ export default class RedisClusterSlots< } // 1. Pausing - //TODO - check the single pubsubnode + // 1.1 Normal sourceNode.client?._pause(); + // 1.2 Sharded pubsub if('pubSub' in sourceNode) { sourceNode.pubSub?.client._pause(); } - - const destinationAddress = `${event.destination.host}:${event.destination.port}`; - let destinationNode = this.nodeByAddress.get(destinationAddress); - let destinationShard: Shard; - - // 2. Create new Master - if(!destinationNode) { - const promises: Promise[] = []; - destinationNode = this.#initiateSlotNode({ host: event.destination.host, port: event.destination.port, id: 'asdff' }, false, true, new Set(), promises); - await Promise.all(promises); - // 2.1 Pause - destinationNode.client?._pause(); - // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine - destinationShard = { - master: destinationNode - }; - } else { - // In case destination node existed, this means there was a Shard already, so its best if we can find it. - const existingShard = this.slots.find(shard => shard.master.host === event.destination.host && shard.master.port === event.destination.port); - if(!existingShard) { - dbgMaintenance("Could not find shard"); - throw new Error('Could not find shard'); - } - destinationShard = existingShard; - } - - // 3. Soft update shards - for(const range of event.ranges) { - if(typeof range === 'number') { - this.slots[range] = destinationShard; - } else { - for (let slot = range[0]; slot <= range[1]; slot++) { - this.slots[slot] = destinationShard; - } - } - } - - // 4. For all affected clients (normal, pubsub, spubsub): - // 4.1 Wait for inflight commands to complete - // 4.2 Extract commands, channels, sharded channels - // 4.3 Kill if no slots are pointing to it - // - - // 5. Prepend extracted commands, chans - // 5.1 Unpause + // 1.3 Regular pubsub + if(this.pubSubNode?.address === sourceAddress) { + this.pubSubNode?.client._pause(); + } + + // const destinationAddress = `${event.destination.host}:${event.destination.port}`; + // let destinationNode = this.nodeByAddress.get(destinationAddress); + // let destinationShard: Shard; + + // // 2. Create new Master + // // TODO create new pubsubnode if needed + // if(!destinationNode) { + // const promises: Promise[] = []; + // destinationNode = this.#initiateSlotNode({ host: event.destination.host, port: event.destination.port, id: 'asdff' }, false, true, new Set(), promises); + // await Promise.all(promises); + // // 2.1 Pause + // destinationNode.client?._pause(); + // // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine + // destinationShard = { + // master: destinationNode + // }; + // } else { + // // In case destination node existed, this means there was a Shard already, so its best if we can find it. + // const existingShard = this.slots.find(shard => shard.master.host === event.destination.host && shard.master.port === event.destination.port); + // if(!existingShard) { + // dbgMaintenance("Could not find shard"); + // throw new Error('Could not find shard'); + // } + // destinationShard = existingShard; + // } + + // // 3. Soft update shards. + // // After this step we are expecting any new commands that hash to the same slots to be routed to the destinationShard + // const movingSlots = new Set(); + // for(const range of event.ranges) { + // if(typeof range === 'number') { + // this.slots[range] = destinationShard; + // movingSlots.add(range) + // } else { + // for (let slot = range[0]; slot <= range[1]; slot++) { + // this.slots[slot] = destinationShard; + // movingSlots.add(slot) + // } + // } + // } + + // // 4. For all affected clients (normal, pubsub, spubsub): + // // 4.1 Wait for inflight commands to complete + // const inflightPromises: Promise[] = []; + // //Normal + // inflightPromises.push(sourceNode.client!._getQueue().waitForInflightCommandsToComplete()); + // //Sharded pubsub + // if('pubSub' in sourceNode) { + // inflightPromises.push(sourceNode.pubSub!.client._getQueue().waitForInflightCommandsToComplete()); + // } + // //Regular pubsub + // if(this.pubSubNode?.address === sourceAddress) { + // inflightPromises.push(this.pubSubNode?.client._getQueue().waitForInflightCommandsToComplete()); + // } + // await Promise.all(inflightPromises); + + + // // 4.2 Extract commands, channels, sharded channels + // // TODO dont forget to extract channels and resubscribe + // const sourceStillHasSlots = this.slots.find(slot => slot.master.address === sourceAddress) !== undefined; + // if(sourceStillHasSlots) { + // const normalCommandsToMove = sourceNode.client!._getQueue().extractCommandsForSlots(movingSlots); + // // 5. Prepend extracted commands, chans + // //TODO pubsub, spubsub + // destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); + + // //unpause source node clients + // sourceNode.client?._unpause(); + // if('pubSub' in sourceNode) { + // sourceNode.pubSub?.client._unpause(); + // } + // //TODO pubSubNode? + // } else { + + // const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands(); + // // 5. Prepend extracted commands, chans + // destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); + // if('pubSub' in destinationNode) { + // // const pubsubListeners = destinationNode.pubSub?.client._getQueue().removePubSubListenersForSlots(movingSlots); + // //TODO resubscribe. Might need to throw an event for cluster to do the job + // } + // //TODO pubSubNode? + + // //Cleanup + // this.masters = this.masters.filter(master => master.address !== sourceAddress); + // //not sure if needed, since there should be no replicas in RE + // this.replicas = this.replicas.filter(replica => replica.address !== sourceAddress); + // this.nodeByAddress.delete(sourceAddress); + // //TODO pubSubNode? + + // // 4.3 Kill because no slots are pointing to it anymore + // await sourceNode.client?.close() + // if('pubSub' in sourceNode) { + // await sourceNode.pubSub?.client.close(); + // } + // //TODO pubSubNode? + // } + + // // 5.1 Unpause + // destinationNode.client?._unpause(); + // if('pubSub' in destinationNode) { + // destinationNode.pubSub?.client._unpause(); + // } } From b3fb5db53dc7ad017a425a937c0b2518cbab6114 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 2 Dec 2025 15:23:05 +0200 Subject: [PATCH 08/15] fix: handle string values in push message reply comparison Buffer.equals() was failing when reply[0] was a string instead of a Buffer, causing hangs on push notifications. Now converts strings to Buffers before comparison in PubSub and commands-queue handlers. Changes: - PubSub.isStatusReply: convert reply[0] to Buffer if string - PubSub.isShardedUnsubscribe: convert reply[0] to Buffer if string - PubSub.handleMessageReply: convert reply[0] to Buffer if string - commands-queue PONG handler: convert reply[0] to Buffer if string --- packages/client/lib/client/commands-queue.ts | 3 ++- packages/client/lib/client/pub-sub.ts | 21 +++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index f1d9e400d5c..99abcb3e6f8 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -290,7 +290,8 @@ export default class RedisCommandsQueue { if (Array.isArray(reply)) { if (this.#onPush(reply)) return; - if (PONG.equals(reply[0] as Buffer)) { + const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0]; + if (PONG.equals(firstElement as Buffer)) { const { resolve, typeMapping } = this.#waitingForReply.shift()!, buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer; resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString()); diff --git a/packages/client/lib/client/pub-sub.ts b/packages/client/lib/client/pub-sub.ts index a0dfe68cc15..1bad2fa9bec 100644 --- a/packages/client/lib/client/pub-sub.ts +++ b/packages/client/lib/client/pub-sub.ts @@ -52,17 +52,19 @@ export type PubSubCommand = ( export class PubSub { static isStatusReply(reply: Array): boolean { + const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0]; return ( - COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.equals(reply[0]) || - COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.equals(reply[0]) || - COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.equals(reply[0]) || - COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.equals(reply[0]) || - COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.equals(reply[0]) + COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.equals(firstElement) || + COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.equals(firstElement) || + COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.equals(firstElement) || + COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.equals(firstElement) || + COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.equals(firstElement) ); } static isShardedUnsubscribe(reply: Array): boolean { - return COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.equals(reply[0]); + const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0]; + return COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.equals(firstElement); } static #channelsArray(channels: string | Array) { @@ -371,14 +373,15 @@ export class PubSub { } handleMessageReply(reply: Array): boolean { - if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(reply[0])) { + const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0]; + if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(firstElement)) { this.#emitPubSubMessage( PUBSUB_TYPE.CHANNELS, reply[2], reply[1] ); return true; - } else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(reply[0])) { + } else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(firstElement)) { this.#emitPubSubMessage( PUBSUB_TYPE.PATTERNS, reply[3], @@ -386,7 +389,7 @@ export class PubSub { reply[1] ); return true; - } else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(reply[0])) { + } else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(firstElement)) { this.#emitPubSubMessage( PUBSUB_TYPE.SHARDED, reply[2], From a8a45256017b51b0fdb2c2944009b51ebb857b8a Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 2 Dec 2025 15:32:40 +0200 Subject: [PATCH 09/15] parse SMIGRATED according to new format --- .../client/lib/client/enterprise-maintenance-manager.ts | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index e3eec0b0172..a05adfdc5cc 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -310,9 +310,9 @@ export default class EnterpriseMaintenanceManager { }; #onSMigrated = (push: any[]) => { - // [ 'SMIGRATED', '15', [ '127.0.0.1:6379 123,456,789-1000', '127.0.0.1:6380 124,457,300-500' ] ] - // ^seq ^new endpoint1 ^slots ^new endpoint2 ^slots - const sequenceId = Number(push[1]); + // [ 'SMIGRATED', 15, [ [ '127.0.0.1:6379', '123,456,789-1000' ], [ '127.0.0.1:6380', '124,457,300-500' ] ] ] + // ^seq ^new endpoint1 ^slots ^new endpoint2 ^slots + const sequenceId: number = push[1]; const smigratedEvent: SMigratedEvent = { seqId: sequenceId, source: { @@ -320,8 +320,7 @@ export default class EnterpriseMaintenanceManager { }, destinations: [] } - for(const endpointInfo of push[2]) { - const [endpoint, slots] = String(endpointInfo).split(' '); + for(const [endpoint, slots] of push[2] as string[]) { //TODO not sure if we need to handle fqdn/ip.. cluster manages clients by host:port. If `cluster slots` returns ip, // but this notification returns fqdn, then we need to unify somehow ( maybe lookup ) const [ host, port ] = endpoint.split(':'); From dbe604a17080c5b901a04c6fe76c22a99d309695 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 2 Dec 2025 20:36:55 +0200 Subject: [PATCH 10/15] comply with the new notification structure --- packages/client/lib/cluster/cluster-slots.ts | 206 +++++++++---------- 1 file changed, 103 insertions(+), 103 deletions(-) diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 07196a65b4a..9f21ac8c131 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -282,109 +282,109 @@ export default class RedisClusterSlots< this.pubSubNode?.client._pause(); } - // const destinationAddress = `${event.destination.host}:${event.destination.port}`; - // let destinationNode = this.nodeByAddress.get(destinationAddress); - // let destinationShard: Shard; - - // // 2. Create new Master - // // TODO create new pubsubnode if needed - // if(!destinationNode) { - // const promises: Promise[] = []; - // destinationNode = this.#initiateSlotNode({ host: event.destination.host, port: event.destination.port, id: 'asdff' }, false, true, new Set(), promises); - // await Promise.all(promises); - // // 2.1 Pause - // destinationNode.client?._pause(); - // // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine - // destinationShard = { - // master: destinationNode - // }; - // } else { - // // In case destination node existed, this means there was a Shard already, so its best if we can find it. - // const existingShard = this.slots.find(shard => shard.master.host === event.destination.host && shard.master.port === event.destination.port); - // if(!existingShard) { - // dbgMaintenance("Could not find shard"); - // throw new Error('Could not find shard'); - // } - // destinationShard = existingShard; - // } - - // // 3. Soft update shards. - // // After this step we are expecting any new commands that hash to the same slots to be routed to the destinationShard - // const movingSlots = new Set(); - // for(const range of event.ranges) { - // if(typeof range === 'number') { - // this.slots[range] = destinationShard; - // movingSlots.add(range) - // } else { - // for (let slot = range[0]; slot <= range[1]; slot++) { - // this.slots[slot] = destinationShard; - // movingSlots.add(slot) - // } - // } - // } - - // // 4. For all affected clients (normal, pubsub, spubsub): - // // 4.1 Wait for inflight commands to complete - // const inflightPromises: Promise[] = []; - // //Normal - // inflightPromises.push(sourceNode.client!._getQueue().waitForInflightCommandsToComplete()); - // //Sharded pubsub - // if('pubSub' in sourceNode) { - // inflightPromises.push(sourceNode.pubSub!.client._getQueue().waitForInflightCommandsToComplete()); - // } - // //Regular pubsub - // if(this.pubSubNode?.address === sourceAddress) { - // inflightPromises.push(this.pubSubNode?.client._getQueue().waitForInflightCommandsToComplete()); - // } - // await Promise.all(inflightPromises); - - - // // 4.2 Extract commands, channels, sharded channels - // // TODO dont forget to extract channels and resubscribe - // const sourceStillHasSlots = this.slots.find(slot => slot.master.address === sourceAddress) !== undefined; - // if(sourceStillHasSlots) { - // const normalCommandsToMove = sourceNode.client!._getQueue().extractCommandsForSlots(movingSlots); - // // 5. Prepend extracted commands, chans - // //TODO pubsub, spubsub - // destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); - - // //unpause source node clients - // sourceNode.client?._unpause(); - // if('pubSub' in sourceNode) { - // sourceNode.pubSub?.client._unpause(); - // } - // //TODO pubSubNode? - // } else { - - // const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands(); - // // 5. Prepend extracted commands, chans - // destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); - // if('pubSub' in destinationNode) { - // // const pubsubListeners = destinationNode.pubSub?.client._getQueue().removePubSubListenersForSlots(movingSlots); - // //TODO resubscribe. Might need to throw an event for cluster to do the job - // } - // //TODO pubSubNode? - - // //Cleanup - // this.masters = this.masters.filter(master => master.address !== sourceAddress); - // //not sure if needed, since there should be no replicas in RE - // this.replicas = this.replicas.filter(replica => replica.address !== sourceAddress); - // this.nodeByAddress.delete(sourceAddress); - // //TODO pubSubNode? - - // // 4.3 Kill because no slots are pointing to it anymore - // await sourceNode.client?.close() - // if('pubSub' in sourceNode) { - // await sourceNode.pubSub?.client.close(); - // } - // //TODO pubSubNode? - // } - - // // 5.1 Unpause - // destinationNode.client?._unpause(); - // if('pubSub' in destinationNode) { - // destinationNode.pubSub?.client._unpause(); - // } + for(const {host, port, slots} of event.destinations) { + const destinationAddress = `${host}:${port}`; + let destinationNode = this.nodeByAddress.get(destinationAddress); + let destinationShard: Shard; + // 2. Create new Master + // TODO create new pubsubnode if needed + if(!destinationNode) { + const promises: Promise[] = []; + destinationNode = this.#initiateSlotNode({ host: host, port: port, id: 'asdff' }, false, true, new Set(), promises); + await Promise.all(promises); + // 2.1 Pause + destinationNode.client?._pause(); + // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine + destinationShard = { + master: destinationNode + }; + } else { + // In case destination node existed, this means there was a Shard already, so its best if we can find it. + const existingShard = this.slots.find(shard => shard.master.host === host && shard.master.port === port); + if(!existingShard) { + dbgMaintenance("Could not find shard"); + throw new Error('Could not find shard'); + } + destinationShard = existingShard; + } + // 3. Soft update shards. + // After this step we are expecting any new commands that hash to the same slots to be routed to the destinationShard + const movingSlots = new Set(); + for(const slot of slots) { + if(typeof slot === 'number') { + this.slots[slot] = destinationShard; + movingSlots.add(slot) + } else { + for (let s = slot[0]; s <= slot[1]; s++) { + this.slots[s] = destinationShard; + movingSlots.add(s) + } + } + } + + // 4. For all affected clients (normal, pubsub, spubsub): + // 4.1 Wait for inflight commands to complete + const inflightPromises: Promise[] = []; + //Normal + inflightPromises.push(sourceNode.client!._getQueue().waitForInflightCommandsToComplete()); + //Sharded pubsub + if('pubSub' in sourceNode) { + inflightPromises.push(sourceNode.pubSub!.client._getQueue().waitForInflightCommandsToComplete()); + } + //Regular pubsub + if(this.pubSubNode?.address === sourceAddress) { + inflightPromises.push(this.pubSubNode?.client._getQueue().waitForInflightCommandsToComplete()); + } + await Promise.all(inflightPromises); + + + // 4.2 Extract commands, channels, sharded channels + // TODO dont forget to extract channels and resubscribe + const sourceStillHasSlots = this.slots.find(slot => slot.master.address === sourceAddress) !== undefined; + if(sourceStillHasSlots) { + const normalCommandsToMove = sourceNode.client!._getQueue().extractCommandsForSlots(movingSlots); + // 5. Prepend extracted commands, chans + //TODO pubsub, spubsub + destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); + + //unpause source node clients + sourceNode.client?._unpause(); + if('pubSub' in sourceNode) { + sourceNode.pubSub?.client._unpause(); + } + //TODO pubSubNode? + } else { + + const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands(); + // 5. Prepend extracted commands, chans + destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); + if('pubSub' in destinationNode) { + // const pubsubListeners = destinationNode.pubSub?.client._getQueue().removePubSubListenersForSlots(movingSlots); + //TODO resubscribe. Might need to throw an event for cluster to do the job + } + //TODO pubSubNode? + + //Cleanup + this.masters = this.masters.filter(master => master.address !== sourceAddress); + //not sure if needed, since there should be no replicas in RE + this.replicas = this.replicas.filter(replica => replica.address !== sourceAddress); + this.nodeByAddress.delete(sourceAddress); + //TODO pubSubNode? + + // 4.3 Kill because no slots are pointing to it anymore + await sourceNode.client?.close() + if('pubSub' in sourceNode) { + await sourceNode.pubSub?.client.close(); + } + //TODO pubSubNode? + } + + // 5.1 Unpause + destinationNode.client?._unpause(); + if('pubSub' in destinationNode) { + destinationNode.pubSub?.client._unpause(); + } + } } From 6da583ae886d96c74b0d7761504bfdc3124d5c3c Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 2 Dec 2025 21:08:46 +0200 Subject: [PATCH 11/15] refine algo --- packages/client/lib/client/commands-queue.ts | 4 +- packages/client/lib/client/pub-sub.ts | 11 +--- packages/client/lib/cluster/cluster-slots.ts | 59 ++++++++++---------- packages/client/lib/cluster/index.ts | 4 +- 4 files changed, 36 insertions(+), 42 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 99abcb3e6f8..9af4550691b 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -350,8 +350,8 @@ export default class RedisCommandsQueue { return this.#pubSub.removeAllListeners(); } - removePubSubListenersForSlots(slots: Set) { - return this.#pubSub.removePubSubListenersForSlots(slots); + removeShardedPubSubListenersForSlots(slots: Set) { + return this.#pubSub.removeShardedPubSubListenersForSlots(slots); } resubscribe(chainId?: symbol) { diff --git a/packages/client/lib/client/pub-sub.ts b/packages/client/lib/client/pub-sub.ts index 1bad2fa9bec..9c705f4952d 100644 --- a/packages/client/lib/client/pub-sub.ts +++ b/packages/client/lib/client/pub-sub.ts @@ -424,15 +424,7 @@ export class PubSub { return result; } - removePubSubListenersForSlots(slots: Set) { - const channels = new Map(); - for (const [channel, value] of this.listeners[PUBSUB_TYPE.CHANNELS]) { - if (slots.has(calculateSlot(channel))) { - channels.set(channel, value); - this.listeners[PUBSUB_TYPE.CHANNELS].delete(channel); - } - } - + removeShardedPubSubListenersForSlots(slots: Set) { const sharded = new Map(); for (const [chanel, value] of this.listeners[PUBSUB_TYPE.SHARDED]) { if (slots.has(calculateSlot(chanel))) { @@ -444,7 +436,6 @@ export class PubSub { this.#updateIsActive(); return { - [PUBSUB_TYPE.CHANNELS]: channels, [PUBSUB_TYPE.SHARDED]: sharded }; } diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 9f21ac8c131..81f488729d8 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -18,6 +18,8 @@ export type NodeAddressMap = { [address: string]: NodeAddress; } | ((address: string) => NodeAddress | undefined); +export const RESUBSCRIBE_LISTENERS_EVENT = '__resubscribeListeners' + export interface Node< M extends RedisModules, F extends RedisFunctions, @@ -284,19 +286,20 @@ export default class RedisClusterSlots< for(const {host, port, slots} of event.destinations) { const destinationAddress = `${host}:${port}`; - let destinationNode = this.nodeByAddress.get(destinationAddress); - let destinationShard: Shard; + let destMasterNode: MasterNode | undefined = this.nodeByAddress.get(destinationAddress); + let destShard: Shard; // 2. Create new Master // TODO create new pubsubnode if needed - if(!destinationNode) { + if(!destMasterNode) { const promises: Promise[] = []; - destinationNode = this.#initiateSlotNode({ host: host, port: port, id: 'asdff' }, false, true, new Set(), promises); - await Promise.all(promises); + destMasterNode = this.#initiateSlotNode({ host: host, port: port, id: 'asdff' }, false, true, new Set(), promises); + await Promise.all([...promises, this.#initiateShardedPubSubClient(destMasterNode)]); // 2.1 Pause - destinationNode.client?._pause(); + destMasterNode.client?._pause(); + destMasterNode.pubSub?.client._pause(); // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine - destinationShard = { - master: destinationNode + destShard = { + master: destMasterNode }; } else { // In case destination node existed, this means there was a Shard already, so its best if we can find it. @@ -305,18 +308,18 @@ export default class RedisClusterSlots< dbgMaintenance("Could not find shard"); throw new Error('Could not find shard'); } - destinationShard = existingShard; + destShard = existingShard; } // 3. Soft update shards. // After this step we are expecting any new commands that hash to the same slots to be routed to the destinationShard const movingSlots = new Set(); for(const slot of slots) { if(typeof slot === 'number') { - this.slots[slot] = destinationShard; + this.slots[slot] = destShard; movingSlots.add(slot) } else { for (let s = slot[0]; s <= slot[1]; s++) { - this.slots[s] = destinationShard; + this.slots[s] = destShard; movingSlots.add(s) } } @@ -341,48 +344,48 @@ export default class RedisClusterSlots< // 4.2 Extract commands, channels, sharded channels // TODO dont forget to extract channels and resubscribe const sourceStillHasSlots = this.slots.find(slot => slot.master.address === sourceAddress) !== undefined; + // If source shard still has slots, this means we have to only extract commands for the moving slots. + // Commands that are for different slots or have no slots should stay in the source shard. + // Same goes for sharded pub sub listeners if(sourceStillHasSlots) { const normalCommandsToMove = sourceNode.client!._getQueue().extractCommandsForSlots(movingSlots); // 5. Prepend extracted commands, chans //TODO pubsub, spubsub - destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); - - //unpause source node clients + destMasterNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); sourceNode.client?._unpause(); if('pubSub' in sourceNode) { + const listeners = sourceNode.pubSub?.client._getQueue().removeShardedPubSubListenersForSlots(movingSlots); + this.#emit(RESUBSCRIBE_LISTENERS_EVENT, listeners); sourceNode.pubSub?.client._unpause(); } - //TODO pubSubNode? } else { - + // If source shard doesnt have any slots left, this means we can safely move all commands to the new shard. + // Same goes for sharded pub sub listeners const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands(); // 5. Prepend extracted commands, chans - destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); - if('pubSub' in destinationNode) { - // const pubsubListeners = destinationNode.pubSub?.client._getQueue().removePubSubListenersForSlots(movingSlots); - //TODO resubscribe. Might need to throw an event for cluster to do the job + destMasterNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove); + if('pubSub' in sourceNode) { + const listeners = sourceNode.pubSub?.client._getQueue().removeAllPubSubListeners(); + this.#emit(RESUBSCRIBE_LISTENERS_EVENT, listeners); } - //TODO pubSubNode? - //Cleanup + //Remove all local references to the dying shard's clients this.masters = this.masters.filter(master => master.address !== sourceAddress); //not sure if needed, since there should be no replicas in RE this.replicas = this.replicas.filter(replica => replica.address !== sourceAddress); this.nodeByAddress.delete(sourceAddress); - //TODO pubSubNode? // 4.3 Kill because no slots are pointing to it anymore await sourceNode.client?.close() if('pubSub' in sourceNode) { await sourceNode.pubSub?.client.close(); } - //TODO pubSubNode? } // 5.1 Unpause - destinationNode.client?._unpause(); - if('pubSub' in destinationNode) { - destinationNode.pubSub?.client._unpause(); + destMasterNode.client?._unpause(); + if('pubSub' in destMasterNode) { + destMasterNode.pubSub?.client._unpause(); } } @@ -496,7 +499,7 @@ export default class RedisClusterSlots< .on(SMIGRATED_EVENT, this.#handleSmigrated) .on('__MOVED', async (allPubSubListeners: PubSubListeners) => { await this.rediscover(client); - this.#emit('__resubscribeAllPubSubListeners', allPubSubListeners); + this.#emit(RESUBSCRIBE_LISTENERS_EVENT, allPubSubListeners); }); return client; diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 3a56e99ec97..5a1b4d5d4ca 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -4,7 +4,7 @@ import { Command, CommandArguments, CommanderConfig, TypeMapping, RedisArgument, import COMMANDS from '../commands'; import { EventEmitter } from 'node:events'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; -import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots'; +import RedisClusterSlots, { NodeAddressMap, RESUBSCRIBE_LISTENERS_EVENT, ShardNode } from './cluster-slots'; import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command'; import { PubSubListener, PubSubListeners } from '../client/pub-sub'; import { ErrorReply } from '../errors'; @@ -310,7 +310,7 @@ export default class RedisCluster< this._options = options; this._slots = new RedisClusterSlots(options, this.emit.bind(this)); - this.on('__resubscribeAllPubSubListeners', this.resubscribeAllPubSubListeners.bind(this)); + this.on(RESUBSCRIBE_LISTENERS_EVENT, this.resubscribeAllPubSubListeners.bind(this)); if (options?.commandOptions) { this._commandOptions = options.commandOptions; From c70dbb4a042b86129d6c275f18a92560df68c2f7 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Wed, 3 Dec 2025 13:42:04 +0200 Subject: [PATCH 12/15] handle pubSubNode replacement --- packages/client/lib/cluster/cluster-slots.ts | 24 +++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 81f488729d8..b803625c22c 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -279,17 +279,12 @@ export default class RedisClusterSlots< if('pubSub' in sourceNode) { sourceNode.pubSub?.client._pause(); } - // 1.3 Regular pubsub - if(this.pubSubNode?.address === sourceAddress) { - this.pubSubNode?.client._pause(); - } for(const {host, port, slots} of event.destinations) { const destinationAddress = `${host}:${port}`; let destMasterNode: MasterNode | undefined = this.nodeByAddress.get(destinationAddress); let destShard: Shard; // 2. Create new Master - // TODO create new pubsubnode if needed if(!destMasterNode) { const promises: Promise[] = []; destMasterNode = this.#initiateSlotNode({ host: host, port: port, id: 'asdff' }, false, true, new Set(), promises); @@ -387,8 +382,25 @@ export default class RedisClusterSlots< if('pubSub' in destMasterNode) { destMasterNode.pubSub?.client._unpause(); } - } + // We want to replace the pubSubNode ONLY if it is pointing to the affected node AND the affected + // node is actually dying ( designated by the fact that there are no remaining slots assigned to it) + if(this.pubSubNode?.address === sourceAddress && !sourceStillHasSlots) { + const channelsListeners = this.pubSubNode.client.getPubSubListeners(PUBSUB_TYPE.CHANNELS), + patternsListeners = this.pubSubNode.client.getPubSubListeners(PUBSUB_TYPE.PATTERNS); + + this.pubSubNode.client.destroy(); + + // Only create the new pubSubNode if there are actual subscriptions to make. + // It will be lazily created later if needed. + if (channelsListeners.size || patternsListeners.size) { + await this.#initiatePubSubClient({ + [PUBSUB_TYPE.CHANNELS]: channelsListeners, + [PUBSUB_TYPE.PATTERNS]: patternsListeners + }) + } + } + } } async #getShards(rootNode: RedisClusterClientOptions) { From a265d79c8ce7b406571a00f48ebe16dcd56271c6 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 5 Dec 2025 14:05:50 +0200 Subject: [PATCH 13/15] tests: merge all `after` functions into one --- packages/test-utils/lib/dockers.ts | 39 ++++++++++++------------------ 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index 47257964f6a..8d4b98a00f8 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -134,13 +134,7 @@ async function dockerRemove(dockerId: string): Promise { } } -after(() => { - return Promise.all( - [...RUNNING_SERVERS.values()].map(async dockerPromise => - await dockerRemove((await dockerPromise).dockerId) - ) - ); -}); + export type RedisClusterDockersConfig = RedisServerDockerOptions & { numberOfMasters?: number; @@ -311,15 +305,7 @@ export function spawnRedisCluster( return dockersPromise; } -after(() => { - return Promise.all( - [...RUNNING_CLUSTERS.values()].map(async dockersPromise => { - return Promise.all( - (await dockersPromise).map(({ dockerId }) => dockerRemove(dockerId)) - ); - }) - ); -}); + const RUNNING_NODES = new Map, Array>(); @@ -397,13 +383,20 @@ export async function spawnRedisSentinel( } after(() => { - return Promise.all( - [...RUNNING_NODES.values(), ...RUNNING_SENTINELS.values()].map(async dockersPromise => { - return Promise.all( - dockersPromise.map(({ dockerId }) => dockerRemove(dockerId)) - ); - }) - ); + return Promise.all([ + ...Array.from(RUNNING_SERVERS.values()).map(async dockerPromise => + dockerRemove((await dockerPromise).dockerId) + ), + ...Array.from(RUNNING_CLUSTERS.values()).map(async dockersPromise => + Promise.all((await dockersPromise).map(({ dockerId }) => dockerRemove(dockerId))) + ), + ...Array.from(RUNNING_NODES.values()).map(dockersPromise => + Promise.all(dockersPromise.map(({ dockerId }) => dockerRemove(dockerId))) + ), + ...Array.from(RUNNING_SENTINELS.values()).map(dockersPromise => + Promise.all(dockersPromise.map(({ dockerId }) => dockerRemove(dockerId))) + ), + ]); }); From 2f39426e74a8bfd8a265c120c1cc6e40ffe23c52 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 5 Dec 2025 14:10:28 +0200 Subject: [PATCH 14/15] tests: add `testWithProxiedCluster()` function --- .../client/lib/cluster/cluster-slots.spec.ts | 3 +- packages/test-utils/lib/dockers.ts | 97 ++++++++++++++++--- packages/test-utils/lib/index.ts | 51 +++++++++- 3 files changed, 134 insertions(+), 17 deletions(-) diff --git a/packages/client/lib/cluster/cluster-slots.spec.ts b/packages/client/lib/cluster/cluster-slots.spec.ts index 76c4cb53fdf..62b823e7c8d 100644 --- a/packages/client/lib/cluster/cluster-slots.spec.ts +++ b/packages/client/lib/cluster/cluster-slots.spec.ts @@ -1,7 +1,8 @@ import { strict as assert } from 'node:assert'; import { EventEmitter } from 'node:events'; -import { RedisClusterOptions, RedisClusterClientOptions } from './index'; +import { RedisClusterClientOptions } from './index'; import RedisClusterSlots from './cluster-slots'; +import TestUtils, { GLOBAL } from '../test-utils' describe('RedisClusterSlots', () => { describe('initialization', () => { diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index 8d4b98a00f8..29fc7cccfeb 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -9,6 +9,7 @@ import { promisify } from 'node:util'; import * as fs from 'node:fs'; import * as os from 'node:os'; import * as path from 'node:path'; +import assert from 'node:assert'; const execAsync = promisify(execFileCallback); @@ -116,6 +117,69 @@ options: RedisServerDockerOptions, serverArguments: Array): Promise, ReturnType>(); +export interface ProxiedRedisServerDocker { + ports: number[], + dockerId: string +} + +export interface ProxiedRedisServerConfig { + nOfProxies: number, + defaultInterceptors: ('cluster'|'hitless'|'logger')[] +} + +const RUNNING_PROXIED_SERVERS = new Map>(); + +export async function spawnProxiedRedisServer(config: ProxiedRedisServerConfig): Promise { + const key = JSON.stringify(config); + const runningServer = RUNNING_PROXIED_SERVERS.get(key); + if (runningServer) { + return runningServer; + } + + const server = spawnProxiedRedisServerDocker(config); + RUNNING_PROXIED_SERVERS.set(key, server); + return server; +} + +export async function spawnProxiedRedisServerDocker( + config: ProxiedRedisServerConfig, +): Promise { + + assert(config.nOfProxies > 0, 'At least one proxy should be started'); + const ports: number[] = []; + for (let i = 0; i < config.nOfProxies; i++) { + ports.push((await portIterator.next()).value); + } + + const dockerArgs =[ + "run", + "-d", + "--network", "host", + "-e", `LISTEN_PORT=${ports.join(',')}`, + "-e", "TIEOUT=0", + "-e", `DEFAULT_INTERCEPTORS=${config.defaultInterceptors.join(',')}`, + "-e", "ENABLE_LOGGING=true", + "cae-resp-proxy-standalone" + ] + + console.log(`[Docker] Spawning Proxy container`, dockerArgs.join(' ')); + + const { stdout, stderr } = await execAsync("docker", dockerArgs); + + if (!stdout) { + throw new Error(`docker run error - ${stderr}`); + } + + while (await isPortAvailable(ports[0])) { + await setTimeout(50); + } + + return { + ports, + dockerId: stdout.trim(), + }; +} + export function spawnRedisServer(dockerConfig: RedisServerDockerOptions, serverArguments: Array): Promise { const runningServer = RUNNING_SERVERS.get(serverArguments); if (runningServer) { @@ -319,7 +383,7 @@ export async function spawnRedisSentinel( if (runningNodes) { return runningNodes; } - + const passIndex = serverArguments.indexOf('--requirepass')+1; let password: string | undefined = undefined; if (passIndex != 0) { @@ -329,7 +393,7 @@ export async function spawnRedisSentinel( const master = await spawnRedisServerDocker(dockerConfigs, serverArguments); const redisNodes: Array = [master]; const replicaPromises: Array> = []; - + const replicasCount = 2; for (let i = 0; i < replicasCount; i++) { replicaPromises.push((async () => { @@ -344,26 +408,26 @@ export async function spawnRedisSentinel( await client.connect(); await client.replicaOf("127.0.0.1", master.port); await client.close(); - + return replica; })()); } - + const replicas = await Promise.all(replicaPromises); redisNodes.push(...replicas); RUNNING_NODES.set(serverArguments, redisNodes); const sentinelPromises: Array> = []; const sentinelCount = 3; - + const appPrefix = 'sentinel-config-dir'; const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), appPrefix)); for (let i = 0; i < sentinelCount; i++) { sentinelPromises.push( spawnSentinelNode( - dockerConfigs, - serverArguments, + dockerConfigs, + serverArguments, master.port, "mymaster", path.join(tmpDir, i.toString()), @@ -371,10 +435,10 @@ export async function spawnRedisSentinel( ), ) } - + const sentinelNodes = await Promise.all(sentinelPromises); RUNNING_SENTINELS.set(serverArguments, sentinelNodes); - + if (tmpDir) { fs.rmSync(tmpDir, { recursive: true }); } @@ -396,6 +460,9 @@ after(() => { ...Array.from(RUNNING_SENTINELS.values()).map(dockersPromise => Promise.all(dockersPromise.map(({ dockerId }) => dockerRemove(dockerId))) ), + ...Array.from(RUNNING_PROXIED_SERVERS.values()).map(async dockerPromise => + dockerRemove((await dockerPromise).dockerId) + ) ]); }); @@ -403,7 +470,7 @@ after(() => { export async function spawnSentinelNode( dockerConfigs: RedisServerDockerOptions, serverArguments: Array, - masterPort: number, + masterPort: number, sentinelName: string, tmpDir: string, password?: string, @@ -429,12 +496,12 @@ sentinel failover-timeout ${sentinelName} 1000 return await spawnRedisServerDocker( { - image: dockerConfigs.image, - version: dockerConfigs.version, + image: dockerConfigs.image, + version: dockerConfigs.version, mode: "sentinel", - mounts: [`${dir}/redis.conf:/redis/config/node-sentinel-1/redis.conf`], + mounts: [`${dir}/redis.conf:/redis/config/node-sentinel-1/redis.conf`], port: port, - }, + }, serverArguments, ); -} \ No newline at end of file +} diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 1a9d1c9845a..baed543cb76 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -19,7 +19,7 @@ import { RedisClusterType } from '@redis/client/index'; import { RedisNode } from '@redis/client/lib/sentinel/types' -import { spawnRedisServer, spawnRedisCluster, spawnRedisSentinel, RedisServerDockerOptions, RedisServerDocker, spawnSentinelNode, spawnRedisServerDocker } from './dockers'; +import { spawnRedisServer, spawnRedisCluster, spawnRedisSentinel, RedisServerDockerOptions, RedisServerDocker, spawnSentinelNode, spawnRedisServerDocker, spawnProxiedRedisServer } from './dockers'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; @@ -28,6 +28,7 @@ import * as os from 'node:os'; import * as path from 'node:path'; import { RedisProxy, getFreePortNumber } from './proxy/redis-proxy'; + interface TestUtilsConfig { /** * The name of the Docker image to use for spawning Redis test instances. @@ -298,6 +299,54 @@ export default class TestUtils { } }); } + + testWithProxiedCluster< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} + >( + title: string, + fn: (proxiedClusterClient: RedisClusterType, proxyUrl: string) => unknown, + options: Omit, 'numberOfReplicas' | 'minimumDockerVersion' | 'serverArguments'> + ) { + let spawnPromise: ReturnType; + before(function () { + this.timeout(30000); + spawnPromise = spawnProxiedRedisServer({ nOfProxies: options.numberOfMasters ?? 3, defaultInterceptors: ['cluster', 'hitless'] }); + }); + + it(title, async function () { + if (!spawnPromise) return this.skip(); + const { ports } = await spawnPromise; + + const cluster = createCluster({ + rootNodes: ports.map(port => ({ + socket: { + port + } + })), + minimizeConnections: options.clusterConfiguration?.minimizeConnections ?? true, + ...options.clusterConfiguration + }); + + if(options.disableClusterSetup) { + return fn(cluster, 'hh'); + } + + await cluster.connect(); + + try { + await TestUtils.#clusterFlushAll(cluster); + await fn(cluster, 'hi'); + } finally { + await TestUtils.#clusterFlushAll(cluster); + cluster.destroy(); + } + }); + } + testWithProxiedClient( title: string, fn: (proxiedClient: RedisClientType, proxy: RedisProxy) => unknown, From 9e052d3843755a9d78c1ef27c340a86b59b61182 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 5 Dec 2025 16:30:05 +0200 Subject: [PATCH 15/15] tests: add ProxyController for easier proxy comms --- packages/test-utils/lib/dockers.ts | 4 + packages/test-utils/lib/index.ts | 11 +- .../test-utils/lib/proxy/proxy-controller.ts | 103 ++++++++++++++++++ 3 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 packages/test-utils/lib/proxy/proxy-controller.ts diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index 29fc7cccfeb..792e8792da1 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -119,6 +119,7 @@ const RUNNING_SERVERS = new Map, ReturnType( title: string, - fn: (proxiedClusterClient: RedisClusterType, proxyUrl: string) => unknown, + fn: (proxiedClusterClient: RedisClusterType, proxyController: ProxyController) => unknown, options: Omit, 'numberOfReplicas' | 'minimumDockerVersion' | 'serverArguments'> ) { let spawnPromise: ReturnType; @@ -319,7 +320,7 @@ export default class TestUtils { it(title, async function () { if (!spawnPromise) return this.skip(); - const { ports } = await spawnPromise; + const { ports, apiPort } = await spawnPromise; const cluster = createCluster({ rootNodes: ports.map(port => ({ @@ -331,15 +332,17 @@ export default class TestUtils { ...options.clusterConfiguration }); + const proxyController = new ProxyController(`http://localhost:${apiPort}`) + if(options.disableClusterSetup) { - return fn(cluster, 'hh'); + return fn(cluster, proxyController); } await cluster.connect(); try { await TestUtils.#clusterFlushAll(cluster); - await fn(cluster, 'hi'); + await fn(cluster, proxyController); } finally { await TestUtils.#clusterFlushAll(cluster); cluster.destroy(); diff --git a/packages/test-utils/lib/proxy/proxy-controller.ts b/packages/test-utils/lib/proxy/proxy-controller.ts new file mode 100644 index 00000000000..dbb38a641ee --- /dev/null +++ b/packages/test-utils/lib/proxy/proxy-controller.ts @@ -0,0 +1,103 @@ +export type ProxyStats = { + totalConnections: number; + activeConnections: number; + totalBytesReceived: number; + totalBytesSent: number; + totalCommandsReceived: number; + totalCommandsSent: number; +}; + +export type SendResult = { + success: boolean; + connectionId: string; + error?: string; +}; + +export type ProxyConfig = { + listenPort: number; + listenHost?: string; + targetHost: string; + targetPort: number; + timeout?: number; + enableLogging?: boolean; +}; + +export default class ProxyController { + constructor(private url: string) { } + + private fetchJson(path: string, options?: RequestInit): Promise { + return fetch(`${this.url}${path}`, options).then(res => res.json()); + } + + getStats(): Promise> { + return this.fetchJson('/stats') as Promise>; + } + + getConnections(): Promise> { + return this.fetchJson('/connections') as Promise>; + } + + getNodes(): Promise<{ ids: string[] }> { + return this.fetchJson('/nodes') as Promise<{ ids: string[] }>; + } + + createNode(config: Partial): Promise<{ success: boolean; cfg: ProxyConfig }> { + return this.fetchJson('/nodes', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(config) + }) as Promise<{ success: boolean; cfg: ProxyConfig }>; + } + + deleteNode(nodeId: string): Promise<{ success: boolean }> { + return this.fetchJson(`/nodes/${nodeId}`, { + method: 'DELETE' + }) as Promise<{ success: boolean }>; + } + + sendToClient(connectionId: string, data: string, encoding: 'base64' | 'raw' = 'base64'): Promise { + return this.fetchJson(`/send-to-client/${connectionId}?encoding=${encoding}`, { + method: 'POST', + headers: { 'Content-Type': 'text/plain' }, + body: data + }) as Promise; + } + + sendToClients(connectionIds: string[], data: string, encoding: 'base64' | 'raw' = 'base64'): Promise<{ results: SendResult[] }> { + return this.fetchJson(`/send-to-clients?connectionIds=${connectionIds.join(',')}&encoding=${encoding}`, { + method: 'POST', + headers: { 'Content-Type': 'text/plain' }, + body: data + }) as Promise<{ results: SendResult[] }>; + } + + sendToAllClients(data: string, encoding: 'base64' | 'raw' = 'base64'): Promise<{ results: SendResult[] }> { + return this.fetchJson(`/send-to-all-clients?encoding=${encoding}`, { + method: 'POST', + headers: { 'Content-Type': 'text/plain' }, + body: data + }) as Promise<{ results: SendResult[] }>; + } + + closeConnection(connectionId: string): Promise<{ success: boolean; connectionId: string }> { + return this.fetchJson(`/connections/${connectionId}`, { + method: 'DELETE' + }) as Promise<{ success: boolean; connectionId: string }>; + } + + createScenario(responses: string[], encoding: 'base64' | 'raw' = 'base64'): Promise<{ success: boolean; totalResponses: number }> { + return this.fetchJson('/scenarios', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ responses, encoding }) + }) as Promise<{ success: boolean; totalResponses: number }>; + } + + addInterceptor(name: string, match: string, response: string, encoding: 'base64' | 'raw' = 'base64'): Promise<{ success: boolean; name: string }> { + return this.fetchJson('/interceptors', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ name, match, response, encoding }) + }) as Promise<{ success: boolean; name: string }>; + } +}