This project implements a distributed co-purchase analysis using:
- Apache Spark for big data processing
- Google Cloud Dataproc (on Google Cloud Platform) for managed Spark clusters
- Scala as the primary programming language The script performs performance testing across single-node and multi-node Spark cluster configurations.
The core algorithm performs these key steps:
- Read and parse order-product pairs from CSV
- Group products by order
- Generate all unique product pair combinations within each order
- Count co-purchase frequencies
- Output results as product pairs with their co-purchase count
- Dynamic partitioning based on cluster configuration
- HashPartitioner for even data distribution
- Minimized data shuffling
- Configurable executor and memory settings
- Google Cloud Platform (GCP) Account
- Google Cloud SDK installed
gcloudCLI configured- SBT (Scala Build Tool)
- Bash shell
- Java 17 JDK
- Scala 2.12.18
- SBT 1.10.x
- Google Cloud SDK
Create a .env file in the project root with the following environment variables:
GCP_BUCKET=your-gcp-bucket-name
REGION=your-gcp-region # e.g., us-central1
ZONE=your-gcp-zone # e.g., us-central1-a
CLUSTER_NAME=dataproc-scaling-test- Upload your
order_products.csvto the specified GCP bucket - Ensure the CSV has two columns: order_id and product_id
- Open Google Cloud Console
- Navigate to Cloud Storage
- Select or create your bucket
- Click "Upload Files"
- Select
order_products.csv
# Upload with specific metadata/access control
gcloud storage buckets create gs://your-bucket-name \
--location=your-gcp-region \
--project=your-project-id
gcloud storage cp order_products.csv gs://your-bucket-name/gcp_cluster_scaling_test.sh: Bash script for cluster management and Spark job executionbuild.sbt: SBT build configurationsrc/main/scala/CoPurchaseAnalysis.scala: Spark job implementation.env: Environment configuration (not tracked in version control)
create_single_node_cluster(): Creates a single-node Dataproc clustercreate_multi_node_cluster(): Creates a multi-node Dataproc clusterrun_spark_job(): Submits Spark job to the clusterupdate_cluster_workers(): Dynamically updates cluster worker countdrop_all_clusters(): Removes all existing clusters in the specified region
gcloud dataproc clusters create $CLUSTER_NAME
--region $REGION # GCP region for deployment
--zone $ZONE # Specific zone within the region
--single-node/--num-workers # Cluster type and worker count
--master-machine-type n2-standard-4 # Master node machine type
--worker-machine-type n2-standard-4 # Worker node machine type
--image-version 2.2-debian12 # Dataproc image version
--project $GCP_BUCKET # GCP project IDgcloud dataproc jobs submit spark
--cluster=$CLUSTER_NAME # Target cluster
--region=$REGION # Cluster's region
--class=CoPurchaseAnalysis # Main Spark job class
--jars="gs://$GCP_BUCKET/jar_file" # JAR location in GCS
--properties="spark.executor.instances=X" # Dynamic configurationgcloud dataproc clusters update $CLUSTER_NAME
--region $REGION # Cluster's region
--num-workers X # New worker countgcloud dataproc clusters delete $CLUSTER_NAME
--region $REGION # Cluster's region
--quiet # Suppress confirmation prompt# Make script executable
chmod +x gcp_cluster_scaling_test.sh
# Run the scaling test
./gcp_cluster_scaling_test.shIn CoPurchaseAnalysis.scala:
- Adjust
numPartitionscalculation - Modify executor core count
- Uncomment/configure Spark optimizations
The script performs two primary tests:
- Single-node cluster performance
- Scaling test with 2-4 worker nodes
Each test:
- Creates/updates a Dataproc cluster
- Builds and uploads the Spark JAR
- Executes the co-purchase analysis
- Collects performance metrics
- Deletes the cluster
Modify these files to adapt the test:
build.sbt: Change Scala/Spark versionsCoPurchaseAnalysis.scala: Adjust Spark configurationsgcp_cluster_scaling_test.sh: Change worker counts, machine types
Modify gcp_cluster_scaling_test.sh:
- Adjust
--master-machine-type - Change
--worker-machine-type - Customize disk sizes
Results are stored in:
gs://$GCP_BUCKET/output/$instances/co_purchase_results/- Performance logs printed to console
- Ensure all environment variables are set
- Verify GCP credentials and permissions
- Check input data format and location