Skip to content

omgitsbees/MySQL-Projects

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

62 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation


Social Media Sentiment Analysis Pipeline

A Python pipeline for ingesting tweets from Twitter, performing sentiment analysis using state-of-the-art NLP models, and storing results in MongoDB for downstream analytics and visualization.


Features

  • Twitter Integration: Fetches tweets in real time using the Twitter API.
  • Batch Processing: Processes tweets in configurable batch sizes.
  • Sentiment Analysis: Uses Hugging Face Transformers (DistilBERT) for robust sentiment classification.
  • MongoDB Storage: Stores enriched results for further analysis or dashboarding.
  • Configurable: All settings managed via YAML config file.
  • Logging & Error Handling: Informative logs for monitoring and debugging.

Architecture

[Twitter API]
     │
     â–Ľ
[Ingestion] ──► [Sentiment Analysis (Transformers)] ──► [MongoDB Storage]

Quick Start

1. Install Dependencies

pip install tweepy transformers torch pymongo pandas pyyaml

2. Configure Twitter and MongoDB

Edit config/config.yml with your Twitter API credentials and MongoDB connection string. The config file is auto-generated on first run.

Example:

twitter:
  api_key: your-api-key
  api_secret: your-api-secret
  bearer_token: your-bearer-token
  access_token: your-access-token
  access_token_secret: your-access-token-secret

mongodb:
  connection_string: mongodb://localhost:27017/
  database: sentiment_analysis
  collection: tweets

processing:
  batch_size: 10
  max_tweets: 100

3. Run MongoDB (if not already running)

docker run -d -p 27017:27017 --name mongodb mongo:latest

4. Run the Pipeline

python Social_Media_Sentiment_Pipeline.py

How It Works

  • Ingest: Fetches tweets matching a query using Tweepy.
  • Analyze: Runs sentiment analysis on tweet batches using DistilBERT.
  • Store: Saves results (including sentiment and score) in MongoDB.
  • Configurable: Easily adjust batch size, query, and credentials.

Extending

  • Add support for more social platforms (Reddit, Facebook, etc.)
  • Integrate with dashboards (e.g., Power BI, Tableau)
  • Add advanced NLP features (topic modeling, entity recognition)
  • Schedule with Airflow or Prefect

License

MIT License


Contributions and feedback are welcome!
For questions or suggestions, please open an issue or submit a pull request.


Batch Data Lake Ingestion Pipeline

A robust Python pipeline for batch ingestion of large CSV/JSON files from AWS S3 or Google Cloud Storage, distributed processing with Dask, and efficient storage in Parquet format for analytics.


Features

  • Cloud Storage Integration: Supports AWS S3 and Google Cloud Storage.
  • Distributed Processing: Uses Dask for scalable, parallel data processing.
  • Flexible Input: Handles both CSV and JSON files.
  • Efficient Output: Stores processed data as Parquet for downstream analytics.
  • Configurable: All settings managed via YAML config file.
  • Logging & Error Handling: Informative logs for monitoring and debugging.

Architecture

[Cloud Storage (S3/GCS)]
         │
         â–Ľ
[Batch Ingestion Pipeline]
         │
         â–Ľ
[Distributed Processing (Dask)]
         │
         â–Ľ
[Parquet Data Lake]

Quick Start

1. Install Dependencies

pip install dask pandas boto3 google-cloud-storage pyyaml

2. Configure Cloud Access

Edit config/config.yml with your cloud credentials, bucket, and prefix.

Example:

cloud:
  provider: aws  # or 'gcp'
  bucket: your-bucket
  prefix: raw-data/
  aws_access_key: your-access-key
  aws_secret_key: your-secret-key
  gcp_credentials_path: path/to/credentials.json

local:
  staging_dir: ./staging
  output_dir: ./processed

processing:
  batch_size: 1000
  num_workers: 4

3. Run the Pipeline

python Batch_Data_Lake.py

How It Works

  • Ingest: Lists and downloads files from S3 or GCS to a local staging directory.
  • Process: Uses Dask to process each file (CSV/JSON), apply transformations, and add metadata.
  • Store: Saves the processed data as Parquet files in the output directory.
  • Cleanup: Removes temporary files after processing.

Extending

  • Add new file formats or custom transformations in transform_data.
  • Integrate with data catalogs or workflow orchestrators (e.g., Airflow).
  • Add data validation or quality checks before storage.

License

MIT License


Contributions and feedback are welcome!
For questions or suggestions, please open an issue or submit a pull request.


Real-Time Log Processing Pipeline

A Python-based real-time log processing pipeline that ingests web server logs, parses and enriches them, streams them through Kafka, and indexes them in Elasticsearch for search, alerting, and analytics.


