This project is an ETL pipeline using Apache Airflow to automate data processing tasks on Google Cloud Platform (GCP). The pipeline performs daily analyses on vehicle data, dynamically handling weekday and weekend processing using Google Cloud Dataproc clusters and storing results in Google Cloud Storage (GCS). The DAG uploads PySpark scripts to GCS, creates a Dataproc cluster, processes data based on the day type, and cleans up resources post-processing.
The pipeline automates data ingestion and processing tasks in GCP, leveraging Airflow's scheduling capabilities and GCP services such as Dataproc and GCS. The pipeline branches based on whether it's a weekday or weekend, running separate Spark jobs accordingly.
- Dynamic Task Scheduling: Configured to run daily, adjusting tasks based on the day of the week.
- PySpark Workflows: Processes data with specific PySpark scripts for weekend and weekday analyses.
- Google Cloud Integration: Utilizes GCS for storage and Dataproc for Spark job execution.
- Airflow DAG: Defines the workflow, with tasks including file upload, Dataproc cluster management, and job submission.
- Branching Logic: A BranchPythonOperator determines if it’s a weekday or weekend, allowing for targeted analyses.
- Dataproc: Dynamically created and deleted for job execution, optimized for cost efficiency.
- GCS: Stores input scripts and job outputs.
- Upload Tasks: Uploads all local PySpark scripts from the specified directory to a GCS bucket.
- Cluster Creation: Creates a Dataproc cluster to run PySpark jobs.
- Branching: Uses a BranchPythonOperator to identify weekday/weekend, routing to the appropriate jobs.
- Data Processing: Executes PySpark jobs based on day type:
- Weekday Jobs: Runs multiple scripts for metrics like average speed, temperature, and tire pressure.
- Weekend Job: Analyzes gas composition data.
- Cluster Deletion: Deletes the Dataproc cluster post-job completion, freeing resources.
- Google Cloud Project with enabled Dataproc, GCS, and IAM permissions.
- Apache Airflow environment configured with GCP connections.
- Python 3.7+
Set up the following connections in your Airflow environment:
gcp_default
: Connection to GCP for Dataproc and GCS.airflow-bigquery-project
: Bucket for storing PySpark scripts and output data.
- Place PySpark scripts in the local directory specified (
/usr/local/airflow/include/data/pyspark
). - Deploy the DAG file to your Airflow environment.
- Trigger the DAG or set it to run on a schedule.
- Review logs and outputs in the Airflow UI and GCS bucket.