Skip to content

Commit f21a271

Browse files
committed
feat: helper fn for redis stream
1 parent 4bf07e1 commit f21a271

File tree

1 file changed

+29
-0
lines changed

1 file changed

+29
-0
lines changed

src/api.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,35 @@ export class Api {
265265
}
266266
}
267267

268+
/**
269+
* @param {string} room
270+
* @param {string} docid
271+
*/
272+
async getRedisLastId (room, docid) {
273+
const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
274+
const docMessages = ms.get(room)?.get(docid) || null
275+
return docMessages?.lastId.toString() || '0'
276+
}
277+
278+
/**
279+
* @param {string} room
280+
* @param {string} docid
281+
* @param {boolean} [remove=false]
282+
*/
283+
async trimRoomStream (room, docid, remove = false) {
284+
const roomName = computeRedisRoomStreamName(room, docid, this.prefix)
285+
const redisLastId = await this.getRedisLastId(room, docid)
286+
const lastId = number.parseInt(redisLastId.split('-')[0])
287+
if (remove) {
288+
await this.redis.del(roomName)
289+
} else {
290+
await this.redis.multi()
291+
.xTrim(roomName, 'MINID', lastId - this.redisMinMessageLifetime)
292+
.xDelIfEmpty(roomName)
293+
.exec()
294+
}
295+
}
296+
268297
/**
269298
* @param {Object} opts
270299
* @param {number} [opts.blockTime]

0 commit comments

Comments
 (0)