Skip to content

Commit

Permalink
added fact table generation script for c2c
Browse files Browse the repository at this point in the history
  • Loading branch information
Pratistha authored and Pratistha committed May 23, 2024
1 parent 0ac3577 commit 0e46613
Showing 1 changed file with 80 additions and 0 deletions.
80 changes: 80 additions & 0 deletions exploitation_zone/customer_purchase_fact_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize the Spark session
spark = SparkSession.builder \
.appName("customer_2_customer_facts") \
.getOrCreate()

# Load the main DataFrame
main_parquet_path = "platform_customer_pricing_data_output"
main_df = spark.read.parquet(main_parquet_path)

# Load the dimension tables
location_parquet_path = "dim_customer_location.parquet"
customer_parquet_path = "dim_customer.parquet"
products_parquet_path = "dim_product.parquet"
date_parquet_path = "dim_date.parquet"

location_df = spark.read.parquet(location_parquet_path)
customer_df = spark.read.parquet(customer_parquet_path)
product_df = spark.read.parquet(products_parquet_path)
date_df = spark.read.parquet(date_parquet_path)

# Join for buyer location
joined_df = main_df.join(
customer_df.select("customer_id", "location_id").withColumnRenamed("location_id", "buyer_location_id"),
main_df["buying_customer_id"] == customer_df["customer_id"],
"left"
).drop(customer_df["customer_id"])

# Rename customer_id to selling_customer_id in the main DataFrame
joined_df = joined_df.withColumnRenamed("customer_id", "selling_customer_id")

# Join for seller location
joined_df = joined_df.join(
customer_df.select("customer_id", "location_id").withColumnRenamed("location_id", "seller_location_id"),
joined_df["selling_customer_id"] == customer_df["customer_id"],
"left"
).drop(customer_df["customer_id"])

# Join for product information
joined_df = joined_df.join(
product_df.select("product_id", "product_name"),
joined_df["product_name"] == product_df["product_name"],
"left"
).drop(product_df["product_name"])

# Join for selling date information
joined_df = joined_df.join(
date_df.select("date_id", "date").withColumnRenamed("date_id", "selling_date_id"),
joined_df["selling_date"] == date_df["date"],
"left"
).drop(date_df["date"])

# Select specific columns from the resulting DataFrame
selected_columns_df = joined_df.select(
"id",
"selling_customer_id",
"buying_customer_id",
"seller_location_id",
"buyer_location_id",
"selling_date_id",
"product_id",
"unit_price",
"quantity",
"expected_price",
"dynamic_price"
)

# Show the last few rows of the resulting DataFrame
last_rows = selected_columns_df.tail(5)
for row in last_rows:
print(row)

# Write the resulting DataFrame to a new parquet file
# output_parquet_path = "customer_purchase_fact_table.parquet"
# selected_columns_df.write.mode("overwrite").parquet(output_parquet_path)

# Stop the Spark session
spark.stop()

0 comments on commit 0e46613

Please sign in to comment.