Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-out-of-window-sse-filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

fix(db): filter SSE inserts outside the current window before they reach D2
17 changes: 14 additions & 3 deletions packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,26 @@ export class CollectionSubscriber<
// Use a holder to forward-reference subscription in the callback
const subscriptionHolder: { current?: CollectionSubscription } = {}

const windowSize = limit + offset
const sendChangesInRange = (
changes: Iterable<ChangeMessage<any, string | number>>,
) => {
const changesArray = Array.isArray(changes) ? changes : [...changes]

this.trackSentValues(changesArray, orderByInfo.comparator)
const windowBoundary = this.biggest
const relevant =
windowBoundary === undefined || windowSize === Infinity
? changesArray
: changesArray.filter((change) => {
if (change.type !== `insert`) return true
if (this.sentToD2Keys.has(change.key)) return true
if (this.sentToD2Keys.size < windowSize) return true
return orderByInfo.comparator(windowBoundary, change.value) >= 0
})

this.trackSentValues(relevant, orderByInfo.comparator)

// Split live updates into a delete of the old value and an insert of the new value
const splittedChanges = splitUpdates(changesArray)
const splittedChanges = splitUpdates(relevant)
this.sendChangesToPipelineWithTracking(
splittedChanges,
subscriptionHolder.current!,
Expand Down
87 changes: 87 additions & 0 deletions packages/db/tests/collection-subscriber-out-of-window-sse.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { describe, expect, it } from 'vitest'
import { createCollection } from '../src/collection/index.js'
import { BTreeIndex } from '../src/indexes/btree-index.js'
import { createLiveQueryCollection } from '../src/query/index.js'
import { mockSyncCollectionOptions } from './utils.js'

type TestItem = {
id: string
value: number
}

describe(`CollectionSubscriber out-of-window SSE filter`, () => {
it(`should not promote an out-of-window SSE insert when an in-window item is deleted`, async () => {
const initialData: Array<TestItem> = [
{ id: `1`, value: 100 },
{ id: `2`, value: 90 },
{ id: `3`, value: 80 },
{ id: `4`, value: 70 },
]

const sourceCollection = createCollection(
mockSyncCollectionOptions({
id: `sse-window-filter`,
getKey: (item: TestItem) => item.id,
initialData,
autoIndex: `eager`,
defaultIndexType: BTreeIndex,
}),
)

await sourceCollection.preload()

const liveQueryCollection = createLiveQueryCollection((q) =>
q
.from({ items: sourceCollection })
.orderBy(({ items }) => items.value, `desc`)
.limit(3)
.select(({ items }) => ({
id: items.id,
value: items.value,
})),
)

await liveQueryCollection.preload()

const initialResults = Array.from(liveQueryCollection.values())
expect(initialResults).toHaveLength(3)
expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`, `3`])

// SSE delivers an insert for an item that sorts BELOW the current window
// (value 10 is lower than all top-3 values: 100, 90, 80)
sourceCollection.utils.begin()
sourceCollection.utils.write({
type: `insert`,
value: { id: `out-of-window`, value: 10 },
})
sourceCollection.utils.commit()

// Window should be unchanged
let results = Array.from(liveQueryCollection.values())
expect(results).toHaveLength(3)
expect(results.map((r) => r.id)).toEqual([`1`, `2`, `3`])

// Now delete one of the top-3 items
sourceCollection.utils.begin()
sourceCollection.utils.write({
type: `delete`,
value: { id: `2`, value: 90 },
})
sourceCollection.utils.commit()

// Wait for loadNextItems to fetch the replacement
await new Promise((resolve) => setTimeout(resolve, 50))

Comment thread
coderabbitai[bot] marked this conversation as resolved.
results = Array.from(liveQueryCollection.values())
expect(results).toHaveLength(3)

// The replacement should be item 4 (value 70) — the next item from the
// BTree — NOT the out-of-window SSE insert (value 10).
expect(
results.map((r) => r.id),
`Expected item 4 (value 70) to replace deleted item 2, ` +
`not the out-of-window SSE insert (value 10). ` +
`Got: ${JSON.stringify(results.map((r) => ({ id: r.id, value: r.value })))}`,
).toEqual([`1`, `3`, `4`])
})
Comment on lines +13 to +89

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add explicit edge-case tests for limit/offset and async ordering.

This test reproduces the core bug path well, but it still misses dedicated corner-case coverage for limit/offset boundaries (for example limit: 0 and non-zero offset) and async event ordering/race scenarios, which are high-value for this fix area.

As per coding guidelines, “Test corner cases including: … async race conditions, and limit/offset edge cases”.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/tests/collection-subscriber-out-of-window-sse.test.ts` around
lines 13 - 86, Add explicit tests that cover limit/offset edge cases and async
event-ordering races: create new test cases using the same helpers
(createCollection, createLiveQueryCollection,
sourceCollection.utils.write/commit, liveQueryCollection.preload) to assert
behavior when limit is 0 and when offset is non-zero (verify window is empty or
starts at correct item), and add race-condition tests that fire concurrent SSE
inserts and deletes with controlled timing (use setTimeout/promises to vary
ordering) to ensure replacements come from the storage BTree (e.g., expect item
with value 70) not an out-of-window SSE insert; include clear assertions on
liveQueryCollection.values() after each sequence and small waits (e.g.,
setTimeout) to let loadNextItems run.

})