Skip to content

Commit

Permalink
Python code to predict expiry date modified
Browse files Browse the repository at this point in the history
  • Loading branch information
SonyShrestha committed Apr 21, 2024
1 parent 954cc68 commit 8f67768
Showing 1 changed file with 19 additions and 34 deletions.
53 changes: 19 additions & 34 deletions formatted_zone/estimate_expiry_date.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
from pyspark import SparkConf
from pyspark.sql import SparkSession
import logging
import os
import configparser
import json
from pyspark.sql import functions as F
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, udf, lower, trim, when, initcap, to_date, date_format, datediff, monotonically_increasing_id, col, expr, lit
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import regexp_replace, udf, lower, trim, monotonically_increasing_id, col, expr, lit, row_number
from pyspark.sql.types import IntegerType
from fuzzywuzzy import process, fuzz
import re
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
import spacy
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from nltk.tokenize import word_tokenize
from nltk.tag import pos_tag

Expand Down Expand Up @@ -105,41 +98,39 @@ def spacy_match_score(spacy_filter, spacy_threshold, s1, s2):
gcs_config = config["GCS"]["credentials_path"]
raw_bucket_name = config["GCS"]["raw_bucket_name"]

requires_fuzzy_matching = config_json["product_matching"]["fuzzy_matching"]["apply"]
requires_spacy_matching = config_json["product_matching"]["spacy_matching"]["apply"]

if requires_fuzzy_matching == "yes":
fuzzy_score_calc_method = config_json["product_matching"]["fuzzy_matching"]["score_calc_method"]
fuzzy_threshold = config_json["product_matching"]["fuzzy_matching"]["threshold"]

if requires_spacy_matching == "yes":
spacy_filter_method = config_json["product_matching"]["spacy_matching"]["filter"]
spacy_threshold = config_json["product_matching"]["spacy_matching"]["threshold"]
fuzzy_score_calc_method = config_json["product_matching"]["fuzzy_matching"]["score_calc_method"]
fuzzy_threshold = config_json["product_matching"]["fuzzy_matching"]["threshold"]

spark = SparkSession.builder \
.appName("Read Parquet File") \
.config("spark.sql.repl.eagerEval.enabled", True) \
.getOrCreate()

# Specify the path to the Parquet file
customer_purachase = "./data/formatted_zone/customer_purchase.parquet"
cust_purachase = "./data/formatted_zone/customer_purchase"

expected_avg_expiry = "./data/cleaned/estimated_avg_expiry"
cust_email = "./data/formatted_zone/customers"

expected_avg_expiry = "./data/formatted_zone/estimated_avg_expiry"

# Read the Parquet file into a DataFrame
customer_purachase_df = spark.read.parquet(customer_purachase).limit(100)
cust_purachase_df = spark.read.parquet(cust_purachase).limit(100)
cust_email_df = spark.read.parquet(cust_email)
cust_email_df = cust_email_df.select("customer_id","email_id")

# customer_purachase_df = customer_purachase_df.filter(customer_purachase_df["product_name"] == "Organic Tattva Moong Dal (Muga Dali) (1 kg)")

customer_purachase_df = cust_purachase_df.join(cust_email_df, 'customer_id', 'inner')
customer_purachase_df = customer_purachase_df.select("id","customer_id","customer_name","email_id","product_name","unit_price","quantity","purchase_date")

# customer_purachase_df = customer_purachase_df.filter(customer_purachase_df["product_name"] == "Natureland Organics Chana Besan (500 g)")

customer_purachase_df = customer_purachase_df.withColumn("original_product_name", customer_purachase_df["product_name"])

customer_purachase_df = customer_purachase_df.withColumn("id", monotonically_increasing_id())
customer_purachase_df = customer_purachase_df.select("id","customer_id","customer_name","product_name","unit_price","quantity","purchase_date","original_product_name")\
customer_purachase_df = customer_purachase_df.select("id","customer_id","customer_name","email_id","product_name","unit_price","quantity","purchase_date","original_product_name")\
.withColumn("product_name", lower(regexp_replace(customer_purachase_df["product_name"], "[^a-zA-Z ]", "")))\
.withColumn("product_name", regexp_replace(trim(col("product_name")), "\\s+", " "))


expected_avg_expiry_df = spark.read.json(expected_avg_expiry)
expected_avg_expiry_df = expected_avg_expiry_df.select("product_name","avg_expiry_days")\
.withColumn("product_name", lower(regexp_replace(expected_avg_expiry_df["product_name"], "[^a-zA-Z ]", "")))\
Expand All @@ -150,13 +141,9 @@ def spacy_match_score(spacy_filter, spacy_threshold, s1, s2):

joined_df = customer_purachase_df.crossJoin(expected_avg_expiry_df)

filtered_df = joined_df.select("id","customer_id","customer_name","product_name","unit_price","quantity","purchase_date","original_product_name","product_in_avg_expiry_file","avg_expiry_days")
filtered_df = joined_df.select("id","customer_id","customer_name","email_id","product_name","unit_price","quantity","purchase_date","original_product_name","product_in_avg_expiry_file","avg_expiry_days")

if requires_fuzzy_matching == "yes":
filtered_df = filtered_df.withColumn("score", fuzzy_match_score(lit(fuzzy_score_calc_method), lit(fuzzy_threshold), filtered_df["product_name"], filtered_df["product_in_avg_expiry_file"]))

elif requires_spacy_matching == "yes":
filtered_df = filtered_df.withColumn("score", spacy_match_score(lit(spacy_filter_method), lit(spacy_threshold), filtered_df["product_name"], filtered_df["product_in_avg_expiry_file"]))
filtered_df = filtered_df.withColumn("score", fuzzy_match_score(lit(fuzzy_score_calc_method), lit(fuzzy_threshold), filtered_df["product_name"], filtered_df["product_in_avg_expiry_file"]))


filtered_df = filtered_df.withColumn("token_count", count_tokens(filtered_df["product_name"], filtered_df["product_in_avg_expiry_file"]))
Expand All @@ -177,7 +164,5 @@ def spacy_match_score(spacy_filter, spacy_threshold, s1, s2):
df_with_rn = df_with_rn.withColumnRenamed("original_product_name", "product_name")

df_with_rn = df_with_rn.withColumn("expected_expiry_date", expr("date_add(purchase_date, cast(ceil(avg_expiry_days/2) AS INT))"))

df_with_rn.show()

df_with_rn.write.json("./data/formatted_zone/fuzzy_result")
df_with_rn.write.parquet("./data/formatted_zone/purchases_nearing_expiry")

0 comments on commit 8f67768

Please sign in to comment.