Features

  • Log Parsing: Extracts structured fields from raw Apache log lines.
  • Streaming: Publishes parsed logs to Kafka topics in real time.
  • Storage & Search: Consumes logs from Kafka and stores them in Elasticsearch with schema mapping.
  • Configurable: Easily adjust Kafka and Elasticsearch settings via YAML config.
  • Extensible: Modular design for adding monitoring, alerting, or downstream analytics.

Architecture

[Web Server Logs] 
      │
      â–Ľ
[LogParser] ──► [KafkaProducer] ──► [Kafka Topic] ──► [KafkaConsumer] ──► [Elasticsearch]

Quick Start

1. Install Dependencies

pip install apache-log-parser kafka-python elasticsearch pyyaml

2. Start Kafka and Elasticsearch

You can use Docker Compose for local development:

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports: ["2181:2181"]
  kafka:
    image: wurstmeister/kafka
    ports: ["9092:9092"]
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.9.3
    ports: ["9200:9200"]
    environment:
      - discovery.type=single-node
docker-compose up -d

3. Run the Pipeline

python Real-Time_Log_Processing.py

Configuration

The pipeline uses a YAML config file (config/config.yml) for Kafka and Elasticsearch settings. It is auto-generated on first run.

Example:

kafka:
  bootstrap_servers:
    - localhost:9092
  topic: web_logs
elasticsearch:
  host: http://localhost:9200
  index: web_logs
monitoring:
  alert_threshold: 100
  window_seconds: 300

How It Works

  • LogParser: Parses Apache log lines into structured dictionaries.
  • LogProducer: Sends parsed logs to a Kafka topic.
  • LogConsumer: Reads logs from Kafka and writes them to Elasticsearch.
  • ElasticsearchHandler: Ensures index exists and stores logs for search and analytics.

Extending

  • Add alerting or monitoring logic in the consumer.
  • Integrate with dashboards (e.g., Kibana) for visualization.
  • Support additional log formats or enrichments.

License

MIT License


Contributions and feedback are welcome!
For questions or suggestions, please open an issue or submit a pull request.


E-Commerce ETL Pipeline

A modular Python ETL pipeline for extracting, transforming, and loading e-commerce sales data into a data warehouse. This project demonstrates best practices in configuration management, logging, and extensibility for real-world data engineering.


Features

  • Extract sales data from CSV or database sources
  • Transform data with derived metrics (total amount, profit) and time dimensions
  • Load processed data into a SQL data warehouse (SQLite by default)
  • Configurable via YAML file
  • Robust logging and error handling
  • Easily extensible for new data sources or transformations

Project Structure

E-Comm_Pipeline.py
config/
    config.yml
data/
    sales.csv
    customers.csv
    products.csv

Quick Start

1. Install Dependencies

pip install pandas numpy sqlalchemy pyyaml

2. Prepare Data

  • Place your sales data as data/sales.csv (with columns: date, quantity, unit_price, unit_cost, etc.)
  • Optionally, add data/customers.csv and data/products.csv for future extensions.

3. Run the Pipeline

python E-Comm_Pipeline.py

Configuration

The pipeline uses a YAML config (config/config.yml) for warehouse connection and data source paths. If not present, it will be created automatically with sensible defaults.

Example:

warehouse:
  connection_string: sqlite:///ecommerce.db
source_paths:
  sales: data/sales.csv
  customers: data/customers.csv
  products: data/products.csv

Logging

All ETL steps and errors are logged to the console for easy monitoring and debugging.


Extending the Pipeline

  • Add new extract/transform/load methods for customers, products, or other entities
  • Integrate with cloud data warehouses (Redshift, BigQuery, etc.) by updating the connection string
  • Schedule with cron, Airflow, or Prefect for production use

License

MIT License


Contributions and feedback are welcome!
For questions or suggestions, please open an issue or submit a pull request.


Video Game Data Pipeline

A scalable, modular data pipeline for ingesting, processing, and serving video game telemetry and user action data. This project is designed for modern gaming platforms and supports analytics, leaderboards, and integration with orchestration tools like Apache Airflow.


Features

  • Data Ingestion: FastAPI endpoints for game telemetry and user actions.
  • Raw Event Logging: Events are staged to a local or cloud data lake in partitioned JSONL files.
  • ETL/ELT Processing: Uses Dask for scalable transformation and aggregation of raw logs.
  • Aggregations: Computes hourly and per-player engagement metrics.
  • Leaderboard Serving: FastAPI endpoint for real-time leaderboards with in-memory caching.
  • Data Storage: Stores processed data in Parquet (data lake) and aggregates in a relational database.
  • Orchestration: Conceptual Apache Airflow DAG for daily ETL and data warehouse loading.
  • Structured Logging: JSON logs for observability and debugging.
  • Local Testing: Main block simulates ingestion, processing, and serving for rapid prototyping.

