Skip to content

A containerized automated ETL pipeline built with Python, PySpark, Spark SQL, PostgreSQL, and Docker.

Notifications You must be signed in to change notification settings

Nonppk/pyspark-etl-automation

Repository files navigation

Transaction Aggregation ETL Pipeline

Overview

This project implements an ETL pipeline using PySpark and Spark SQL to process large transactional datasets stored in CSV files.
The pipeline performs both DataFrame-based and SQL-based transformations to aggregate transactions by customer and stores the final results in a PostgreSQL database for analytical querying.
The design emphasizes automation and reproducibility for repeatable, reliable runs.


Features

  • Extract: Read raw transaction data from CSV.
  • Transform:
    • Use Spark SQL for complex aggregations and filtering logic.
    • Aggregate by customer_id.
    • Determine each customer's favourite_product (the product with the highest total units sold).
    • Calculate longest_streak (the longest sequence of consecutive purchase days).
  • Load: Write the transformed data into PostgreSQL.
  • Automation: Support for scheduled daily runs and idempotent re-runs via orchestration (e.g., Airflow or cron).

Tech Stack

Component Technology
Programming Language Python 3.9
Processing Engine Apache Spark (PySpark & Spark SQL)
Database PostgreSQL 13
Containerization Docker, Docker Compose
Dependency Management Poetry
Orchestration (Automation) Apache Airflow(optional)

Usage

Run ETL pipeline

docker-compose build
docker-compose run etl poetry run python main.py   --source /opt/data/transaction.csv   --database warehouse   --table customers

Verify output

docker-compose exec db psql --user postgres -d warehouse   -c 'SELECT * FROM customers LIMIT 10;'

Integration Test

The integration test verifies the correctness of the ETL pipeline by asserting that for customer_id = 0023938:

  • favourite_product matches the most frequently sold product.
  • longest_streak equals the correct number of consecutive purchase days.

Cloud Deployment Proposal

A scalable, automated deployment can be built on AWS:

Layer AWS Service Purpose
Data Ingestion AWS DMS Move data from on-premise RDBMS to AWS
Scheduling & Orchestration (Automation) Amazon MWAA (Managed Airflow) or AWS Step Functions Automate daily jobs, retries, SLAs
Data Processing AWS Glue or Amazon EMR (Spark + Spark SQL) Perform ETL transformations at scale
Data Storage Amazon Redshift Store aggregated data for interactive analytics
Monitoring & Alerts Amazon CloudWatch, SNS Centralized logs, metrics, failure notifications

This design provides automation, scalability, and fault tolerance with managed services and auto-scaling compute.


Deliverables

  • main.py – ETL entry point (PySpark + Spark SQL pipeline implementation)
  • docker-compose.yml – Defines Spark, PostgreSQL, and ETL containers
  • Dockerfile – Python 3.9 image with Spark and dependencies
  • tests/ – Integration tests for ETL validation
  • deployment.pdf – Cloud architecture proposal and explanation
  • changes.patch – Git patch file containing implementation changes

Project Structure

TAKE-HOME-TEST/
├── data/
│   └── transaction.csv
├── main.py
├── integration_test.py
├── docker-compose.yml
├── Dockerfile
├── pyproject.toml
├── poetry.lock
├── .env
├── .gitignore
└── README.md

Keywords

ETL, PySpark, Spark SQL, PostgreSQL, Docker, Docker Compose, Data Engineering, Data Pipeline, Orchestration, Scheduling, Automation, Airflow, AWS Glue, EMR, Redshift, CloudWatch, CI/CD, Integration Test.


Summary

A reproducible and containerized ETL pipeline built with PySpark, Spark SQL, PostgreSQL, and Docker.
Designed for large-scale transaction aggregation, automated scheduling, SQL-based transformations, testing, and cloud-ready deployment.

About

A containerized automated ETL pipeline built with Python, PySpark, Spark SQL, PostgreSQL, and Docker.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages