diff --git a/src/api.js b/src/api.js index a3db515..2ca13c9 100644 --- a/src/api.js +++ b/src/api.js @@ -120,6 +120,41 @@ export class Api { url, // scripting: https://github.com/redis/node-redis/#lua-scripts scripts: { + checkAndRecoverWorkerStream: redis.defineScript({ + NUMBER_OF_KEYS: 1, + SCRIPT: ` + local found = false + local messages = redis.call("XREAD", "COUNT", 0, "STREAMS", "${this.redisWorkerStreamName}", "0-0") + + if messages and #messages > 0 then + local entries = messages[1][2] + for _, entry in ipairs(entries) do + -- Each entry is an array where entry[2] is the message fields + if entry[2][2] == KEYS[1] then + found = true + break + end + end + end + + -- If stream not found in y:worker and the stream exists, add it + if not found and redis.call("TYPE", KEYS[1]).ok == "stream" then + redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1]) + end + `, + /** + * @param {string} key + */ + transformArguments (key) { + return [key] + }, + /** + * @param {null} x + */ + transformReply (x) { + return x + } + }), addMessage: redis.defineScript({ NUMBER_OF_KEYS: 1, SCRIPT: ` @@ -211,6 +246,16 @@ export class Api { return this.redis.addMessage(computeRedisRoomStreamName(room, docid, this.prefix), m) } + /** + * @param {string} room + * @param {string} docid + */ + async checkAndRecoveryWorkerStream (room, docid) { + await this.redis.checkAndRecoverWorkerStream( + computeRedisRoomStreamName(room, docid, this.prefix) + ) + } + /** * @param {string} room * @param {string} docid diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 2993fd0..78779c0 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -157,6 +157,7 @@ export class YSocketIO { this.initAwarenessListeners(socket) this.initSocketListeners(socket) + await this.client.checkAndRecoveryWorkerStream(namespace, 'index') const doc = await this.client.getDoc(namespace, 'index') if (