Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Barbara/airflow mapper #502

Merged
merged 13 commits into from
Sep 15, 2023
Merged
130 changes: 130 additions & 0 deletions dags/mapper_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from datetime import datetime
import sys

from airflow.decorators import dag, task
from airflow.models.param import Param

from rikolti.metadata_mapper.lambda_shepherd import \
get_vernacular_pages, get_collection, \
get_mapping_summary, check_for_missing_enrichments
from rikolti.metadata_mapper.lambda_function import map_page
# from rikolti.metadata_mapper import validate_mapping


@task()
def get_registry_metadata_for_collection_task(params=None):
if not params:
return False

collection_id = params.get('collection_id')
# raise an error?
if not collection_id:
return []

collection = get_collection(collection_id)

return collection


@task()
def get_vernacular_pages_for_collection_task(params=None):
if not params:
return False

collection_id = params.get('collection_id')
# raise an error?
if not collection_id:
return []
barbarahui marked this conversation as resolved.
Show resolved Hide resolved

pages = get_vernacular_pages(
collection_id)
barbarahui marked this conversation as resolved.
Show resolved Hide resolved

return pages


@task()
def map_page_task(page: str, collection: dict, params=None):
# max_active_tis_per_dag - setting on the task to restrict how many
# instances can be running at the same time, *across all DAG runs*
if not params:
return False

collection_id = params.get('collection_id')
# raise an error?
if not collection_id:
return {}

try:
mapped_page = map_page(collection_id, page, collection)
except KeyError:
print(
f"[{collection_id}]: {collection['rikolti_mapper_type']} "
"not yet implemented", file=sys.stderr
)

return mapped_page


@task()
def get_mapping_summary_task(mapped_pages: list, collection: dict, params=None):
if not params:
return False

collection_id = params.get('collection_id')
# validate = params.get('validate')

collection_summary = get_mapping_summary(mapped_pages)

# TODO
#if validate:
# opts = validate if isinstance(validate, dict) else {}
# validate_mapping.create_collection_validation_csv(
# collection_id,
# **opts
# )

return {
'status': 'success',
'collection_id': collection_id,
'missing_enrichments': check_for_missing_enrichments(collection),
'records_mapped': collection_summary.get('count'),
'pages_mapped': collection_summary.get('page_count'),
'exceptions': collection_summary.get('group_exceptions')
}


@dag(
schedule=None,
start_date=datetime(2023, 1, 1),
catchup=False,
params={'collection_id': Param(None, description="Collection ID to map")},
tags=["rikolti"],
)
def mapper_dag():
# This is a functional duplicate of
# rikolti.metadata_mapper.lambda_shepherd.map_collection

# Within an airflow runtime context, we take advantage of airflow's dynamic
# task mapping to fan out all calls to map_page.
# Outside the airflow runtime context, on the command line for example,
# map_collection performs manual "fan out" in the for loop below.

# Any changes to mapper_dag should be carefully considered, duplicated
# to map_collection, and tested in both contexts.

collection = get_registry_metadata_for_collection_task()

# simple dynamic task mapping
# max_map_length=1024 by default.
# if get_vernacular_pages_for_collection_task() generates
# more than this, that task will fail
# need to somehow chunk up pages into groups of 1024?
page_list = get_vernacular_pages_for_collection_task()
mapped_pages = (
map_page_task
.partial(collection=collection)
.expand(page=page_list)
)

get_mapping_summary_task(mapped_pages, collection)
mapper_dag()
31 changes: 15 additions & 16 deletions metadata_mapper/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def parse_enrichment_url(enrichment_url):
return enrichment_func, kwargs


def run_enrichments(records, payload, enrichment_set):
collection = payload.get('collection', {})
def run_enrichments(records, collection, enrichment_set, page_filename):
for enrichment_url in collection.get(enrichment_set, []):
enrichment_func, kwargs = parse_enrichment_url(enrichment_url)
if not enrichment_func and settings.SKIP_UNDEFINED_ENRICHMENTS:
Expand All @@ -64,41 +63,38 @@ def run_enrichments(records, payload, enrichment_set):
kwargs.update({'collection': collection})
logging.debug(
f"[{collection['id']}]: running enrichment: {enrichment_func} "
f"for page {payload['page_filename']} with kwargs: {kwargs}")
f"for page {page_filename} with kwargs: {kwargs}")
records = [
record.enrich(enrichment_func, **kwargs)
for record in records
]
return records


