Skip to content

Commit 542b52e

Browse files
committed
add support for extracting commands from queue
1 parent cd996ca commit 542b52e

File tree

2 files changed

+72
-0
lines changed

2 files changed

+72
-0
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,10 @@ export default class RedisCommandsQueue {
349349
return this.#pubSub.removeAllListeners();
350350
}
351351

352+
removePubSubListenersForSlots(slots: Set<number>) {
353+
return this.#pubSub.removePubSubListenersForSlots(slots);
354+
}
355+
352356
resubscribe(chainId?: symbol) {
353357
const commands = this.#pubSub.resubscribe();
354358
if (!commands.length) return;
@@ -548,4 +552,46 @@ export default class RedisCommandsQueue {
548552
this.#waitingForReply.length === 0
549553
);
550554
}
555+
556+
/**
557+
*
558+
* Extracts commands for the given slots from the toWrite queue.
559+
* Some commands dont have "slotNumber", which means they are not designated to particular slot/node.
560+
* We ignore those.
561+
*/
562+
extractCommandsForSlots(slots: Set<number>): CommandToWrite[] {
563+
const result: CommandToWrite[] = [];
564+
let current = this.#toWrite.head;
565+
while(current !== undefined) {
566+
if(current.value.slotNumber !== undefined && slots.has(current.value.slotNumber)) {
567+
result.push(current.value);
568+
const toRemove = current;
569+
current = current.next;
570+
this.#toWrite.remove(toRemove);
571+
}
572+
}
573+
return result;
574+
}
575+
576+
/**
577+
* Gets all commands from the write queue without removing them.
578+
*/
579+
getAllCommands(): CommandToWrite[] {
580+
const result: CommandToWrite[] = [];
581+
let current = this.#toWrite.head;
582+
while(current) {
583+
result.push(current.value);
584+
current = current.next;
585+
}
586+
return result;
587+
}
588+
589+
/**
590+
* Prepends commands to the write queue in reverse.
591+
*/
592+
prependCommandsToWrite(commands: CommandToWrite[]) {
593+
for (let i = commands.length - 1; i <= 0; i--) {
594+
this.#toWrite.unshift(commands[i]);
595+
}
596+
}
551597
}

packages/client/lib/client/pub-sub.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { RedisArgument } from '../RESP/types';
22
import { CommandToWrite } from './commands-queue';
3+
import calculateSlot from 'cluster-key-slot';
34

45
export const PUBSUB_TYPE = {
56
CHANNELS: 'CHANNELS',
@@ -420,6 +421,31 @@ export class PubSub {
420421
return result;
421422
}
422423

424+
removePubSubListenersForSlots(slots: Set<number>) {
425+
const channels = new Map<string, ChannelListeners>();
426+
for (const [channel, value] of this.listeners[PUBSUB_TYPE.CHANNELS]) {
427+
if (slots.has(calculateSlot(channel))) {
428+
channels.set(channel, value);
429+
this.listeners[PUBSUB_TYPE.CHANNELS].delete(channel);
430+
}
431+
}
432+
433+
const sharded = new Map<string, ChannelListeners>();
434+
for (const [chanel, value] of this.listeners[PUBSUB_TYPE.SHARDED]) {
435+
if (slots.has(calculateSlot(chanel))) {
436+
sharded.set(chanel, value);
437+
this.listeners[PUBSUB_TYPE.SHARDED].delete(chanel);
438+
}
439+
}
440+
441+
this.#updateIsActive();
442+
443+
return {
444+
[PUBSUB_TYPE.CHANNELS]: channels,
445+
[PUBSUB_TYPE.SHARDED]: sharded
446+
};
447+
}
448+
423449
#emitPubSubMessage(
424450
type: PubSubType,
425451
message: Buffer,

0 commit comments

Comments
 (0)