Skip to content

Commit

Permalink
Paginate filepaths for airflow task fanout
Browse files Browse the repository at this point in the history
  • Loading branch information
amywieliczka committed Feb 14, 2024
1 parent 751b7aa commit 445b304
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 30 deletions.
26 changes: 19 additions & 7 deletions content_harvester/by_page.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,26 @@ def harvest_page_content(
parser = argparse.ArgumentParser(
description="Harvest content using a page of mapped metadata")
parser.add_argument('collection_id', help="Collection ID")
parser.add_argument('mapped_page_path', help="URI-formatted path to a mapped metadata page")
parser.add_argument('mapped_page_path', help="URI-formatted path to a mapped metadata page, optionally a list")
parser.add_argument('with_content_urls_version', help="URI-formatted path to a with_content_urls version")
parser.add_argument('mapper_type', help="If 'nuxeo.nuxeo', use Nuxeo auth")
args = parser.parse_args()

print(harvest_page_content(
args.collection_id,
args.mapper_type,
args.mapped_page_path,
args.with_content_urls_version
))
print_value = []
if args.mapped_page_path.startswith('['):
mapped_page_paths = json.loads(args.mapped_page_path)
for mapped_page_path in mapped_page_paths:
print_value.append(harvest_page_content(
args.collection_id,
args.mapper_type,
mapped_page_path,
args.with_content_urls_version
))
else:
print_value = harvest_page_content(
args.collection_id,
args.mapper_type,
args.mapped_page_path,
args.with_content_urls_version
)
print(print_value)
4 changes: 2 additions & 2 deletions dags/docker_content_harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def docker_content_harvest():
task_id="page_content_harvester_on_local_docker",
collection_id="{{ params.collection_id }}",
with_content_urls_version="{{ params.with_content_urls_version }}",
page="{{ params.page_filename }}",
pages="{{ params.page_filename }}",
mapper_type="{{ params.mapper_type }}",
)
harvest_content_for_page_task
Expand All @@ -38,7 +38,7 @@ def docker_content_harvest():
command=["{{ params.collection_id }}"],
collection_id="{{ params.collection_id }}",
with_content_urls_version="{{ params.with_content_urls_version }}",
page="all",
pages="all",
mapper_type="{{ params.mapper_type }}"
)
harvest_content_for_collection_task
Expand Down
4 changes: 2 additions & 2 deletions dags/ecs_content_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def ecs_content_harvest():
task_id="page_content_harvester_on_ecs",
collection_id="{{ params.collection_id }}",
with_content_urls_version="{{ params.with_content_urls_version }}",
page="{{ params.page_filename }}",
pages="{{ params.page_filename }}",
mapper_type="{{ params.mapper_type }}",
)
harvest_content_for_page
Expand All @@ -47,7 +47,7 @@ def ecs_content_harvest():
},
collection_id = "{{ params.collection_id }}",
with_content_urls_version="{{ params.with_content_urls_version }}",
page="all",
pages="all",
mapper_type="{{ params.mapper_type }}",
)
harvest_content_for_collection
Expand Down
9 changes: 6 additions & 3 deletions dags/harvest_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
create_merged_version, put_merged_page)
from rikolti.dags.shared_tasks import create_stage_index_task
from rikolti.dags.shared_tasks import cleanup_failed_index_creation_task
from rikolti.dags.shared_tasks import paginate_filepaths_for_fanout


def get_child_records(version, parent_id) -> list:
Expand Down Expand Up @@ -79,9 +80,11 @@ def merge_children(version):

@task()
def get_mapped_page_filenames_task(mapped_pages):
return [mapped['mapped_page_path'] for mapped in mapped_pages
mapped_pages = [mapped['mapped_page_path'] for mapped in mapped_pages
if mapped['mapped_page_path']]

return paginate_filepaths_for_fanout(mapped_pages)


@dag(
dag_id="harvest_collection",
Expand Down Expand Up @@ -109,7 +112,7 @@ def harvest():
mapped_pages = (
map_page_task
.partial(collection=collection, mapped_data_version=mapped_data_version)
.expand(vernacular_page=fetched_pages)
.expand(vernacular_pages=fetched_pages)
)

mapping_status = get_mapping_status_task(collection, mapped_pages)
Expand All @@ -128,7 +131,7 @@ def harvest():
mapper_type=collection['rikolti_mapper_type']
)
.expand(
page=mapped_page_paths
pages=mapped_page_paths
)
)

Expand Down
5 changes: 3 additions & 2 deletions dags/mapper_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from rikolti.dags.shared_tasks import map_page_task
from rikolti.dags.shared_tasks import get_mapping_status_task
from rikolti.dags.shared_tasks import validate_collection_task
from rikolti.dags.shared_tasks import paginate_filepaths_for_fanout
from rikolti.utils.versions import get_most_recent_vernacular_version
from rikolti.utils.versions import get_most_recent_mapped_version
from rikolti.utils.versions import get_vernacular_pages
Expand All @@ -23,7 +24,7 @@ def get_vernacular_pages_task(collection: dict, params: Optional[dict]=None):
vernacular_version = get_most_recent_vernacular_version(collection_id)
pages = get_vernacular_pages(vernacular_version)
# TODO: split page_list into pages and children?
return pages
return paginate_filepaths_for_fanout(pages)

@task()
def get_mapped_pages_task(params: Optional[dict] = None):
Expand Down Expand Up @@ -72,7 +73,7 @@ def mapper_dag():
mapped_pages = (
map_page_task
.partial(collection=collection, mapped_data_version=mapped_data_version)
.expand(vernacular_page=page_list)
.expand(vernacular_pages=page_list)
)

