Skip to content

Commit

Permalink
Static code modified
Browse files Browse the repository at this point in the history
  • Loading branch information
SonyShrestha committed May 31, 2024
1 parent 03b98eb commit cb99a05
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 35 deletions.
42 changes: 34 additions & 8 deletions formatted_zone/business_review_sentiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,38 @@
from pyspark.sql.types import StringType, FloatType
from dotenv import load_dotenv
from datetime import datetime
import configparser
import json
import logging

# # Load environment variables
# os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
# os.environ["PYSPARK_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
# os.environ["PYSPARK_DRIVER_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
# load_dotenv()


logging.basicConfig(level=logging.INFO) # Set log level to INFO

# Create logger object
logger = logging.getLogger()

# Get the path to the parent parent directory
config_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config file
config_file_path = os.path.join(config_dir, "config.ini")
config = configparser.ConfigParser()
config.read(config_file_path)

config_file_path_json = os.path.join(config_dir, "config.json")
with open(config_file_path_json) as f:
config_json = json.load(f)

gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]

# Load environment variables
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["PYSPARK_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
load_dotenv()

def create_spark_session():
spark = SparkSession.builder \
Expand All @@ -24,7 +50,7 @@ def create_spark_session():
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/home/pce/Documents/VBP_Joint_Project-main/formal-atrium-418823-7fbbc75ebbc6.json") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # Set log level to ERROR
return spark
Expand Down Expand Up @@ -54,7 +80,7 @@ def analyze_sentiment(text):

if __name__ == "__main__":
spark = create_spark_session()
bussiness_df = preprocess_data(spark, "gs://spicy_1/business_reviews_20240407221817.parquet")
bussiness_df = preprocess_data(spark, f"gs://{raw_bucket_name}/business_reviews_*")

# Load the sentiment-analysis pipeline
sentiment_pipeline = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment-latest", tokenizer="cardiffnlp/twitter-roberta-base-sentiment-latest")
Expand All @@ -73,7 +99,7 @@ def analyze_sentiment(text):

# Save the results to a Parquet file
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
output_path = f"gs://formatted_zone/business_sentiment_{timestamp}.parquet"
output_path = f"gs://{formatted_bucket_name}/business_sentiment_{timestamp}.parquet"

bussiness_df.write.mode("overwrite").parquet(output_path)
bussiness_df.show()
Expand Down
2 changes: 1 addition & 1 deletion formatted_zone/customer_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()

df_customer_loc = spark.read.parquet('gs://'+raw_bucket_name+'/customer_location*')
df_customer_loc = spark.read.parquet(f'gs://{raw_bucket_name}/customer_location*')

logger.info('-----------------------------------------------------')
logger.info("Cleaning data for customer_location")
Expand Down
2 changes: 1 addition & 1 deletion formatted_zone/customer_sales.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
.getOrCreate()


df = spark.read.parquet('purchases_nearing_expiry')
df = spark.read.parquet('purchases_nearing_expiry*')


def generate_expiry_date(purchase_date):
Expand Down
2 changes: 1 addition & 1 deletion formatted_zone/customers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
.getOrCreate()

# Read the Parquet file into a DataFrame from GCS Raw Bucket
customers_df = spark.read.parquet('gs://'+raw_bucket_name+'/customers*.parquet')
customers_df = spark.read.parquet(f'gs://{raw_bucket_name}/customers*.parquet')

logger.info('-----------------------------------------------------')
logger.info("Cleaning data for customers")
Expand Down
2 changes: 1 addition & 1 deletion formatted_zone/dynamic_pricing.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def rule_based_pricing(days_to_expiry, consumption_rate, base_price, avg_expiry_
pricing_udf = udf(rule_based_pricing, DoubleType())

# Define the path to the parquet file
parquet_path = "platform_customer_pricing_data_output"
parquet_path = "platform_customer_pricing_data_output*"
df = spark.read.parquet(parquet_path)

df.show(20)
Expand Down
2 changes: 1 addition & 1 deletion formatted_zone/establishments_catalonia.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
# Read the Parquet file into a DataFrame from GCS Raw Bucket
# supermarket_loc = "./data/gcs_raw_parquet/establishments_catalonia.parquet"
# df_supermarket_info = spark.read.parquet(supermarket_loc)
df_supermarket_info = spark.read.parquet('gs://'+raw_bucket_name+'/establishments_catalonia*')
df_supermarket_info = spark.read.parquet(f'gs://{raw_bucket_name}/establishments_catalonia*')

logger.info('-----------------------------------------------------')
logger.info("Cleaning data for establishments_catalonia")
Expand Down
2 changes: 1 addition & 1 deletion formatted_zone/estimate_expiry_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def count_tokens(s1, s2):
# Specify the path to the Parquet file
cust_purachase = f'gs://{formatted_bucket_name}/customer_purchase*'
cust_email = f'gs://{formatted_bucket_name}/customers*'
expected_avg_expiry = 'gs://'+formatted_bucket_name+'/estimated_avg_expiry*'
expected_avg_expiry = f'gs://{formatted_bucket_name}/estimated_avg_expiry*'

# Read the Parquet file into a DataFrame
cust_purachase_df = spark.read.parquet(cust_purachase)
Expand Down
6 changes: 3 additions & 3 deletions formatted_zone/estimate_perishability.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,17 @@ def preprocess_approved_food(approved_food_df, scrapped_date):
break

# Read the Parquet file into a DataFrame
flipkart_df = spark.read.parquet('gs://'+raw_bucket_name+'/flipkart*')
flipkart_df = spark.read.parquet(f'gs://{raw_bucket_name}/flipkart*')

avg_expiry_date_flipkart_df = preprocess_flipkart(flipkart_df)

# Read the Parquet file into a DataFrame
eat_by_date_df = spark.read.parquet('gs://'+raw_bucket_name+'/eat_by_date*')
eat_by_date_df = spark.read.parquet(f'gs://{raw_bucket_name}/eat_by_date*')

avg_expiry_date_eat_by_date_df = preprocess_eat_by_date(eat_by_date_df, item_desc_filter_out)

# Read the Parquet file into a DataFrame
approved_food_df = spark.read.parquet('gs://'+raw_bucket_name+'/Approved*')
approved_food_df = spark.read.parquet(f'gs://{raw_bucket_name}/Approved*')

avg_expiry_date_approved_food_df = preprocess_approved_food(approved_food_df,scrapped_date)

Expand Down
2 changes: 1 addition & 1 deletion formatted_zone/expiry_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def send_email(row):
.getOrCreate()

# Specify the path to the Parquet file
estimated_avg_expiry = spark.read.parquet('gs://'+formatted_bucket_name+'/purchases_nearing_expiry.*')
estimated_avg_expiry = spark.read.parquet(f'gs://{formatted_bucket_name}/purchases_nearing_expiry.*')

# Read the Parquet file into a DataFrame
estimated_avg_expiry_df = spark.read.parquet(estimated_avg_expiry)
Expand Down
40 changes: 32 additions & 8 deletions formatted_zone/individual_review_sentiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,36 @@
from pyspark.sql.types import StringType, FloatType
from dotenv import load_dotenv
from datetime import datetime
import configparser
import logging
import json

# Load environment variables
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["PYSPARK_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
load_dotenv()
# # Load environment variables
# os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
# os.environ["PYSPARK_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
# os.environ["PYSPARK_DRIVER_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
# load_dotenv()

logging.basicConfig(level=logging.INFO) # Set log level to INFO

# Create logger object
logger = logging.getLogger()

# Get the path to the parent parent directory
config_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config file
config_file_path = os.path.join(config_dir, "config.ini")
config = configparser.ConfigParser()
config.read(config_file_path)

config_file_path_json = os.path.join(config_dir, "config.json")
with open(config_file_path_json) as f:
config_json = json.load(f)

gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]

def create_spark_session():
spark = SparkSession.builder \
Expand All @@ -24,7 +48,7 @@ def create_spark_session():
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/home/pce/Documents/VBP_Joint_Project-main/formal-atrium-418823-7fbbc75ebbc6.json") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # Set log level to ERROR
return spark
Expand Down Expand Up @@ -54,7 +78,7 @@ def analyze_sentiment(text):

if __name__ == "__main__":
spark = create_spark_session()
review_df = preprocess_data(spark, "gs://spicy_1/individual_reviews_20240407221822.parquet")
review_df = preprocess_data(spark, f"gs://{raw_bucket_name}/individual_reviews_*")

# Load the sentiment-analysis pipeline
sentiment_pipeline = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment-latest", tokenizer="cardiffnlp/twitter-roberta-base-sentiment-latest")
Expand All @@ -73,7 +97,7 @@ def analyze_sentiment(text):
# Save the results to a Parquet file
# Generate a timestamp for the output file
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
output_path = f"gs://formatted_zone/customer_sentiment_{timestamp}.parquet"
output_path = f"gs://{formatted_bucket_name}/customer_sentiment_{timestamp}.parquet"

review_df.write.mode("overwrite").parquet(output_path)
review_df.show()
Expand Down
2 changes: 1 addition & 1 deletion formatted_zone/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
.getOrCreate()

# Read the Parquet file into a DataFrame from GCS Raw Bucket
df_location = spark.read.parquet('gs://'+raw_bucket_name+'/location*')
df_location = spark.read.parquet(f'gs://{raw_bucket_name}/location*')

logger.info('-----------------------------------------------------')
logger.info("Cleaning data for location (customers)")
Expand Down
40 changes: 32 additions & 8 deletions formatted_zone/mealdbrecomend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,37 @@
import json
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import torch
import configparser
import logging
import json

# # Set environment variables
# os.environ["PYSPARK_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
# os.environ["PYSPARK_DRIVER_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
# os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

# load_dotenv()

logging.basicConfig(level=logging.INFO) # Set log level to INFO

# Create logger object
logger = logging.getLogger()

# Get the path to the parent parent directory
config_dir = os.path.abspath(os.path.join(os.getcwd()))

# Specify the path to config file
config_file_path = os.path.join(config_dir, "config.ini")
config = configparser.ConfigParser()
config.read(config_file_path)

# Set environment variables
os.environ["PYSPARK_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/home/pce/anaconda3/envs/spark_env/bin/python3.11"
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
config_file_path_json = os.path.join(config_dir, "config.json")
with open(config_file_path_json) as f:
config_json = json.load(f)

load_dotenv()
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]
formatted_bucket_name = config["GCS"]["formatted_bucket_name"]

def create_spark_session():
spark = SparkSession.builder \
Expand All @@ -25,7 +49,7 @@ def create_spark_session():
.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/home/pce/Documents/VBP_Joint_Project-main/formal-atrium-418823-7fbbc75ebbc6.json") \
.config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", gcs_config) \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
return spark
Expand Down Expand Up @@ -95,8 +119,8 @@ def save_to_json_file(data, file_path):
print(f"Data has been written to {file_path}")

def initialize():
input_path = "gs://spicy_1/mealdb_20240407221823.parquet"
output_path = "gs://formatted_zone"
input_path = f"gs://{raw_bucket_name}/mealdb_*"
output_path = f"gs://{formatted_bucket_name}"
processed_df, full_path = preprocess_and_save_data(input_path, output_path)
return processed_df

Expand Down

0 comments on commit cb99a05

Please sign in to comment.