Skip to content

neo4j-graphrag-p1-integration #277

@kien-ship-it

Description

@kien-ship-it

Feature Packet — GraphRAG Integration Pipeline (Dev A)

Parent Spec: .kiro/specs/neo4j-graphrag/ (R5, R13, R14)
Phase: 1 — Parallel Foundation Work
Owner: Dev A (Integration Glue)
Parallel with: Dev B (Sidecar Endpoints: R3, R4), Dev C (Neo4j Direct Writer: R5, R6)


1. Feature Overview

Dev A wires the GraphRAG pipeline into the existing document ingestion flow at src/lib/tools/doc-ingestion/index.ts. The pipeline gains three modified/new steps — all gated by NEO4J_URI:

  • Step F (enhanced): BERT NER entity extraction with optional CLS embeddings when Neo4j is enabled
  • Step F2 (new): LLM-based relationship extraction via the sidecar's /extract-relationships endpoint
  • Step G (rewritten): Direct Neo4j write using Dev C's Neo4jDirectWriter — entities, relationships, mentions, and document graph written from in-memory data, NOT from Postgres kg_* tables

When NEO4J_URI is not set, the pipeline is completely unchanged — Steps F, F2, and G don't run, no graph data is stored anywhere. The existing Postgres kg_* path continues to work for users who haven't enabled Neo4j.


2. Shared Contracts

All contracts are already committed. Dev A codes against these — no new schemas needed.

Integration Boundaries

Dev A → Dev B (Sidecar):
  GET  /extract-entities?include_embeddings=true   → ExtractEntitiesEnhancedResponseSchema
  GET  /extract-entities                           → ExtractEntitiesResponseSchema
  POST /extract-relationships                      → ExtractRelationshipsResponseSchema

Dev A → Dev C (Neo4j Writer):
  Neo4jDirectWriter.writeEntities(entities, companyId)        → number
  Neo4jDirectWriter.writeRelationships(relationships, companyId) → string[]
  Neo4jDirectWriter.writeMentions(mentions, companyId)        → number
  Neo4jDirectWriter.writeDocumentGraph(doc, companyId)        → void
  Neo4jDirectWriter.ensureIndexes()                           → void

Schema Locations

Schema File
ExtractEntitiesResponseSchema __tests__/api/graphrag/contracts/graphrag-schemas.ts
ExtractEntitiesEnhancedResponseSchema __tests__/api/graphrag/contracts/graphrag-schemas.ts
ExtractRelationshipsResponseSchema __tests__/api/graphrag/contracts/graphrag-schemas.ts
Neo4jWriteResultSchema __tests__/api/graphrag/contracts/graphrag-schemas.ts
Neo4jDirectWriter interface src/lib/graph/neo4j-direct-writer.ts
Neo4jEntityInput, Neo4jRelationshipInput, Neo4jMentionInput src/lib/graph/neo4j-direct-writer.ts

Gating Logic

NEO4J_URI unset           → Steps F, F2, G do NOT run. Pipeline ends at Step E.
                            Existing Postgres kg_* path via entity-extraction.ts
                            does NOT run either — no graph data anywhere.

NEO4J_URI set             → Steps F, F2, G run.
  + SIDECAR_URL unset     → Step F skipped (no entity extraction possible).
  + SIDECAR_URL set       → Step F runs.
    + include_embeddings  → Entities include 768-dim CLS vectors.

  + EXTRACTION_LLM_BASE_URL unset → Step F2 skipped entirely (no error).
  + EXTRACTION_LLM_BASE_URL set   → Step F2 runs.

  + Neo4j unreachable     → Step G logs warning, pipeline continues.
                            Document still marked as ingested.

3. Workstream Assignment — Dev A: Integration Pipeline

Contract

Modify maybeExtractEntities() (Step F):

  • When NEO4J_URI is set: call /extract-entities?include_embeddings=true to get CLS embeddings alongside entity data
  • When NEO4J_URI is NOT set: skip entirely — do not call /extract-entities at all (the old Postgres kg_* path is legacy and not used when Neo4j is disabled)
  • Parse the response against ExtractEntitiesEnhancedResponseSchema (when Neo4j enabled) or ExtractEntitiesResponseSchema (when not)
  • Store the entity data in a format suitable for passing to Step G — this means transforming sidecar entities into Neo4jEntityInput[] and Neo4jMentionInput[] shapes
  • Also generate CO_OCCURS relationships between entities that appear in the same chunk, formatted as Neo4jRelationshipInput[]

