diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 12f6bfc..6cf4ac8 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -128,12 +128,6 @@ export class YSocketIO { * @readonly */ debouncedPersistMap = new Map() - /** - * @type {Map} - * @private - * @readonly - */ - debouncedPersistDocMap = new Map() /** * @type {Map} * @private @@ -373,8 +367,7 @@ export class YSocketIO { const nsp = this.namespaceMap.get(ns) if (nsp?.sockets.size === 0 && stream) { this.cleanupNamespace(ns, stream, DEFAULT_CLEAR_TIMEOUT) - const doc = this.namespaceDocMap.get(ns) - if (doc) this.debouncedPersist(ns, doc.ydoc, true) + if (this.namespaceDocMap.has(ns)) this.debouncedPersist(ns, true) } } }) @@ -492,7 +485,7 @@ export class YSocketIO { const shouldPersist = now - lastPersistCalledAt > MAX_PERSIST_INTERVAL if (changed || shouldPersist || nsp.sockets.size === 0) { this.namespacePersistentMap.set(namespace, now) - this.debouncedPersist(namespace, doc.ydoc, nsp.sockets.size === 0) + this.debouncedPersist(namespace, nsp.sockets.size === 0) } this.namespaceDocMap.get(namespace)?.ydoc.destroy() this.namespaceDocMap.set(namespace, doc) @@ -500,11 +493,9 @@ export class YSocketIO { /** * @param {string} namespace - * @param {Y.Doc} doc * @param {boolean=} immediate */ - debouncedPersist (namespace, doc, immediate = false) { - this.debouncedPersistDocMap.set(namespace, doc) + debouncedPersist (namespace, immediate = false) { if (this.debouncedPersistMap.has(namespace)) { if (!immediate) return clearTimeout(this.debouncedPersistMap.get(namespace) || undefined) @@ -514,9 +505,17 @@ export class YSocketIO { : PERSIST_INTERVAL + (Math.random() - 0.5) * PERSIST_INTERVAL const timeout = setTimeout( async () => { + // wait for previous persisting operation if exists + const prev = this.awaitingPersistMap.get(namespace) + if (prev?.promise) await prev.promise + // delete persist entry to allow queueing next persisting operation + // we can delete it here because the following until awaiting persist + // are all synchronize operations + this.debouncedPersistMap.delete(namespace) + try { assert(this.client) - const doc = this.debouncedPersistDocMap.get(namespace) + const doc = this.namespaceDocMap.get(namespace)?.ydoc logSocketIO(`trying to persist ${namespace}`) if (!doc) return if (this.client.persistWorker) { @@ -534,15 +533,14 @@ export class YSocketIO { }) await promise } else { - await this.client.store.persistDoc(namespace, 'index', doc) + const promise = this.client.store.persistDoc(namespace, 'index', doc) + this.awaitingPersistMap.set(namespace, { promise, resolve: () => {} }) + await promise } await this.client.trimRoomStream(namespace, 'index') } catch (e) { console.error(e) - } finally { - this.debouncedPersistDocMap.delete(namespace) - this.debouncedPersistMap.delete(namespace) } }, timeoutInterval