Skip to content

Commit

Permalink
fix: factor out shared tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
bibliotechy committed Sep 29, 2023
1 parent 62b0f50 commit 91b3d6c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
6 changes: 3 additions & 3 deletions dags/mapper_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from rikolti.dags.harvest_dag import get_collection_metadata_task
from rikolti.dags.harvest_dag import map_page_task
from rikolti.dags.harvest_dag import get_mapping_status_task
from shared_tasks import get_collection_metadata_task
from shared_tasks import map_page_task
from shared_tasks import get_mapping_status_task
from rikolti.metadata_mapper.lambda_shepherd import get_vernacular_pages


Expand Down
35 changes: 35 additions & 0 deletions dags/shared_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import requests
from airflow.decorators import task
from rikolti.metadata_mapper.lambda_function import map_page
from rikolti.metadata_mapper.lambda_shepherd import get_mapping_status

@task()
def get_collection_metadata_task(params=None):
if not params or not params.get('collection_id'):
raise ValueError("Collection ID not found in params")
collection_id = params.get('collection_id')

resp = requests.get(
"https://registry.cdlib.org/api/v1/"
f"rikolticollection/{collection_id}/?format=json"
)
resp.raise_for_status()

return resp.json()


# max_active_tis_per_dag - setting on the task to restrict how many
# instances can be running at the same time, *across all DAG runs*
@task()
def map_page_task(page: str, collection: dict):
collection_id = collection.get('id')
if not collection_id:
return False
mapped_page = map_page(collection_id, page, collection)
return mapped_page


@task()
def get_mapping_status_task(collection: dict, mapped_pages: list):
mapping_status = get_mapping_status(collection, mapped_pages)
return mapping_status

0 comments on commit 91b3d6c

Please sign in to comment.