From f11c4e1678757d181cb49b2decbe4c3cd0636bdc Mon Sep 17 00:00:00 2001 From: Pratistha Date: Sun, 7 Apr 2024 19:20:08 +0200 Subject: [PATCH] initial code for dag synthetic and collector --- .DS_Store | Bin 6148 -> 8196 bytes config.ini | 1 + dags/collector.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++ dags/synthetic.py | 44 +++++++++++++++++++++++++++++++ 4 files changed, 110 insertions(+) create mode 100644 dags/collector.py create mode 100644 dags/synthetic.py diff --git a/.DS_Store b/.DS_Store index f12a3f9fa02564cc27a6aca39d742244fd794847..ad0a9007ce9b849d576d54093e1518b84acaff0c 100644 GIT binary patch delta 305 zcmZoMXmOBWU|?W$DortDU;r^WfEYvza8E20o2aMAD7Z0TH}hr%jz7$c**Q2SHn1=X zPUc~;txsnt_RPsoPRhwo0x1U?@)JnU`ws>{76StdLkdG815jN~x?yl~er^F+6hgq| za`RnWpk{EGtZ5Bvd3e+jS#NHB3RZbBnBfH($cAsWWBJRdna+^RP|Og|P{NQ3q)Qm| z7z!9F(OlaNv}N9AB{pTo&FVbQnYkpmfdZ}|k8c*__|80;U&M2=KMw~7BP2{1HplbK GVFmzGMMl2> delta 112 zcmZp1XfcprU|?W$DortDU=RQ@Ie-{Mvv5r;6q~50$jH4hU^g=(_hcRc+s!WpJ~3{V y73N}`*l=huI|qj#Gf*WE2yg=lSCFQSh2NPc^UHXGj9_4b7zeU}VRJms9A*H6z!QA{ diff --git a/config.ini b/config.ini index 0a04bda..7a36141 100644 --- a/config.ini +++ b/config.ini @@ -1,5 +1,6 @@ [COMMON] raw_data_dir=C:\UPC\BDM\\Project\\VBP_Joint_Project\\data\\raw +synthetic_dir = ./landing_zone/synthetic/ [EAT_BY_DATE] url=https://eatbydate.com/ diff --git a/dags/collector.py b/dags/collector.py new file mode 100644 index 0000000..498ebe4 --- /dev/null +++ b/dags/collector.py @@ -0,0 +1,65 @@ +from airflow import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator +from datetime import datetime, timedelta + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2024, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag_collectors = DAG( + 'collectors_monthly_processing', + default_args=default_args, + description='Run collector scripts monthly', + schedule_interval='0 0 1 * *', # Monthly on the first day + catchup=False +) + +# Task for Scrapy crawler for approvedfood_groceries +scrapy_approvedfood_task = BashOperator( + task_id='scrapy_approvedfood_groceries', + bash_command='scrapy crawl approvedfood_groceries -O ./data/raw/Approvedfood.json', + dag=dag_collectors, +) + +# Task for Scrapy crawler for mealdb +scrapy_mealdb_task = BashOperator( + task_id='scrapy_mealdb', + bash_command='scrapy crawl mealdb -O ./data/raw/mealdb.json', + dag=dag_collectors, +) + +# Tasks for running Python scripts +big_basket_task = BashOperator( + task_id='big_basket', + bash_command='python ./landing_zone/collectors/big_basket/big_basket.py', + dag=dag_collectors, +) + +establishments_scraper_task = BashOperator( + task_id='establishments_scraper', + bash_command='python ./landing_zone/collectors/catalonia_establishment_location/establishments_scraper.py', + dag=dag_collectors, +) + +sm_retail_customer_task = BashOperator( + task_id='sm_retail_customer', + bash_command='python ./landing_zone/collectors/customers/sm_retail_customer.py', + dag=dag_collectors, +) + +eat_by_date_task = BashOperator( + task_id='eat_by_date', + bash_command='python ./landing_zone/collectors/eat_by_date/eat_by_date.py', + dag=dag_collectors, +) + +scrap_flipkart_pages_task = BashOperator( + task_id='scrap_flipkart_pages', + bash_command='python ./landing_zone/collectors/flipkart/scrap_flipkart_pages_sel.py', + dag=dag_collectors, +) diff --git a/dags/synthetic.py b/dags/synthetic.py new file mode 100644 index 0000000..55d2431 --- /dev/null +++ b/dags/synthetic.py @@ -0,0 +1,44 @@ +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from datetime import datetime, timedelta +import os +import configparser + + +def load_config(config_path): + config = configparser.ConfigParser() + config.read(config_path) + return config + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2024, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +# Initialize the DAG for synthetic scripts +dag_synthetic = DAG( + 'synthetic_daily_processing', + default_args=default_args, + description='Process synthetic scripts daily', + schedule_interval='0 0 * * *', # Daily at midnight + catchup=False +) + +config = load_config(config_path) +synthetic_dir = config['COMMON']['synthetic_dir'] + +def run_synthetic_script(filename): + exec(open(f"{synthetic_dir}/{filename}").read(), {'__name__': '__main__'}) + +# Define tasks for each synthetic script +for filename in os.listdir(synthetic_dir): + if filename.endswith('.py'): + task = PythonOperator( + task_id=f'run_{filename[:-3]}', # Remove .py extension for task_id + python_callable=run_synthetic_script, + op_kwargs={'filename': filename}, + dag=dag_synthetic, + )