Feature Packet — GraphRAG Integration Pipeline (Dev A)
Parent Spec: .kiro/specs/neo4j-graphrag/ (R5, R13, R14)
Phase: 1 — Parallel Foundation Work
Owner: Dev A (Integration Glue)
Parallel with: Dev B (Sidecar Endpoints: R3, R4), Dev C (Neo4j Direct Writer: R5, R6)
1. Feature Overview
Dev A wires the GraphRAG pipeline into the existing document ingestion flow at src/lib/tools/doc-ingestion/index.ts. The pipeline gains three modified/new steps — all gated by NEO4J_URI:
- Step F (enhanced): BERT NER entity extraction with optional CLS embeddings when Neo4j is enabled
- Step F2 (new): LLM-based relationship extraction via the sidecar's
/extract-relationships endpoint
- Step G (rewritten): Direct Neo4j write using Dev C's
Neo4jDirectWriter — entities, relationships, mentions, and document graph written from in-memory data, NOT from Postgres kg_* tables
When NEO4J_URI is not set, the pipeline is completely unchanged — Steps F, F2, and G don't run, no graph data is stored anywhere. The existing Postgres kg_* path continues to work for users who haven't enabled Neo4j.
2. Shared Contracts
All contracts are already committed. Dev A codes against these — no new schemas needed.
Integration Boundaries
Dev A → Dev B (Sidecar):
GET /extract-entities?include_embeddings=true → ExtractEntitiesEnhancedResponseSchema
GET /extract-entities → ExtractEntitiesResponseSchema
POST /extract-relationships → ExtractRelationshipsResponseSchema
Dev A → Dev C (Neo4j Writer):
Neo4jDirectWriter.writeEntities(entities, companyId) → number
Neo4jDirectWriter.writeRelationships(relationships, companyId) → string[]
Neo4jDirectWriter.writeMentions(mentions, companyId) → number
Neo4jDirectWriter.writeDocumentGraph(doc, companyId) → void
Neo4jDirectWriter.ensureIndexes() → void
Schema Locations
| Schema |
File |
ExtractEntitiesResponseSchema |
__tests__/api/graphrag/contracts/graphrag-schemas.ts |
ExtractEntitiesEnhancedResponseSchema |
__tests__/api/graphrag/contracts/graphrag-schemas.ts |
ExtractRelationshipsResponseSchema |
__tests__/api/graphrag/contracts/graphrag-schemas.ts |
Neo4jWriteResultSchema |
__tests__/api/graphrag/contracts/graphrag-schemas.ts |
Neo4jDirectWriter interface |
src/lib/graph/neo4j-direct-writer.ts |
Neo4jEntityInput, Neo4jRelationshipInput, Neo4jMentionInput |
src/lib/graph/neo4j-direct-writer.ts |
Gating Logic
NEO4J_URI unset → Steps F, F2, G do NOT run. Pipeline ends at Step E.
Existing Postgres kg_* path via entity-extraction.ts
does NOT run either — no graph data anywhere.
NEO4J_URI set → Steps F, F2, G run.
+ SIDECAR_URL unset → Step F skipped (no entity extraction possible).
+ SIDECAR_URL set → Step F runs.
+ include_embeddings → Entities include 768-dim CLS vectors.
+ EXTRACTION_LLM_BASE_URL unset → Step F2 skipped entirely (no error).
+ EXTRACTION_LLM_BASE_URL set → Step F2 runs.
+ Neo4j unreachable → Step G logs warning, pipeline continues.
Document still marked as ingested.
3. Workstream Assignment — Dev A: Integration Pipeline
Contract
Modify maybeExtractEntities() (Step F):
- When
NEO4J_URI is set: call /extract-entities?include_embeddings=true to get CLS embeddings alongside entity data
- When
NEO4J_URI is NOT set: skip entirely — do not call /extract-entities at all (the old Postgres kg_* path is legacy and not used when Neo4j is disabled)
- Parse the response against
ExtractEntitiesEnhancedResponseSchema (when Neo4j enabled) or ExtractEntitiesResponseSchema (when not)
- Store the entity data in a format suitable for passing to Step G — this means transforming sidecar entities into
Neo4jEntityInput[] and Neo4jMentionInput[] shapes
- Also generate
CO_OCCURS relationships between entities that appear in the same chunk, formatted as Neo4jRelationshipInput[]
Add maybeExtractRelationships() (Step F2):
- Only runs when
NEO4J_URI is set AND EXTRACTION_LLM_BASE_URL is set
- Calls sidecar
POST /extract-relationships with the same chunk texts used in Step F
- Parse the response against
ExtractRelationshipsResponseSchema
- Transform the LLM-extracted relationships into
Neo4jRelationshipInput[] format
- If the sidecar call fails, log a warning and continue — Step G will write BERT entities + CO_OCCURS only
- If the response contains zero relationships, that's fine — not an error
Rewrite maybeSyncToNeo4j() (Step G):
- When
NEO4J_URI is set: use Dev C's Neo4jDirectWriter to write all graph data directly to Neo4j
- Call
ensureIndexes() before any writes (idempotent, safe to call every time)
- Call
writeEntities() with the Neo4jEntityInput[] from Step F
- Call
writeRelationships() with the combined Neo4jRelationshipInput[] from Steps F and F2 (CO_OCCURS from F + typed relationships from F2)
- Call
writeMentions() with the Neo4jMentionInput[] from Step F
- Call
writeDocumentGraph() with document metadata and section IDs
- After writes, trigger entity resolution (Dev C's implementation handles this)
- Pass in-memory data from Steps F and F2 — do NOT read from Postgres
kg_* tables
- If Neo4j is unreachable, log a warning and continue — document is still marked as ingested
- Return a result conforming to
Neo4jWriteResultSchema
Backward compatibility:
- When
NEO4J_URI is NOT set, the pipeline skips Steps F, F2, and G entirely
- The old
neo4j-sync.ts module is no longer called from the pipeline (replaced by Neo4jDirectWriter), but the file stays in the repo
- Postgres Steps A–E are completely unchanged regardless of
NEO4J_URI
Constraints
-
Inngest step isolation is non-negotiable. Steps F, F2, and G must be separate runStep() calls. If you combine them into a single step, a failure in relationship extraction (F2) would lose the entity extraction results (F) on retry. The pipeline already uses savePipelineState() / loadPipelineState() for inter-step data — use the same pattern.
-
Dev B's sidecar endpoints may not be deployed yet. Your code must handle HTTP errors and empty responses from /extract-entities?include_embeddings=true and /extract-relationships gracefully. The existing health check pattern (fetch(sidecarUrl/health)) is already in maybeExtractEntities() — extend it.
-
Dev C's Neo4jDirectWriter implementation may not be ready yet. The interface is committed at src/lib/graph/neo4j-direct-writer.ts. Code against the interface and test with mocks. When Dev C's implementation lands, it should be a drop-in.
-
The entity-extraction.ts module currently stores entities in Postgres kg_* tables. When Neo4j is enabled, you need entity data in Neo4jEntityInput format instead. You'll need to decide whether to refactor entity-extraction.ts to return data without storing it, or build a parallel transformation path. The current module does sidecar call + Postgres upsert in one function — those concerns may need separating.
-
Serialization size matters. Entity embeddings are 768 floats each. If a document has 500 entities, that's ~1.5MB of embedding data. The pipeline uses savePipelineState() which writes to ocrJobs.ocrResult (JSONB column). Verify this doesn't hit Postgres JSONB size limits or Inngest's ~4MB step output limit. The existing pipeline already handles this concern for page data and chunks — follow the same pattern.
Acceptance Criteria
- When
NEO4J_URI is set and sidecar is healthy, Step F calls /extract-entities?include_embeddings=true and entities include 768-dim embeddings
- When
NEO4J_URI is NOT set, Step F does not run at all (no sidecar call, no entity extraction)
- When
NEO4J_URI is set and EXTRACTION_LLM_BASE_URL is set, Step F2 calls /extract-relationships and stores relationship data
- When
EXTRACTION_LLM_BASE_URL is NOT set, Step F2 is skipped entirely (no error, no warning beyond debug log)
- When
NEO4J_URI is set, Step G uses Neo4jDirectWriter to write entities, relationships, mentions, and document graph directly to Neo4j
- Step G does NOT read from Postgres
kg_* tables — it uses in-memory data passed from Steps F and F2 via savePipelineState() / loadPipelineState()
- When
NEO4J_URI is NOT set, the pipeline skips Steps F, F2, and G entirely — Postgres Steps A–E run unchanged
- When Step F2 fails (sidecar error), Step G still writes BERT entities + CO_OCCURS relationships to Neo4j
- When Neo4j is unreachable during Step G, the pipeline continues and the document is marked as successfully ingested
- Postgres Steps A–E are completely unchanged — no modifications to upload, OCR, chunking, embedding, or section storage
- The
Neo4jWriteResult from Step G passes Neo4jWriteResultSchema.safeParse() validation
Design Challenges
1. Data Flow Between Inngest Steps
The current pipeline stores entity data in Postgres kg_* tables (via entity-extraction.ts) and then neo4j-sync.ts reads it back. The new architecture writes directly to Neo4j from in-memory data. But the pipeline uses Inngest steps which are independently retryable — if Step G fails and retries, the in-memory data from Step F is gone.
You have at least three options:
- (a) Use
savePipelineState() to serialize the Neo4jEntityInput[], Neo4jRelationshipInput[], and Neo4jMentionInput[] into the ocrJobs.ocrResult JSONB column. This is the existing pattern for pages and chunks. But entity embeddings are large (768 floats × N entities) — will this hit size limits?
- (b) Keep writing to Postgres
kg_* tables as a side effect and have Step G read back from them. This preserves retryability but defeats the "direct write" architecture — Neo4j is supposed to be the only graph store.
- (c) Make Steps F+F2+G a single Inngest step. This eliminates the inter-step data problem but loses independent retryability — if the Neo4j write fails, you re-run entity extraction too.
What's the right trade-off? Consider: how large is the entity data for a typical document? How often does Step G actually fail? Is the complexity of (a) worth it for a failure mode that rarely happens?
2. Dual Path Elimination vs. Safety Net
When NEO4J_URI IS set, the new path extracts entities with embeddings and writes directly to Neo4j. The parent spec says Neo4j is the ONLY place graph data lives when enabled — kg_* tables are legacy.
But should you also write to Postgres kg_* tables as a fallback when Neo4j is enabled? Arguments:
- No fallback (parent spec intent): Neo4j is the single source of truth for graph data. Writing to both creates data consistency headaches — which one is canonical? If Neo4j goes down mid-ingestion, the graph data for that document is simply missing until re-ingestion. This is acceptable because Postgres Steps A–E still succeed and the document is searchable via BM25+Vector.
- Fallback to Postgres: If Neo4j goes down, you still have graph data in Postgres. But then you need reconciliation logic — when Neo4j comes back, do you sync from Postgres? This is the old
neo4j-sync.ts pattern you're replacing.
The parent spec is clear (Neo4j-only when enabled), but you should understand why and be prepared to defend the decision. What's your strategy for a document that was ingested while Neo4j was temporarily down?
4. Testing Strategy
4A. Contract Compliance Tests (Already Committed)
The following contract tests already exist and validate schema shapes. Dev A's integration code must produce data that passes these validations:
| Test File |
Validates |
contract.extract-entities.test.ts |
ExtractEntitiesResponseSchema, ExtractEntitiesEnhancedResponseSchema |
contract.extract-relationships.test.ts |
ExtractRelationshipsResponseSchema |
contract.neo4j-write-result.test.ts |
Neo4jWriteResultSchema |
contract.neo4j-node-shapes.test.ts |
Neo4jEntityNodeSchema, Neo4jSectionNodeSchema |
contract.env-vars.test.ts |
Environment variable gating logic |
4B. Integration Boundary Tests (Skeletons)
// __tests__/api/graphrag/integration.pipeline.test.ts
describe("Integration: Ingestion Pipeline GraphRAG Steps", () => {
// ── Step F: Entity Extraction Routing ──────────────────────
it("Step F calls /extract-entities?include_embeddings=true when NEO4J_URI is set");
it("Step F calls /extract-entities without include_embeddings param when NEO4J_URI is not set");
it("Step F transforms sidecar response into Neo4jEntityInput[] shape");
it("Step F generates CO_OCCURS relationships for entities in the same chunk");
it("Step F persists entity data via savePipelineState() for Step G consumption");
// ── Step F2: Relationship Extraction ───────────────────────
it("Step F2 calls POST /extract-relationships when NEO4J_URI and EXTRACTION_LLM_BASE_URL are set");
it("Step F2 is skipped entirely when EXTRACTION_LLM_BASE_URL is not set");
it("Step F2 transforms sidecar response into Neo4jRelationshipInput[] shape");
it("Step F2 persists relationship data via savePipelineState() for Step G consumption");
it("Step F2 failure does not prevent Step G from running");
// ── Step G: Neo4j Direct Write ─────────────────────────────
it("Step G calls Neo4jDirectWriter.ensureIndexes() before writes");
it("Step G calls Neo4jDirectWriter.writeEntities() with data from Step F");
it("Step G calls Neo4jDirectWriter.writeRelationships() with combined data from Steps F and F2");
it("Step G calls Neo4jDirectWriter.writeMentions() with mention data from Step F");
it("Step G calls Neo4jDirectWriter.writeDocumentGraph() with document metadata");
it("Step G does NOT import or call neo4j-sync.ts");
it("Step G does NOT read from Postgres kg_* tables");
it("Step G returns a result that passes Neo4jWriteResultSchema.safeParse()");
// ── Gating & Backward Compatibility ────────────────────────
it("When NEO4J_URI is not set, Steps F, F2, and G do not run");
it("Postgres Steps A-E produce identical output regardless of NEO4J_URI setting");
it("Neo4j unreachable during Step G — pipeline continues, document marked as ingested");
// ── Failure Isolation ──────────────────────────────────────
it("Step F2 failure → Step G still writes BERT entities + CO_OCCURS to Neo4j");
it("Step F failure → Step F2 still attempts (independent steps)");
it("Step G failure → document still marked as successfully ingested");
});
4C. Dev-Proposed Edge Case Tests
Think about these — propose your own test cases in your Design Brief:
- What happens when the sidecar returns entities but
/extract-relationships returns empty results (LLM unconfigured or no relationships found)? Does Step G correctly write BERT-only entities with CO_OCCURS?
- What happens when Step F succeeds but Step G fails on first attempt and retries? Is the intermediate data (entities, relationships, mentions) available for the retry via
loadPipelineState()?
- What happens when a document has zero entities extracted? Does the pipeline skip Steps F2 and G gracefully, or does it attempt to write empty arrays to Neo4j?
- What happens when the sidecar returns entities with embeddings that are NOT exactly 768 dimensions? Does the schema validation catch this before it reaches Neo4j?
- What happens when two documents are being ingested concurrently and both write to Neo4j? Are the MERGE queries truly idempotent, or can you get race conditions on entity creation?
Quick Reference
| Item |
Location |
| Ingestion pipeline |
src/lib/tools/doc-ingestion/index.ts |
| Pipeline types |
src/lib/tools/doc-ingestion/types.ts |
| Current entity extraction |
src/lib/ingestion/entity-extraction.ts |
| Current Neo4j sync (old, being replaced) |
src/lib/graph/neo4j-sync.ts |
| Neo4j client (health check, driver) |
src/lib/graph/neo4j-client.ts |
| Direct writer interface (Dev C implements) |
src/lib/graph/neo4j-direct-writer.ts |
| Embedding provider |
src/lib/embeddings/provider.ts |
| Shared Zod schemas |
__tests__/api/graphrag/contracts/graphrag-schemas.ts |
| Env var validation |
src/env.ts |
| Parent requirements |
.kiro/specs/neo4j-graphrag/requirements.md (R5, R13, R14) |
| Parent design |
.kiro/specs/neo4j-graphrag/design.md (Ingestion Pipeline Contract, Failure Isolation) |
Feature Packet — GraphRAG Integration Pipeline (Dev A)
Parent Spec:
.kiro/specs/neo4j-graphrag/(R5, R13, R14)Phase: 1 — Parallel Foundation Work
Owner: Dev A (Integration Glue)
Parallel with: Dev B (Sidecar Endpoints: R3, R4), Dev C (Neo4j Direct Writer: R5, R6)
1. Feature Overview
Dev A wires the GraphRAG pipeline into the existing document ingestion flow at
src/lib/tools/doc-ingestion/index.ts. The pipeline gains three modified/new steps — all gated byNEO4J_URI:/extract-relationshipsendpointNeo4jDirectWriter— entities, relationships, mentions, and document graph written from in-memory data, NOT from Postgreskg_*tablesWhen
NEO4J_URIis not set, the pipeline is completely unchanged — Steps F, F2, and G don't run, no graph data is stored anywhere. The existing Postgreskg_*path continues to work for users who haven't enabled Neo4j.2. Shared Contracts
All contracts are already committed. Dev A codes against these — no new schemas needed.
Integration Boundaries
Schema Locations
ExtractEntitiesResponseSchema__tests__/api/graphrag/contracts/graphrag-schemas.tsExtractEntitiesEnhancedResponseSchema__tests__/api/graphrag/contracts/graphrag-schemas.tsExtractRelationshipsResponseSchema__tests__/api/graphrag/contracts/graphrag-schemas.tsNeo4jWriteResultSchema__tests__/api/graphrag/contracts/graphrag-schemas.tsNeo4jDirectWriterinterfacesrc/lib/graph/neo4j-direct-writer.tsNeo4jEntityInput,Neo4jRelationshipInput,Neo4jMentionInputsrc/lib/graph/neo4j-direct-writer.tsGating Logic
3. Workstream Assignment — Dev A: Integration Pipeline
Contract
Modify
maybeExtractEntities()(Step F):NEO4J_URIis set: call/extract-entities?include_embeddings=trueto get CLS embeddings alongside entity dataNEO4J_URIis NOT set: skip entirely — do not call/extract-entitiesat all (the old Postgreskg_*path is legacy and not used when Neo4j is disabled)ExtractEntitiesEnhancedResponseSchema(when Neo4j enabled) orExtractEntitiesResponseSchema(when not)Neo4jEntityInput[]andNeo4jMentionInput[]shapesCO_OCCURSrelationships between entities that appear in the same chunk, formatted asNeo4jRelationshipInput[]Add
maybeExtractRelationships()(Step F2):NEO4J_URIis set ANDEXTRACTION_LLM_BASE_URLis setPOST /extract-relationshipswith the same chunk texts used in Step FExtractRelationshipsResponseSchemaNeo4jRelationshipInput[]formatRewrite
maybeSyncToNeo4j()(Step G):NEO4J_URIis set: use Dev C'sNeo4jDirectWriterto write all graph data directly to Neo4jensureIndexes()before any writes (idempotent, safe to call every time)writeEntities()with theNeo4jEntityInput[]from Step FwriteRelationships()with the combinedNeo4jRelationshipInput[]from Steps F and F2 (CO_OCCURS from F + typed relationships from F2)writeMentions()with theNeo4jMentionInput[]from Step FwriteDocumentGraph()with document metadata and section IDskg_*tablesNeo4jWriteResultSchemaBackward compatibility:
NEO4J_URIis NOT set, the pipeline skips Steps F, F2, and G entirelyneo4j-sync.tsmodule is no longer called from the pipeline (replaced byNeo4jDirectWriter), but the file stays in the repoNEO4J_URIConstraints
Inngest step isolation is non-negotiable. Steps F, F2, and G must be separate
runStep()calls. If you combine them into a single step, a failure in relationship extraction (F2) would lose the entity extraction results (F) on retry. The pipeline already usessavePipelineState()/loadPipelineState()for inter-step data — use the same pattern.Dev B's sidecar endpoints may not be deployed yet. Your code must handle HTTP errors and empty responses from
/extract-entities?include_embeddings=trueand/extract-relationshipsgracefully. The existing health check pattern (fetch(sidecarUrl/health)) is already inmaybeExtractEntities()— extend it.Dev C's
Neo4jDirectWriterimplementation may not be ready yet. The interface is committed atsrc/lib/graph/neo4j-direct-writer.ts. Code against the interface and test with mocks. When Dev C's implementation lands, it should be a drop-in.The
entity-extraction.tsmodule currently stores entities in Postgreskg_*tables. When Neo4j is enabled, you need entity data inNeo4jEntityInputformat instead. You'll need to decide whether to refactorentity-extraction.tsto return data without storing it, or build a parallel transformation path. The current module does sidecar call + Postgres upsert in one function — those concerns may need separating.Serialization size matters. Entity embeddings are 768 floats each. If a document has 500 entities, that's ~1.5MB of embedding data. The pipeline uses
savePipelineState()which writes toocrJobs.ocrResult(JSONB column). Verify this doesn't hit Postgres JSONB size limits or Inngest's ~4MB step output limit. The existing pipeline already handles this concern for page data and chunks — follow the same pattern.Acceptance Criteria
NEO4J_URIis set and sidecar is healthy, Step F calls/extract-entities?include_embeddings=trueand entities include 768-dim embeddingsNEO4J_URIis NOT set, Step F does not run at all (no sidecar call, no entity extraction)NEO4J_URIis set andEXTRACTION_LLM_BASE_URLis set, Step F2 calls/extract-relationshipsand stores relationship dataEXTRACTION_LLM_BASE_URLis NOT set, Step F2 is skipped entirely (no error, no warning beyond debug log)NEO4J_URIis set, Step G usesNeo4jDirectWriterto write entities, relationships, mentions, and document graph directly to Neo4jkg_*tables — it uses in-memory data passed from Steps F and F2 viasavePipelineState()/loadPipelineState()NEO4J_URIis NOT set, the pipeline skips Steps F, F2, and G entirely — Postgres Steps A–E run unchangedNeo4jWriteResultfrom Step G passesNeo4jWriteResultSchema.safeParse()validationDesign Challenges
1. Data Flow Between Inngest Steps
The current pipeline stores entity data in Postgres
kg_*tables (viaentity-extraction.ts) and thenneo4j-sync.tsreads it back. The new architecture writes directly to Neo4j from in-memory data. But the pipeline uses Inngest steps which are independently retryable — if Step G fails and retries, the in-memory data from Step F is gone.You have at least three options:
savePipelineState()to serialize theNeo4jEntityInput[],Neo4jRelationshipInput[], andNeo4jMentionInput[]into theocrJobs.ocrResultJSONB column. This is the existing pattern for pages and chunks. But entity embeddings are large (768 floats × N entities) — will this hit size limits?kg_*tables as a side effect and have Step G read back from them. This preserves retryability but defeats the "direct write" architecture — Neo4j is supposed to be the only graph store.What's the right trade-off? Consider: how large is the entity data for a typical document? How often does Step G actually fail? Is the complexity of (a) worth it for a failure mode that rarely happens?
2. Dual Path Elimination vs. Safety Net
When
NEO4J_URIIS set, the new path extracts entities with embeddings and writes directly to Neo4j. The parent spec says Neo4j is the ONLY place graph data lives when enabled —kg_*tables are legacy.But should you also write to Postgres
kg_*tables as a fallback when Neo4j is enabled? Arguments:neo4j-sync.tspattern you're replacing.The parent spec is clear (Neo4j-only when enabled), but you should understand why and be prepared to defend the decision. What's your strategy for a document that was ingested while Neo4j was temporarily down?
4. Testing Strategy
4A. Contract Compliance Tests (Already Committed)
The following contract tests already exist and validate schema shapes. Dev A's integration code must produce data that passes these validations:
contract.extract-entities.test.tsExtractEntitiesResponseSchema,ExtractEntitiesEnhancedResponseSchemacontract.extract-relationships.test.tsExtractRelationshipsResponseSchemacontract.neo4j-write-result.test.tsNeo4jWriteResultSchemacontract.neo4j-node-shapes.test.tsNeo4jEntityNodeSchema,Neo4jSectionNodeSchemacontract.env-vars.test.ts4B. Integration Boundary Tests (Skeletons)
4C. Dev-Proposed Edge Case Tests
Think about these — propose your own test cases in your Design Brief:
/extract-relationshipsreturns empty results (LLM unconfigured or no relationships found)? Does Step G correctly write BERT-only entities with CO_OCCURS?loadPipelineState()?Quick Reference
src/lib/tools/doc-ingestion/index.tssrc/lib/tools/doc-ingestion/types.tssrc/lib/ingestion/entity-extraction.tssrc/lib/graph/neo4j-sync.tssrc/lib/graph/neo4j-client.tssrc/lib/graph/neo4j-direct-writer.tssrc/lib/embeddings/provider.ts__tests__/api/graphrag/contracts/graphrag-schemas.tssrc/env.ts.kiro/specs/neo4j-graphrag/requirements.md(R5, R13, R14).kiro/specs/neo4j-graphrag/design.md(Ingestion Pipeline Contract, Failure Isolation)