Skip to content

neo4j-graphrag-p1-writer #279

@kien-ship-it

Description

@kien-ship-it

Feature Packet — Neo4j Direct Write Pipeline + Entity Resolution

Parent Spec: .kiro/specs/neo4j-graphrag/
Workstream Owner: Dev C
Scope: R5 (Direct Neo4j Write Pipeline) + R6 (Vector Index + Entity Resolution)
Integration Surface: TypeScript + Cypher — writes directly to Neo4j, no Postgres intermediary for graph data


1. Feature Overview

The GraphRAG system needs a way to write extracted entities, relationships, and document structure directly into Neo4j as a graph. Today, neo4j-sync.ts reads from Postgres kg_* tables and mirrors them into Neo4j — a two-hop pattern that adds latency and couples graph writes to the relational schema. Dev C's workstream replaces this pattern with a direct write pipeline: when Dev B's sidecar extracts entities and relationships from a document, Dev C's code writes them straight to Neo4j.

This workstream also creates the vector index on entity embeddings and implements entity resolution — deduplicating entities across documents using cosine similarity. When "Microsoft" appears in Document A and "Microsoft Corp" appears in Document B, entity resolution detects the near-duplicate via embedding similarity and merges them, creating an ALIAS_OF relationship for provenance.

The Neo4jDirectWriter interface is already defined and committed (types only). Your job is to implement it.


2. Shared Contracts (Reference — Do Not Duplicate)

All shared schemas live in __tests__/api/graphrag/contracts/graphrag-schemas.ts (117 passing tests). The interface you're implementing is defined in src/lib/graph/neo4j-direct-writer.ts.

Interface you implement:

  • Neo4jDirectWriterwriteEntities(), writeRelationships(), writeMentions(), writeDocumentGraph(), ensureIndexes()

Input types (already defined in neo4j-direct-writer.ts):

  • Neo4jEntityInput — entity with name, displayName, label, confidence, mentionCount, companyId, optional embedding
  • Neo4jRelationshipInput — relationship with sourceName, sourceLabel, targetName, targetLabel, relationType (SCREAMING_SNAKE_CASE), weight, evidenceCount, optional detail, documentId, companyId
  • Neo4jMentionInput — mention linking entity to section
  • Neo4jDocumentGraphInput — document with sections and optional topics
  • Neo4jWriteResult — combined result shape

Zod schemas your output must satisfy:

  • Neo4jEntityNodeSchema — entity node shape in Neo4j
  • Neo4jSectionNodeSchema — section node shape
  • Neo4jDocumentNodeSchema — document node shape
  • Neo4jWriteResultSchema — write result shape ({ entities, mentions, relationships, dynamicRelTypes, durationMs })
  • Neo4jDynamicRelPropertiesSchema — relationship properties ({ weight, evidenceCount, detail, documentId })

Read the actual Zod definitions and the interface file before you start. The contract tests are your ground truth.


3. Workstream Assignment — Dev C

Contract (R5 — Direct Neo4j Write Pipeline)

Implement the Neo4jDirectWriter interface. Create your implementation file (e.g., src/lib/graph/neo4j-direct-writer-impl.ts — you choose the file name and internal structure).

