Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/chad/dags-deployment' into dags-…
Browse files Browse the repository at this point in the history
…development
  • Loading branch information
amywieliczka committed Aug 15, 2023
2 parents dc73fa9 + 57ffbd8 commit 71abc79
Show file tree
Hide file tree
Showing 42 changed files with 2,235 additions and 105 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/dags_deploy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Sync DAGs

on:
workflow_run:
workflows:
- 'DAGs Check'
types:
- completed
pull_request:
types:
- closed

jobs:
deploy:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- uses: actions/checkout@master
- uses: jakejarvis/s3-sync-action@master
with:
args: --exclude "*" --include "*dags.py" --delete
env:
AWS_S3_BUCKET: ${{ secrets.DAGS_S3_BUCKET }}
AWS_ACCESS_KEY_ID: ${{ secrets.DAGS_S3_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DAGS_S3_SECRET_ACCESS_KEY }}
AWS_REGION: 'us-west-2'
SOURCE_DIR: 'dags'
DEST_DIR: 'dags/rikolti'
32 changes: 32 additions & 0 deletions .github/workflows/dags_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Dags Check
on:
push:
paths:
- 'dags/**'
pull_request:
branches:
- main
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.9"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r dags/requirements.txt
pip check
- name: Lint dags with ruff
run: |
pip install ruff
ruff check --format=github ./dags
- name: Test with Pytest
run: |
pip install pytest apache-airflow
cd dags || exit
pytest tests.py -v
Empty file added __init__.py
Empty file.
3 changes: 1 addition & 2 deletions content_harvester/by_collection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import os
import settings
import boto3
from . import settingsimport boto3
from by_page import harvest_page_content


Expand Down
3 changes: 1 addition & 2 deletions content_harvester/by_page.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import settings
import boto3
from . import settingsimport boto3
import os
import requests
import derivatives
Expand Down
3 changes: 1 addition & 2 deletions content_harvester/tests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import json
import settings
from sample_data.nuxeo_harvests import nuxeo_complex_object_harvests
from . import settingsfrom sample_data.nuxeo_harvests import nuxeo_complex_object_harvests
# nuxeo_harvests, \
# nuxeo_nested_complex_object_harvests
# from sample_data.oac_harvests import oac_harvests
Expand Down
2 changes: 2 additions & 0 deletions dags/.airflowignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tests.py
README.md
13 changes: 13 additions & 0 deletions dags/Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
pytest = "*"
apache-airflow = "*"

[dev-packages]

[requires]
python_version = "3.9"
1,936 changes: 1,936 additions & 0 deletions dags/Pipfile.lock

Large diffs are not rendered by default.

Empty file added dags/README.md
Empty file.
Empty file added dags/requirements.txt
Empty file.
62 changes: 62 additions & 0 deletions dags/test_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# DAG exhibiting task flow paradigm in airflow 2.0
# https://airflow.apache.org/docs/apache-airflow/2.0.2/tutorial_taskflow_api.html
# Modified for our use case

import json

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval="@daily", start_date=days_ago(2), tags=['example'])

Check failure on line 15 in dags/test_dag.py

View workflow job for this annotation

GitHub Actions / test

Ruff (E501)

dags/test_dag.py:15:89: E501 Line too long (101 > 88 characters)
def dag_with_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple ETL data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0

for value in order_data_dict.values():
total_order_value += value

return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""

print("Total order value is: %.2f" % total_order_value)
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
dag_with_taskflow_api = dag_with_taskflow_api()
13 changes: 13 additions & 0 deletions dags/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from unittest import TestCase

from airflow.models import DagBag

DAGS_FOLDER = "."


class HarvestDagsTest(TestCase):
def dag_bag(self):
return DagBag(dag_folder=DAGS_FOLDER, include_examples=False)

def test_no_import_errors(self):
assert not self.dag_bag().import_errors
11 changes: 8 additions & 3 deletions metadata_fetcher/fetch_registry_collections.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import requests
import argparse
import sys
import lambda_function
import logging
import sys

import requests

from . import lambda_function


logger = logging.getLogger(__name__)


def registry_endpoint(url):
page = url
while page:
Expand Down
10 changes: 7 additions & 3 deletions metadata_fetcher/fetchers/Fetcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import requests
import logging
import os
import sys
import settings

