diff --git a/.DS_Store b/.DS_Store index f12a3f9..ad0a900 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/config.ini b/config.ini index 0a04bda..7a36141 100644 --- a/config.ini +++ b/config.ini @@ -1,5 +1,6 @@ [COMMON] raw_data_dir=C:\UPC\BDM\\Project\\VBP_Joint_Project\\data\\raw +synthetic_dir = ./landing_zone/synthetic/ [EAT_BY_DATE] url=https://eatbydate.com/ diff --git a/dags/collector.py b/dags/collector.py new file mode 100644 index 0000000..498ebe4 --- /dev/null +++ b/dags/collector.py @@ -0,0 +1,65 @@ +from airflow import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator +from datetime import datetime, timedelta + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2024, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag_collectors = DAG( + 'collectors_monthly_processing', + default_args=default_args, + description='Run collector scripts monthly', + schedule_interval='0 0 1 * *', # Monthly on the first day + catchup=False +) + +# Task for Scrapy crawler for approvedfood_groceries +scrapy_approvedfood_task = BashOperator( + task_id='scrapy_approvedfood_groceries', + bash_command='scrapy crawl approvedfood_groceries -O ./data/raw/Approvedfood.json', + dag=dag_collectors, +) + +# Task for Scrapy crawler for mealdb +scrapy_mealdb_task = BashOperator( + task_id='scrapy_mealdb', + bash_command='scrapy crawl mealdb -O ./data/raw/mealdb.json', + dag=dag_collectors, +) + +# Tasks for running Python scripts +big_basket_task = BashOperator( + task_id='big_basket', + bash_command='python ./landing_zone/collectors/big_basket/big_basket.py', + dag=dag_collectors, +) + +establishments_scraper_task = BashOperator( + task_id='establishments_scraper', + bash_command='python ./landing_zone/collectors/catalonia_establishment_location/establishments_scraper.py', + dag=dag_collectors, +) + +sm_retail_customer_task = BashOperator( + task_id='sm_retail_customer', + bash_command='python ./landing_zone/collectors/customers/sm_retail_customer.py', + dag=dag_collectors, +) + +eat_by_date_task = BashOperator( + task_id='eat_by_date', + bash_command='python ./landing_zone/collectors/eat_by_date/eat_by_date.py', + dag=dag_collectors, +) + +scrap_flipkart_pages_task = BashOperator( + task_id='scrap_flipkart_pages', + bash_command='python ./landing_zone/collectors/flipkart/scrap_flipkart_pages_sel.py', + dag=dag_collectors, +) diff --git a/dags/synthetic.py b/dags/synthetic.py new file mode 100644 index 0000000..55d2431 --- /dev/null +++ b/dags/synthetic.py @@ -0,0 +1,44 @@ +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from datetime import datetime, timedelta +import os +import configparser + + +def load_config(config_path): + config = configparser.ConfigParser() + config.read(config_path) + return config + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2024, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +# Initialize the DAG for synthetic scripts +dag_synthetic = DAG( + 'synthetic_daily_processing', + default_args=default_args, + description='Process synthetic scripts daily', + schedule_interval='0 0 * * *', # Daily at midnight + catchup=False +) + +config = load_config(config_path) +synthetic_dir = config['COMMON']['synthetic_dir'] + +def run_synthetic_script(filename): + exec(open(f"{synthetic_dir}/{filename}").read(), {'__name__': '__main__'}) + +# Define tasks for each synthetic script +for filename in os.listdir(synthetic_dir): + if filename.endswith('.py'): + task = PythonOperator( + task_id=f'run_{filename[:-3]}', # Remove .py extension for task_id + python_callable=run_synthetic_script, + op_kwargs={'filename': filename}, + dag=dag_synthetic, + )