Skip to content

Commit b361280

Browse files
authored
Merge pull request #18 from hackmdio/fix/recovery-mechanism-for-orphan-streams
fix/recovery mechanism for orphan streams
2 parents 23494c9 + 6f73173 commit b361280

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

src/api.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,41 @@ export class Api {
120120
url,
121121
// scripting: https://github.com/redis/node-redis/#lua-scripts
122122
scripts: {
123+
checkAndRecoverWorkerStream: redis.defineScript({
124+
NUMBER_OF_KEYS: 1,
125+
SCRIPT: `
126+
local found = false
127+
local messages = redis.call("XREAD", "COUNT", 0, "STREAMS", "${this.redisWorkerStreamName}", "0-0")
128+
129+
if messages and #messages > 0 then
130+
local entries = messages[1][2]
131+
for _, entry in ipairs(entries) do
132+
-- Each entry is an array where entry[2] is the message fields
133+
if entry[2][2] == KEYS[1] then
134+
found = true
135+
break
136+
end
137+
end
138+
end
139+
140+
-- If stream not found in y:worker and the stream exists, add it
141+
if not found and redis.call("TYPE", KEYS[1]).ok == "stream" then
142+
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
143+
end
144+
`,
145+
/**
146+
* @param {string} key
147+
*/
148+
transformArguments (key) {
149+
return [key]
150+
},
151+
/**
152+
* @param {null} x
153+
*/
154+
transformReply (x) {
155+
return x
156+
}
157+
}),
123158
addMessage: redis.defineScript({
124159
NUMBER_OF_KEYS: 1,
125160
SCRIPT: `
@@ -211,6 +246,16 @@ export class Api {
211246
return this.redis.addMessage(computeRedisRoomStreamName(room, docid, this.prefix), m)
212247
}
213248

249+
/**
250+
* @param {string} room
251+
* @param {string} docid
252+
*/
253+
async checkAndRecoveryWorkerStream (room, docid) {
254+
await this.redis.checkAndRecoverWorkerStream(
255+
computeRedisRoomStreamName(room, docid, this.prefix)
256+
)
257+
}
258+
214259
/**
215260
* @param {string} room
216261
* @param {string} docid

src/y-socket-io/y-socket-io.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ export class YSocketIO {
157157
this.initAwarenessListeners(socket)
158158
this.initSocketListeners(socket)
159159

160+
await this.client.checkAndRecoveryWorkerStream(namespace, 'index')
160161
const doc = await this.client.getDoc(namespace, 'index')
161162

162163
if (

0 commit comments

Comments
 (0)