diff --git a/src/api.js b/src/api.js index 2ca13c9..e772493 100644 --- a/src/api.js +++ b/src/api.js @@ -115,51 +115,19 @@ export class Api { this.redisMinMessageLifetime = number.parseInt(env.getConf('redis-min-message-lifetime') || '60000') this.redisWorkerStreamName = this.prefix + ':worker' this.redisWorkerGroupName = this.prefix + ':worker' + this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset` this._destroyed = false this.redis = redis.createClient({ url, // scripting: https://github.com/redis/node-redis/#lua-scripts scripts: { - checkAndRecoverWorkerStream: redis.defineScript({ - NUMBER_OF_KEYS: 1, - SCRIPT: ` - local found = false - local messages = redis.call("XREAD", "COUNT", 0, "STREAMS", "${this.redisWorkerStreamName}", "0-0") - - if messages and #messages > 0 then - local entries = messages[1][2] - for _, entry in ipairs(entries) do - -- Each entry is an array where entry[2] is the message fields - if entry[2][2] == KEYS[1] then - found = true - break - end - end - end - - -- If stream not found in y:worker and the stream exists, add it - if not found and redis.call("TYPE", KEYS[1]).ok == "stream" then - redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) - end - `, - /** - * @param {string} key - */ - transformArguments (key) { - return [key] - }, - /** - * @param {null} x - */ - transformReply (x) { - return x - } - }), addMessage: redis.defineScript({ NUMBER_OF_KEYS: 1, SCRIPT: ` if redis.call("EXISTS", KEYS[1]) == 0 then redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) + elseif redis.call("XLEN", KEYS[1]) > 100 then + redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1]) end redis.call("XADD", KEYS[1], "*", "m", ARGV[1]) `, @@ -246,16 +214,6 @@ export class Api { return this.redis.addMessage(computeRedisRoomStreamName(room, docid, this.prefix), m) } - /** - * @param {string} room - * @param {string} docid - */ - async checkAndRecoveryWorkerStream (room, docid) { - await this.redis.checkAndRecoverWorkerStream( - computeRedisRoomStreamName(room, docid, this.prefix) - ) - } - /** * @param {string} room * @param {string} docid @@ -327,6 +285,7 @@ export class Api { }) } tasks.length > 0 && logWorker('Accepted tasks ', { tasks }) + if (this.redis.isOpen) await this.redis.expire(this.workerSetName, 60 * 5) let reclaimCounts = 0 await promise.all(tasks.map(async task => { const streamlen = await this.redis.xLen(task.stream) @@ -334,6 +293,7 @@ export class Api { await this.redis.multi() .xDelIfEmpty(task.stream) .xDel(this.redisWorkerStreamName, task.id) + .sRem(this.workerSetName, task.stream) .exec() logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream }) } else { @@ -348,6 +308,7 @@ export class Api { .xTrim(task.stream, 'MINID', lastId - this.redisMinMessageLifetime) .xAdd(this.redisWorkerStreamName, '*', { compact: task.stream }) .xDel(this.redisWorkerStreamName, task.id) + .sAdd(this.workerSetName, task.stream) .exec() ]) logWorker('Compacted stream ', { stream: task.stream, taskId: task.id, newLastId: lastId - this.redisMinMessageLifetime }) @@ -413,6 +374,8 @@ export class Worker { await promise.wait(client.redisWorkerTimeout - (now - prev)) } prev = now + + await this.checkAndRecoverOrphanStreams() } catch (e) { console.error(e) } @@ -420,4 +383,44 @@ export class Worker { logWorker('Ended worker process ', { id: client.consumername }) })() } + + async checkAndRecoverOrphanStreams () { + if (!this.client.redis.isOpen) return + const rawConsumers = await this.client.redis.xInfoConsumers( + this.client.redisWorkerStreamName, + this.client.redisWorkerGroupName + ) + const consumers = rawConsumers + .filter((c) => c.pending > 0 || c.inactive < this.client.redisWorkerTimeout) + .map(({ name }) => name) + const leader = consumers.sort()[0] + if (this.client.consumername !== leader) return + + /** @type {Set} */ + const processingSet = new Set() + for (const consumer of consumers) { + const ids = await this.client.redis.sMembers( + `${this.client.prefix}:worker:${consumer}:idset` + ) + for (const id of ids) processingSet.add(id) + } + + const checkListKey = `${this.client.prefix}:worker:checklist` + const checklist = await this.client.redis.sMembers(checkListKey) + + const orphans = checklist.filter((room) => !processingSet.has(room)) + if (!orphans) return + logWorker(`adding ${orphans.length} orphans back to worker`) + + await Promise.all( + orphans.map((room) => + this.client.redis.xAdd( + this.client.redisWorkerStreamName, + '*', + { compact: room } + ) + ) + ) + await this.client.redis.del(checkListKey) + } } diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 78779c0..590facf 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -157,7 +157,6 @@ export class YSocketIO { this.initAwarenessListeners(socket) this.initSocketListeners(socket) - await this.client.checkAndRecoveryWorkerStream(namespace, 'index') const doc = await this.client.getDoc(namespace, 'index') if ( @@ -216,7 +215,7 @@ export class YSocketIO { this.getNamespaceString(socket.nsp), 'index', Buffer.from(this.toRedis('sync-update', message)) - ) + ).catch(console.error) }) } @@ -240,7 +239,7 @@ export class YSocketIO { this.getNamespaceString(socket.nsp), 'index', Buffer.from(this.toRedis('awareness-update', new Uint8Array(message))) - ) + ).catch(console.error) }) } @@ -285,7 +284,7 @@ export class YSocketIO { this.getNamespaceString(socket.nsp), 'index', Buffer.from(this.toRedis('sync-step-2', message)) - ) + ).catch(console.error) } ) if (doc.awareness.states.size > 0) {