From 45edda48e0233aca3a470e140e9c3a0ddb2b1ced Mon Sep 17 00:00:00 2001 From: Mitchell Laferla Date: Mon, 3 Jul 2023 09:37:55 -0700 Subject: [PATCH] converted weekly new brand combos script --- .../weekly_autolinker_100_links_audit.py | 2 +- monitoring_weekly/weekly_new_brand_combos.py | 177 ++++++++++++++++++ 2 files changed, 178 insertions(+), 1 deletion(-) create mode 100644 monitoring_weekly/weekly_new_brand_combos.py diff --git a/monitoring_weekly/weekly_autolinker_100_links_audit.py b/monitoring_weekly/weekly_autolinker_100_links_audit.py index af28076..e9f9965 100644 --- a/monitoring_weekly/weekly_autolinker_100_links_audit.py +++ b/monitoring_weekly/weekly_autolinker_100_links_audit.py @@ -78,7 +78,7 @@ def main(mytimer: func.TimerRequest) -> None: if len(sql_df.count()) >0: - filename = dir_path+r'\azure_results\autolinker_100_sample_audit.csv' + filename = dir_path+r'\autolinker_100_sample_audit.csv' sql_df.to_csv(filename, index = False) msgHtml = """ diff --git a/monitoring_weekly/weekly_new_brand_combos.py b/monitoring_weekly/weekly_new_brand_combos.py new file mode 100644 index 0000000..760eb7f --- /dev/null +++ b/monitoring_weekly/weekly_new_brand_combos.py @@ -0,0 +1,177 @@ +import datetime +import logging +import os +import string +import time + +from sqlalchemy import create_engine +import azure.functions as func +import pandas as pd +import sqlalchemy.dialects.sqlite + + +from sparkpost import SparkPost + + + +def main(mytimer: func.TimerRequest) -> None: + utc_timestamp = datetime.datetime.utcnow().replace( + tzinfo=datetime.timezone.utc).isoformat() + + if mytimer.past_due: + logging.info('The timer is past due!') + + logging.info('Python timer trigger function ran at %s', utc_timestamp) + + + + logging.info("Weekly Wednesday New Brand Combos") + +sp = SparkPost(os.environ["SPARKPOST_KEY"]) + +dir_path = os.getcwd() + + +engine = create_engine( + 'snowflake://{user}:{password}@{account}/'.format( + user=os.environ["SNOWFLAKE_USER"], + password=os.environ["SNOWFLAKE_PASSWORD"], + account=os.environ["SNOWFLAKE_ACCOUNT"], + ) +) + + +usedb_string = "use FIVETRAN;" + + +sql_string_s1 = """ +create or replace temp table lizland.public.new_state as +select --b.category, + d.state, + c.name as brand, + min(link_date) as link_dt +from staging.imported_products a +join staging.products b +on a.product_id = b.product_id +join insights.dim_brand c +on b.brand_id = c.brand_id +join base.stores d +on a.store_id = d.store_id +group by 1, 2 +having min(link_date) >= current_date()-7;""" + +sql_string_s2 = """ +--new brands in a state file +select bb.*, listagg(imported_product_id, ', ') as variations +from staging.imported_products a +join staging.products b +on a.product_id = b.product_id +join insights.dim_brand c +on b.brand_id = c.brand_id +join base.stores d +on a.store_id = d.store_id +join lizland.public.new_state bb +on d.state = bb.state +and c.name = bb.brand +group by 1, 2, 3 +order by state; +""" + +sql_string_c1 = """ +create or replace temp table lizland.public.new_cat as +select b.category, + --d.state, + c.name as brand, + min(link_date) as link_dt +from staging.imported_products a +join staging.products b +on a.product_id = b.product_id +join insights.dim_brand c +on b.brand_id = c.brand_id +join base.stores d +on a.store_id = d.store_id +group by 1, 2 +having min(link_date) >= current_date()-7; +""" + +sql_string_c2 = """ +--new brand in a category file +select bb.*, listagg(imported_product_id, ', ') as variations +from staging.imported_products a +join staging.products b +on a.product_id = b.product_id +join insights.dim_brand c +on b.brand_id = c.brand_id +join lizland.public.new_cat bb +on b.category = bb.category +and c.name = bb.brand +group by 1, 2, 3 +order by category; +""" + + +try: + #create connection to DB and tell to use Fivetran + connection = engine.connect() + connection.execute(usedb_string) + + print("executing query") + connection.execute(sql_string_c1) + connection.execute(sql_string_s1) + state_df = pd.read_sql_query(sql_string_s2, connection) + cat_df = pd.read_sql_query(sql_string_c2, connection) + print("query executed") + +finally: + connection.close() + engine.dispose() + + +filename_s = dir_path+r'\new_store_brand_state_combos.csv' +filename_c = dir_path+r'\new_store_brand_cat_combos.csv' +state_df.to_csv(filename_s, index = False) +cat_df.to_csv(filename_c, index = False) + + +if len(state_df.count()) > 0 or len(cat_df.count()) > 0: + msgHtml = """ + +

Hello, friend.

+

See attached for your data:

+

Regards,

+

Me

+ + """ + try: + sp.transmissions.send( + recipients=['mlaferla@headset.io'], + html=msgHtml, + from_email='analytics-monitor@headset.io', + subject="New Brand Combos", + attachments=[ + { + "name": filename_c.split('\\')[-1], + "type": "text/csv", + "filename": filename_c + } + , { + "name": filename_s.split('\\')[-1], + "type": "text/csv", + "filename": filename_s + } + ] + ) + except SparkPostAPIException as err: + # http response status code + print(err.status) + # python requests library response object + # http://docs.python-requests.org/en/master/api/#requests.Response + print(err.response.json()) + # list of formatted errors + print(err.errors) + +# Finally, remove files +os.remove(filename_s) +os.remove(filename_c) + +print("finished") \ No newline at end of file