@@ -14,8 +14,10 @@ export const SMIGRATED_EVENT = "__SMIGRATED";
1414export interface SMigratedEvent {
1515 seqId : number ,
1616 source : { host : string , port : number } ;
17- destination : { host : string , port : number } ;
18- ranges : ( number | [ number , number ] ) [ ]
17+ destinations : {
18+ host : string , port : number
19+ slots : ( number | [ number , number ] ) [ ]
20+ } [ ]
1921}
2022
2123export const MAINTENANCE_EVENTS = {
@@ -165,11 +167,8 @@ export default class EnterpriseMaintenanceManager {
165167 return true ;
166168 }
167169 case PN . SMIGRATED : {
168- // [ 'SMIGRATED', '123', '54.78.247.156:12075' 'slot1,rangediff1' ]
169- // ^seq ^new ip ^slots info
170- const sequenceId = Number ( push [ 1 ] ) ;
171170 dbgMaintenance ( "Received SMIGRATED" ) ;
172- this . #client . _handleSmigrated ( sequenceId ) ;
171+ this . #onSMigrated ( push ) ;
173172 return true ;
174173 }
175174 }
@@ -205,12 +204,9 @@ export default class EnterpriseMaintenanceManager {
205204 // period that was communicated by the server is over.
206205 if ( url === null ) {
207206 assert ( this . #options. maintEndpointType === "none" ) ;
208- assert ( this . #options. socket !== undefined ) ;
209- assert ( "host" in this . #options. socket ) ;
210- assert ( typeof this . #options. socket . host === "string" ) ;
211- host = this . #options. socket . host ;
212- assert ( typeof this . #options. socket . port === "number" ) ;
213- port = this . #options. socket . port ;
207+ const { host : h , port : p } = this . #getAddress( )
208+ host = h ;
209+ port = p ;
214210 const waitTime = ( afterSeconds * 1000 ) / 2 ;
215211 dbgMaintenance ( `Wait for ${ waitTime } ms` ) ;
216212 await setTimeout ( waitTime ) ;
@@ -312,8 +308,56 @@ export default class EnterpriseMaintenanceManager {
312308
313309 this . #client. _maintenanceUpdate ( update ) ;
314310 } ;
311+
312+ #onSMigrated = ( push : any [ ] ) => {
313+ // [ 'SMIGRATED', '15', [ '127.0.0.1:6379 123,456,789-1000', '127.0.0.1:6380 124,457,300-500' ] ]
314+ // ^seq ^new endpoint1 ^slots ^new endpoint2 ^slots
315+ const sequenceId = Number ( push [ 1 ] ) ;
316+ const smigratedEvent : SMigratedEvent = {
317+ seqId : sequenceId ,
318+ source : {
319+ ...this . #getAddress( )
320+ } ,
321+ destinations : [ ]
322+ }
323+ for ( const endpointInfo of push [ 2 ] ) {
324+ const [ endpoint , slots ] = String ( endpointInfo ) . split ( ' ' ) ;
325+ //TODO not sure if we need to handle fqdn/ip.. cluster manages clients by host:port. If `cluster slots` returns ip,
326+ // but this notification returns fqdn, then we need to unify somehow ( maybe lookup )
327+ const [ host , port ] = endpoint . split ( ':' ) ;
328+ // `slots` could be mix of single slots and ranges, for example: 123,456,789-1000
329+ const parsedSlots = slots . split ( ',' ) . map ( ( singleOrRange ) : number | [ number , number ] => {
330+ const separatorIndex = singleOrRange . indexOf ( '-' ) ;
331+ if ( separatorIndex === - 1 ) {
332+ // Its single slot
333+ return Number ( singleOrRange ) ;
334+ }
335+ // Its range
336+ return [ Number ( singleOrRange . substring ( 0 , separatorIndex ) ) , Number ( singleOrRange . substring ( separatorIndex + 1 ) ) ] ;
337+ } ) ;
338+
339+ smigratedEvent . destinations . push ( {
340+ host,
341+ port : Number ( port ) ,
342+ slots : parsedSlots
343+ } )
344+ }
345+ this . #client. _handleSmigrated ( smigratedEvent ) ;
346+ }
347+
348+
349+ #getAddress( ) : { host : string , port : number } {
350+ assert ( this . #options. socket !== undefined ) ;
351+ assert ( "host" in this . #options. socket ) ;
352+ assert ( typeof this . #options. socket . host === "string" ) ;
353+ const host = this . #options. socket . host ;
354+ assert ( typeof this . #options. socket . port === "number" ) ;
355+ const port = this . #options. socket . port ;
356+ return { host, port } ;
357+ }
315358}
316359
360+
317361export type MovingEndpointType =
318362 | "auto"
319363 | "internal-ip"
0 commit comments