Skip to content

Commit c88dd84

Browse files
committed
parse notification
1 parent 7f7e005 commit c88dd84

File tree

3 files changed

+62
-17
lines changed

3 files changed

+62
-17
lines changed

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ export const SMIGRATED_EVENT = "__SMIGRATED";
1414
export interface SMigratedEvent {
1515
seqId: number,
1616
source: { host: string, port: number };
17-
destination: { host: string, port: number };
18-
ranges: (number | [number, number])[]
17+
destinations: {
18+
host: string, port: number
19+
slots: (number | [number, number])[]
20+
}[]
1921
}
2022

2123
export const MAINTENANCE_EVENTS = {
@@ -165,11 +167,8 @@ export default class EnterpriseMaintenanceManager {
165167
return true;
166168
}
167169
case PN.SMIGRATED: {
168-
// [ 'SMIGRATED', '123', '54.78.247.156:12075' 'slot1,rangediff1' ]
169-
// ^seq ^new ip ^slots info
170-
const sequenceId = Number(push[1]);
171170
dbgMaintenance("Received SMIGRATED");
172-
this.#client._handleSmigrated(sequenceId);
171+
this.#onSMigrated(push);
173172
return true;
174173
}
175174
}
@@ -205,12 +204,9 @@ export default class EnterpriseMaintenanceManager {
205204
// period that was communicated by the server is over.
206205
if (url === null) {
207206
assert(this.#options.maintEndpointType === "none");
208-
assert(this.#options.socket !== undefined);
209-
assert("host" in this.#options.socket);
210-
assert(typeof this.#options.socket.host === "string");
211-
host = this.#options.socket.host;
212-
assert(typeof this.#options.socket.port === "number");
213-
port = this.#options.socket.port;
207+
const { host: h, port: p } = this.#getAddress()
208+
host = h;
209+
port = p;
214210
const waitTime = (afterSeconds * 1000) / 2;
215211
dbgMaintenance(`Wait for ${waitTime}ms`);
216212
await setTimeout(waitTime);
@@ -312,8 +308,56 @@ export default class EnterpriseMaintenanceManager {
312308

313309
this.#client._maintenanceUpdate(update);
314310
};
311+
312+
#onSMigrated = (push: any[]) => {
313+
// [ 'SMIGRATED', '15', [ '127.0.0.1:6379 123,456,789-1000', '127.0.0.1:6380 124,457,300-500' ] ]
314+
// ^seq ^new endpoint1 ^slots ^new endpoint2 ^slots
315+
const sequenceId = Number(push[1]);
316+
const smigratedEvent: SMigratedEvent = {
317+
seqId: sequenceId,
318+
source: {
319+
...this.#getAddress()
320+
},
321+
destinations: []
322+
}
323+
for(const endpointInfo of push[2]) {
324+
const [endpoint, slots] = String(endpointInfo).split(' ');
325+
//TODO not sure if we need to handle fqdn/ip.. cluster manages clients by host:port. If `cluster slots` returns ip,
326+
// but this notification returns fqdn, then we need to unify somehow ( maybe lookup )
327+
const [ host, port ] = endpoint.split(':');
328+
// `slots` could be mix of single slots and ranges, for example: 123,456,789-1000
329+
const parsedSlots = slots.split(',').map((singleOrRange): number | [number, number] => {
330+
const separatorIndex = singleOrRange.indexOf('-');
331+
if(separatorIndex === -1) {
332+
// Its single slot
333+
return Number(singleOrRange);
334+
}
335+
// Its range
336+
return [Number(singleOrRange.substring(0, separatorIndex)), Number(singleOrRange.substring(separatorIndex + 1))];
337+
});
338+
339+
smigratedEvent.destinations.push({
340+
host,
341+
port: Number(port),
342+
slots: parsedSlots
343+
})
344+
}
345+
this.#client._handleSmigrated(smigratedEvent);
346+
}
347+
348+
349+
#getAddress(): { host: string, port: number } {
350+
assert(this.#options.socket !== undefined);
351+
assert("host" in this.#options.socket);
352+
assert(typeof this.#options.socket.host === "string");
353+
const host = this.#options.socket.host;
354+
assert(typeof this.#options.socket.port === "number");
355+
const port = this.#options.socket.port;
356+
return { host, port };
357+
}
315358
}
316359

360+
317361
export type MovingEndpointType =
318362
| "auto"
319363
| "internal-ip"

packages/client/lib/client/index.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider }
2020
import { BasicCommandParser, CommandParser } from './parser';
2121
import SingleEntryCache from '../single-entry-cache';
2222
import { version } from '../../package.json'
23-
import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType, SMIGRATED_EVENT } from './enterprise-maintenance-manager';
23+
import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType, SMIGRATED_EVENT, SMigratedEvent } from './enterprise-maintenance-manager';
2424

2525
export interface RedisClientOptions<
2626
M extends RedisModules = RedisModules,
@@ -1010,9 +1010,10 @@ export default class RedisClient<
10101010
/**
10111011
* @internal
10121012
*/
1013-
_handleSmigrated(sequenceId: number) {
1014-
this._self.emit(SMIGRATED_EVENT, sequenceId);
1015-
}
1013+
_handleSmigrated(smigratedEvent: SMigratedEvent) {
1014+
this._self.emit(SMIGRATED_EVENT, smigratedEvent);
1015+
}
1016+
10161017

10171018
/**
10181019
* @internal

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ export default class RedisClusterSlots<
255255
}
256256

257257
#handleSmigrated = async (event: SMigratedEvent) => {
258-
dbgMaintenance(`[CSlots]: handle smigrated`, event);
258+
dbgMaintenance(`[CSlots]: handle smigrated`, JSON.stringify(event, null, 2));
259259

260260
if(this.smigratedSeqIdsSeen.has(event.seqId)) {
261261
dbgMaintenance(`[CSlots]: sequence id ${event.seqId} already seen, abort`)

0 commit comments

Comments
 (0)