diff --git a/dags/harvest_dag.py b/dags/harvest_dag.py index 914ee772b..4244a7550 100644 --- a/dags/harvest_dag.py +++ b/dags/harvest_dag.py @@ -78,13 +78,15 @@ def merge_children(version): @task() -def get_mapped_page_filenames_task(mapped_pages): - mapped_pages = [ - [mapped['mapped_page_path'] for mapped in mapped_page_list - if mapped['mapped_page_path']] - for mapped_page_list in mapped_pages] +def get_mapped_page_filenames_task(mapped_page_batches: list[list[dict]]): + batches = [] + for mapped_page_batch in mapped_page_batches: + batch = [ + mapped_page['mapped_page_path'] for mapped_page in mapped_page_batch + if mapped_page['mapped_page_path']] + batches.append(json.dumps(batch)) - return json.dumps(mapped_pages) + return batches @dag( @@ -104,24 +106,24 @@ def harvest(): collection = get_collection_metadata_task() vernacular_version = create_vernacular_version_task(collection=fetchdata) - fetched_pages = fetch_collection_task( + fetched_page_batches = fetch_collection_task( collection=fetchdata, vernacular_version=vernacular_version) mapped_data_version = create_mapped_version_task( collection=collection, - vernacular_pages=fetched_pages + vernacular_page_batches=fetched_page_batches ) - mapped_pages = ( + mapped_page_batches = ( map_page_task .partial(collection=collection, mapped_data_version=mapped_data_version) - .expand(vernacular_pages=fetched_pages) + .expand(vernacular_page_batch=fetched_page_batches) ) - mapping_status = get_mapping_status_task(collection, mapped_pages) + mapping_status = get_mapping_status_task(collection, mapped_page_batches) validate_collection_task(collection['id'], mapping_status['mapped_page_paths']) - mapped_page_paths = get_mapped_page_filenames_task(mapped_pages) + mapped_page_batches = get_mapped_page_filenames_task(mapped_page_batches) with_content_urls_version = create_with_content_urls_version_task( - collection, mapped_pages) + collection, mapped_page_batches) content_harvest_task = ( ContentHarvestOperator @@ -132,14 +134,14 @@ def harvest(): mapper_type=collection['rikolti_mapper_type'] ) .expand( - pages=mapped_page_paths + pages=mapped_page_batches ) ) - merged_parent_records = merge_children(with_content_urls_version) - content_harvest_task >> merged_parent_records + merged_pages = merge_children(with_content_urls_version) + content_harvest_task >> merged_pages - stage_index = create_stage_index_task(collection, merged_parent_records) + stage_index = create_stage_index_task(collection, merged_pages) cleanup_failed_index_creation_task(stage_index) diff --git a/dags/mapper_dag.py b/dags/mapper_dag.py index ffb15b88f..6923bd600 100644 --- a/dags/mapper_dag.py +++ b/dags/mapper_dag.py @@ -1,3 +1,4 @@ +import math from datetime import datetime from typing import Optional @@ -9,7 +10,7 @@ from rikolti.dags.shared_tasks import map_page_task from rikolti.dags.shared_tasks import get_mapping_status_task from rikolti.dags.shared_tasks import validate_collection_task -from rikolti.dags.shared_tasks import paginate_filepaths_for_fanout +from rikolti.dags.shared_tasks import batched from rikolti.utils.versions import get_most_recent_vernacular_version from rikolti.utils.versions import get_most_recent_mapped_version from rikolti.utils.versions import get_vernacular_pages @@ -17,14 +18,18 @@ @task() -def get_vernacular_pages_task(collection: dict, params: Optional[dict]=None): +def get_vernacular_page_batches_task( + collection: dict, params: Optional[dict]=None) -> list[list[str]]: collection_id = collection['id'] vernacular_version = params.get('vernacular_version') if params else None if not vernacular_version: vernacular_version = get_most_recent_vernacular_version(collection_id) pages = get_vernacular_pages(vernacular_version) # TODO: split page_list into pages and children? - return paginate_filepaths_for_fanout(pages) + + # 1024 is the maximum number of fanout tasks allowed + batch_size = math.ceil(len(pages) / 1024) + return batched(pages, batch_size) @task() def get_mapped_pages_task(params: Optional[dict] = None): @@ -65,18 +70,18 @@ def get_mapped_pages_task(params: Optional[dict] = None): ) def mapper_dag(): collection = get_collection_metadata_task() - page_list = get_vernacular_pages_task(collection=collection) + page_batches = get_vernacular_page_batches_task(collection=collection) mapped_data_version = create_mapped_version_task( collection=collection, - vernacular_pages=page_list + vernacular_page_batches=page_batches ) - mapped_pages = ( + mapped_page_batches = ( map_page_task .partial(collection=collection, mapped_data_version=mapped_data_version) - .expand(vernacular_pages=page_list) + .expand(vernacular_page_batch=page_batches) ) - mapping_status = get_mapping_status_task(collection, mapped_pages) + mapping_status = get_mapping_status_task(collection, mapped_page_batches) validate_collection_task(collection['id'], mapping_status['mapped_page_paths']) mapper_dag() diff --git a/dags/shared_tasks.py b/dags/shared_tasks.py index 4bd8644fb..ece275b1d 100644 --- a/dags/shared_tasks.py +++ b/dags/shared_tasks.py @@ -1,5 +1,6 @@ import boto3 import pprint +import json import os import math from datetime import datetime @@ -42,13 +43,12 @@ def get_collection_fetchdata_task(params=None): return resp.json() -def paginate_filepaths_for_fanout(filepaths): - # 1024 is the maximum number of fanout tasks allowed - page_size = math.ceil(len(filepaths) / 1024) - paginated_filepaths = [] - for i in range(0, len(filepaths), page_size): - paginated_filepaths.append(filepaths[i:i+page_size]) - return paginated_filepaths +# TODO: in python3.12 we can use itertools.batched +def batched(l, batch_size): + batches = [] + for i in range(0, len(l), batch_size): + batches.append(l[i:i+batch_size]) + return batches @task() @@ -58,7 +58,8 @@ def create_vernacular_version_task(collection) -> str: @task() -def fetch_collection_task(collection: dict, vernacular_version: str): +def fetch_collection_task( + collection: dict, vernacular_version: str) -> list[list[str]]: """ returns a list of the filepaths of the vernacular metadata relative to the collection id, ex: [ @@ -137,7 +138,9 @@ def flatten_stats(stats): f"\n{pprint.pprint(fetch_status)}\n{stats['success']}\n{stats['filepaths']}" ) - return paginate_filepaths_for_fanout(stats['filepaths']) + # 1024 is the maximum number of fanout tasks allowed + batch_size = math.ceil(len(stats['filepaths']) / 1024) + return batched(stats['filepaths'], batch_size) @task(multiple_outputs=True) @@ -159,11 +162,11 @@ def get_collection_metadata_task(params=None): # instances can be running at the same time, *across all DAG runs* @task() def map_page_task( - vernacular_pages: Union[str,list[str]], + vernacular_page_batch: Union[str,list[str]], collection: dict, mapped_data_version: str): """ - vernacular_pages is a list of filepaths relative to the collection id, ex: + vernacular_page_batches is a list of filepaths relative to the collection id, ex: [ 3433/vernacular_metadata_2023-01-01T00:00:00/data/1 ] or: [ @@ -184,7 +187,7 @@ def map_page_task( return False mapped_pages = [] - for vernacular_page in vernacular_pages: + for vernacular_page in vernacular_page_batch: mapped_page = map_page( collection_id, vernacular_page, mapped_data_version, collection) mapped_pages.append(mapped_page) @@ -192,7 +195,7 @@ def map_page_task( @task(multiple_outputs=True) -def get_mapping_status_task(collection: dict, paginated_mapped_pages: list): +def get_mapping_status_task(collection: dict, mapped_page_batches: list) -> dict: """ mapped_pages is a list of a list of dicts with the following keys: status: success @@ -208,15 +211,15 @@ def get_mapping_status_task(collection: dict, paginated_mapped_pages: list): ] """ mapped_pages = [] - for pages in paginated_mapped_pages: - mapped_pages.extend(pages) + for batch in mapped_page_batches: + mapped_pages.extend(batch) mapping_status = get_mapping_status(collection, mapped_pages) return mapping_status @task() -def create_mapped_version_task(collection, vernacular_pages): +def create_mapped_version_task(collection, vernacular_page_batches) -> str: """ vernacular pages is a list of lists of the filepaths of the vernacular metadata relative to the collection id, ex: [ @@ -226,10 +229,12 @@ def create_mapped_version_task(collection, vernacular_pages): returns the path to a new mapped version, ex: "3433/vernacular_metadata_2023-01-01T00:00:00/mapped_metadata_2023-01-01T00:00:00/" """ - vernacular_version = get_version(collection.get('id'), vernacular_pages[0][0]) + vernacular_page_batch = vernacular_page_batches[0] + vernacular_page = vernacular_page_batch[0] + vernacular_version = get_version(collection.get('id'), vernacular_page) if not vernacular_version: raise ValueError( - f"Vernacular version not found in {vernacular_pages[0][0]}") + f"Vernacular version not found in {vernacular_page}") mapped_data_version = create_mapped_version(vernacular_version) return mapped_data_version @@ -264,9 +269,15 @@ def validate_collection_task(collection_id: int, mapped_metadata_pages: dict) -> @task() -def create_with_content_urls_version_task(collection: dict, mapped_pages: list[list[dict]]): - mapped_page_path = [page['mapped_page_path'] for page in mapped_pages[0] - if page['mapped_page_path']][0] +def create_with_content_urls_version_task( + collection: dict, mapped_page_batches: list[str]): + mapped_page_path = None + for batch in mapped_page_batches: + batch = json.loads(batch) + for mapped_page in batch: + if mapped_page.get('mapped_page_path'): + mapped_page_path = mapped_page['mapped_page_path'] + break mapped_version = get_version(collection['id'], mapped_page_path) return create_with_content_urls_version(mapped_version)