Skip to content

Commit efbb11f

Browse files
committed
fix: move orphan check from connection to worker
1 parent fbc53ea commit efbb11f

File tree

2 files changed

+49
-49
lines changed

2 files changed

+49
-49
lines changed

src/api.js

Lines changed: 46 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -115,51 +115,19 @@ export class Api {
115115
this.redisMinMessageLifetime = number.parseInt(env.getConf('redis-min-message-lifetime') || '60000')
116116
this.redisWorkerStreamName = this.prefix + ':worker'
117117
this.redisWorkerGroupName = this.prefix + ':worker'
118+
this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset`
118119
this._destroyed = false
119120
this.redis = redis.createClient({
120121
url,
121122
// scripting: https://github.com/redis/node-redis/#lua-scripts
122123
scripts: {
123-
checkAndRecoverWorkerStream: redis.defineScript({
124-
NUMBER_OF_KEYS: 1,
125-
SCRIPT: `
126-
local found = false
127-
local messages = redis.call("XREAD", "COUNT", 0, "STREAMS", "${this.redisWorkerStreamName}", "0-0")
128-
129-
if messages and #messages > 0 then
130-
local entries = messages[1][2]
131-
for _, entry in ipairs(entries) do
132-
-- Each entry is an array where entry[2] is the message fields
133-
if entry[2][2] == KEYS[1] then
134-
found = true
135-
break
136-
end
137-
end
138-
end
139-
140-
-- If stream not found in y:worker and the stream exists, add it
141-
if not found and redis.call("TYPE", KEYS[1]).ok == "stream" then
142-
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
143-
end
144-
`,
145-
/**
146-
* @param {string} key
147-
*/
148-
transformArguments (key) {
149-
return [key]
150-
},
151-
/**
152-
* @param {null} x
153-
*/
154-
transformReply (x) {
155-
return x
156-
}
157-
}),
158124
addMessage: redis.defineScript({
159125
NUMBER_OF_KEYS: 1,
160126
SCRIPT: `
161127
if redis.call("EXISTS", KEYS[1]) == 0 then
162128
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
129+
elseif redis.call("XLEN", KEYS[1]) > 100 then
130+
redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1])
163131
end
164132
redis.call("XADD", KEYS[1], "*", "m", ARGV[1])
165133
`,
@@ -246,16 +214,6 @@ export class Api {
246214
return this.redis.addMessage(computeRedisRoomStreamName(room, docid, this.prefix), m)
247215
}
248216

249-
/**
250-
* @param {string} room
251-
* @param {string} docid
252-
*/
253-
async checkAndRecoveryWorkerStream (room, docid) {
254-
await this.redis.checkAndRecoverWorkerStream(
255-
computeRedisRoomStreamName(room, docid, this.prefix)
256-
)
257-
}
258-
259217
/**
260218
* @param {string} room
261219
* @param {string} docid
@@ -327,13 +285,15 @@ export class Api {
327285
})
328286
}
329287
tasks.length > 0 && logWorker('Accepted tasks ', { tasks })
288+
await this.redis.expire(this.workerSetName, 60 * 5)
330289
let reclaimCounts = 0
331290
await promise.all(tasks.map(async task => {
332291
const streamlen = await this.redis.xLen(task.stream)
333292
if (streamlen === 0) {
334293
await this.redis.multi()
335294
.xDelIfEmpty(task.stream)
336295
.xDel(this.redisWorkerStreamName, task.id)
296+
.sRem(this.workerSetName, task.stream)
337297
.exec()
338298
logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream })
339299
} else {
@@ -348,6 +308,7 @@ export class Api {
348308
.xTrim(task.stream, 'MINID', lastId - this.redisMinMessageLifetime)
349309
.xAdd(this.redisWorkerStreamName, '*', { compact: task.stream })
350310
.xDel(this.redisWorkerStreamName, task.id)
311+
.sAdd(this.workerSetName, task.stream)
351312
.exec()
352313
])
353314
logWorker('Compacted stream ', { stream: task.stream, taskId: task.id, newLastId: lastId - this.redisMinMessageLifetime })
@@ -413,11 +374,51 @@ export class Worker {
413374
await promise.wait(client.redisWorkerTimeout - (now - prev))
414375
}
415376
prev = now
377+
378+
await this.checkAndRecoverOrphanStreams()
416379
} catch (e) {
417380
console.error(e)
418381
}
419382
}
420383
logWorker('Ended worker process ', { id: client.consumername })
421384
})()
422385
}
386+
387+
async checkAndRecoverOrphanStreams () {
388+
const consumers = (
389+
await this.client.redis.xInfoConsumers(
390+
this.client.redisWorkerStreamName,
391+
this.client.redisWorkerGroupName
392+
)
393+
).map(({ name }) => name)
394+
const leader = consumers.sort()[0]
395+
if (this.client.consumername !== leader) return
396+
397+
/** @type {Set<string>} */
398+
const processingSet = new Set()
399+
for (const consumer of consumers) {
400+
const ids = await this.client.redis.sMembers(
401+
`${this.client.prefix}:worker:${consumer}:idset`
402+
)
403+
for (const id of ids) processingSet.add(id)
404+
}
405+
406+
const checkListKey = `${this.client.prefix}:worker:checklist`
407+
const checklist = await this.client.redis.sMembers(checkListKey)
408+
409+
const orphans = checklist.filter((room) => !processingSet.has(room))
410+
if (!orphans) return
411+
logWorker(`adding ${orphans.length} orphans back to worker`)
412+
413+
await Promise.all(
414+
orphans.map((room) =>
415+
this.client.redis.xAdd(
416+
this.client.redisWorkerStreamName,
417+
'*',
418+
{ compact: room }
419+
)
420+
)
421+
)
422+
await this.client.redis.del(checkListKey)
423+
}
423424
}

src/y-socket-io/y-socket-io.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ export class YSocketIO {
157157
this.initAwarenessListeners(socket)
158158
this.initSocketListeners(socket)
159159

160-
await this.client.checkAndRecoveryWorkerStream(namespace, 'index')
161160
const doc = await this.client.getDoc(namespace, 'index')
162161

163162
if (
@@ -216,7 +215,7 @@ export class YSocketIO {
216215
this.getNamespaceString(socket.nsp),
217216
'index',
218217
Buffer.from(this.toRedis('sync-update', message))
219-
)
218+
).catch(console.error)
220219
})
221220
}
222221

@@ -240,7 +239,7 @@ export class YSocketIO {
240239
this.getNamespaceString(socket.nsp),
241240
'index',
242241
Buffer.from(this.toRedis('awareness-update', new Uint8Array(message)))
243-
)
242+
).catch(console.error)
244243
})
245244
}
246245

@@ -285,7 +284,7 @@ export class YSocketIO {
285284
this.getNamespaceString(socket.nsp),
286285
'index',
287286
Buffer.from(this.toRedis('sync-step-2', message))
288-
)
287+
).catch(console.error)
289288
}
290289
)
291290
if (doc.awareness.states.size > 0) {

0 commit comments

Comments
 (0)