Skip to content

Commit c70dbb4

Browse files
committed
handle pubSubNode replacement
1 parent 6da583a commit c70dbb4

File tree

1 file changed

+18
-6
lines changed

1 file changed

+18
-6
lines changed

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -279,17 +279,12 @@ export default class RedisClusterSlots<
279279
if('pubSub' in sourceNode) {
280280
sourceNode.pubSub?.client._pause();
281281
}
282-
// 1.3 Regular pubsub
283-
if(this.pubSubNode?.address === sourceAddress) {
284-
this.pubSubNode?.client._pause();
285-
}
286282

287283
for(const {host, port, slots} of event.destinations) {
288284
const destinationAddress = `${host}:${port}`;
289285
let destMasterNode: MasterNode<M, F, S, RESP, TYPE_MAPPING> | undefined = this.nodeByAddress.get(destinationAddress);
290286
let destShard: Shard<M, F, S, RESP, TYPE_MAPPING>;
291287
// 2. Create new Master
292-
// TODO create new pubsubnode if needed
293288
if(!destMasterNode) {
294289
const promises: Promise<unknown>[] = [];
295290
destMasterNode = this.#initiateSlotNode({ host: host, port: port, id: 'asdff' }, false, true, new Set(), promises);
@@ -387,8 +382,25 @@ export default class RedisClusterSlots<
387382
if('pubSub' in destMasterNode) {
388383
destMasterNode.pubSub?.client._unpause();
389384
}
390-
}
391385

386+
// We want to replace the pubSubNode ONLY if it is pointing to the affected node AND the affected
387+
// node is actually dying ( designated by the fact that there are no remaining slots assigned to it)
388+
if(this.pubSubNode?.address === sourceAddress && !sourceStillHasSlots) {
389+
const channelsListeners = this.pubSubNode.client.getPubSubListeners(PUBSUB_TYPE.CHANNELS),
390+
patternsListeners = this.pubSubNode.client.getPubSubListeners(PUBSUB_TYPE.PATTERNS);
391+
392+
this.pubSubNode.client.destroy();
393+
394+
// Only create the new pubSubNode if there are actual subscriptions to make.
395+
// It will be lazily created later if needed.
396+
if (channelsListeners.size || patternsListeners.size) {
397+
await this.#initiatePubSubClient({
398+
[PUBSUB_TYPE.CHANNELS]: channelsListeners,
399+
[PUBSUB_TYPE.PATTERNS]: patternsListeners
400+
})
401+
}
402+
}
403+
}
392404
}
393405

394406
async #getShards(rootNode: RedisClusterClientOptions) {

0 commit comments

Comments
 (0)