Skip to content

Commit

Permalink
fix: all dags use shared tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
bibliotechy committed Sep 29, 2023
1 parent 91b3d6c commit 96f181b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 39 deletions.
4 changes: 2 additions & 2 deletions dags/fetcher_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from airflow.decorators import dag
from airflow.models.param import Param

from rikolti.dags.harvest_dag import get_collection_fetchdata_task
from rikolti.dags.harvest_dag import fetch_collection_task
from rikolti.dags.shared_tasks import get_collection_fetchdata_task
from rikolti.dags.shared_tasks import fetch_collection_task

@dag(
dag_id="fetch_collection",
Expand Down
38 changes: 4 additions & 34 deletions dags/harvest_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from airflow.models.param import Param

from rikolti.metadata_fetcher.lambda_function import fetch_collection
from rikolti.metadata_mapper.lambda_function import map_page
from rikolti.metadata_mapper.lambda_shepherd import get_mapping_status

from rikolti.dags.shared_tasks import get_collection_metadata_task
from rikolti.dags.shared_tasks import map_page_task
from rikolti.dags.shared_tasks import get_mapping_status_task


# TODO: remove the rikoltifetcher registry endpoint and restructure
Expand Down Expand Up @@ -62,38 +64,6 @@ def fetch_collection_task(collection: dict):
]


@task()
def get_collection_metadata_task(params=None):
if not params or not params.get('collection_id'):
raise ValueError("Collection ID not found in params")
collection_id = params.get('collection_id')

resp = requests.get(
"https://registry.cdlib.org/api/v1/"
f"rikolticollection/{collection_id}/?format=json"
)
resp.raise_for_status()

return resp.json()


# max_active_tis_per_dag - setting on the task to restrict how many
# instances can be running at the same time, *across all DAG runs*
@task()
def map_page_task(page: str, collection: dict):
collection_id = collection.get('id')
if not collection_id:
return False
mapped_page = map_page(collection_id, page, collection)
return mapped_page


@task()
def get_mapping_status_task(collection: dict, mapped_pages: list):
mapping_status = get_mapping_status(collection, mapped_pages)
return mapping_status


@dag(
dag_id="harvest_collection",
schedule=None,
Expand Down
6 changes: 3 additions & 3 deletions dags/mapper_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from shared_tasks import get_collection_metadata_task
from shared_tasks import map_page_task
from shared_tasks import get_mapping_status_task
from rikolti.dags.shared_tasks import get_collection_metadata_task
from rikolti.dags.shared_tasks import map_page_task
from rikolti.dags.shared_tasks import get_mapping_status_task
from rikolti.metadata_mapper.lambda_shepherd import get_vernacular_pages


Expand Down

0 comments on commit 96f181b

Please sign in to comment.