Skip to content

Commit 81f6f97

Browse files
committed
feat: do persist in socketio connection layer
1 parent f21a271 commit 81f6f97

File tree

1 file changed

+150
-39
lines changed

1 file changed

+150
-39
lines changed

src/y-socket-io/y-socket-io.js

Lines changed: 150 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import * as promise from 'lib0/promise'
44
import * as encoding from 'lib0/encoding'
55
import * as decoding from 'lib0/decoding'
66
import { assert } from 'lib0/testing'
7-
import { User } from './user.js'
87
import * as api from '../api.js'
98
import * as protocol from '../protocol.js'
109
import { createSubscriber } from '../subscriber.js'
10+
import { isDeepStrictEqual } from 'util'
11+
import { User } from './user.js'
12+
13+
const PERSIST_INTERVAL = 5000
1114

1215
/**
1316
* @typedef {import('socket.io').Namespace} Namespace
@@ -89,6 +92,18 @@ export class YSocketIO {
8992
* @readonly
9093
*/
9194
namespaceMap = new Map()
95+
/**
96+
* @type {Map<string, RedisDoc>}
97+
* @private
98+
* @readonly
99+
*/
100+
namespaceDocMap = new Map()
101+
/**
102+
* @type {Map<Socket, { user: UserLike, validatedAt: number }>}
103+
* @private
104+
* @readonly
105+
*/
106+
socketUserCache = new Map()
92107

93108
/**
94109
* YSocketIO constructor.
@@ -123,12 +138,21 @@ export class YSocketIO {
123138
this.nsp = this.io.of(/^\/yjs\|.*$/)
124139

125140
this.nsp.use(async (socket, next) => {
126-
if (this.configuration.authenticate == null) return next()
127-
const user = await this.configuration.authenticate(socket)
128-
if (user) {
129-
socket.user = new User(this.getNamespaceString(socket.nsp), user.userid)
130-
return next()
131-
} else return next(new Error('Unauthorized'))
141+
if (this.configuration.authenticate === null) return next()
142+
const userCache = this.socketUserCache.get(socket)
143+
const namespace = this.getNamespaceString(socket.nsp)
144+
if (!userCache || Date.now() - userCache.validatedAt > 60_000) {
145+
this.socketUserCache.delete(socket)
146+
const user = await this.configuration.authenticate(socket)
147+
if (!user) return next(new Error('Unauthorized'))
148+
this.socketUserCache.set(socket, { user, validatedAt: Date.now() })
149+
socket.user = new User(namespace, user.userid)
150+
} else {
151+
socket.user = new User(namespace, userCache.user.userid)
152+
}
153+
154+
if (socket.user) return next()
155+
else return next(new Error('Unauthorized'))
132156
})
133157

134158
this.nsp.on('connection', async (socket) => {
@@ -156,17 +180,23 @@ export class YSocketIO {
156180
this.initSyncListeners(socket)
157181
this.initAwarenessListeners(socket)
158182
this.initSocketListeners(socket)
183+
;(async () => {
184+
assert(this.client)
185+
assert(socket.user)
186+
const doc =
187+
this.namespaceDocMap.get(namespace) ||
188+
(await this.client.getDoc(namespace, 'index'))
189+
this.namespaceDocMap.set(namespace, doc)
159190

160-
const doc = await this.client.getDoc(namespace, 'index')
161-
162-
if (
163-
api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId)
164-
) {
165-
// our subscription is newer than the content that we received from the api
166-
// need to renew subscription id and make sure that we catch the latest content.
167-
this.subscriber.ensureSubId(stream, doc.redisLastId)
168-
}
169-
this.startSynchronization(socket, doc)
191+
if (
192+
api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId)
193+
) {
194+
// our subscription is newer than the content that we received from the api
195+
// need to renew subscription id and make sure that we catch the latest content.
196+
this.subscriber?.ensureSubId(stream, doc.redisLastId)
197+
}
198+
this.startSynchronization(socket, doc)
199+
})()
170200
})
171201

172202
return { client, subscriber }
@@ -200,22 +230,31 @@ export class YSocketIO {
200230
syncStep2
201231
) => {
202232
assert(this.client)
203-
const doc = await this.client.getDoc(
204-
this.getNamespaceString(socket.nsp),
205-
'index'
206-
)
233+
const namespace = this.getNamespaceString(socket.nsp)
234+
const doc =
235+
this.namespaceDocMap.get(namespace) ||
236+
(await this.client.getDoc(namespace, 'index'))
237+
this.namespaceDocMap.set(namespace, doc)
238+
assert(doc)
207239
syncStep2(Y.encodeStateAsUpdate(doc.ydoc, stateVector))
208240
}
209241
)
210242

243+
/** @type {unknown} */
244+
let prevMsg = null
211245
socket.on('sync-update', (/** @type {ArrayBuffer} */ update) => {
246+
if (isDeepStrictEqual(update, prevMsg)) return
212247
assert(this.client)
248+
const namespace = this.getNamespaceString(socket.nsp)
213249
const message = Buffer.from(update.slice(0, update.byteLength))
214-
this.client.addMessage(
215-
this.getNamespaceString(socket.nsp),
216-
'index',
217-
Buffer.from(this.toRedis('sync-update', message))
218-
).catch(console.error)
250+
this.client
251+
.addMessage(
252+
namespace,
253+
'index',
254+
Buffer.from(this.toRedis('sync-update', message))
255+
)
256+
.catch(console.error)
257+
prevMsg = update
219258
})
220259
}
221260

