Skip to content

Commit e4b0d1a

Browse files
Merge pull request #28 from axiom-of-choice/move-aws-s3-modules
Move aws s3 modules
2 parents 7a02675 + c493144 commit e4b0d1a

File tree

24 files changed

+71
-558
lines changed

24 files changed

+71
-558
lines changed

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ RUN apt-get update \
88
ADD requirements.txt .
99
COPY ./bq_sa.json /opt/airflow/bq_sa.json
1010
USER "${AIRFLOW_UID}:0"
11+
ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow"
1112
RUN pip install -r requirements.txt
1213
RUN pip install --upgrade cffi
1314
RUN airflow db upgrade

README.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Data Engineering Challenge
1+
# Data ELT Pipeline Project
22

33
## What is this repo for?
44

@@ -30,14 +30,16 @@ I used Pandas as data manipulation layer because it offers a complete solution t
3030
### File structure
3131
The file directory follows and standard etl pipeline structure, we have:
3232
* *airflow/* Includes:
33-
- dags/ directory where it's located the etls modules and custom operators that we will be using into the pipelines DAG to mantain the DAG file clean and organized
34-
- custom_operators: Directory containing all the custom operators
35-
- daily_etl_modules: Modules used in daily_pipeline.py DAG
36-
- hourly_etl_modules: Modules used in hourly_pipeline.py DAG
37-
- daily_pipeline.py and hourly_pipeline.py DAGS
38-
- utils.py. Helper common functiions
33+
- common/ Directory containing all the common modules
34+
- custom_operators: Directory containing all the Airflow custom operators
35+
- aws: AWS custom classes for the use cae (wrapper of boto3)
36+
- gcp: Google Cloud custom classes for the use case (wrapper for gcp library)
37+
- utils.py: Helper common functions
38+
- dags/ directory where it's located the DAGS for bth pipelines
39+
- daily_pipeline.py: DAG
40+
- hourly_pipeline.py: DAG
3941
- *data/* Includes:
40-
- *data_municipios/*: Where it's stored static data about municipios (It should be in another place like a Database or storage service)
42+
- *data_municipios/*: Where it's stored static data about municipios (It is also stored in BigQuery but i placed here for the other users.)
4143
- Airflow config files.: *aiflow.cfg, airflow.db, webserver_config.py*
4244
- queries.toml (File containing queries to be run into BigQuery)
4345
* *example.env* Env file of example (**You need to replace some env variables like S3 and BigQuery configuration)

airflow/common/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .custom_operators.S3_BQ import S3ToBigQuery
2+
from .aws.s3 import S3_Connector
3+
from .gcp.bigquery import BigQueryConnector
File renamed without changes.

airflow/dags/custom_operators/S3_BQ.py renamed to airflow/common/custom_operators/S3_BQ.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from typing import Any, Optional
22
from airflow.models.baseoperator import BaseOperator
33
from airflow.utils.context import Context
4-
from daily_etl_modules.aws.s3 import S3_Connector
5-
from daily_etl_modules.gcp.bigquery import BigQueryConnector
4+
from common.aws.s3 import S3_Connector
5+
from common.gcp.bigquery import BigQueryConnector
66
import pandas as pd
77
import datetime as dt
88

airflow/dags/daily_etl_modules/utils.py renamed to airflow/common/utils.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@
77
logger.setLevel(logging.DEBUG)
88
from pathlib import Path
99
from inspect import getargs
10+
from dotenv import load_dotenv
11+
import requests
12+
import gzip
13+
from common.aws.s3 import S3_Connector
14+
15+
API_URL = os.environ['CONAGUA_API']
16+
1017

1118
BASE_PAHT = Path(__file__).resolve().parent
1219

@@ -72,4 +79,33 @@ def newfoo(*args, **kwargs):
7279
#print "newfoo called with args=%r kwargs=%r"%(args,kwargs)
7380
some_args = dict((k,kwargs[k]) for k in argspec.args if k in kwargs)
7481
return func(*args, **some_args)
75-
return newfoo
82+
return newfoo
83+
84+
85+
# Extract funcs
86+
87+
def _extract_raw_file(url: str) -> gzip.GzipFile:
88+
''' Requests the endpoint and retrieve the file compressed
89+
90+
Args:
91+
url (str): url of the endpoint. Defaults to "https://smn.conagua.gob.mx/tools/GUI/webservices/?method=1"
92+
93+
Returns:
94+
gzip.GzipFile: Route of the compressed file
95+
'''
96+
try:
97+
logger.info(msg='Requesting endpoint')
98+
ftpstream = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=10)
99+
except Exception as e:
100+
logger.exception(e)
101+
logger.error(msg='Extract raw file failed')
102+
raise ValueError
103+
logger.info(msg='Request successful')
104+
return ftpstream.content
105+
106+
def extract_process(s3_client: S3_Connector, url: str) -> None:
107+
''' Requests the endpoint and uplaods the file to S3 bucket'''
108+
try:
109+
s3_client.upload_s3(bucket=os.environ['S3_BUCKET'], obj=_extract_raw_file(url), key='HourlyForecast_MX.gz')
110+
except Exception as e:
111+
logger.exception(e)

airflow/dags/daily_etl_modules/extract.py

Lines changed: 0 additions & 65 deletions
This file was deleted.

airflow/dags/daily_etl_modules/gcp/__init__.py

Whitespace-only changes.

airflow/dags/daily_etl_modules/load.py

Lines changed: 0 additions & 18 deletions
This file was deleted.

airflow/dags/daily_etl_modules/transform.py

Lines changed: 0 additions & 62 deletions
This file was deleted.

airflow/dags/daily_pipeline.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77
from airflow.models import DAG
88
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator, S3Hook
99
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
10-
from custom_operators.S3_BQ import S3ToBigQuery
11-
from daily_etl_modules.aws.s3 import S3_Connector
12-
from daily_etl_modules.gcp.bigquery import BigQueryConnector
10+
from common.custom_operators.S3_BQ import S3ToBigQuery
11+
from common.aws.s3 import S3_Connector
12+
from common.gcp.bigquery import BigQueryConnector
1313
import os
14-
from daily_etl_modules.extract import _extract_raw_file, extract_process
15-
from daily_etl_modules.transform import generate_table_1, generate_table_2
14+
from common.utils import _extract_raw_file
1615
from dotenv import load_dotenv
1716
load_dotenv()
1817
import toml

airflow/dags/hourly_etl_modules/__init__.py

Whitespace-only changes.

airflow/dags/hourly_etl_modules/extract.py

Lines changed: 0 additions & 106 deletions
This file was deleted.

0 commit comments

Comments
 (0)