# {"collection_id": 26098, "rikolti_mapper_type": "nuxeo.nuxeo", "page_filename": "r-0"}
# {"collection_id": 26098, "rikolti_mapper_type": "nuxeo.nuxeo", "page_filename": 2}
# AWS Lambda entry point
def map_page(payload: Union[dict, str], context: dict = {}):
if isinstance(payload, str):
payload = json.loads(payload)
def map_page(collection_id: int, page_filename: str, collection: Union[dict, str]):
if isinstance(collection, str):
collection = json.loads(collection)

vernacular_reader = import_vernacular_reader(
payload.get('rikolti_mapper_type'))
source_vernacular = vernacular_reader(payload)
collection.get('rikolti_mapper_type'))
source_vernacular = vernacular_reader(collection_id, page_filename)
api_resp = source_vernacular.get_api_response()
source_metadata_records = source_vernacular.parse(api_resp)

source_metadata_records = run_enrichments(
source_metadata_records, payload, 'rikolti__pre_mapping')
source_metadata_records, collection, 'rikolti__pre_mapping', page_filename)

for record in source_metadata_records:
record.to_UCLDC()
mapped_records = source_metadata_records

writer = UCLDCWriter(payload)
writer = UCLDCWriter(collection_id, page_filename)
if settings.DATA_DEST["STORE"] == 'file':
writer.write_local_mapped_metadata(
[record.to_dict() for record in mapped_records])

mapped_records = run_enrichments(
mapped_records, payload, 'rikolti__enrichments')
mapped_records, collection, 'rikolti__enrichments', page_filename)

# TODO: analyze and simplify this straight port of the
# solr updater module into the Rikolti framework
Expand Down Expand Up @@ -140,9 +136,12 @@ def map_page(payload: Union[dict, str], context: dict = {}):
import argparse
parser = argparse.ArgumentParser(
description="Map metadata from the institution's vernacular")
parser.add_argument('payload', help='json payload')
parser.add_argument('collection_id', help='collection id')
parser.add_argument('page_filename', help='vernauclar metadata page filename')
parser.add_argument('collection', help='json collection metadata from registry')

args = parser.parse_args(sys.argv[1:])
mapped_page = map_page(args.payload, {})
mapped_page = map_page(args.collection_id, args.page_filename, args.collection)

print(f"{mapped_page.get('num_records_mapped')} records mapped")

Expand Down
76 changes: 41 additions & 35 deletions metadata_mapper/lambda_shepherd.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,77 +76,83 @@ def get_vernacular_pages(collection_id):
return page_list


# {"collection_id": 26098, "source_type": "nuxeo"}
# {"collection_id": 26098, "source_type": "nuxeo"}
# AWS Lambda entry point
def map_collection(payload, context):
if isinstance(payload, str):
payload = json.loads(payload)

collection_id = payload.get('collection_id')
collection = get_collection(collection_id)
payload.update({'collection': collection})
def get_mapping_summary(mapped_pages):
count = sum([page['num_records_mapped'] for page in mapped_pages])
page_count = len(mapped_pages)
collection_exceptions = [page.get('page_exceptions', {}) for page in mapped_pages]

group_exceptions = {}
for page_exceptions in collection_exceptions:
for exception, couch_ids in page_exceptions.items():
group_exceptions.setdefault(exception, []).extend(couch_ids)

return {
'count': count,
'page_count': page_count,
'group_exceptions': group_exceptions
}

def map_collection(collection_id, validate=False):
# This is a functional duplicate of rikolti.dags.mapper_dag.mapper_dag

if not collection_id:
print('collection_id required', file=sys.stderr)
exit()
# Within an airflow runtime context, we take advantage of airflow's dynamic
# task mapping to fan out all calls to map_page.
# Outside the airflow runtime context, on the command line for example,
# map_collection performs manual "fan out" in the for loop below.

