diff --git a/exploitation_zone/customer_purchase_fact_table.py b/exploitation_zone/customer_purchase_fact_table.py new file mode 100644 index 0000000..60f08e7 --- /dev/null +++ b/exploitation_zone/customer_purchase_fact_table.py @@ -0,0 +1,80 @@ +from pyspark.sql import SparkSession +from pyspark.sql.functions import col + +# Initialize the Spark session +spark = SparkSession.builder \ + .appName("customer_2_customer_facts") \ + .getOrCreate() + +# Load the main DataFrame +main_parquet_path = "platform_customer_pricing_data_output" +main_df = spark.read.parquet(main_parquet_path) + +# Load the dimension tables +location_parquet_path = "dim_customer_location.parquet" +customer_parquet_path = "dim_customer.parquet" +products_parquet_path = "dim_product.parquet" +date_parquet_path = "dim_date.parquet" + +location_df = spark.read.parquet(location_parquet_path) +customer_df = spark.read.parquet(customer_parquet_path) +product_df = spark.read.parquet(products_parquet_path) +date_df = spark.read.parquet(date_parquet_path) + +# Join for buyer location +joined_df = main_df.join( + customer_df.select("customer_id", "location_id").withColumnRenamed("location_id", "buyer_location_id"), + main_df["buying_customer_id"] == customer_df["customer_id"], + "left" +).drop(customer_df["customer_id"]) + +# Rename customer_id to selling_customer_id in the main DataFrame +joined_df = joined_df.withColumnRenamed("customer_id", "selling_customer_id") + +# Join for seller location +joined_df = joined_df.join( + customer_df.select("customer_id", "location_id").withColumnRenamed("location_id", "seller_location_id"), + joined_df["selling_customer_id"] == customer_df["customer_id"], + "left" +).drop(customer_df["customer_id"]) + +# Join for product information +joined_df = joined_df.join( + product_df.select("product_id", "product_name"), + joined_df["product_name"] == product_df["product_name"], + "left" +).drop(product_df["product_name"]) + +# Join for selling date information +joined_df = joined_df.join( + date_df.select("date_id", "date").withColumnRenamed("date_id", "selling_date_id"), + joined_df["selling_date"] == date_df["date"], + "left" +).drop(date_df["date"]) + +# Select specific columns from the resulting DataFrame +selected_columns_df = joined_df.select( + "id", + "selling_customer_id", + "buying_customer_id", + "seller_location_id", + "buyer_location_id", + "selling_date_id", + "product_id", + "unit_price", + "quantity", + "expected_price", + "dynamic_price" +) + +# Show the last few rows of the resulting DataFrame +last_rows = selected_columns_df.tail(5) +for row in last_rows: + print(row) + +# Write the resulting DataFrame to a new parquet file +# output_parquet_path = "customer_purchase_fact_table.parquet" +# selected_columns_df.write.mode("overwrite").parquet(output_parquet_path) + +# Stop the Spark session +spark.stop()