import boto3
import logging
import requests

from .. import settings


logger = logging.getLogger(__name__)


Expand Down
15 changes: 9 additions & 6 deletions metadata_fetcher/fetchers/flickr_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import json
import logging
import math
import time

from .Fetcher import Fetcher
import requests
from requests.adapters import HTTPAdapter
from requests.adapters import Retry
from urllib.parse import urlencode
import settings
import logging

import requests
from requests.adapters import HTTPAdapter, Retry

from .. import settings
from .Fetcher import Fetcher


logger = logging.getLogger(__name__)


Expand Down
15 changes: 10 additions & 5 deletions metadata_fetcher/fetchers/nuxeo_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import json
from .Fetcher import Fetcher, InvalidHarvestEndpoint
import logging
import os
import requests
import subprocess

from urllib.parse import quote as urllib_quote

import boto3
import settings
import subprocess
import logging
import requests

from .. import settings
from .Fetcher import Fetcher, InvalidHarvestEndpoint


logger = logging.getLogger(__name__)


Expand Down
4 changes: 3 additions & 1 deletion metadata_fetcher/fetchers/oac_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import requests
from xml.etree import ElementTree

import requests

from .Fetcher import Fetcher
import logging
logger = logging.getLogger(__name__)
Expand Down
17 changes: 12 additions & 5 deletions metadata_fetcher/fetchers/oai_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import json
from xml.etree import ElementTree
from .Fetcher import Fetcher
import logging
from urllib.parse import parse_qs
from sickle import Sickle
from xml.etree import ElementTree

import requests
import logging

from sickle import Sickle

from .Fetcher import Fetcher


logger = logging.getLogger(__name__)


NAMESPACE = {'oai2': 'http://www.openarchives.org/OAI/2.0/'}

Expand Down Expand Up @@ -70,7 +77,7 @@ def check_page(self, http_resp: requests.Response) -> int:
'oai2:ListRecords', NAMESPACE).findall('oai2:record', NAMESPACE)

if len(xml_hits) > 0:
logging.debug(
logger.debug(
f"{self.collection_id}, fetched page {self.write_page} - "
f"{len(xml_hits)} hits,-,-,-,-,-"
)
Expand Down
15 changes: 10 additions & 5 deletions metadata_fetcher/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import json
import boto3
import sys
import settings
import importlib
import json
import logging
from fetchers.Fetcher import Fetcher, InvalidHarvestEndpoint
import sys

import boto3

from . import settings
from .fetchers.Fetcher import Fetcher, InvalidHarvestEndpoint


logger = logging.getLogger(__name__)


def import_fetcher(harvest_type):
fetcher_module = importlib.import_module(
f"fetchers.{harvest_type}_fetcher", package="metadata_fetcher")
Expand Down
1 change: 1 addition & 0 deletions metadata_fetcher/old_media_instructions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os

import boto3


Expand Down
13 changes: 8 additions & 5 deletions metadata_fetcher/tests.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import json
import settings
import argparse
import json
import logging

from lambda_function import fetch_collection
from sample_data.nuxeo_harvests import nuxeo_harvests, \
nuxeo_complex_object_harvests, nuxeo_nested_complex_object_harvests
from sample_data.nuxeo_harvests import (nuxeo_complex_object_harvests,
nuxeo_harvests,
nuxeo_nested_complex_object_harvests)
from sample_data.oac_harvests import oac_harvests
from sample_data.oai_harvests import oai_harvests
from fetch_registry_collections import fetch_endpoint

from . import fetch_endpoint, fetch_registry_collections, import, settingsfrom


def main():
harvests = [
Expand Down
10 changes: 4 additions & 6 deletions metadata_mapper/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import importlib
import json
import logging
import sys

from typing import Union
from urllib.parse import urlparse, parse_qs

import settings
import logging
from urllib.parse import parse_qs, urlparse

from mappers.mapper import UCLDCWriter, Record, Vernacular
from . import settings
from .mappers.mapper import Record, UCLDCWriter, Vernacular

logger = logging.getLogger(__name__)

Expand Down
Loading

0 comments on commit 71abc79

Please sign in to comment.