Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
SonyShrestha committed May 24, 2024
2 parents fe39973 + 47567c0 commit f1fb32c
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 0 deletions.
Empty file.
Binary file not shown.
Empty file.
Binary file not shown.
88 changes: 88 additions & 0 deletions exploitation_zone/fact_business_review.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import logging
import os
import configparser
import json
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace, lit,from_unixtime

# Configure logging
logging.basicConfig(level=logging.INFO) # Set log level to INFO

# Create logger object
logger = logging.getLogger()

# Get base directory
root_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config file
config_file_path = os.path.join(root_dir, "config.ini")
config = configparser.ConfigParser()
config.read(config_file_path)

config_file_path_json = os.path.join(root_dir, "config.json")
with open(config_file_path_json) as f:
config_json = json.load(f)


if __name__ == "__main__":
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]
exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"]
project_id = config['BIGQUERY']['project_id']
dataset_id = config['BIGQUERY']['dataset_id']


spark = SparkSession.builder \
.appName("Customer Fact table creation") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config('spark.jars', '/home/pce/Documents/VBP_Joint_Project-main/spark-bigquery-with-dependencies_2.12-0.27.0.jar') \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.config("temporaryGcsBucket", raw_bucket_name) \
.config("parentProject", project_id) \
.config("project", project_id) \
.getOrCreate()


logger.info('-----------------------------------------------------')
logger.info("Creating business review fact table")

exploitation_zone_parquet_file_path = os.path.join(root_dir, 'data', 'exploitation_zone')

# Read the Parquet file into a DataFrame from GCS Bucket
business_purchase_df = spark.read.parquet(exploitation_zone_parquet_file_path, 'supermarket_products.parquet')
cust_purchase_df = spark.read.parquet(exploitation_zone_parquet_file_path, 'customer_purchase.parquet')
dim_product_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'dim_product.parquet'))
dim_customer_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'dim_customer.parquet'))

business_sentiment_df = spark.read.parquet(exploitation_zone_parquet_file_path, 'business_sentiment.parquet')
business_sentiment_df = business_sentiment_df.withColumnRenamed("business_name", "store_name")


fact_business_sentiment_df = business_purchase_df.join(business_sentiment_df, 'store_name', 'inner').select(business_purchase_df['store_id'], business_purchase_df['product_id'],
business_purchase_df['product_name'], business_sentiment_df['date'],
business_sentiment_df['sentiment_label'],business_sentiment_df['sentiment_score'],
)

fact_business_sentiment_df = fact_business_sentiment_df.withColumn("date", regexp_replace("date", "-", "")).withColumnRenamed("date", "date_id")
# fact_business_sentiment_df = fact_business_sentiment_df.join(cust_purchase_df, 'product_name', 'inner').select(fact_business_sentiment_df['*'], cust_purchase_df['customer_id'])
fact_business_sentiment_df = fact_business_sentiment_df.withColumn("review_id", monotonically_increasing_id() + 10000)
fact_business_sentiment_df = fact_business_sentiment_df.withColumn("created_on",lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) # Add created_on
fact_business_sentiment_df = fact_business_sentiment_df.select("review_id", "store_id", "product_id", "date_id", "sentiment_label", "sentiment_score", "created_on")

fact_business_sentiment_df.show()
fact_business_sentiment_df.printSchema()


fact_business_sentiment_df.write \
.format('bigquery') \
.option('table', f'{project_id}:{dataset_id}.fact_business_review') \
.mode('overwrite') \
.save()

# fact_customer_inventory_df.write.mode('overwrite').parquet(os.path.join(exploitation_zone_parquet_file_path, 'fact_customer_inventory.parquet'))
fact_business_sentiment_df.write.mode('overwrite').parquet(f'/home/pce/Documents/VBP_Joint_Project-main/data/exploitation_zone/fact_business_review.parquet')
87 changes: 87 additions & 0 deletions exploitation_zone/fact_customer_review.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging
import os
import configparser
import json
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import udf, monotonically_increasing_id, col, regexp_replace, lit,from_unixtime

# Configure logging
logging.basicConfig(level=logging.INFO) # Set log level to INFO

# Create logger object
logger = logging.getLogger()

# Get base directory
root_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config file
config_file_path = os.path.join(root_dir, "config.ini")
config = configparser.ConfigParser()
config.read(config_file_path)

config_file_path_json = os.path.join(root_dir, "config.json")
with open(config_file_path_json) as f:
config_json = json.load(f)


if __name__ == "__main__":
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]
exploitation_bucket_name = config["GCS"]["exploitation_bucket_name"]
project_id = config['BIGQUERY']['project_id']
dataset_id = config['BIGQUERY']['dataset_id']


spark = SparkSession.builder \
.appName("Customer Fact table creation") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config('spark.jars', '/home/pce/Documents/VBP_Joint_Project-main/spark-bigquery-with-dependencies_2.12-0.27.0.jar') \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.config("temporaryGcsBucket", raw_bucket_name) \
.config("parentProject", project_id) \
.config("project", project_id) \
.getOrCreate()


logger.info('-----------------------------------------------------')
logger.info("Creating business review fact table")

exploitation_zone_parquet_file_path = os.path.join(root_dir, 'data', 'exploitation_zone')

# Read the Parquet file into a DataFrame from GCS Bucket
cust_purchase_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'customer_purchase.parquet'))
dim_product_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'dim_product.parquet'))
dim_customer_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'dim_customer.parquet'))

customer_sentiment_df = spark.read.parquet(os.path.join(exploitation_zone_parquet_file_path, 'customer_sentiment.parquet'))
customer_sentiment_df = customer_sentiment_df.withColumnRenamed("user_name", "customer_name")


fact_customer_sentiment_df = cust_purchase_df.join(customer_sentiment_df, 'customer_name', 'inner').select(cust_purchase_df['customer_id'],
cust_purchase_df['product_name'],
customer_sentiment_df['sentiment_label'],customer_sentiment_df['sentiment_score'],
cust_purchase_df['purchase_date'])

fact_customer_sentiment_df = fact_customer_sentiment_df.withColumn("purchase_date", regexp_replace("purchase_date", "-", "")).withColumnRenamed("purchase_date", "date_id")
fact_customer_sentiment_df = fact_customer_sentiment_df.join(dim_product_df, 'product_name', 'inner').select(fact_customer_sentiment_df['*'], dim_product_df['product_id'])
fact_customer_sentiment_df = fact_customer_sentiment_df.withColumn("review_id", monotonically_increasing_id() + 20000)
fact_customer_sentiment_df = fact_customer_sentiment_df.withColumn("created_on",lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) # Add created_on
fact_customer_sentiment_df = fact_customer_sentiment_df.select("review_id", "customer_id", "product_id", "date_id", "sentiment_label", "sentiment_score", "created_on")

fact_customer_sentiment_df.show()
fact_customer_sentiment_df.printSchema()


fact_customer_sentiment_df.write \
.format('bigquery') \
.option('table', f'{project_id}:{dataset_id}.fact_customer_review') \
.mode('overwrite') \
.save()

# fact_customer_inventory_df.write.mode('overwrite').parquet(os.path.join(exploitation_zone_parquet_file_path, 'fact_customer_inventory.parquet'))
# fact_customer_sentiment_df.write.mode('overwrite').parquet(f'/home/pce/Documents/VBP_Joint_Project-main/data/exploitation_zone/fact_customer_review.parquet')

0 comments on commit f1fb32c

Please sign in to comment.