Tech Stack


Quick Start

1. Install Dependencies

pip install fastapi uvicorn dask[complete] pandas pyarrow sqlalchemy cachetools python-dotenv pydantic

2. Configure Environment

Create a .env file or set environment variables as needed:

DATA_LAKE_PATH=./data_lake
DB_CONNECTION_STRING=sqlite:///videogame.db
LOG_LEVEL=INFO

3. Run the Ingestion API

uvicorn Video_Game_Pipeline:ingestion_app --reload --port 8000

4. Run the Serving API

uvicorn Video_Game_Pipeline:serving_app --reload --port 8001

5. Local Testing

You can run the script directly to simulate ingestion, processing, and serving:

python Video_Game_Pipeline.py

6. Airflow Integration

  • The script includes a conceptual Airflow DAG for daily ETL and data warehouse loading.
  • To use with Airflow, place the DAG definition in your Airflow DAGs folder and configure as needed.

API Endpoints

Ingestion

  • POST /v2/telemetry/game_event
    Upload game telemetry events (player actions, scores, etc.)

  • POST /v2/telemetry/user_action
    Record user actions (menu navigation, settings changes, etc.)

Serving

  • GET /v2/leaderboard/{game_id}?limit=10
    Get the top players for a game

Project Structure

Video_Game_Pipeline.py
.env
/data_lake/
    raw/
    processed/

Extending the Pipeline

  • Add new event types by extending the Pydantic models and ingestion endpoints.
  • Integrate real ML models for player behavior prediction or matchmaking.
  • Connect to cloud storage for the data lake (e.g., S3, GCS).
  • Deploy with Docker/Kubernetes for production scalability.

Social Media Video Pipeline

A robust, modular data pipeline for ingesting, processing, and serving social media video and interaction data. This project is designed for scalable, production-grade analytics and machine learning on platforms like TikTok, YouTube Shorts, or Instagram Reels.

Features

  • Data Ingestion: FastAPI endpoints for uploading video metadata, user interactions, and moderation events.
  • Raw Event Logging: Events are staged to a local or cloud data lake in partitioned JSONL files.
  • ETL/ELT Processing: Uses Dask for scalable transformation and aggregation of raw logs.
  • Engagement Metrics: Computes daily video engagement (views, likes, comments, shares, watch time).
  • User Feature Generation: Extracts user activity features for downstream ML models.
  • Data Storage: Stores processed data in Parquet (data lake) and aggregates in a relational database.
  • Machine Learning Hooks: Conceptual feature store and model training integration for recommendations.
  • Serving APIs: FastAPI endpoints for personalized feeds and trending videos, with in-memory caching.
  • Orchestration: Apache Airflow DAGs for daily ETL, feature generation, and model retraining.
  • Structured Logging: JSON logs for observability and debugging.
  • Local Testing: Main block simulates ingestion, processing, and serving for rapid prototyping.

Tech Stack

Quick Start

1. Install Dependencies

pip install fastapi uvicorn dask[complete] pandas pyarrow sqlalchemy cachetools python-dotenv pydantic

2. Configure Environment

Create a .env file or set environment variables as needed:

DATA_LAKE_PATH=./tiktok_data_lake
DB_CONNECTION_STRING=sqlite:///tiktok.db
LOG_LEVEL=INFO

3. Run the Ingestion API

uvicorn Social_Media_Video_Pipeline:ingestion_app --reload --port 8080

4. Run the Serving API

uvicorn Social_Media_Video_Pipeline:serving_app --reload --port 8081

5. Local Testing

You can run the script directly to simulate ingestion, processing, and serving:

python Social_Media_Video_Pipeline.py

6. Airflow Integration

  • The script includes conceptual Airflow DAGs for daily ETL and model retraining.
  • To use with Airflow, place the DAG definition in your Airflow DAGs folder and configure as needed.

API Endpoints

Ingestion

  • POST /v1/video/upload_metadata
    Upload video metadata (user, title, tags, etc.)

  • POST /v1/event/interaction
    Record user interaction events (view, like, comment, share, follow)

  • POST /v1/event/moderation
    Record moderation events (flag, approve, reject)

Serving

  • GET /v1/feed/for_you?user_id=...&count=...
    Get personalized video recommendations

  • GET /v1/feed/trending?region=...&count=...
    Get trending videos for a region

Project Structure

Social_Media_Video_Pipeline.py
.env
/data_lake/
    raw/
    processed/

Extending the Pipeline

  • Add new event types by extending the Pydantic models and ingestion endpoints.
  • Integrate real ML models for recommendations and trending feeds.
  • Connect to cloud storage for the data lake (e.g., S3, GCS).
  • Deploy with Docker/Kubernetes for production scalability.

About

Database and Pipeline Projects

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages