diff --git a/dags/fetcher_dag.py b/dags/fetcher_dag.py index f43735359..25e510a30 100644 --- a/dags/fetcher_dag.py +++ b/dags/fetcher_dag.py @@ -3,8 +3,8 @@ from airflow.decorators import dag from airflow.models.param import Param -from rikolti.dags.harvest_dag import get_collection_fetchdata_task -from rikolti.dags.harvest_dag import fetch_collection_task +from rikolti.dags.shared_tasks import get_collection_fetchdata_task +from rikolti.dags.shared_tasks import fetch_collection_task @dag( dag_id="fetch_collection", diff --git a/dags/harvest_dag.py b/dags/harvest_dag.py index f445d68e5..fb5d591e1 100644 --- a/dags/harvest_dag.py +++ b/dags/harvest_dag.py @@ -5,8 +5,10 @@ from airflow.models.param import Param from rikolti.metadata_fetcher.lambda_function import fetch_collection -from rikolti.metadata_mapper.lambda_function import map_page -from rikolti.metadata_mapper.lambda_shepherd import get_mapping_status + +from rikolti.dags.shared_tasks import get_collection_metadata_task +from rikolti.dags.shared_tasks import map_page_task +from rikolti.dags.shared_tasks import get_mapping_status_task # TODO: remove the rikoltifetcher registry endpoint and restructure @@ -62,38 +64,6 @@ def fetch_collection_task(collection: dict): ] -@task() -def get_collection_metadata_task(params=None): - if not params or not params.get('collection_id'): - raise ValueError("Collection ID not found in params") - collection_id = params.get('collection_id') - - resp = requests.get( - "https://registry.cdlib.org/api/v1/" - f"rikolticollection/{collection_id}/?format=json" - ) - resp.raise_for_status() - - return resp.json() - - -# max_active_tis_per_dag - setting on the task to restrict how many -# instances can be running at the same time, *across all DAG runs* -@task() -def map_page_task(page: str, collection: dict): - collection_id = collection.get('id') - if not collection_id: - return False - mapped_page = map_page(collection_id, page, collection) - return mapped_page - - -@task() -def get_mapping_status_task(collection: dict, mapped_pages: list): - mapping_status = get_mapping_status(collection, mapped_pages) - return mapping_status - - @dag( dag_id="harvest_collection", schedule=None, diff --git a/dags/mapper_dag.py b/dags/mapper_dag.py index 000656590..c62234cef 100644 --- a/dags/mapper_dag.py +++ b/dags/mapper_dag.py @@ -2,9 +2,9 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from shared_tasks import get_collection_metadata_task -from shared_tasks import map_page_task -from shared_tasks import get_mapping_status_task +from rikolti.dags.shared_tasks import get_collection_metadata_task +from rikolti.dags.shared_tasks import map_page_task +from rikolti.dags.shared_tasks import get_mapping_status_task from rikolti.metadata_mapper.lambda_shepherd import get_vernacular_pages