Skip to content

Commit e07cef5

Browse files
committed
fix: acknowledge tasks in Redis worker stream after processing
1 parent 0be69a9 commit e07cef5

File tree

1 file changed

+13
-0
lines changed

1 file changed

+13
-0
lines changed

src/api.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,12 @@ export class Api {
304304
.sRem(this.workerSetName, task.stream)
305305
.exec()
306306
logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream })
307+
308+
await this.redis.xAck(
309+
this.redisWorkerStreamName,
310+
this.redisWorkerGroupName,
311+
task.id
312+
);
307313
} else {
308314
reclaimCounts++
309315
const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix)
@@ -322,6 +328,13 @@ export class Api {
322328
.sAdd(this.workerSetName, task.stream)
323329
.exec()
324330
])
331+
332+
await this.redis.xAck(
333+
this.redisWorkerStreamName,
334+
this.redisWorkerGroupName,
335+
task.id
336+
);
337+
325338
logWorker('Compacted stream ', { stream: task.stream, taskId: task.id, newLastId: lastId - this.redisMinMessageLifetime })
326339
try {
327340
if (ydocUpdateCallback != null) {

0 commit comments

Comments
 (0)