count = 0
page_count = 0
collection_exceptions = []
# Any changes to map_collection should be carefully considered, duplicated
# to mapper_dag, and tested in both contexts.

if isinstance(validate, str):
validate = json.loads(validate)

collection = get_collection(collection_id)

page_list = get_vernacular_pages(collection_id)
mapped_pages = []
for page in page_list:
payload.update({'page_filename': page})

try:
mapped_page = map_page(json.dumps(payload), {})
mapped_page = map_page(collection_id, page, collection)
mapped_pages.append(mapped_page)
except KeyError:
print(
f"[{collection_id}]: {collection['rikolti_mapper_type']} "
"not yet implemented", file=sys.stderr
)
continue

count += mapped_page['num_records_mapped']
page_count += 1
collection_exceptions.append(mapped_page.get('page_exceptions', {}))

collection_stats = get_mapping_summary(mapped_pages)

validate = payload.get("validate")
if validate:
opts = validate if isinstance(validate, dict) else {}
validate_mapping.create_collection_validation_csv(
collection_id,
**opts
)

group_exceptions = {}
for page_exceptions in collection_exceptions:
for exception, couch_ids in page_exceptions.items():
group_exceptions.setdefault(exception, []).extend(couch_ids)

return {
'status': 'success',
'collection_id': collection_id,
'missing_enrichments': check_for_missing_enrichments(collection),
'records_mapped': count,
'pages_mapped': page_count,
'exceptions': group_exceptions
'records_mapped': collection_stats.get('count'),
'pages_mapped': collection_stats.get('page_count'),
'exceptions': collection_stats.get('group_exceptions')
}


if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Map metadata from the institution's vernacular")
parser.add_argument('payload', help='json payload')
parser.add_argument('collection_id', help='collection ID from registry')
parser.add_argument('--validate', help='validate mapping; may provide json opts',
const=True, nargs='?')
args = parser.parse_args(sys.argv[1:])
mapped_collection = map_collection(args.payload, {})
mapped_collection = map_collection(args.collection_id, args.validate)
missing_enrichments = mapped_collection.get('missing_enrichments')
if len(missing_enrichments) > 0:
print(
f"{args.payload.get('collection_id')}, missing enrichments, ",
f"{args.collection_id}, missing enrichments, ",
f"ALL, -, -, {missing_enrichments}"
)

Expand Down
6 changes: 3 additions & 3 deletions metadata_mapper/map_registry_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ def map_endpoint(url, limit=None):
sys.stderr.write('\r')
progress_bar = f"{progress}/{limit}"
sys.stderr.write(
f"{progress_bar:<9}: start mapping {collection_id:<6}")
f"{progress_bar:<9}: start mapping {collection_id:<6}\n")
sys.stderr.flush()

logger.debug(
f"[{collection_id}]: call lambda with payload: {collection}")
f"[{collection_id}]: call lambda with collection_id: {collection_id}")

try:
map_result = lambda_shepherd.map_collection(
collection, None)
collection_id)
except FileNotFoundError:
print(f"[{collection_id}]: not fetched yet", file=sys.stderr)
continue
Expand Down
12 changes: 6 additions & 6 deletions metadata_mapper/mappers/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@


class UCLDCWriter(object):
def __init__(self, payload):
self.collection_id = payload.get('collection_id')
self.page_filename = payload.get('page_filename')
def __init__(self, collection_id: int, page_filename: str):
self.collection_id = collection_id
self.page_filename = page_filename

def write_local_mapped_metadata(self, mapped_metadata):
local_path = settings.local_path(
Expand All @@ -50,9 +50,9 @@ def write_s3_mapped_metadata(self, mapped_metadata):


class Vernacular(ABC, object):
def __init__(self, payload: dict) -> None:
self.collection_id = payload.get('collection_id')
self.page_filename = payload.get('page_filename')
def __init__(self, collection_id: int, page_filename: str) -> None:
self.collection_id = collection_id
self.page_filename = page_filename

def get_api_response(self) -> dict:
if settings.DATA_SRC["STORE"] == 'file':
Expand Down