Skip to content

Commit

Permalink
Merge pull request #485 from ucldc/refine_output
Browse files Browse the repository at this point in the history
Refine output
  • Loading branch information
amywieliczka authored Aug 15, 2023
2 parents 9f7273c + ea44bd2 commit dc73fa9
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 197 deletions.
111 changes: 69 additions & 42 deletions metadata_fetcher/fetch_registry_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,89 @@
import sys
import lambda_function
import logging
logger = logging.getLogger(__name__)

def registry_endpoint(url):
page = url
while page:
response = requests.get(url=page)
response.raise_for_status()
page = response.json().get('meta', {}).get('next', None)
if page:
page = f"https://registry.cdlib.org{page}"

collections = response.json().get('objects', [response.json()])
for collection in collections:
yield collection


def fetch_endpoint(url, limit=None):
response = requests.get(url=url)
response.raise_for_status()
total = response.json().get('meta', {}).get('total_count', 1)
progress = 0
fetch_report_headers = (
"Collection ID, Status, Total Pages, Rikolti Count, Solr Count, "
"Diff Count, Solr Last Updated"
)

if not limit:
limit = total

print(f">>> Fetching {limit}/{total} collections described at {url}")
print(fetch_report_headers)

collection_page = url
results = {}

while collection_page and (not limit or len(results) < limit):
response = requests.get(url=collection_page)
response.raise_for_status()
total_collections = response.json().get('meta', {}).get('total_count', 1)
print(
f">>> Fetching {total_collections} collections "
f"described at {collection_page}"
)
for collection in registry_endpoint(url):
collection_id = collection['collection_id']

collection_page = response.json().get('meta', {}).get('next')
if collection_page:
collection_page = f"https://registry.cdlib.org{collection_page}"
logging.debug(f"Next page: {collection_page}")
collections = response.json().get('objects', [response.json()])
for collection in collections:
if limit and len(results) >= limit:
break
log_msg = f"[{collection['collection_id']}]: " + "{}"
print(log_msg.format(
f"Fetching collection {collection['collection_id']} - "
f"{collection['solr_count']} items in solr as of "
f"{collection['solr_last_updated']}"
))
logging.debug(log_msg.format(f"lambda payload: {collection}"))
return_val = lambda_function.fetch_collection(
collection, None)
results[collection['collection_id']] = return_val

if return_val[-1]['status'] != 'success':
print(log_msg.format(f"Error: {return_val}"))
else:
logging.debug(log_msg.format(f"Fetch successful: {return_val}"))
collection_page = False

for collection_id, collection_result in results.items():
success = all([page['status'] == 'success' for page in collection_result])
total_items = sum([page['document_count'] for page in collection_result])
total_pages = collection_result[-1]['page'] + 1
print(
f"[{collection_id}]: Fetch {'successful' if success else 'errored'} - "
f"Fetched {total_items} items over {total_pages} pages"
progress = progress + 1
sys.stderr.write('\r')
progress_bar = f"{progress}/{limit}"
sys.stderr.write(
f"{progress_bar:<9}: start fetching {collection_id:<6}")
sys.stderr.flush()

logger.debug(
f"[{collection_id}]: call lambda with payload: {collection}")

fetch_result = lambda_function.fetch_collection(collection, None)
results[collection_id] = fetch_result

success = all([page['status'] == 'success' for page in fetch_result])
total_items = sum([page['document_count'] for page in fetch_result])
total_pages = fetch_result[-1]['page'] + 1
diff_items = total_items - collection['solr_count']

fetch_report_row = (
f"{collection_id}, {'success' if success else 'error'}, "
f"{total_pages}, {total_items}, {collection['solr_count']}, "
f"{diff_items}, {collection['solr_last_updated']}"
)
print(fetch_report_row)

if not success:
print(f"[{collection_id}]: {collection_result[-1]}")
fetch_report_failure_row = (
f"{collection_id}, {fetch_result[-1]}, -, -, -, -, -")
print(fetch_report_failure_row)

sys.stderr.write('\r')
progress_bar = f"{progress}/{limit}"
sys.stderr.write(
f"{progress_bar:<9}: finish fetching {collection_id:<5}")
sys.stderr.flush()

if limit and len(results.keys()) >= limit:
break

sys.stderr.write('\n')
return results


if __name__ == "__main__":
logging.basicConfig(
filename='fetch_endpoint.log', encoding='utf-8', level=logging.DEBUG)
parser = argparse.ArgumentParser(
description="Run fetcher for registry endpoint")
parser.add_argument('endpoint', help='registry api endpoint')
Expand Down
9 changes: 6 additions & 3 deletions metadata_fetcher/fetchers/Fetcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import requests
import os
import sys
import settings
import boto3
import logging
logger = logging.getLogger(__name__)


class InvalidHarvestEndpoint(Exception):
Expand Down Expand Up @@ -67,12 +70,12 @@ def fetchtos3(self, page):
),
Body=page)
except Exception as e:
print(e)
print(e, file=sys.stderr)

