diff --git a/src/api.js b/src/api.js index 2ca13c9..077e827 100644 --- a/src/api.js +++ b/src/api.js @@ -277,8 +277,10 @@ export class Api { const awareness = new awarenessProtocol.Awareness(ydoc) awareness.setLocalState(null) // we don't want to propagate awareness state const now = performance.now() + if (docstate) { Y.applyUpdateV2(ydoc, docstate.doc) } + let changed = false + ydoc.once('afterTransaction', (tr) => { changed = tr.changed.size > 0 }) ydoc.transact(() => { - if (docstate) { Y.applyUpdateV2(ydoc, docstate.doc) } docMessages?.messages.forEach(m => { const decoder = decoding.createDecoder(m) switch (decoding.readVarUint(decoder)) { @@ -296,7 +298,13 @@ export class Api { }) }) logApi(`took ${performance.now() - now}ms to process messages for room: ${room}`) - return { ydoc, awareness, redisLastId: docMessages?.lastId.toString() || '0', storeReferences: docstate?.references || null } + return { + ydoc, + awareness, + redisLastId: docMessages?.lastId.toString() || '0', + storeReferences: docstate?.references || null, + changed + } } /** @@ -339,11 +347,14 @@ export class Api { } else { reclaimCounts++ const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix) - const { ydoc, storeReferences, redisLastId } = await this.getDoc(room, docid) + const { ydoc, storeReferences, redisLastId, changed } = await this.getDoc(room, docid) const lastId = math.max(number.parseInt(redisLastId.split('-')[0]), number.parseInt(task.id.split('-')[0])) - await this.store.persistDoc(room, docid, ydoc) + if (changed) { + logWorker(`persisting changes in room: ${room}`) + await this.store.persistDoc(room, docid, ydoc) + } else logWorker(`skip persisting room: ${room} due to no changes`) await promise.all([ - storeReferences ? this.store.deleteReferences(room, docid, storeReferences) : promise.resolve(), + storeReferences && changed ? this.store.deleteReferences(room, docid, storeReferences) : promise.resolve(), this.redis.multi() .xTrim(task.stream, 'MINID', lastId - this.redisMinMessageLifetime) .xAdd(this.redisWorkerStreamName, '*', { compact: task.stream }) diff --git a/tests/api.tests.js b/tests/api.tests.js index 51ecfef..0e0ea71 100644 --- a/tests/api.tests.js +++ b/tests/api.tests.js @@ -75,6 +75,7 @@ export const testWorker = async tc => { let streamexists = true while (streamexists) { streamexists = (await client.redis.exists(stream)) === 1 + await promise.wait(10) } const { ydoc: loadedDoc } = await client.getDoc(room, docid) t.assert(loadedDoc.getMap().get('key1') === 'val1')