Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/quick-tables-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@tanstack/electric-db-collection': patch
---

Fix duplicate key error when overlapping subset queries return the same row with different values.

When multiple subset queries return the same row (e.g., different WHERE clauses that both match the same record), the server sends `insert` operations for each response. If the row's data changed between requests (e.g., timestamp field updated), this caused a `DuplicateKeySyncError`. The adapter now tracks synced keys and converts subsequent inserts to updates.
30 changes: 28 additions & 2 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,12 @@ function createElectricSync<T extends Row<unknown>>(
syncMode === `progressive` && !hasReceivedUpToDate
const bufferedMessages: Array<Message<T>> = [] // Buffer change messages during initial sync

// Track keys that have been synced to handle overlapping subset queries.
// When multiple subset queries return the same row, the server sends `insert`
// for each response. We convert subsequent inserts to updates to avoid
// duplicate key errors when the row's data has changed between requests.
const syncedKeys = new Set<string | number>()

/**
* Process a change message: handle tags and write the mutation
*/
Expand All @@ -1249,14 +1255,28 @@ function createElectricSync<T extends Row<unknown>>(
const rowId = collection.getKeyFromItem(changeMessage.value)
const operation = changeMessage.headers.operation

if (operation === `delete`) {
// Track synced keys and handle overlapping subset queries.
// When multiple subset queries return the same row, the server sends
// `insert` for each response. We convert subsequent inserts to updates
// to avoid duplicate key errors when the row's data has changed.
const isDelete = operation === `delete`
const isDuplicateInsert =
operation === `insert` && syncedKeys.has(rowId)

if (isDelete) {
syncedKeys.delete(rowId)
} else {
syncedKeys.add(rowId)
}

if (isDelete) {
clearTagsForRow(rowId)
} else if (hasTags) {
processTagsForChangeMessage(tags, removedTags, rowId)
}

write({
type: changeMessage.headers.operation,
type: isDuplicateInsert ? `update` : operation,
value: changeMessage.value,
// Include the primary key and relation info in the metadata
metadata: {
Expand Down Expand Up @@ -1392,6 +1412,9 @@ function createElectricSync<T extends Row<unknown>>(
// Clear tag tracking state
clearTagTrackingState()

// Clear synced keys tracking since we're starting fresh
syncedKeys.clear()

// Reset the loadSubset deduplication state since we're starting fresh
// This ensures that previously loaded predicates don't prevent refetching after truncate
loadSubsetDedupe?.reset()
Expand Down Expand Up @@ -1419,6 +1442,9 @@ function createElectricSync<T extends Row<unknown>>(
// Clear tag tracking state for atomic swap
clearTagTrackingState()

// Clear synced keys tracking for atomic swap
syncedKeys.clear()

// Apply all buffered change messages and extract txids/snapshots
for (const bufferedMsg of bufferedMessages) {
if (isChangeMessage(bufferedMsg)) {
Expand Down
Loading