@@ -18,6 +18,8 @@ export type NodeAddressMap = {
1818 [ address : string ] : NodeAddress ;
1919} | ( ( address : string ) => NodeAddress | undefined ) ;
2020
21+ export const RESUBSCRIBE_LISTENERS_EVENT = '__resubscribeListeners'
22+
2123export interface Node <
2224 M extends RedisModules ,
2325 F extends RedisFunctions ,
@@ -284,19 +286,20 @@ export default class RedisClusterSlots<
284286
285287 for ( const { host, port, slots} of event . destinations ) {
286288 const destinationAddress = `${ host } :${ port } ` ;
287- let destinationNode = this . nodeByAddress . get ( destinationAddress ) ;
288- let destinationShard : Shard < M , F , S , RESP , TYPE_MAPPING > ;
289+ let destMasterNode : MasterNode < M , F , S , RESP , TYPE_MAPPING > | undefined = this . nodeByAddress . get ( destinationAddress ) ;
290+ let destShard : Shard < M , F , S , RESP , TYPE_MAPPING > ;
289291 // 2. Create new Master
290292 // TODO create new pubsubnode if needed
291- if ( ! destinationNode ) {
293+ if ( ! destMasterNode ) {
292294 const promises : Promise < unknown > [ ] = [ ] ;
293- destinationNode = this . #initiateSlotNode( { host : host , port : port , id : 'asdff' } , false , true , new Set ( ) , promises ) ;
294- await Promise . all ( promises ) ;
295+ destMasterNode = this . #initiateSlotNode( { host : host , port : port , id : 'asdff' } , false , true , new Set ( ) , promises ) ;
296+ await Promise . all ( [ ... promises , this . #initiateShardedPubSubClient ( destMasterNode ) ] ) ;
295297 // 2.1 Pause
296- destinationNode . client ?. _pause ( ) ;
298+ destMasterNode . client ?. _pause ( ) ;
299+ destMasterNode . pubSub ?. client . _pause ( ) ;
297300 // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine
298- destinationShard = {
299- master : destinationNode
301+ destShard = {
302+ master : destMasterNode
300303 } ;
301304 } else {
302305 // In case destination node existed, this means there was a Shard already, so its best if we can find it.
@@ -305,18 +308,18 @@ export default class RedisClusterSlots<
305308 dbgMaintenance ( "Could not find shard" ) ;
306309 throw new Error ( 'Could not find shard' ) ;
307310 }
308- destinationShard = existingShard ;
311+ destShard = existingShard ;
309312 }
310313 // 3. Soft update shards.
311314 // After this step we are expecting any new commands that hash to the same slots to be routed to the destinationShard
312315 const movingSlots = new Set < number > ( ) ;
313316 for ( const slot of slots ) {
314317 if ( typeof slot === 'number' ) {
315- this . slots [ slot ] = destinationShard ;
318+ this . slots [ slot ] = destShard ;
316319 movingSlots . add ( slot )
317320 } else {
318321 for ( let s = slot [ 0 ] ; s <= slot [ 1 ] ; s ++ ) {
319- this . slots [ s ] = destinationShard ;
322+ this . slots [ s ] = destShard ;
320323 movingSlots . add ( s )
321324 }
322325 }
@@ -341,48 +344,48 @@ export default class RedisClusterSlots<
341344 // 4.2 Extract commands, channels, sharded channels
342345 // TODO dont forget to extract channels and resubscribe
343346 const sourceStillHasSlots = this . slots . find ( slot => slot . master . address === sourceAddress ) !== undefined ;
347+ // If source shard still has slots, this means we have to only extract commands for the moving slots.
348+ // Commands that are for different slots or have no slots should stay in the source shard.
349+ // Same goes for sharded pub sub listeners
344350 if ( sourceStillHasSlots ) {
345351 const normalCommandsToMove = sourceNode . client ! . _getQueue ( ) . extractCommandsForSlots ( movingSlots ) ;
346352 // 5. Prepend extracted commands, chans
347353 //TODO pubsub, spubsub
348- destinationNode . client ?. _getQueue ( ) . prependCommandsToWrite ( normalCommandsToMove ) ;
349-
350- //unpause source node clients
354+ destMasterNode . client ?. _getQueue ( ) . prependCommandsToWrite ( normalCommandsToMove ) ;
351355 sourceNode . client ?. _unpause ( ) ;
352356 if ( 'pubSub' in sourceNode ) {
357+ const listeners = sourceNode . pubSub ?. client . _getQueue ( ) . removeShardedPubSubListenersForSlots ( movingSlots ) ;
358+ this . #emit( RESUBSCRIBE_LISTENERS_EVENT , listeners ) ;
353359 sourceNode . pubSub ?. client . _unpause ( ) ;
354360 }
355- //TODO pubSubNode?
356361 } else {
357-
362+ // If source shard doesnt have any slots left, this means we can safely move all commands to the new shard.
363+ // Same goes for sharded pub sub listeners
358364 const normalCommandsToMove = sourceNode . client ! . _getQueue ( ) . getAllCommands ( ) ;
359365 // 5. Prepend extracted commands, chans
360- destinationNode . client ?. _getQueue ( ) . prependCommandsToWrite ( normalCommandsToMove ) ;
361- if ( 'pubSub' in destinationNode ) {
362- // const pubsubListeners = destinationNode .pubSub?.client._getQueue().removePubSubListenersForSlots(movingSlots );
363- //TODO resubscribe. Might need to throw an event for cluster to do the job
366+ destMasterNode . client ?. _getQueue ( ) . prependCommandsToWrite ( normalCommandsToMove ) ;
367+ if ( 'pubSub' in sourceNode ) {
368+ const listeners = sourceNode . pubSub ?. client . _getQueue ( ) . removeAllPubSubListeners ( ) ;
369+ this . #emit ( RESUBSCRIBE_LISTENERS_EVENT , listeners ) ;
364370 }
365- //TODO pubSubNode?
366371
367- //Cleanup
372+ //Remove all local references to the dying shard's clients
368373 this . masters = this . masters . filter ( master => master . address !== sourceAddress ) ;
369374 //not sure if needed, since there should be no replicas in RE
370375 this . replicas = this . replicas . filter ( replica => replica . address !== sourceAddress ) ;
371376 this . nodeByAddress . delete ( sourceAddress ) ;
372- //TODO pubSubNode?
373377
374378 // 4.3 Kill because no slots are pointing to it anymore
375379 await sourceNode . client ?. close ( )
376380 if ( 'pubSub' in sourceNode ) {
377381 await sourceNode . pubSub ?. client . close ( ) ;
378382 }
379- //TODO pubSubNode?
380383 }
381384
382385 // 5.1 Unpause
383- destinationNode . client ?. _unpause ( ) ;
384- if ( 'pubSub' in destinationNode ) {
385- destinationNode . pubSub ?. client . _unpause ( ) ;
386+ destMasterNode . client ?. _unpause ( ) ;
387+ if ( 'pubSub' in destMasterNode ) {
388+ destMasterNode . pubSub ?. client . _unpause ( ) ;
386389 }
387390 }
388391
@@ -496,7 +499,7 @@ export default class RedisClusterSlots<
496499 . on ( SMIGRATED_EVENT , this . #handleSmigrated)
497500 . on ( '__MOVED' , async ( allPubSubListeners : PubSubListeners ) => {
498501 await this . rediscover ( client ) ;
499- this . #emit( '__resubscribeAllPubSubListeners' , allPubSubListeners ) ;
502+ this . #emit( RESUBSCRIBE_LISTENERS_EVENT , allPubSubListeners ) ;
500503 } ) ;
501504
502505 return client ;
0 commit comments