Big Data + ML Platform powered by Apache Spark for large-scale analytics and machine learning. Inspired by Trafilea's recommender system architecture.
This template provides a complete Spark-based ML platform for processing massive datasets and building production ML models at scale.
π vs Other ML Templates:
- π Big Data First - Designed for TBs of data, not GBs
- β‘ Spark Native - Leverages Spark's distributed computing power
- π§ ETL + ML - Combined data engineering and machine learning
- ποΈ Production Ready - EMR, Glue, Athena integration
- π° Cost Optimized - Spot instances, auto-scaling, efficient resource usage
π Data Sources (Athena/S3/Redshift)
β
π Spark ETL (EMR/Glue)
β
π§ͺ Feature Engineering (Spark ML)
β
π€ Model Training (Spark MLlib/XGBoost)
β
π Model Evaluation & Analytics
β
πΎ Model Registry (S3/MLflow)
β
π Batch Scoring & Insights
Component | AWS Services | Purpose |
---|---|---|
Compute | EMR, Glue, EC2 Spot | Distributed Spark processing |
Storage | S3 (Data Lake), EFS | Raw data, features, models |
Databases | Athena, Redshift, Glue Catalog | Data querying and metadata |
Orchestration | Step Functions, Airflow | Workflow management |
Monitoring | CloudWatch, EMR Notebooks | Observability and debugging |
ML | Spark MLlib, SageMaker (optional) | Model training and inference |
# Clone the repository
git clone https://github.com/trafilea/spark-ml-analytics.git
cd spark-ml-analytics
# Deploy infrastructure
make deploy-infra ENV=dev
# Check AWS configuration
make check-aws
# Create sample dataset
mkdir -p data/raw
echo "user_id,product_id,rating,timestamp" > data/raw/interactions.csv
echo "user1,product1,4.5,2024-01-01" >> data/raw/interactions.csv
# Upload to S3
make upload-data ENV=dev
# Run full ML pipeline
make pipeline ENV=dev
# Or run individual steps
make extract ENV=dev
make transform ENV=dev
make feature-eng ENV=dev
make train ENV=dev
spark-ml-analytics/
βββ src/
β βββ main.py # CLI entry point
β βββ controllers/ # Configuration and pipeline controllers
β βββ etl/
β β βββ extraction/ # Data extraction from various sources
β β βββ transformation/ # Spark ETL processing
β β βββ loading/ # Data loading utilities
β βββ analytics/
β β βββ feature_engineering/ # ML feature generation
β β βββ model_training/ # Spark MLlib training
β β βββ evaluation/ # Model evaluation
β β βββ insights/ # Business analytics
β βββ connectors/ # AWS service connectors
β β βββ s3_connector.py
β β βββ athena_connector.py
β β βββ emr_connector.py
β β βββ glue_connector.py
β βββ utils/ # Spark utilities and helpers
βββ infrastructure/ # Terraform infrastructure
β βββ terraform/
β βββ modules/
β β βββ emr/ # EMR cluster configuration
β β βββ glue/ # Glue jobs and catalog
β β βββ s3/ # Data lake setup
β β βββ athena/ # Athena workgroup
β βββ environments/
βββ configs/ # Environment configurations
β βββ dev.yaml
β βββ staging.yaml
β βββ prod.yaml
βββ notebooks/ # Jupyter/Zeppelin notebooks
βββ scripts/ # Utility scripts
βββ tests/ # Test suite
βββ data/ # Local data directory
Each environment has its own YAML configuration in configs/
:
# configs/dev.yaml
environment: dev
aws:
region: us-east-1
s3_bucket: "trafilea-spark-ml-dev"
spark:
app_name: "TrafileaSparkMLAnalytics-Dev"
master: "local[*]" # or "yarn" for EMR
driver_memory: "2g"
executor_memory: "2g"
cluster:
type: "local" # local, emr, glue
emr:
instance_type: "m5.xlarge"
instance_count: 3
Optimized Spark settings for different workloads:
spark:
config:
"spark.sql.adaptive.enabled": "true"
"spark.sql.adaptive.coalescePartitions.enabled": "true"
"spark.sql.adaptive.skewJoin.enabled": "true"
"spark.dynamicAllocation.enabled": "true"
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension"
# Extract data from Athena
python src/main.py extract \
--env dev \
--execution-datetime "2024-01-01 10:00:00" \
--data-source athena
# Transform with Spark
python src/main.py transform \
--env dev \
--execution-datetime "2024-01-01 10:00:00" \
--job-type etl \
--cluster-mode emr
# Generate user features
python src/main.py feature-engineering \
--env dev \
--execution-datetime "2024-01-01 10:00:00" \
--feature-type user
# Generate item features
python src/main.py feature-engineering \
--env dev \
--execution-datetime "2024-01-01 10:00:00" \
--feature-type item
# Train collaborative filtering model
python src/main.py train \
--env dev \
--execution-datetime "2024-01-01 10:00:00" \
--model-type collaborative_filtering \
--framework spark_mllib
# Train XGBoost model
python src/main.py train \
--env dev \
--execution-datetime "2024-01-01 10:00:00" \
--model-type xgboost \
--framework spark_mllib
# Run descriptive analytics
python src/main.py analytics \
--env dev \
--execution-datetime "2024-01-01 10:00:00" \
--analysis-type descriptive
- Spark ALS (Alternating Least Squares)
- Matrix Factorization
- Implicit Feedback models
- Random Forest with item features
- Gradient Boosting with user/item features
- Feature-based similarity models
- XGBoost (via Spark ML)
- LightGBM integration
- Model Stacking
- Trend Analysis
- Seasonality Detection
- Forecasting Models
- RFM Analysis (Recency, Frequency, Monetary)
- User Segmentation
- Behavioral Patterns
- Churn Prediction
- Product Performance
- Cross-sell Analysis
- Inventory Optimization
- Price Elasticity
- Revenue Analytics
- Conversion Funnels
- A/B Test Analysis
- Market Basket Analysis
# Start local Spark
make transform ENV=dev
# Run Jupyter notebooks
make start-jupyter
# Deploy EMR cluster
make start-emr ENV=prod
# Submit Spark job
make submit-spark ENV=prod
# Monitor via Spark UI
make spark-ui ENV=prod
# Deploy Glue infrastructure
make deploy-infra ENV=prod
# Run Glue ETL job
aws glue start-job-run --job-name spark-ml-analytics-prod
- Spark History Server - Job execution history
- YARN ResourceManager - Cluster resource usage
- CloudWatch Metrics - System and application metrics
- Data Validation checks
- Schema Evolution tracking
- Data Lineage documentation
- Query Performance insights
- Resource Utilization monitoring
- Cost Optimization recommendations
# DAG for daily ML pipeline
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_containers import EMRContainerOperator
dag = DAG('spark_ml_pipeline', schedule_interval='@daily')
extract_task = EMRContainerOperator(
task_id='extract_data',
job_driver={
"sparkSubmitJobDriver": {
"entryPoint": "s3://bucket/src/main.py",
"entryPointArguments": ["extract", "--env", "prod"]
}
}
)
# Stream processing with Kinesis
spark.readStream \
.format("kinesis") \
.option("streamName", "user-events") \
.load() \
.writeStream \
.foreachBatch(process_batch) \
.start()
- Product Recommendations at scale
- Cross-sell & Upsell optimization
- Personalized Marketing campaigns
- Fraud Detection models
- Risk Assessment analytics
- Customer Lifetime Value modeling
- Predictive Maintenance
- Quality Control analytics
- Supply Chain optimization
- Customer Segmentation
- Attribution Modeling
- Marketing Mix optimization
# Run all tests
make test
# Spark-specific tests
pytest tests/spark/ -v
# Integration tests with EMR
pytest tests/integration/ -v
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature
- Commit changes:
git commit -m 'Add amazing feature'
- Push to branch:
git push origin feature/amazing-feature
- Open a Pull Request
Copyright Β© 2024 Trafilea
- π Apache Spark Documentation
- π§ EMR Best Practices
- π§ Contact: data-engineering@trafilea.com
π Ready to process TBs of data with Spark!
β
Processing > 100GB datasets
β
Complex feature engineering
β
Distributed ML training
β
Real-time + Batch analytics
β
Cost-sensitive big data workloads