Skip to content

Commit

Permalink
initial code for dag synthetic and collector
Browse files Browse the repository at this point in the history
  • Loading branch information
Pratistha authored and Pratistha committed Apr 7, 2024
1 parent cd19bf2 commit f11c4e1
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 0 deletions.
Binary file modified .DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions config.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[COMMON]
raw_data_dir=C:\UPC\BDM\\Project\\VBP_Joint_Project\\data\\raw
synthetic_dir = ./landing_zone/synthetic/

[EAT_BY_DATE]
url=https://eatbydate.com/
Expand Down
65 changes: 65 additions & 0 deletions dags/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag_collectors = DAG(
'collectors_monthly_processing',
default_args=default_args,
description='Run collector scripts monthly',
schedule_interval='0 0 1 * *', # Monthly on the first day
catchup=False
)

# Task for Scrapy crawler for approvedfood_groceries
scrapy_approvedfood_task = BashOperator(
task_id='scrapy_approvedfood_groceries',
bash_command='scrapy crawl approvedfood_groceries -O ./data/raw/Approvedfood.json',
dag=dag_collectors,
)

# Task for Scrapy crawler for mealdb
scrapy_mealdb_task = BashOperator(
task_id='scrapy_mealdb',
bash_command='scrapy crawl mealdb -O ./data/raw/mealdb.json',
dag=dag_collectors,
)

# Tasks for running Python scripts
big_basket_task = BashOperator(
task_id='big_basket',
bash_command='python ./landing_zone/collectors/big_basket/big_basket.py',
dag=dag_collectors,
)

establishments_scraper_task = BashOperator(
task_id='establishments_scraper',
bash_command='python ./landing_zone/collectors/catalonia_establishment_location/establishments_scraper.py',
dag=dag_collectors,
)

sm_retail_customer_task = BashOperator(
task_id='sm_retail_customer',
bash_command='python ./landing_zone/collectors/customers/sm_retail_customer.py',
dag=dag_collectors,
)

eat_by_date_task = BashOperator(
task_id='eat_by_date',
bash_command='python ./landing_zone/collectors/eat_by_date/eat_by_date.py',
dag=dag_collectors,
)

scrap_flipkart_pages_task = BashOperator(
task_id='scrap_flipkart_pages',
bash_command='python ./landing_zone/collectors/flipkart/scrap_flipkart_pages_sel.py',
dag=dag_collectors,
)
44 changes: 44 additions & 0 deletions dags/synthetic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os
import configparser


def load_config(config_path):
config = configparser.ConfigParser()
config.read(config_path)
return config

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

# Initialize the DAG for synthetic scripts
dag_synthetic = DAG(
'synthetic_daily_processing',
default_args=default_args,
description='Process synthetic scripts daily',
schedule_interval='0 0 * * *', # Daily at midnight
catchup=False
)

config = load_config(config_path)
synthetic_dir = config['COMMON']['synthetic_dir']

def run_synthetic_script(filename):
exec(open(f"{synthetic_dir}/{filename}").read(), {'__name__': '__main__'})

# Define tasks for each synthetic script
for filename in os.listdir(synthetic_dir):
if filename.endswith('.py'):
task = PythonOperator(
task_id=f'run_{filename[:-3]}', # Remove .py extension for task_id
python_callable=run_synthetic_script,
op_kwargs={'filename': filename},
dag=dag_synthetic,
)

0 comments on commit f11c4e1

Please sign in to comment.