mapping_status = get_mapping_status_task(collection, mapped_pages)
Expand Down
10 changes: 5 additions & 5 deletions dags/shared_content_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def get_awsvpc_config():


class ContentHarvestEcsOperator(EcsRunTaskOperator):
def __init__(self, collection_id=None, with_content_urls_version=None, page=None, mapper_type=None, **kwargs):
def __init__(self, collection_id=None, with_content_urls_version=None, pages=None, mapper_type=None, **kwargs):
container_name = "rikolti-content_harvester"

args = {
Expand All @@ -52,7 +52,7 @@ def __init__(self, collection_id=None, with_content_urls_version=None, page=None
"name": container_name,
"command": [
f"{collection_id}",
page,
pages,
with_content_urls_version,
mapper_type
],
Expand Down Expand Up @@ -106,7 +106,7 @@ def execute(self, context):


class ContentHarvestDockerOperator(DockerOperator):
def __init__(self, collection_id, with_content_urls_version, page, mapper_type, **kwargs):
def __init__(self, collection_id, with_content_urls_version, pages, mapper_type, **kwargs):
mounts = []
if os.environ.get("METADATA_MOUNT"):
mounts.append(Mount(
Expand Down Expand Up @@ -142,7 +142,7 @@ def __init__(self, collection_id, with_content_urls_version, page, mapper_type,
)
container_version = os.environ.get(
'CONTENT_HARVEST_VERSION', 'latest')
page_basename = page.split('/')[-1]
page_basename = pages[0].split('/')[-1]
container_name = (
f"content_harvester_{collection_id}_{page_basename.split('.')[0]}")

Expand All @@ -166,7 +166,7 @@ def __init__(self, collection_id, with_content_urls_version, page, mapper_type,
"container_name": container_name,
"command": [
f"{collection_id}",
page,
pages,
with_content_urls_version,
mapper_type
],
Expand Down
45 changes: 36 additions & 9 deletions dags/shared_tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import boto3
import pprint
import os
import math
from datetime import datetime
from typing import Union

import requests

Expand Down Expand Up @@ -40,6 +42,15 @@ def get_collection_fetchdata_task(params=None):
return resp.json()


def paginate_filepaths_for_fanout(filepaths):
# 1024 is the maximum number of fanout tasks allowed
page_size = math.ceil(len(filepaths) / 1024)
paginated_filepaths = []
for i in range(0, len(filepaths), page_size):
paginated_filepaths.append(filepaths[i:i+page_size])
return paginated_filepaths


@task()
def create_vernacular_version_task(collection) -> str:
# returns: '3433/vernacular_metadata_v1/'
Expand Down Expand Up @@ -126,7 +137,7 @@ def flatten_stats(stats):
f"\n{pprint.pprint(fetch_status)}\n{stats['success']}\n{stats['filepaths']}"
)

return stats['filepaths']
return paginate_filepaths_for_fanout(stats['filepaths'])


@task(multiple_outputs=True)
Expand All @@ -147,10 +158,18 @@ def get_collection_metadata_task(params=None):
# max_active_tis_per_dag - setting on the task to restrict how many
# instances can be running at the same time, *across all DAG runs*
@task()
def map_page_task(vernacular_page: str, collection: dict, mapped_data_version: str):
def map_page_task(
vernacular_pages: Union[str,list[str]],
collection: dict,
mapped_data_version: str):
"""
vernacular_page is a filepath relative to the collection id, ex:
3433/vernacular_metadata_2023-01-01T00:00:00/data/1
vernacular_pages is a list of filepaths relative to the collection id, ex:
[ 3433/vernacular_metadata_2023-01-01T00:00:00/data/1 ]
or:
[
3433/vernacular_metadata_2023-01-01T00:00:00/data/1,
3433/vernacular_metadata_2023-01-01T00:00:00/data/2
]
mapped_data_version is a path relative to the collection id, ex:
3433/vernacular_metadata_2023-01-01T00:00:00/mapped_metadata_2023-01-01T00:00:00/
returns a dictionary with the following keys:
Expand All @@ -163,15 +182,19 @@ def map_page_task(vernacular_page: str, collection: dict, mapped_data_version: s
collection_id = collection.get('id')
if not collection_id or not mapped_data_version:
return False
mapped_page = map_page(
collection_id, vernacular_page, mapped_data_version, collection)
return mapped_page

mapped_pages = []
for vernacular_page in vernacular_pages:
mapped_page = map_page(
collection_id, vernacular_page, mapped_data_version, collection)
mapped_pages.append(mapped_page)
return mapped_pages


@task(multiple_outputs=True)
def get_mapping_status_task(collection: dict, mapped_pages: list):
def get_mapping_status_task(collection: dict, paginated_mapped_pages: list):
"""
mapped_pages is a list of dicts with the following keys:
mapped_pages is a list of a list of dicts with the following keys:
status: success
num_records_mapped: int
page_exceptions: TODO
Expand All @@ -184,7 +207,11 @@ def get_mapping_status_task(collection: dict, mapped_pages: list):
3433/vernacular_metadata_2023-01-01T00:00:00/mapped_metadata_2023-01-01T00:00:00/3.jsonl
]
"""
mapped_pages = []
for pages in paginated_mapped_pages:
mapped_pages.extend(pages)
mapping_status = get_mapping_status(collection, mapped_pages)

return mapping_status


Expand Down

0 comments on commit 445b304

Please sign in to comment.