writeEntities(entities, companyId) → Promise<number>

  • MERGE Entity nodes into Neo4j with all properties from Neo4jEntityInput
  • MERGE key: (name, label, companyId) — this triple uniquely identifies an entity
  • On create: set displayName, confidence, mentionCount, embedding
  • On match: update confidence, mentionCount; update embedding only when the new value is non-null (don't overwrite an existing embedding with null)
  • Return the count of entities processed

writeRelationships(relationships, companyId) → Promise<string[]>

  • MERGE relationships using DYNAMIC Cypher relationship types — the actual relationType string becomes the Cypher type (e.g., MERGE (a)-[r:CEO_OF]->(b))
  • Do NOT use a generic :RELATES_TO with a type property
  • Match source and target entities by (name, label, companyId)
  • Set relationship properties: weight, evidenceCount, detail, documentId
  • Return an array of distinct relationship type strings that were written

writeMentions(mentions, companyId) → Promise<number>

  • MERGE Section nodes with properties id (sectionId) and documentId
  • MERGE MENTIONED_IN edges from Entity to Section with confidence property
  • Return the count of mentions processed

writeDocumentGraph(doc, companyId) → Promise<void>

  • MERGE a Document node with properties: id, name, companyId, uploadedAt
  • MERGE CONTAINS edges from Document to each Section (by sectionId + documentId)
  • Topics are optional (R9 scope). If doc.topics is provided, create Topic nodes and DISCUSSES edges from Section to Topic. If not provided, skip — don't error.

ensureIndexes() → Promise<void>

  • Create vector index entity-embeddings on Entity.embedding (768-dim, cosine similarity)
  • Use CREATE VECTOR INDEX ... IF NOT EXISTS
  • Non-fatal: if index creation fails (e.g., Neo4j Community without required plugin), log a warning and return — don't throw

All methods:

  • Use getNeo4jSession() from src/lib/graph/neo4j-client.ts to get a session
  • Close the session in a finally block after each operation
  • All writes are scoped by companyId
  • If Neo4j is unreachable during a write, catch the error, log a warning with console.warn(), and return gracefully (return 0, [], or undefined as appropriate — don't throw)

Contract (R6 — Vector Index + Entity Resolution)

After writing entities with embeddings, check for near-duplicates using the vector index.

Resolution query:

CALL db.index.vector.queryNodes('entity-embeddings', $embedding, $topK)
  • Scope results to the same companyId
  • topK: a reasonable value (you decide — consider that most entities won't have many near-duplicates)

Threshold:

  • Configurable via ENTITY_RESOLUTION_THRESHOLD env var (default: 0.85)
  • Similarity score from the vector query is cosine similarity (0 to 1)

Merge logic:

  • When two entities within the same companyId exceed the threshold: merge into a canonical entity
  • Canonical selection: the entity with higher mentionCount is canonical. If tied, the entity that was written first (already existed in Neo4j) is canonical.
  • Create an ALIAS_OF relationship from the alias node to the canonical node
  • The alias node stays in the graph (for provenance) — don't delete it

Graceful degradation:

  • If the vector index doesn't exist, skip resolution with a console.warn() — don't error
  • If the vector query fails for any reason, skip resolution with a warning log

Constraints

  • Interface is frozen. The Neo4jDirectWriter interface in neo4j-direct-writer.ts is already committed and other workstreams depend on it. Implement it — don't change the method signatures.

  • Dynamic relationship types in Cypher. Neo4j doesn't support parameterized relationship types. You can't write MERGE (a)-[r:$type]->(b) — the $type part isn't a valid parameter. Look at how neo4j-sync.ts solves this (lines ~200-230: it groups relationships by type and runs one MERGE query per type, interpolating the type into the Cypher string). This is the known pattern in this codebase. If you choose a different approach, document why.

  • MERGE key precision matters. The parent design specifies Entity MERGE key as (name, label, companyId). Over-specifying (e.g., including confidence) creates duplicates when confidence changes. Under-specifying (e.g., just name) merges entities that shouldn't be merged (e.g., "Apple" the company vs "Apple" the fruit, if they had different labels).

  • Session lifecycle. getNeo4jSession() creates a new session each time. You must close it when done. The pattern in neo4j-sync.ts uses try/finally with await session?.close(). Follow this pattern.

  • neo4j-driver is already installed. Import Session type from neo4j-driver. Use session.run(cypher, params) for queries. The driver handles connection pooling internally.

  • Entity resolution runs AFTER writes. You need entities to exist in Neo4j with embeddings before you can query the vector index. Don't try to resolve during the write — resolve after.

  • New env var needed. ENTITY_RESOLUTION_THRESHOLD is not yet in src/env.ts. Add it following the existing pattern. Look at how EMBEDDING_DIMENSIONS is defined — it uses z.coerce.number() with .optional(). Your threshold is a float between 0 and 1. Also add it to parseServerEnv() and to .env.example.

  • Don't modify neo4j-sync.ts. Your code replaces the pattern, not the file. The old sync path stays for backward compatibility until the migration is complete.

Acceptance Criteria (R5)

  1. Given an array of Neo4jEntityInput objects, writeEntities() creates Entity nodes in Neo4j with all required properties: name, displayName, label, confidence, mentionCount, companyId, and embedding when present.
  2. Calling writeEntities() twice with the same input data produces the same graph state — no duplicate Entity nodes are created (idempotent via MERGE on (name, label, companyId)).
  3. writeRelationships() creates relationships using the actual relationType string as the Cypher relationship type (e.g., :CEO_OF, :ACQUIRED). No relationships of type :RELATES_TO exist in the graph after a write.
  4. writeRelationships() returns an array of distinct relationship type strings that were written (e.g., ["CEO_OF", "ACQUIRED", "HEADQUARTERED_IN"]).
  5. writeMentions() creates Section nodes (with id and documentId properties) and MENTIONED_IN edges from Entity to Section (with confidence property).
  6. writeDocumentGraph() creates a Document node (with id, name, companyId, uploadedAt) and CONTAINS edges to Section nodes.
  7. ensureIndexes() creates the entity-embeddings vector index (768-dim, cosine). Running it twice does not produce an error.
  8. When Neo4j is unreachable, all write methods log a warning via console.warn() and return gracefully — writeEntities() returns 0, writeRelationships() returns [], writeMentions() returns 0, writeDocumentGraph() returns without throwing, ensureIndexes() returns without throwing.
  9. The Neo4jWriteResult object (assembled by the caller from the return values of all write methods) passes Neo4jWriteResultSchema.safeParse().

Acceptance Criteria (R6)

  1. After writing entities with embeddings, entity resolution finds entities within the same companyId whose embedding cosine similarity exceeds the configured threshold.
  2. When two entities are resolved as duplicates, the canonical entity (higher mentionCount) keeps its node. The alias entity gets an ALIAS_OF relationship pointing to the canonical.
  3. The ENTITY_RESOLUTION_THRESHOLD env var controls the similarity threshold. Default is 0.85 when the env var is not set.
  4. When the entity-embeddings vector index doesn't exist, entity resolution is skipped entirely with a console.warn() — no error thrown.
  5. db.index.vector.queryNodes('entity-embeddings', $embedding, $topK) returns results when the index exists and entities have embeddings stored.

Design Challenges

These are real decisions you need to make. There isn't one right answer — each approach has trade-offs. Pick one, implement it, and be ready to explain why.

Challenge 1: Dynamic Cypher Relationship Types

Neo4j doesn't support parameterized relationship types. MERGE (a)-[r:$type]->(b) is invalid Cypher — the relationship type must be a literal in the query string.

You have at least three approaches:

  • (a) Group-by-type, one query per type — This is what neo4j-sync.ts does today. Group all relationships by relationType, then for each type, run a separate UNWIND ... MERGE query with the type interpolated into the Cypher string. Simple and proven, but means N round-trips to Neo4j for N distinct relationship types. For a document with 15 different relationship types, that's 15 queries.

  • (b) APOC apoc.merge.relationship() — The APOC library provides apoc.merge.relationship(startNode, relType, props, endNode) which accepts the relationship type as a parameter. This lets you write all relationships in a single query regardless of type count. But it adds a dependency on APOC being installed in the Neo4j instance, and the MERGE semantics are slightly different from native Cypher MERGE.

  • (c) Dynamic Cypher string construction — Build the Cypher query string dynamically, interpolating the relationship type. This is essentially what option (a) does, but you could batch more aggressively (e.g., one large query with multiple MERGE clauses). The risk is Cypher injection if the type string isn't sanitized — the relationType comes from LLM extraction and should be SCREAMING_SNAKE_CASE, but you'd want to validate that.

Think about: What's the realistic number of distinct relationship types per document? (Hint: the LLM typically extracts 5-20 types per document.) Is the round-trip overhead of option (a) actually a problem at that scale? What happens if APOC isn't installed? How do you sanitize dynamic Cypher safely?

Challenge 2: Entity Resolution Timing and Batching

Entity resolution needs entities to exist in Neo4j with embeddings before the vector index can find them. But when and how often should resolution run?

  • (a) Per-entity, inline — After writing each entity, immediately query the vector index for near-duplicates. Most accurate (catches duplicates as soon as they appear), but means one vector query per entity. For a document with 50 entities, that's 50 vector queries during ingestion.

  • (b) Per-document, batch — Write all entities for a document first, then run a single resolution pass over all newly-written entities. One pass per document, but you might miss cross-document duplicates if Document B's entities haven't been written yet when Document A's resolution runs.

  • (c) Deferred/scheduled — Don't resolve during ingestion at all. Run entity resolution as a separate process (e.g., a periodic job or a post-ingestion step) that scans all entities. Keeps ingestion fast, but duplicates exist in the graph until the next resolution run.

Think about: What happens when Document A writes "Microsoft" and Document B (ingested 5 minutes later) writes "Microsoft Corp"? In option (b), Document A's resolution pass won't see "Microsoft Corp" because it doesn't exist yet. In option (c), both exist as separate nodes until the scheduled job runs. Does this matter for query quality in the interim? What's the performance impact of option (a) on ingestion latency? Is there a hybrid approach?


4. Testing Strategy

4A. Contract Compliance Tests (Already Committed — Do Not Modify)

These TypeScript tests validate that your Neo4j write results match the shared Zod schemas. They already exist and pass:

Test File What It Validates
__tests__/api/graphrag/contract.neo4j-write-result.test.ts Neo4jWriteResultSchema — write result shape with entities, mentions, relationships, dynamicRelTypes, durationMs
__tests__/api/graphrag/contract.neo4j-node-shapes.test.ts Neo4jEntityNodeSchema, Neo4jSectionNodeSchema, Neo4jDocumentNodeSchema, Neo4jTopicNodeSchema — node property shapes

Your implementation must produce data that conforms to these schemas. The contract tests validate the schema definitions — your job is to produce Neo4j nodes and results that match.

4B. Integration Boundary Tests (Skeletons — You Implement)

These test the actual Neo4j writes. They require a running Neo4j instance. Use the Docker setup from R1: docker compose --profile dev up → Neo4j at bolt://localhost:7687.

__tests__/api/graphrag/integration.neo4j-writer.test.ts

/**
 * Integration tests for Neo4jDirectWriter implementation.
 * Requires a running Neo4j instance (docker compose --profile dev up).
 *
 * Setup: connect to test Neo4j, clear test data before each test.
 * Use a unique companyId per test to avoid cross-test contamination.
 */
import type { Neo4jDirectWriter, Neo4jEntityInput, Neo4jRelationshipInput, Neo4jMentionInput, Neo4jDocumentGraphInput } from "~/lib/graph/neo4j-direct-writer";

describe("Integration: Neo4jDirectWriter", () => {
  // Setup: instantiate your implementation, connect to Neo4j
  // Teardown: clear test nodes after each test (MATCH (n) WHERE n.companyId = $testCompanyId DETACH DELETE n)

  it("writeEntities creates Entity nodes with all required properties", async () => {
    // Write entities, then query Neo4j to verify node properties match Neo4jEntityNodeSchema
  });

  it("writeEntities is idempotent — calling twice produces same node count", async () => {
    // Write same entities twice, count Entity nodes — should be same as writing once
  });

  it("writeRelationships creates dynamic Cypher types, not RELATES_TO", async () => {
    // Write relationships with type "CEO_OF", query Neo4j for :CEO_OF relationships
    // Also verify no :RELATES_TO relationships exist
  });

  it("writeRelationships returns distinct type strings", async () => {
    // Write relationships with types ["CEO_OF", "ACQUIRED", "CEO_OF"]
    // Expect return value to be ["CEO_OF", "ACQUIRED"] (distinct, order doesn't matter)
  });

  it("writeMentions creates Section nodes and MENTIONED_IN edges", async () => {
    // Write mentions, query Neo4j for Section nodes and MENTIONED_IN edges
  });

  it("writeDocumentGraph creates Document node with CONTAINS edges", async () => {
    // Write document graph, query Neo4j for Document node and CONTAINS edges to Sections
  });

  it("ensureIndexes creates entity-embeddings vector index", async () => {
    // Call ensureIndexes(), then SHOW INDEXES to verify entity-embeddings exists
  });

  it("ensureIndexes is idempotent — calling twice does not error", async () => {
    // Call ensureIndexes() twice — second call should not throw
  });

  it("all methods return gracefully when Neo4j is unreachable", async () => {
    // Use a writer configured with a bad Neo4j URI
    // Verify writeEntities returns 0, writeRelationships returns [], etc.
    // Verify no exceptions are thrown
  });
});

__tests__/api/graphrag/integration.entity-resolution.test.ts

/**
 * Integration tests for entity resolution via vector index.
 * Requires a running Neo4j instance with vector index support.
 */

describe("Integration: Entity Resolution", () => {
  // Setup: create entity-embeddings vector index, write test entities with embeddings

  it("merges entities with cosine similarity above threshold", async () => {
    // Write two entities with very similar embeddings (e.g., cosine sim > 0.95)
    // Run entity resolution
    // Verify one ALIAS_OF relationship exists
  });

  it("creates ALIAS_OF relationship from alias to canonical", async () => {
    // Write entity A (mentionCount: 5) and entity B (mentionCount: 2) with similar embeddings
    // Run resolution — B should get ALIAS_OF pointing to A (A is canonical)
  });

  it("canonical entity has higher mentionCount", async () => {
    // Write entities with different mentionCounts and similar embeddings
    // Verify the one with higher mentionCount is the canonical (no ALIAS_OF from it)
  });

  it("skips resolution when vector index doesn't exist", async () => {
    // Drop the vector index, run resolution
    // Verify no error thrown, warning logged
  });

  it("respects ENTITY_RESOLUTION_THRESHOLD env var", async () => {
    // Set threshold to 0.99, write entities with 0.90 similarity
    // Verify no ALIAS_OF created (below threshold)
    // Set threshold to 0.80, run again — ALIAS_OF should be created
  });
});

4C. Dev-Proposed Edge Case Tests

Propose 2-3 additional test cases in your Design Brief. Think about:

  • Identical embeddings (similarity = 1.0): What happens when two entities have the exact same embedding but different names? (e.g., "Microsoft" and "MSFT" both embedded identically.) Does your resolution handle the self-match case? (The vector query will return the entity itself as the top match — you need to exclude it.)

  • Special characters in relationship types: What happens when writeRelationships() receives a relationType that doesn't match SCREAMING_SNAKE_CASE? (e.g., "ceo of", "CEO-OF", "123_INVALID".) The LLM extraction contract says types must match ^[A-Z][A-Z0-9_]*$, but what if invalid data slips through? Do you validate and reject, sanitize, or pass through and let Cypher fail?

  • Missing vs null vs undefined embeddings: What happens when writeEntities() receives entities where some have embedding: [...], some have embedding: undefined, and some have no embedding property at all? Does the MERGE query handle all three cases correctly? Does entity resolution correctly skip entities without embeddings?


Quick Reference

Item Location
Interface (types only) src/lib/graph/neo4j-direct-writer.ts
Old sync pattern (reference) src/lib/graph/neo4j-sync.ts
Neo4j client src/lib/graph/neo4j-client.ts
Embedding provider src/lib/embeddings/provider.ts
Shared Zod schemas __tests__/api/graphrag/contracts/graphrag-schemas.ts
Contract tests __tests__/api/graphrag/contract.neo4j-write-result.test.ts, contract.neo4j-node-shapes.test.ts
Env vars src/env.ts — add ENTITY_RESOLUTION_THRESHOLD (follow EMBEDDING_DIMENSIONS pattern)
Env example .env.example — add ENTITY_RESOLUTION_THRESHOLD=
Docker Neo4j docker compose --profile dev upbolt://localhost:7687
Parent requirements .kiro/specs/neo4j-graphrag/requirements.md (R5, R6)
Parent design .kiro/specs/neo4j-graphrag/design.md (Neo4j Graph Schema, Direct Writer Interface)

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions