@@ -263,12 +263,6 @@ export default class RedisClusterSlots<
263263 }
264264 this . smigratedSeqIdsSeen . add ( event . seqId ) ;
265265
266- // slots = new Array<Shard<M, F, S, RESP, TYPE_MAPPING>>(RedisClusterSlots.#SLOTS);
267- // masters = new Array<MasterNode<M, F, S, RESP, TYPE_MAPPING>>();
268- // replicas = new Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
269- // readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
270- // pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>;
271-
272266 const sourceAddress = `${ event . source . host } :${ event . source . port } ` ;
273267 const sourceNode = this . nodeByAddress . get ( sourceAddress ) ;
274268 if ( ! sourceNode ) {
@@ -277,56 +271,120 @@ export default class RedisClusterSlots<
277271 }
278272
279273 // 1. Pausing
280- //TODO - check the single pubsubnode
274+ // 1.1 Normal
281275 sourceNode . client ?. _pause ( ) ;
276+ // 1.2 Sharded pubsub
282277 if ( 'pubSub' in sourceNode ) {
283278 sourceNode . pubSub ?. client . _pause ( ) ;
284279 }
285-
286- const destinationAddress = `${ event . destination . host } :${ event . destination . port } ` ;
287- let destinationNode = this . nodeByAddress . get ( destinationAddress ) ;
288- let destinationShard : Shard < M , F , S , RESP , TYPE_MAPPING > ;
289-
290- // 2. Create new Master
291- if ( ! destinationNode ) {
292- const promises : Promise < unknown > [ ] = [ ] ;
293- destinationNode = this . #initiateSlotNode( { host : event . destination . host , port : event . destination . port , id : 'asdff' } , false , true , new Set ( ) , promises ) ;
294- await Promise . all ( promises ) ;
295- // 2.1 Pause
296- destinationNode . client ?. _pause ( ) ;
297- // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine
298- destinationShard = {
299- master : destinationNode
300- } ;
301- } else {
302- // In case destination node existed, this means there was a Shard already, so its best if we can find it.
303- const existingShard = this . slots . find ( shard => shard . master . host === event . destination . host && shard . master . port === event . destination . port ) ;
304- if ( ! existingShard ) {
305- dbgMaintenance ( "Could not find shard" ) ;
306- throw new Error ( 'Could not find shard' ) ;
307- }
308- destinationShard = existingShard ;
309- }
310-
311- // 3. Soft update shards
312- for ( const range of event . ranges ) {
313- if ( typeof range === 'number' ) {
314- this . slots [ range ] = destinationShard ;
315- } else {
316- for ( let slot = range [ 0 ] ; slot <= range [ 1 ] ; slot ++ ) {
317- this . slots [ slot ] = destinationShard ;
318- }
319- }
320- }
321-
322- // 4. For all affected clients (normal, pubsub, spubsub):
323- // 4.1 Wait for inflight commands to complete
324- // 4.2 Extract commands, channels, sharded channels
325- // 4.3 Kill if no slots are pointing to it
326- //
327-
328- // 5. Prepend extracted commands, chans
329- // 5.1 Unpause
280+ // 1.3 Regular pubsub
281+ if ( this . pubSubNode ?. address === sourceAddress ) {
282+ this . pubSubNode ?. client . _pause ( ) ;
283+ }
284+
285+ // const destinationAddress = `${event.destination.host}:${event.destination.port}`;
286+ // let destinationNode = this.nodeByAddress.get(destinationAddress);
287+ // let destinationShard: Shard<M, F, S, RESP, TYPE_MAPPING>;
288+
289+ // // 2. Create new Master
290+ // // TODO create new pubsubnode if needed
291+ // if(!destinationNode) {
292+ // const promises: Promise<unknown>[] = [];
293+ // destinationNode = this.#initiateSlotNode({ host: event.destination.host, port: event.destination.port, id: 'asdff' }, false, true, new Set(), promises);
294+ // await Promise.all(promises);
295+ // // 2.1 Pause
296+ // destinationNode.client?._pause();
297+ // // In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine
298+ // destinationShard = {
299+ // master: destinationNode
300+ // };
301+ // } else {
302+ // // In case destination node existed, this means there was a Shard already, so its best if we can find it.
303+ // const existingShard = this.slots.find(shard => shard.master.host === event.destination.host && shard.master.port === event.destination.port);
304+ // if(!existingShard) {
305+ // dbgMaintenance("Could not find shard");
306+ // throw new Error('Could not find shard');
307+ // }
308+ // destinationShard = existingShard;
309+ // }
310+
311+ // // 3. Soft update shards.
312+ // // After this step we are expecting any new commands that hash to the same slots to be routed to the destinationShard
313+ // const movingSlots = new Set<number>();
314+ // for(const range of event.ranges) {
315+ // if(typeof range === 'number') {
316+ // this.slots[range] = destinationShard;
317+ // movingSlots.add(range)
318+ // } else {
319+ // for (let slot = range[0]; slot <= range[1]; slot++) {
320+ // this.slots[slot] = destinationShard;
321+ // movingSlots.add(slot)
322+ // }
323+ // }
324+ // }
325+
326+ // // 4. For all affected clients (normal, pubsub, spubsub):
327+ // // 4.1 Wait for inflight commands to complete
328+ // const inflightPromises: Promise<void>[] = [];
329+ // //Normal
330+ // inflightPromises.push(sourceNode.client!._getQueue().waitForInflightCommandsToComplete());
331+ // //Sharded pubsub
332+ // if('pubSub' in sourceNode) {
333+ // inflightPromises.push(sourceNode.pubSub!.client._getQueue().waitForInflightCommandsToComplete());
334+ // }
335+ // //Regular pubsub
336+ // if(this.pubSubNode?.address === sourceAddress) {
337+ // inflightPromises.push(this.pubSubNode?.client._getQueue().waitForInflightCommandsToComplete());
338+ // }
339+ // await Promise.all(inflightPromises);
340+
341+
342+ // // 4.2 Extract commands, channels, sharded channels
343+ // // TODO dont forget to extract channels and resubscribe
344+ // const sourceStillHasSlots = this.slots.find(slot => slot.master.address === sourceAddress) !== undefined;
345+ // if(sourceStillHasSlots) {
346+ // const normalCommandsToMove = sourceNode.client!._getQueue().extractCommandsForSlots(movingSlots);
347+ // // 5. Prepend extracted commands, chans
348+ // //TODO pubsub, spubsub
349+ // destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove);
350+
351+ // //unpause source node clients
352+ // sourceNode.client?._unpause();
353+ // if('pubSub' in sourceNode) {
354+ // sourceNode.pubSub?.client._unpause();
355+ // }
356+ // //TODO pubSubNode?
357+ // } else {
358+
359+ // const normalCommandsToMove = sourceNode.client!._getQueue().getAllCommands();
360+ // // 5. Prepend extracted commands, chans
361+ // destinationNode.client?._getQueue().prependCommandsToWrite(normalCommandsToMove);
362+ // if('pubSub' in destinationNode) {
363+ // // const pubsubListeners = destinationNode.pubSub?.client._getQueue().removePubSubListenersForSlots(movingSlots);
364+ // //TODO resubscribe. Might need to throw an event for cluster to do the job
365+ // }
366+ // //TODO pubSubNode?
367+
368+ // //Cleanup
369+ // this.masters = this.masters.filter(master => master.address !== sourceAddress);
370+ // //not sure if needed, since there should be no replicas in RE
371+ // this.replicas = this.replicas.filter(replica => replica.address !== sourceAddress);
372+ // this.nodeByAddress.delete(sourceAddress);
373+ // //TODO pubSubNode?
374+
375+ // // 4.3 Kill because no slots are pointing to it anymore
376+ // await sourceNode.client?.close()
377+ // if('pubSub' in sourceNode) {
378+ // await sourceNode.pubSub?.client.close();
379+ // }
380+ // //TODO pubSubNode?
381+ // }
382+
383+ // // 5.1 Unpause
384+ // destinationNode.client?._unpause();
385+ // if('pubSub' in destinationNode) {
386+ // destinationNode.pubSub?.client._unpause();
387+ // }
330388
331389 }
332390
0 commit comments