A Python pipeline for ingesting tweets from Twitter, performing sentiment analysis using state-of-the-art NLP models, and storing results in MongoDB for downstream analytics and visualization.
- Twitter Integration: Fetches tweets in real time using the Twitter API.
- Batch Processing: Processes tweets in configurable batch sizes.
- Sentiment Analysis: Uses Hugging Face Transformers (DistilBERT) for robust sentiment classification.
- MongoDB Storage: Stores enriched results for further analysis or dashboarding.
- Configurable: All settings managed via YAML config file.
- Logging & Error Handling: Informative logs for monitoring and debugging.
[Twitter API]
│
â–Ľ
[Ingestion] ──► [Sentiment Analysis (Transformers)] ──► [MongoDB Storage]
pip install tweepy transformers torch pymongo pandas pyyaml
Edit config/config.yml
with your Twitter API credentials and MongoDB connection string. The config file is auto-generated on first run.
Example:
twitter:
api_key: your-api-key
api_secret: your-api-secret
bearer_token: your-bearer-token
access_token: your-access-token
access_token_secret: your-access-token-secret
mongodb:
connection_string: mongodb://localhost:27017/
database: sentiment_analysis
collection: tweets
processing:
batch_size: 10
max_tweets: 100
docker run -d -p 27017:27017 --name mongodb mongo:latest
python Social_Media_Sentiment_Pipeline.py
- Ingest: Fetches tweets matching a query using Tweepy.
- Analyze: Runs sentiment analysis on tweet batches using DistilBERT.
- Store: Saves results (including sentiment and score) in MongoDB.
- Configurable: Easily adjust batch size, query, and credentials.
- Add support for more social platforms (Reddit, Facebook, etc.)
- Integrate with dashboards (e.g., Power BI, Tableau)
- Add advanced NLP features (topic modeling, entity recognition)
- Schedule with Airflow or Prefect
MIT License
Contributions and feedback are welcome!
For questions or suggestions, please open an issue or submit a pull request.
A robust Python pipeline for batch ingestion of large CSV/JSON files from AWS S3 or Google Cloud Storage, distributed processing with Dask, and efficient storage in Parquet format for analytics.
- Cloud Storage Integration: Supports AWS S3 and Google Cloud Storage.
- Distributed Processing: Uses Dask for scalable, parallel data processing.
- Flexible Input: Handles both CSV and JSON files.
- Efficient Output: Stores processed data as Parquet for downstream analytics.
- Configurable: All settings managed via YAML config file.
- Logging & Error Handling: Informative logs for monitoring and debugging.
[Cloud Storage (S3/GCS)]
│
â–Ľ
[Batch Ingestion Pipeline]
│
â–Ľ
[Distributed Processing (Dask)]
│
â–Ľ
[Parquet Data Lake]
pip install dask pandas boto3 google-cloud-storage pyyaml
Edit config/config.yml
with your cloud credentials, bucket, and prefix.
Example:
cloud:
provider: aws # or 'gcp'
bucket: your-bucket
prefix: raw-data/
aws_access_key: your-access-key
aws_secret_key: your-secret-key
gcp_credentials_path: path/to/credentials.json
local:
staging_dir: ./staging
output_dir: ./processed
processing:
batch_size: 1000
num_workers: 4
python Batch_Data_Lake.py
- Ingest: Lists and downloads files from S3 or GCS to a local staging directory.
- Process: Uses Dask to process each file (CSV/JSON), apply transformations, and add metadata.
- Store: Saves the processed data as Parquet files in the output directory.
- Cleanup: Removes temporary files after processing.
- Add new file formats or custom transformations in
transform_data
. - Integrate with data catalogs or workflow orchestrators (e.g., Airflow).
- Add data validation or quality checks before storage.
MIT License
Contributions and feedback are welcome!
For questions or suggestions, please open an issue or submit a pull request.
A Python-based real-time log processing pipeline that ingests web server logs, parses and enriches them, streams them through Kafka, and indexes them in Elasticsearch for search, alerting, and analytics.
- Log Parsing: Extracts structured fields from raw Apache log lines.
- Streaming: Publishes parsed logs to Kafka topics in real time.
- Storage & Search: Consumes logs from Kafka and stores them in Elasticsearch with schema mapping.
- Configurable: Easily adjust Kafka and Elasticsearch settings via YAML config.
- Extensible: Modular design for adding monitoring, alerting, or downstream analytics.
[Web Server Logs]
│
â–Ľ
[LogParser] ──► [KafkaProducer] ──► [Kafka Topic] ──► [KafkaConsumer] ──► [Elasticsearch]
pip install apache-log-parser kafka-python elasticsearch pyyaml
You can use Docker Compose for local development:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports: ["2181:2181"]
kafka:
image: wurstmeister/kafka
ports: ["9092:9092"]
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.9.3
ports: ["9200:9200"]
environment:
- discovery.type=single-node
docker-compose up -d
python Real-Time_Log_Processing.py
The pipeline uses a YAML config file (config/config.yml
) for Kafka and Elasticsearch settings. It is auto-generated on first run.
Example:
kafka:
bootstrap_servers:
- localhost:9092
topic: web_logs
elasticsearch:
host: http://localhost:9200
index: web_logs
monitoring:
alert_threshold: 100
window_seconds: 300
- LogParser: Parses Apache log lines into structured dictionaries.
- LogProducer: Sends parsed logs to a Kafka topic.
- LogConsumer: Reads logs from Kafka and writes them to Elasticsearch.
- ElasticsearchHandler: Ensures index exists and stores logs for search and analytics.
- Add alerting or monitoring logic in the consumer.
- Integrate with dashboards (e.g., Kibana) for visualization.
- Support additional log formats or enrichments.
MIT License
Contributions and feedback are welcome!
For questions or suggestions, please open an issue or submit a pull request.
A modular Python ETL pipeline for extracting, transforming, and loading e-commerce sales data into a data warehouse. This project demonstrates best practices in configuration management, logging, and extensibility for real-world data engineering.
- Extract sales data from CSV or database sources
- Transform data with derived metrics (total amount, profit) and time dimensions
- Load processed data into a SQL data warehouse (SQLite by default)
- Configurable via YAML file
- Robust logging and error handling
- Easily extensible for new data sources or transformations
E-Comm_Pipeline.py
config/
config.yml
data/
sales.csv
customers.csv
products.csv
pip install pandas numpy sqlalchemy pyyaml
- Place your sales data as
data/sales.csv
(with columns:date
,quantity
,unit_price
,unit_cost
, etc.) - Optionally, add
data/customers.csv
anddata/products.csv
for future extensions.
python E-Comm_Pipeline.py
The pipeline uses a YAML config (config/config.yml
) for warehouse connection and data source paths. If not present, it will be created automatically with sensible defaults.
Example:
warehouse:
connection_string: sqlite:///ecommerce.db
source_paths:
sales: data/sales.csv
customers: data/customers.csv
products: data/products.csv
All ETL steps and errors are logged to the console for easy monitoring and debugging.
- Add new extract/transform/load methods for customers, products, or other entities
- Integrate with cloud data warehouses (Redshift, BigQuery, etc.) by updating the connection string
- Schedule with cron, Airflow, or Prefect for production use
MIT License
Contributions and feedback are welcome!
For questions or suggestions, please open an issue or submit a pull request.
A scalable, modular data pipeline for ingesting, processing, and serving video game telemetry and user action data. This project is designed for modern gaming platforms and supports analytics, leaderboards, and integration with orchestration tools like Apache Airflow.
- Data Ingestion: FastAPI endpoints for game telemetry and user actions.
- Raw Event Logging: Events are staged to a local or cloud data lake in partitioned JSONL files.
- ETL/ELT Processing: Uses Dask for scalable transformation and aggregation of raw logs.
- Aggregations: Computes hourly and per-player engagement metrics.
- Leaderboard Serving: FastAPI endpoint for real-time leaderboards with in-memory caching.
- Data Storage: Stores processed data in Parquet (data lake) and aggregates in a relational database.
- Orchestration: Conceptual Apache Airflow DAG for daily ETL and data warehouse loading.
- Structured Logging: JSON logs for observability and debugging.
- Local Testing: Main block simulates ingestion, processing, and serving for rapid prototyping.
- Python 3.8+
- FastAPI (API)
- Dask (Distributed Data Processing)
- Pandas (Data Analysis)
- PyArrow (Parquet I/O)
- SQLAlchemy (Database ORM)
- Uvicorn (ASGI Server)
- Apache Airflow (Orchestration, optional)
- Cachetools (In-memory caching)
- Pydantic (Data validation)
- dotenv (Environment config)
pip install fastapi uvicorn dask[complete] pandas pyarrow sqlalchemy cachetools python-dotenv pydantic
Create a .env
file or set environment variables as needed:
DATA_LAKE_PATH=./data_lake
DB_CONNECTION_STRING=sqlite:///videogame.db
LOG_LEVEL=INFO
uvicorn Video_Game_Pipeline:ingestion_app --reload --port 8000
uvicorn Video_Game_Pipeline:serving_app --reload --port 8001
You can run the script directly to simulate ingestion, processing, and serving:
python Video_Game_Pipeline.py
- The script includes a conceptual Airflow DAG for daily ETL and data warehouse loading.
- To use with Airflow, place the DAG definition in your Airflow DAGs folder and configure as needed.
-
POST /v2/telemetry/game_event
Upload game telemetry events (player actions, scores, etc.) -
POST /v2/telemetry/user_action
Record user actions (menu navigation, settings changes, etc.)
GET /v2/leaderboard/{game_id}?limit=10
Get the top players for a game
Video_Game_Pipeline.py
.env
/data_lake/
raw/
processed/
- Add new event types by extending the Pydantic models and ingestion endpoints.
- Integrate real ML models for player behavior prediction or matchmaking.
- Connect to cloud storage for the data lake (e.g., S3, GCS).
- Deploy with Docker/Kubernetes for production scalability.
A robust, modular data pipeline for ingesting, processing, and serving social media video and interaction data. This project is designed for scalable, production-grade analytics and machine learning on platforms like TikTok, YouTube Shorts, or Instagram Reels.
- Data Ingestion: FastAPI endpoints for uploading video metadata, user interactions, and moderation events.
- Raw Event Logging: Events are staged to a local or cloud data lake in partitioned JSONL files.
- ETL/ELT Processing: Uses Dask for scalable transformation and aggregation of raw logs.
- Engagement Metrics: Computes daily video engagement (views, likes, comments, shares, watch time).
- User Feature Generation: Extracts user activity features for downstream ML models.
- Data Storage: Stores processed data in Parquet (data lake) and aggregates in a relational database.
- Machine Learning Hooks: Conceptual feature store and model training integration for recommendations.
- Serving APIs: FastAPI endpoints for personalized feeds and trending videos, with in-memory caching.
- Orchestration: Apache Airflow DAGs for daily ETL, feature generation, and model retraining.
- Structured Logging: JSON logs for observability and debugging.
- Local Testing: Main block simulates ingestion, processing, and serving for rapid prototyping.
- Python 3.8+
- FastAPI (API)
- Dask (Distributed Data Processing)
- Pandas (Data Analysis)
- PyArrow (Parquet I/O)
- SQLAlchemy (Database ORM)
- Uvicorn (ASGI Server)
- Apache Airflow (Orchestration, optional)
- Cachetools (In-memory caching)
- Pydantic (Data validation)
- dotenv (Environment config)
pip install fastapi uvicorn dask[complete] pandas pyarrow sqlalchemy cachetools python-dotenv pydantic
Create a .env
file or set environment variables as needed:
DATA_LAKE_PATH=./tiktok_data_lake
DB_CONNECTION_STRING=sqlite:///tiktok.db
LOG_LEVEL=INFO
uvicorn Social_Media_Video_Pipeline:ingestion_app --reload --port 8080
uvicorn Social_Media_Video_Pipeline:serving_app --reload --port 8081
You can run the script directly to simulate ingestion, processing, and serving:
python Social_Media_Video_Pipeline.py
- The script includes conceptual Airflow DAGs for daily ETL and model retraining.
- To use with Airflow, place the DAG definition in your Airflow DAGs folder and configure as needed.
-
POST /v1/video/upload_metadata
Upload video metadata (user, title, tags, etc.) -
POST /v1/event/interaction
Record user interaction events (view, like, comment, share, follow) -
POST /v1/event/moderation
Record moderation events (flag, approve, reject)
-
GET /v1/feed/for_you?user_id=...&count=...
Get personalized video recommendations -
GET /v1/feed/trending?region=...&count=...
Get trending videos for a region
Social_Media_Video_Pipeline.py
.env
/data_lake/
raw/
processed/
- Add new event types by extending the Pydantic models and ingestion endpoints.
- Integrate real ML models for recommendations and trending feeds.
- Connect to cloud storage for the data lake (e.g., S3, GCS).
- Deploy with Docker/Kubernetes for production scalability.