This project implements a scalable document processing pipeline using AWS S3, Apache Kafka (Confluent Cloud), MongoDB, and LlamaParse.
flowchart TB
subgraph Storage ["Data Storage"]
direction TB
S3[("AWS S3 Bucket")]
MDB[("MongoDB")]
end
subgraph Ingestion ["Data Ingestion & Chunking"]
direction LR
PY("Python Script")
LP["LlamaParse"]
PYP("Python Processing")
end
subgraph Confluent ["Confluent Cloud"]
direction TB
subgraph Topics ["Kafka Topics"]
direction LR
KR["raw Topic"]
KE["embedded_data Topic"]
end
KB(["Kafka Brokers"])
end
subgraph Processing ["Data Processing"]
direction LR
FLINK("Apache Flink Job")
EMB["Embedding Generation"]
end
subgraph Consumption ["Data Consumption"]
direction LR
CONS("Consumer Application")
AVRO["Avro Deserializer"]
end
S3 -- "Raw Files" --> PY
PY --> LP
LP --> PYP
PYP -- "Publish" --> KR
KR -- "Stream" --> FLINK
FLINK --> EMB
EMB -- "Transformed Data" --> KE
KB -. "Manage Messages" .-> Topics
KE -- "Consume" --> CONS
CONS --> AVRO
AVRO -- "Structured Data" --> MDB
%% Styling
classDef storageStyle fill:#FF9900,stroke:#232F3E,color:black,stroke-width:2px
classDef processStyle fill:#3776AB,stroke:#2A5D8C,color:white,stroke-width:2px
classDef kafkaStyle fill:#231F20,stroke:#000000,color:white,stroke-width:2px
classDef flinkStyle fill:#E6526F,stroke:#C6324C,color:white,stroke-width:2px
classDef brokerStyle fill:#767676,stroke:#4D4D4D,color:white,stroke-width:2px
classDef llamaStyle fill:#7F00FF,stroke:#5D00BA,color:white,stroke-width:2px
classDef avroStyle fill:#C7131F,stroke:#A11119,color:white,stroke-width:2px
classDef subgraphStyle fill:#F5F5F5,stroke:#CCCCCC,color:#333333,stroke-width:1px
classDef confluentStyle fill:#F8FBFE,stroke:#2481D7,color:#333333,stroke-width:1px
class S3,MDB storageStyle
class PY,PYP,CONS processStyle
class KR,KE kafkaStyle
class FLINK flinkStyle
class KB brokerStyle
class LP llamaStyle
class EMB processStyle
class AVRO avroStyle
class Storage,Ingestion,Processing,Consumption,Topics subgraphStyle
class Confluent confluentStyle
style MDB stroke:#000000,fill:#00C853
This solution presents a sophisticated real-time data processing pipeline that combines cloud storage, stream processing, machine learning, and NoSQL databases to create a comprehensive system for data enrichment and analysis.
At its core, the architecture follows a streaming data pattern where information flows continuously from source to destination, being transformed and enriched along the way. The pipeline begins with raw data files stored in an AWS S3 bucket, which serves as the primary data lake for the solution.
A Python script acts as the ingestion layer, reading these raw files and publishing structured data to a Confluent Kafka topic named "raw." Kafka serves as the central nervous system of this architecture, providing a resilient, high-throughput messaging backbone that decouples data producers from consumers.
The heart of the processing layer is an Apache Flink job that consumes data from the raw Kafka topic. Flink, a powerful stream processing framework, applies transformations to the incoming data and utilizes a specialized MLPREDICT function to generate embeddings. These embeddings are vector representations that capture semantic meaning from the raw data, essentially transforming unstructured or semi-structured data into rich, machine-learning-ready formats.
The enriched data, now containing valuable embeddings, is published to another Confluent Kafka topic called "embedded_data." Kafka brokers manage these messages, ensuring they're properly replicated, partitioned, and delivered with the appropriate guarantees.
Finally, a consumer application reads from the embedded_data topic and stores the processed information in MongoDB, a document-oriented database well-suited for storing complex, nested data structures including vector embeddings.
This architecture excels in scenarios requiring real-time data processing with machine learning enrichment, such as recommendation systems, semantic search applications, content classification, or anomaly detection platforms. The solution combines the best aspects of stream processing (low latency, high throughput) with the power of machine learning to create an intelligent data pipeline.RetryClaude can make mistakes. Please double-check responses.
- PDF document processing from S3 bucket
- Intelligent document parsing with LlamaParse
- Document chunking for better processing
- Avro serialization/deserialization with Schema Registry
- Scalable message processing with Kafka
- Persistent storage in MongoDB
- Multi-threaded consumer implementation
- Python 3.9+
- AWS Account with S3 access
- Confluent Cloud account
- MongoDB instance
- LlamaParse API key
- Create a
.env
file with the following variables:
# AWS Configuration
AWS_ACCESS_KEY_ID=your_aws_access_key
AWS_SECRET_ACCESS_KEY=your_aws_secret_key
AWS_SESSION_TOKEN=your_session_token
S3_BUCKET_NAME=your_bucket_name
S3_PREFIX=your_prefix
# MongoDB Configuration
MONGODB_URI=your_mongodb_uri
MONGODB_DB=your_database_name
MONGODB_COLLECTION=your_collection_name
# LlamaParse Configuration
LLAMA_PARSE_API_KEY=your_llamaparse_api_key
# Confluent Cloud Configuration
SCHEMA_REGISTRY_URL=https://your-sr-endpoint.confluent.cloud
SCHEMA_REGISTRY_API_KEY=your_schema_registry_api_key
SCHEMA_REGISTRY_API_SECRET=your_schema_registry_api_secret
- Create a
client.properties
file with your Confluent Cloud configuration:
bootstrap.servers=your-kafka-broker
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=your_api_key
sasl.password=your_api_secret
- Clone the repository:
git clone https://github.com/mongodb-partners/doc-embedding-stream.git
cd doc-embedding-stream
- Create and activate a virtual environment:
python -m venv doc-embedding-stream
source doc-embedding-stream/bin/activate # On Windows: doc-embedding-stream\Scripts\activate
- Install dependencies:
pip install -r requirements.txt
-
Document Ingestion
- PDFs are stored in AWS S3
- Producer reads PDFs from specified S3 bucket/prefix
- Documents are parsed using LlamaParse
-
Document Processing
- Each PDF is split into pages
- Pages are processed as individual chunks
- Metadata is preserved for each chunk
-
Message Production
- Chunks are serialized using Avro
- Messages are produced to Kafka topic
- Schema Registry ensures message compatibility
-
Message Consumption
- Consumer reads messages from Kafka
- Messages are deserialized using Avro schema
- Processed data is stored in MongoDB
The Avro schema for messages:
{
"type": "record",
"name": "summary_embedding_v2_value",
"namespace": "org.apache.flink.avro.generated.record",
"fields": [
{
"name": "content",
"type": ["null", "string"],
"default": null
},
{
"name": "embeddings",
"type": ["null", {"type": "array", "items": ["null", "float"]}],
"default": null
}
]
}
Run the application:
python client.py
The application will:
- Start a consumer thread for processing messages
- List PDF files from S3
- Process and chunk each PDF
- Send chunks to Kafka
- Consumer will process messages and store them in MongoDB
-
Create a Confluent Cloud Account
- Go to Confluent Cloud
- Sign up for a new account or log in
- Create a new environment and cluster
-
Create Required Topics
# Create the source topic for raw data confluent kafka topic create raw_v1 --partitions 6 # Create the destination topic for embeddings confluent kafka topic create summary_embedding_v2 --partitions 6
-
Create and Run Flink SQL Statements
-- Create the embedding model CREATE MODEL AWSBedrockEmbedding INPUT (text STRING) OUTPUT (embeddings ARRAY<FLOAT>) WITH ( 'bedrock.connection' = 'bedrock-connection-0', 'task' = 'embedding', 'provider' = 'BEDROCK' ); -- Create the destination table CREATE TABLE summary_embedding_v2 ( content STRING, embeddings ARRAY<FLOAT> ); -- Insert transformed data INSERT INTO summary_embedding_v2 SELECT CAST(val as STRING), embeddings FROM raw_v1, LATERAL TABLE (ML_PREDICT('AWSBedrockEmbedding', CAST(val as STRING)));
-
Configure AWS Bedrock Connection
- In Confluent Cloud, go to Data Integration → Connections
- Click "Add Connection" and select AWS Bedrock
- Configure with your AWS credentials
- Name the connection 'bedrock-connection-0'
-
Deploy the Pipeline
- Review the pipeline configuration
- Click "Deploy" to start processing
- Monitor the pipeline in the Confluent Cloud dashboard
- S3 access errors are caught and reported
- PDF parsing errors are handled gracefully
- Kafka production/consumption errors are logged
- MongoDB connection issues are handled with retries
- Schema Registry connection issues are reported
- Console logging provides visibility into:
- PDF processing status
- Kafka message production/consumption
- MongoDB document insertion
- Error conditions and exceptions
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.