@@ -232,14 +271,19 @@ export class YSocketIO {
232271
* @readonly
233272
*/
234273
initAwarenessListeners = (socket) => {
274+
/** @type {unknown} */
275+
const prevMsg = null
235276
socket.on('awareness-update', (/** @type {ArrayBuffer} */ update) => {
277+
if (isDeepStrictEqual(update, prevMsg)) return
236278
assert(this.client)
237279
const message = Buffer.from(update.slice(0, update.byteLength))
238-
this.client.addMessage(
239-
this.getNamespaceString(socket.nsp),
240-
'index',
241-
Buffer.from(this.toRedis('awareness-update', new Uint8Array(message)))
242-
).catch(console.error)
280+
this.client
281+
.addMessage(
282+
this.getNamespaceString(socket.nsp),
283+
'index',
284+
Buffer.from(this.toRedis('awareness-update', new Uint8Array(message)))
285+
)
286+
.catch(console.error)
243287
})
244288
}
245289

@@ -253,14 +297,18 @@ export class YSocketIO {
253297
socket.on('disconnect', async () => {
254298
assert(this.subscriber)
255299
if (!socket.user) return
256-
for (const ns of socket.user.subs) {
257-
const stream = this.namespaceStreamMap.get(ns)
300+
this.socketUserCache.delete(socket)
301+
for (const stream of socket.user.subs) {
302+
const ns = this.streamNamespaceMap.get(stream)
303+
if (!ns) continue
258304
const nsp = this.namespaceMap.get(ns)
259305
if (nsp?.sockets.size === 0 && stream) {
260306
this.subscriber.unsubscribe(stream, this.redisMessageSubscriber)
261307
this.namespaceStreamMap.delete(ns)
262308
this.streamNamespaceMap.delete(stream)
263309
this.namespaceMap.delete(ns)
310+
this.namespaceDocMap.get(ns)?.ydoc.destroy()
311+
this.namespaceDocMap.delete(ns)
264312
}
265313
}
266314
})
@@ -280,11 +328,13 @@ export class YSocketIO {
280328
(/** @type {Uint8Array} */ update) => {
281329
assert(this.client)
282330
const message = Buffer.from(update.slice(0, update.byteLength))
283-
this.client.addMessage(
284-
this.getNamespaceString(socket.nsp),
285-
'index',
286-
Buffer.from(this.toRedis('sync-step-2', message))
287-
).catch(console.error)
331+
this.client
332+
.addMessage(
333+
this.getNamespaceString(socket.nsp),
334+
'index',
335+
Buffer.from(this.toRedis('sync-step-2', message))
336+
)
337+
.catch(console.error)
288338
}
289339
)
290340
if (doc.awareness.states.size > 0) {
@@ -303,7 +353,7 @@ export class YSocketIO {
303353
* @param {string} stream
304354
* @param {Array<Uint8Array>} messages
305355
*/
306-
redisMessageSubscriber = (stream, messages) => {
356+
redisMessageSubscriber = async (stream, messages) => {
307357
const namespace = this.streamNamespaceMap.get(stream)
308358
if (!namespace) return
309359
const nsp = this.namespaceMap.get(namespace)
@@ -313,6 +363,8 @@ export class YSocketIO {
313363
this.namespaceStreamMap.delete(namespace)
314364
this.streamNamespaceMap.delete(stream)
315365
this.namespaceMap.delete(namespace)
366+
this.namespaceDocMap.get(namespace)?.ydoc.destroy()
367+
this.namespaceDocMap.delete(namespace)
316368
}
317369

318370
/** @type {Uint8Array[]} */
@@ -334,6 +386,65 @@ export class YSocketIO {
334386
if (msg.length === 0) continue
335387
nsp.emit('awareness-update', msg)
336388
}
389+
390+
let changed = false
391+
const existDoc = this.namespaceDocMap.get(namespace)
392+
if (existDoc) {
393+
existDoc.ydoc.on('afterTransaction', (tr) => {
394+
changed = tr.changed.size > 0
395+
})
396+
Y.transact(existDoc.ydoc, () => {
397+
for (const msg of updates) Y.applyUpdate(existDoc.ydoc, msg)
398+
for (const msg of awareness) {
399+
AwarenessProtocol.applyAwarenessUpdate(existDoc.awareness, msg, null)
400+
}
401+
})
402+
}
403+
404+
assert(this.client)
405+
let doc = existDoc
406+
if (!existDoc) {
407+
const getDoc = await this.client.getDoc(namespace, 'index')
408+
doc = getDoc
409+
changed = getDoc.changed
410+
}
411+
assert(doc)
412+
if (changed) this.debouncedPersist(namespace, doc.ydoc)
413+
this.namespaceDocMap.get(namespace)?.ydoc.destroy()
414+
this.namespaceDocMap.set(namespace, doc)
415+
await this.client.trimRoomStream(namespace, 'index', nsp.sockets.size === 0)
416+
}
417+
418+
/**
419+
* @type {Map<string, NodeJS.Timeout | null>}
420+
*/
421+
debouncedPersistMap = new Map()
422+
/**
423+
* @type {Map<string, Y.Doc>}
424+
*/
425+
debouncedPersistDocMap = new Map()
426+
427+
/**
428+
* @param {string} namespace
429+
* @param {Y.Doc} doc
430+
*/
431+
async debouncedPersist (namespace, doc) {
432+
this.debouncedPersistDocMap.set(namespace, doc)
433+
if (this.debouncedPersistMap.has(namespace)) return
434+
this.debouncedPersistMap.set(
435+
namespace,
436+
setTimeout(
437+
async () => {
438+
assert(this.client)
439+
const doc = this.debouncedPersistDocMap.get(namespace)
440+
if (!doc) return
441+
await this.client.store.persistDoc(namespace, 'index', doc)
442+
this.debouncedPersistDocMap.delete(namespace)
443+
this.debouncedPersistMap.delete(namespace)
444+
},
445+
PERSIST_INTERVAL + (Math.random() - 0.5) * PERSIST_INTERVAL
446+
)
447+
)
337448
}
338449

339450
/**

0 commit comments

Comments
 (0)