From e07cef51fe9455f1896b6b60b713eaa7a2ed1550 Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Wed, 22 Jan 2025 00:54:02 +0800 Subject: [PATCH 1/4] fix: acknowledge tasks in Redis worker stream after processing --- src/api.js | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/api.js b/src/api.js index 4f29cad..f56615a 100644 --- a/src/api.js +++ b/src/api.js @@ -304,6 +304,12 @@ export class Api { .sRem(this.workerSetName, task.stream) .exec() logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream }) + + await this.redis.xAck( + this.redisWorkerStreamName, + this.redisWorkerGroupName, + task.id + ); } else { reclaimCounts++ const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix) @@ -322,6 +328,13 @@ export class Api { .sAdd(this.workerSetName, task.stream) .exec() ]) + + await this.redis.xAck( + this.redisWorkerStreamName, + this.redisWorkerGroupName, + task.id + ); + logWorker('Compacted stream ', { stream: task.stream, taskId: task.id, newLastId: lastId - this.redisMinMessageLifetime }) try { if (ydocUpdateCallback != null) { From a87d566f221454b7f7e551544989871ced693756 Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Wed, 22 Jan 2025 01:03:22 +0800 Subject: [PATCH 2/4] chore: lint fix --- src/api.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api.js b/src/api.js index f56615a..312b718 100644 --- a/src/api.js +++ b/src/api.js @@ -309,7 +309,7 @@ export class Api { this.redisWorkerStreamName, this.redisWorkerGroupName, task.id - ); + ) } else { reclaimCounts++ const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix) @@ -333,7 +333,7 @@ export class Api { this.redisWorkerStreamName, this.redisWorkerGroupName, task.id - ); + ) logWorker('Compacted stream ', { stream: task.stream, taskId: task.id, newLastId: lastId - this.redisMinMessageLifetime }) try { From d18651e978254584897bffca2fdc39eaa9d20613 Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Wed, 22 Jan 2025 13:30:44 +0800 Subject: [PATCH 3/4] fix: move ack to redis transactions --- src/api.js | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/src/api.js b/src/api.js index 312b718..9eee994 100644 --- a/src/api.js +++ b/src/api.js @@ -301,15 +301,10 @@ export class Api { await this.redis.multi() .xDelIfEmpty(task.stream) .xDel(this.redisWorkerStreamName, task.id) + .xAck(this.redisWorkerStreamName, this.redisWorkerGroupName, task.id) .sRem(this.workerSetName, task.stream) .exec() logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream }) - - await this.redis.xAck( - this.redisWorkerStreamName, - this.redisWorkerGroupName, - task.id - ) } else { reclaimCounts++ const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix) @@ -325,16 +320,10 @@ export class Api { .xTrim(task.stream, 'MINID', lastId - this.redisMinMessageLifetime) .xAdd(this.redisWorkerStreamName, '*', { compact: task.stream }) .xDel(this.redisWorkerStreamName, task.id) + .xAck(this.redisWorkerStreamName, this.redisWorkerGroupName, task.id) .sAdd(this.workerSetName, task.stream) .exec() ]) - - await this.redis.xAck( - this.redisWorkerStreamName, - this.redisWorkerGroupName, - task.id - ) - logWorker('Compacted stream ', { stream: task.stream, taskId: task.id, newLastId: lastId - this.redisMinMessageLifetime }) try { if (ydocUpdateCallback != null) { From 65361c63a2104cc6a3f12dfa4c5206bc88782e4c Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Wed, 22 Jan 2025 14:43:14 +0800 Subject: [PATCH 4/4] fix: ensure task deletion occurs after acknowledgment in Redis worker stream --- src/api.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api.js b/src/api.js index 9eee994..693793c 100644 --- a/src/api.js +++ b/src/api.js @@ -300,8 +300,8 @@ export class Api { if (streamlen === 0) { await this.redis.multi() .xDelIfEmpty(task.stream) - .xDel(this.redisWorkerStreamName, task.id) .xAck(this.redisWorkerStreamName, this.redisWorkerGroupName, task.id) + .xDel(this.redisWorkerStreamName, task.id) .sRem(this.workerSetName, task.stream) .exec() logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream }) @@ -319,8 +319,8 @@ export class Api { this.redis.multi() .xTrim(task.stream, 'MINID', lastId - this.redisMinMessageLifetime) .xAdd(this.redisWorkerStreamName, '*', { compact: task.stream }) - .xDel(this.redisWorkerStreamName, task.id) .xAck(this.redisWorkerStreamName, this.redisWorkerGroupName, task.id) + .xDel(this.redisWorkerStreamName, task.id) .sAdd(this.workerSetName, task.stream) .exec() ])