Skip to content

Commit d0acec0

Browse files
committed
fix: only use active consumers
1 parent efbb11f commit d0acec0

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

src/api.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -385,12 +385,13 @@ export class Worker {
385385
}
386386

387387
async checkAndRecoverOrphanStreams () {
388-
const consumers = (
389-
await this.client.redis.xInfoConsumers(
390-
this.client.redisWorkerStreamName,
391-
this.client.redisWorkerGroupName
392-
)
393-
).map(({ name }) => name)
388+
const rawConsumers = await this.client.redis.xInfoConsumers(
389+
this.client.redisWorkerStreamName,
390+
this.client.redisWorkerGroupName
391+
)
392+
const consumers = rawConsumers
393+
.filter((c) => c.pending > 0 || c.inactive < this.client.redisWorkerTimeout)
394+
.map(({ name }) => name)
394395
const leader = consumers.sort()[0]
395396
if (this.client.consumername !== leader) return
396397

0 commit comments

Comments
 (0)