Skip to content

Commit

Permalink
feat: add dags ci and cd github actions
Browse files Browse the repository at this point in the history
  • Loading branch information
bibliotechy committed Aug 21, 2023
1 parent 3d4f5a1 commit d5a20f2
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 1 deletion.
30 changes: 30 additions & 0 deletions .github/workflows/dags_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: Dags Check
on:

pull_request:
branches:
- main
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.9"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r dags/requirements.txt
pip check
- name: Lint dags with ruff
run: |
pip install ruff
ruff check --format=github ./dags
- name: Test with Pytest
run: |
pip install pytest apache-airflow
cd dags || exit
pytest tests.py -v
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ We use PR reviews to approve or reject, comment on, and request further iteratio
- LOCALFOLDER


## Dags

Dags that use the rikolti modules in Airflow are defined in the `dags` folder.

A very basic level of testing is done on the dags to ensure tey load as expected.

### Continuous Deployment

On each merge to `main`, an `AWS CodeBuild` task will run, triggered by a webhook emitted from github that the `CodeBuild` project listens for.

## Airflow Development

### Set up `aws-mwaa-local-runner`
Expand Down
3 changes: 3 additions & 0 deletions dags/.airflowignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
tests.py
test_dag.py
README.md
1 change: 0 additions & 1 deletion dags/example_dag.py

This file was deleted.

Empty file added dags/requirements.txt
Empty file.
64 changes: 64 additions & 0 deletions dags/test_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# DAG exhibiting task flow paradigm in airflow 2.0
# https://airflow.apache.org/docs/apache-airflow/2.0.2/tutorial_taskflow_api.html
# Modified for our use case

import json
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
}
@dag(
default_args=default_args,
schedule_interval="@daily",
start_date=days_ago(2),
tags=['test'])
def dag_with_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple ETL data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0

for value in order_data_dict.values():
total_order_value += value

return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""

print("Total order value is: %.2f" % total_order_value)
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
dag_with_taskflow_api = dag_with_taskflow_api()
14 changes: 14 additions & 0 deletions dags/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from unittest import TestCase

from airflow.models import DagBag


DAGS_FOLDER = "."


class HarvestDagsTest(TestCase):
def dag_bag(self):
return DagBag(dag_folder=DAGS_FOLDER, include_examples=False)

def test_no_import_errors(self):
assert not self.dag_bag().import_errors

0 comments on commit d5a20f2

Please sign in to comment.