Add maybeExtractRelationships() (Step F2):

  • Only runs when NEO4J_URI is set AND EXTRACTION_LLM_BASE_URL is set
  • Calls sidecar POST /extract-relationships with the same chunk texts used in Step F
  • Parse the response against ExtractRelationshipsResponseSchema
  • Transform the LLM-extracted relationships into Neo4jRelationshipInput[] format
  • If the sidecar call fails, log a warning and continue — Step G will write BERT entities + CO_OCCURS only
  • If the response contains zero relationships, that's fine — not an error

Rewrite maybeSyncToNeo4j() (Step G):

  • When NEO4J_URI is set: use Dev C's Neo4jDirectWriter to write all graph data directly to Neo4j
  • Call ensureIndexes() before any writes (idempotent, safe to call every time)
  • Call writeEntities() with the Neo4jEntityInput[] from Step F
  • Call writeRelationships() with the combined Neo4jRelationshipInput[] from Steps F and F2 (CO_OCCURS from F + typed relationships from F2)
  • Call writeMentions() with the Neo4jMentionInput[] from Step F
  • Call writeDocumentGraph() with document metadata and section IDs
  • After writes, trigger entity resolution (Dev C's implementation handles this)
  • Pass in-memory data from Steps F and F2 — do NOT read from Postgres kg_* tables
  • If Neo4j is unreachable, log a warning and continue — document is still marked as ingested
  • Return a result conforming to Neo4jWriteResultSchema

Backward compatibility:

  • When NEO4J_URI is NOT set, the pipeline skips Steps F, F2, and G entirely
  • The old neo4j-sync.ts module is no longer called from the pipeline (replaced by Neo4jDirectWriter), but the file stays in the repo
  • Postgres Steps A–E are completely unchanged regardless of NEO4J_URI

Constraints

  1. Inngest step isolation is non-negotiable. Steps F, F2, and G must be separate runStep() calls. If you combine them into a single step, a failure in relationship extraction (F2) would lose the entity extraction results (F) on retry. The pipeline already uses savePipelineState() / loadPipelineState() for inter-step data — use the same pattern.

  2. Dev B's sidecar endpoints may not be deployed yet. Your code must handle HTTP errors and empty responses from /extract-entities?include_embeddings=true and /extract-relationships gracefully. The existing health check pattern (fetch(sidecarUrl/health)) is already in maybeExtractEntities() — extend it.

  3. Dev C's Neo4jDirectWriter implementation may not be ready yet. The interface is committed at src/lib/graph/neo4j-direct-writer.ts. Code against the interface and test with mocks. When Dev C's implementation lands, it should be a drop-in.

  4. The entity-extraction.ts module currently stores entities in Postgres kg_* tables. When Neo4j is enabled, you need entity data in Neo4jEntityInput format instead. You'll need to decide whether to refactor entity-extraction.ts to return data without storing it, or build a parallel transformation path. The current module does sidecar call + Postgres upsert in one function — those concerns may need separating.

  5. Serialization size matters. Entity embeddings are 768 floats each. If a document has 500 entities, that's ~1.5MB of embedding data. The pipeline uses savePipelineState() which writes to ocrJobs.ocrResult (JSONB column). Verify this doesn't hit Postgres JSONB size limits or Inngest's ~4MB step output limit. The existing pipeline already handles this concern for page data and chunks — follow the same pattern.

Acceptance Criteria

  1. When NEO4J_URI is set and sidecar is healthy, Step F calls /extract-entities?include_embeddings=true and entities include 768-dim embeddings
  2. When NEO4J_URI is NOT set, Step F does not run at all (no sidecar call, no entity extraction)
  3. When NEO4J_URI is set and EXTRACTION_LLM_BASE_URL is set, Step F2 calls /extract-relationships and stores relationship data
  4. When EXTRACTION_LLM_BASE_URL is NOT set, Step F2 is skipped entirely (no error, no warning beyond debug log)
  5. When NEO4J_URI is set, Step G uses Neo4jDirectWriter to write entities, relationships, mentions, and document graph directly to Neo4j
  6. Step G does NOT read from Postgres kg_* tables — it uses in-memory data passed from Steps F and F2 via savePipelineState() / loadPipelineState()
  7. When NEO4J_URI is NOT set, the pipeline skips Steps F, F2, and G entirely — Postgres Steps A–E run unchanged
  8. When Step F2 fails (sidecar error), Step G still writes BERT entities + CO_OCCURS relationships to Neo4j
  9. When Neo4j is unreachable during Step G, the pipeline continues and the document is marked as successfully ingested
  10. Postgres Steps A–E are completely unchanged — no modifications to upload, OCR, chunking, embedding, or section storage
  11. The Neo4jWriteResult from Step G passes Neo4jWriteResultSchema.safeParse() validation

Design Challenges

1. Data Flow Between Inngest Steps

The current pipeline stores entity data in Postgres kg_* tables (via entity-extraction.ts) and then neo4j-sync.ts reads it back. The new architecture writes directly to Neo4j from in-memory data. But the pipeline uses Inngest steps which are independently retryable — if Step G fails and retries, the in-memory data from Step F is gone.

You have at least three options:

  • (a) Use savePipelineState() to serialize the Neo4jEntityInput[], Neo4jRelationshipInput[], and Neo4jMentionInput[] into the ocrJobs.ocrResult JSONB column. This is the existing pattern for pages and chunks. But entity embeddings are large (768 floats × N entities) — will this hit size limits?
  • (b) Keep writing to Postgres kg_* tables as a side effect and have Step G read back from them. This preserves retryability but defeats the "direct write" architecture — Neo4j is supposed to be the only graph store.
  • (c) Make Steps F+F2+G a single Inngest step. This eliminates the inter-step data problem but loses independent retryability — if the Neo4j write fails, you re-run entity extraction too.

What's the right trade-off? Consider: how large is the entity data for a typical document? How often does Step G actually fail? Is the complexity of (a) worth it for a failure mode that rarely happens?

2. Dual Path Elimination vs. Safety Net

When NEO4J_URI IS set, the new path extracts entities with embeddings and writes directly to Neo4j. The parent spec says Neo4j is the ONLY place graph data lives when enabled — kg_* tables are legacy.

But should you also write to Postgres kg_* tables as a fallback when Neo4j is enabled? Arguments:

  • No fallback (parent spec intent): Neo4j is the single source of truth for graph data. Writing to both creates data consistency headaches — which one is canonical? If Neo4j goes down mid-ingestion, the graph data for that document is simply missing until re-ingestion. This is acceptable because Postgres Steps A–E still succeed and the document is searchable via BM25+Vector.
  • Fallback to Postgres: If Neo4j goes down, you still have graph data in Postgres. But then you need reconciliation logic — when Neo4j comes back, do you sync from Postgres? This is the old neo4j-sync.ts pattern you're replacing.

The parent spec is clear (Neo4j-only when enabled), but you should understand why and be prepared to defend the decision. What's your strategy for a document that was ingested while Neo4j was temporarily down?


4. Testing Strategy

4A. Contract Compliance Tests (Already Committed)

The following contract tests already exist and validate schema shapes. Dev A's integration code must produce data that passes these validations:

Test File Validates
contract.extract-entities.test.ts ExtractEntitiesResponseSchema, ExtractEntitiesEnhancedResponseSchema
contract.extract-relationships.test.ts ExtractRelationshipsResponseSchema
contract.neo4j-write-result.test.ts Neo4jWriteResultSchema
contract.neo4j-node-shapes.test.ts Neo4jEntityNodeSchema, Neo4jSectionNodeSchema
contract.env-vars.test.ts Environment variable gating logic

4B. Integration Boundary Tests (Skeletons)

// __tests__/api/graphrag/integration.pipeline.test.ts

describe("Integration: Ingestion Pipeline GraphRAG Steps", () => {
  // ── Step F: Entity Extraction Routing ──────────────────────
  it("Step F calls /extract-entities?include_embeddings=true when NEO4J_URI is set");
  it("Step F calls /extract-entities without include_embeddings param when NEO4J_URI is not set");
  it("Step F transforms sidecar response into Neo4jEntityInput[] shape");
  it("Step F generates CO_OCCURS relationships for entities in the same chunk");
  it("Step F persists entity data via savePipelineState() for Step G consumption");

  // ── Step F2: Relationship Extraction ───────────────────────
  it("Step F2 calls POST /extract-relationships when NEO4J_URI and EXTRACTION_LLM_BASE_URL are set");
  it("Step F2 is skipped entirely when EXTRACTION_LLM_BASE_URL is not set");
  it("Step F2 transforms sidecar response into Neo4jRelationshipInput[] shape");
  it("Step F2 persists relationship data via savePipelineState() for Step G consumption");
  it("Step F2 failure does not prevent Step G from running");

  // ── Step G: Neo4j Direct Write ─────────────────────────────
  it("Step G calls Neo4jDirectWriter.ensureIndexes() before writes");
  it("Step G calls Neo4jDirectWriter.writeEntities() with data from Step F");
  it("Step G calls Neo4jDirectWriter.writeRelationships() with combined data from Steps F and F2");
  it("Step G calls Neo4jDirectWriter.writeMentions() with mention data from Step F");
  it("Step G calls Neo4jDirectWriter.writeDocumentGraph() with document metadata");
  it("Step G does NOT import or call neo4j-sync.ts");
  it("Step G does NOT read from Postgres kg_* tables");
  it("Step G returns a result that passes Neo4jWriteResultSchema.safeParse()");

  // ── Gating & Backward Compatibility ────────────────────────
  it("When NEO4J_URI is not set, Steps F, F2, and G do not run");
  it("Postgres Steps A-E produce identical output regardless of NEO4J_URI setting");
  it("Neo4j unreachable during Step G — pipeline continues, document marked as ingested");

  // ── Failure Isolation ──────────────────────────────────────
  it("Step F2 failure → Step G still writes BERT entities + CO_OCCURS to Neo4j");
  it("Step F failure → Step F2 still attempts (independent steps)");
  it("Step G failure → document still marked as successfully ingested");
});

4C. Dev-Proposed Edge Case Tests

Think about these — propose your own test cases in your Design Brief:

  • What happens when the sidecar returns entities but /extract-relationships returns empty results (LLM unconfigured or no relationships found)? Does Step G correctly write BERT-only entities with CO_OCCURS?
  • What happens when Step F succeeds but Step G fails on first attempt and retries? Is the intermediate data (entities, relationships, mentions) available for the retry via loadPipelineState()?
  • What happens when a document has zero entities extracted? Does the pipeline skip Steps F2 and G gracefully, or does it attempt to write empty arrays to Neo4j?
  • What happens when the sidecar returns entities with embeddings that are NOT exactly 768 dimensions? Does the schema validation catch this before it reaches Neo4j?
  • What happens when two documents are being ingested concurrently and both write to Neo4j? Are the MERGE queries truly idempotent, or can you get race conditions on entity creation?

Quick Reference

Item Location
Ingestion pipeline src/lib/tools/doc-ingestion/index.ts
Pipeline types src/lib/tools/doc-ingestion/types.ts
Current entity extraction src/lib/ingestion/entity-extraction.ts
Current Neo4j sync (old, being replaced) src/lib/graph/neo4j-sync.ts
Neo4j client (health check, driver) src/lib/graph/neo4j-client.ts
Direct writer interface (Dev C implements) src/lib/graph/neo4j-direct-writer.ts
Embedding provider src/lib/embeddings/provider.ts
Shared Zod schemas __tests__/api/graphrag/contracts/graphrag-schemas.ts
Env var validation src/env.ts
Parent requirements .kiro/specs/neo4j-graphrag/requirements.md (R5, R13, R14)
Parent design .kiro/specs/neo4j-graphrag/design.md (Ingestion Pipeline Contract, Failure Isolation)

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions