Skip to content

Commit

Permalink
Merge pull request #29 from axiom-of-choice/develop
Browse files Browse the repository at this point in the history
Move common modules to another folder
  • Loading branch information
axiom-of-choice authored Sep 26, 2023
2 parents a66cbc2 + e4b0d1a commit 8f5f9b4
Show file tree
Hide file tree
Showing 24 changed files with 71 additions and 558 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ RUN apt-get update \
ADD requirements.txt .
COPY ./bq_sa.json /opt/airflow/bq_sa.json
USER "${AIRFLOW_UID}:0"
ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow"
RUN pip install -r requirements.txt
RUN pip install --upgrade cffi
RUN airflow db upgrade
18 changes: 10 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Data Engineering Challenge
# Data ELT Pipeline Project

## What is this repo for?

Expand Down Expand Up @@ -30,14 +30,16 @@ I used Pandas as data manipulation layer because it offers a complete solution t
### File structure
The file directory follows and standard etl pipeline structure, we have:
* *airflow/* Includes:
- dags/ directory where it's located the etls modules and custom operators that we will be using into the pipelines DAG to mantain the DAG file clean and organized
- custom_operators: Directory containing all the custom operators
- daily_etl_modules: Modules used in daily_pipeline.py DAG
- hourly_etl_modules: Modules used in hourly_pipeline.py DAG
- daily_pipeline.py and hourly_pipeline.py DAGS
- utils.py. Helper common functiions
- common/ Directory containing all the common modules
- custom_operators: Directory containing all the Airflow custom operators
- aws: AWS custom classes for the use cae (wrapper of boto3)
- gcp: Google Cloud custom classes for the use case (wrapper for gcp library)
- utils.py: Helper common functions
- dags/ directory where it's located the DAGS for bth pipelines
- daily_pipeline.py: DAG
- hourly_pipeline.py: DAG
- *data/* Includes:
- *data_municipios/*: Where it's stored static data about municipios (It should be in another place like a Database or storage service)
- *data_municipios/*: Where it's stored static data about municipios (It is also stored in BigQuery but i placed here for the other users.)
- Airflow config files.: *aiflow.cfg, airflow.db, webserver_config.py*
- queries.toml (File containing queries to be run into BigQuery)
* *example.env* Env file of example (**You need to replace some env variables like S3 and BigQuery configuration)
Expand Down
3 changes: 3 additions & 0 deletions airflow/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .custom_operators.S3_BQ import S3ToBigQuery
from .aws.s3 import S3_Connector
from .gcp.bigquery import BigQueryConnector
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Any, Optional
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
from daily_etl_modules.aws.s3 import S3_Connector
from daily_etl_modules.gcp.bigquery import BigQueryConnector
from common.aws.s3 import S3_Connector
from common.gcp.bigquery import BigQueryConnector
import pandas as pd
import datetime as dt

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
logger.setLevel(logging.DEBUG)
from pathlib import Path
from inspect import getargs
from dotenv import load_dotenv
import requests
import gzip
from common.aws.s3 import S3_Connector

API_URL = os.environ['CONAGUA_API']


BASE_PAHT = Path(__file__).resolve().parent

Expand Down Expand Up @@ -72,4 +79,33 @@ def newfoo(*args, **kwargs):
#print "newfoo called with args=%r kwargs=%r"%(args,kwargs)
some_args = dict((k,kwargs[k]) for k in argspec.args if k in kwargs)
return func(*args, **some_args)
return newfoo
return newfoo


# Extract funcs

def _extract_raw_file(url: str) -> gzip.GzipFile:
''' Requests the endpoint and retrieve the file compressed
Args:
url (str): url of the endpoint. Defaults to "https://smn.conagua.gob.mx/tools/GUI/webservices/?method=1"
Returns:
gzip.GzipFile: Route of the compressed file
'''
try:
logger.info(msg='Requesting endpoint')
ftpstream = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=10)
except Exception as e:
logger.exception(e)
logger.error(msg='Extract raw file failed')
raise ValueError
logger.info(msg='Request successful')
return ftpstream.content

def extract_process(s3_client: S3_Connector, url: str) -> None:
''' Requests the endpoint and uplaods the file to S3 bucket'''
try:
s3_client.upload_s3(bucket=os.environ['S3_BUCKET'], obj=_extract_raw_file(url), key='HourlyForecast_MX.gz')
except Exception as e:
logger.exception(e)
65 changes: 0 additions & 65 deletions airflow/dags/daily_etl_modules/extract.py

This file was deleted.

Empty file.
18 changes: 0 additions & 18 deletions airflow/dags/daily_etl_modules/load.py

This file was deleted.

62 changes: 0 additions & 62 deletions airflow/dags/daily_etl_modules/transform.py

This file was deleted.

9 changes: 4 additions & 5 deletions airflow/dags/daily_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
from airflow.models import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator, S3Hook
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from custom_operators.S3_BQ import S3ToBigQuery
from daily_etl_modules.aws.s3 import S3_Connector
from daily_etl_modules.gcp.bigquery import BigQueryConnector
from common.custom_operators.S3_BQ import S3ToBigQuery
from common.aws.s3 import S3_Connector
from common.gcp.bigquery import BigQueryConnector
import os
from daily_etl_modules.extract import _extract_raw_file, extract_process
from daily_etl_modules.transform import generate_table_1, generate_table_2
from common.utils import _extract_raw_file
from dotenv import load_dotenv
load_dotenv()
import toml
Expand Down
Empty file.
106 changes: 0 additions & 106 deletions airflow/dags/hourly_etl_modules/extract.py

This file was deleted.

Loading

0 comments on commit 8f5f9b4

Please sign in to comment.