def fetch_page(self):
page = self.build_fetch_request()
print(
f"[{self.collection_id}]: Fetching page {self.write_page} "
logger.debug(
f"[{self.collection_id}]: fetching page {self.write_page} "
f"at {page.get('url')}"
)
try:
Expand Down
12 changes: 7 additions & 5 deletions metadata_fetcher/fetchers/flickr_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from requests.adapters import Retry
from urllib.parse import urlencode
import settings
import logging
logger = logging.getLogger(__name__)


class FlickrFetcher(Fetcher):
Expand Down Expand Up @@ -116,7 +118,7 @@ def aggregate_vernacular_content(self, content: str) -> str:
"""
photos = json.loads(content)

print(
logger.debug(
f"[{self.collection_id}]: Starting to fetch all photos for page"
f" {self.write_page}"
)
Expand All @@ -132,14 +134,14 @@ def aggregate_vernacular_content(self, content: str) -> str:
if response.ok:
break
time.sleep(math.pow(i * 2, 2))
print(
logger.debug(
f"[{self.collection_id}]: Retrying request, response was not 2xx"
)

photo_data.append(json.loads(response.content).get("photo"))
self.photo_index += 1

print(
logger.debug(
f"[{self.collection_id}]: Fetched all photos for page"
f" {self.write_page}"
)
Expand All @@ -162,7 +164,7 @@ def get_photo_metadata(self, id: str) -> requests.Response:
})
url = self.build_request_url(params)

print(
logger.debug(
f"[{self.collection_id}]: Fetching photo {id} "
f"({self.photo_index} of {self.photo_total}) at {url}"
)
Expand All @@ -183,7 +185,7 @@ def check_page(self, http_resp: requests.Response) -> int:
self.photo_total = len(data.get(self.response_items_attribute, {}).
get("photo", []))

print(
logger.debug(
f"[{self.collection_id}]: Fetched ids for page {self.write_page} "
f"at {http_resp.url} with {self.photo_total} hits"
)
Expand Down
8 changes: 5 additions & 3 deletions metadata_fetcher/fetchers/nuxeo_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import boto3
import settings
import subprocess
import logging
logger = logging.getLogger(__name__)


# {'harvest_data': {'harvest_extra_data'}}
Expand Down Expand Up @@ -137,7 +139,7 @@ def build_fetch_request(self):
'query': query
}
}
print(
logger.debug(
f"[{self.collection_id}]: Fetching page {page} of {query_type} at "
f"{self.nuxeo['prefix']} - {current_path['path']}"
)
Expand All @@ -162,7 +164,7 @@ def check_page(self, http_resp: requests.Response) -> int:

documents = 0
if query_type in ['documents', 'children'] and response.get('entries'):
print(
logger.debug(
f"[{self.collection_id}]: "
f"Fetched page {self.nuxeo.get('api_page')} of "
f"{query_type} at {self.nuxeo['prefix']} - "
Expand Down Expand Up @@ -192,7 +194,7 @@ def check_page(self, http_resp: requests.Response) -> int:
)

if not documents:
print(
logger.debug(
f"[{self.collection_id}]: Fetched page is empty")

return documents
Expand Down
6 changes: 4 additions & 2 deletions metadata_fetcher/fetchers/oac_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import requests
from xml.etree import ElementTree
from .Fetcher import Fetcher
import logging
logger = logging.getLogger(__name__)


class OacFetcher(Fetcher):
Expand Down Expand Up @@ -70,7 +72,7 @@ def build_fetch_request(self):
f"&startDoc={harvested+1}"
f"&group={current_group}"
)}
print(
logger.debug(
f"[{self.collection_id}]: Fetching page "
f"at {request.get('url')}")

Expand All @@ -86,7 +88,7 @@ def check_page(self, http_resp: requests.Response) -> int:
f"{self.oac.get('url')}&docsPerPage=100&"
f"startDoc={harvested+1}&group={current_group}"
)
print(
logger.debug(
f"[{self.collection_id}]: Fetched page "
f"at {requested_url} "
f"with {len(xml_hits)} hits"
Expand Down
7 changes: 4 additions & 3 deletions metadata_fetcher/fetchers/oai_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from urllib.parse import parse_qs
from sickle import Sickle
import requests
import logging

NAMESPACE = {'oai2': 'http://www.openarchives.org/OAI/2.0/'}

Expand Down Expand Up @@ -69,9 +70,9 @@ def check_page(self, http_resp: requests.Response) -> int:
'oai2:ListRecords', NAMESPACE).findall('oai2:record', NAMESPACE)

if len(xml_hits) > 0:
print(
f"[{self.collection_id}]: Fetched page {self.write_page}; "
f"{len(xml_hits)} hits; {self.build_fetch_request()['url']}"
logging.debug(
f"{self.collection_id}, fetched page {self.write_page} - "
f"{len(xml_hits)} hits,-,-,-,-,-"
)
return len(xml_hits)

Expand Down
31 changes: 21 additions & 10 deletions metadata_fetcher/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import sys
import settings
import importlib
import logging
from fetchers.Fetcher import Fetcher, InvalidHarvestEndpoint

logger = logging.getLogger(__name__)

def import_fetcher(harvest_type):
fetcher_module = importlib.import_module(
Expand All @@ -23,34 +24,36 @@ def fetch_collection(payload, context):
if settings.LOCAL_RUN and isinstance(payload, str):
payload = json.loads(payload)

logger.debug(f"fetch_collection payload: {payload}")

fetcher_class = import_fetcher(payload.get('harvest_type'))

return_val = {'page': payload.get('write_page', 0), 'document_count': 0}
fetch_report = {'page': payload.get('write_page', 0), 'document_count': 0}
try:
fetcher = fetcher_class(payload)
return_val['document_count'] = fetcher.fetch_page()
fetch_report['document_count'] = fetcher.fetch_page()
except InvalidHarvestEndpoint as e:
print(e)
return_val.update({
logger.error(e)
fetch_report.update({
'status': 'error',
'body': json.dumps({
'error': repr(e),
'payload': payload
})
})
return [return_val]
return [fetch_report]

next_page = fetcher.json()
return_val.update({
fetch_report.update({
'status': 'success',
'next_page': next_page
})

return_val = [return_val]
fetch_report = [fetch_report]

if not json.loads(next_page).get('finished'):
if settings.LOCAL_RUN:
return_val.extend(fetch_collection(next_page, {}))
fetch_report.extend(fetch_collection(next_page, {}))
else:
lambda_client = boto3.client('lambda', region_name="us-west-2",)
lambda_client.invoke(
Expand All @@ -59,7 +62,7 @@ def fetch_collection(payload, context):
Payload=next_page.encode('utf-8')
)

return return_val
return fetch_report


if __name__ == "__main__":
Expand All @@ -68,5 +71,13 @@ def fetch_collection(payload, context):
description="Fetch metadata in the institution's vernacular")
parser.add_argument('payload', help='json payload')
args = parser.parse_args(sys.argv[1:])

logging.basicConfig(
filename=f"fetch_collection_{args.payload.get('collection_id')}.log",
encoding='utf-8',
level=logging.DEBUG
)
print(f"Starting to fetch collection {args.payload.get('collection_id')}")
fetch_collection(args.payload, {})
print(f"Finished fetching collection {args.payload.get('collection_id')}")
sys.exit(0)
10 changes: 8 additions & 2 deletions metadata_fetcher/settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os

import sys
import logging
from dotenv import load_dotenv
logger = logging.getLogger(__name__)

load_dotenv()

Expand All @@ -14,6 +16,10 @@
if not LOCAL_RUN and DATA_DEST == 'local':
print(
"A local data destination is only valid "
"when the application is run locally"
"when the application is run locally",
file=sys.stderr
)
exit()

for key, value in os.environ.items():
logger.debug(f"{key}={value}")
Loading

0 comments on commit dc73fa9

Please sign in to comment.