Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine output #485

Merged
merged 7 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 69 additions & 42 deletions metadata_fetcher/fetch_registry_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,89 @@
import sys
import lambda_function
import logging
logger = logging.getLogger(__name__)

def registry_endpoint(url):
page = url
while page:
response = requests.get(url=page)
response.raise_for_status()
page = response.json().get('meta', {}).get('next', None)
if page:
page = f"https://registry.cdlib.org{page}"

collections = response.json().get('objects', [response.json()])
for collection in collections:
yield collection


def fetch_endpoint(url, limit=None):
response = requests.get(url=url)
response.raise_for_status()
total = response.json().get('meta', {}).get('total_count', 1)
progress = 0
fetch_report_headers = (
"Collection ID, Status, Total Pages, Rikolti Count, Solr Count, "
"Diff Count, Solr Last Updated"
)

if not limit:
limit = total

print(f">>> Fetching {limit}/{total} collections described at {url}")
print(fetch_report_headers)

collection_page = url
results = {}

while collection_page and (not limit or len(results) < limit):
response = requests.get(url=collection_page)
response.raise_for_status()
total_collections = response.json().get('meta', {}).get('total_count', 1)
print(
f">>> Fetching {total_collections} collections "
f"described at {collection_page}"
)
for collection in registry_endpoint(url):
collection_id = collection['collection_id']

collection_page = response.json().get('meta', {}).get('next')
if collection_page:
collection_page = f"https://registry.cdlib.org{collection_page}"
logging.debug(f"Next page: {collection_page}")
collections = response.json().get('objects', [response.json()])
for collection in collections:
if limit and len(results) >= limit:
break
log_msg = f"[{collection['collection_id']}]: " + "{}"
print(log_msg.format(
f"Fetching collection {collection['collection_id']} - "
f"{collection['solr_count']} items in solr as of "
f"{collection['solr_last_updated']}"
))
logging.debug(log_msg.format(f"lambda payload: {collection}"))
return_val = lambda_function.fetch_collection(
collection, None)
results[collection['collection_id']] = return_val

if return_val[-1]['status'] != 'success':
print(log_msg.format(f"Error: {return_val}"))
else:
logging.debug(log_msg.format(f"Fetch successful: {return_val}"))
collection_page = False

for collection_id, collection_result in results.items():
success = all([page['status'] == 'success' for page in collection_result])
total_items = sum([page['document_count'] for page in collection_result])
total_pages = collection_result[-1]['page'] + 1
print(
f"[{collection_id}]: Fetch {'successful' if success else 'errored'} - "
f"Fetched {total_items} items over {total_pages} pages"
progress = progress + 1
sys.stderr.write('\r')
progress_bar = f"{progress}/{limit}"
sys.stderr.write(
f"{progress_bar:<9}: start fetching {collection_id:<6}")
sys.stderr.flush()

logger.debug(
f"[{collection_id}]: call lambda with payload: {collection}")

fetch_result = lambda_function.fetch_collection(collection, None)
results[collection_id] = fetch_result

success = all([page['status'] == 'success' for page in fetch_result])
total_items = sum([page['document_count'] for page in fetch_result])
total_pages = fetch_result[-1]['page'] + 1
diff_items = total_items - collection['solr_count']

fetch_report_row = (
f"{collection_id}, {'success' if success else 'error'}, "
f"{total_pages}, {total_items}, {collection['solr_count']}, "
f"{diff_items}, {collection['solr_last_updated']}"
)
print(fetch_report_row)

if not success:
print(f"[{collection_id}]: {collection_result[-1]}")
fetch_report_failure_row = (
f"{collection_id}, {fetch_result[-1]}, -, -, -, -, -")
print(fetch_report_failure_row)

sys.stderr.write('\r')
progress_bar = f"{progress}/{limit}"
sys.stderr.write(
f"{progress_bar:<9}: finish fetching {collection_id:<5}")
sys.stderr.flush()

if limit and len(results.keys()) >= limit:
break

sys.stderr.write('\n')
return results


if __name__ == "__main__":
logging.basicConfig(
filename='fetch_endpoint.log', encoding='utf-8', level=logging.DEBUG)
parser = argparse.ArgumentParser(
description="Run fetcher for registry endpoint")
parser.add_argument('endpoint', help='registry api endpoint')
Expand Down
9 changes: 6 additions & 3 deletions metadata_fetcher/fetchers/Fetcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import requests
import os
import sys
import settings
import boto3
import logging
logger = logging.getLogger(__name__)


class InvalidHarvestEndpoint(Exception):
Expand Down Expand Up @@ -67,12 +70,12 @@ def fetchtos3(self, page):
),
Body=page)
except Exception as e:
print(e)
print(e, file=sys.stderr)

def fetch_page(self):
page = self.build_fetch_request()
print(
f"[{self.collection_id}]: Fetching page {self.write_page} "
logger.debug(
f"[{self.collection_id}]: fetching page {self.write_page} "
f"at {page.get('url')}"
)
try:
Expand Down
12 changes: 7 additions & 5 deletions metadata_fetcher/fetchers/flickr_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from requests.adapters import Retry
from urllib.parse import urlencode
import settings
import logging
logger = logging.getLogger(__name__)


class FlickrFetcher(Fetcher):
Expand Down Expand Up @@ -116,7 +118,7 @@ def aggregate_vernacular_content(self, content: str) -> str:
"""
photos = json.loads(content)

print(
logger.debug(
f"[{self.collection_id}]: Starting to fetch all photos for page"
f" {self.write_page}"
)
Expand All @@ -132,14 +134,14 @@ def aggregate_vernacular_content(self, content: str) -> str:
if response.ok:
break
time.sleep(math.pow(i * 2, 2))
print(
logger.debug(
f"[{self.collection_id}]: Retrying request, response was not 2xx"
)

photo_data.append(json.loads(response.content).get("photo"))
self.photo_index += 1

print(
logger.debug(
f"[{self.collection_id}]: Fetched all photos for page"
f" {self.write_page}"
)
Expand All @@ -162,7 +164,7 @@ def get_photo_metadata(self, id: str) -> requests.Response:
})
url = self.build_request_url(params)

print(
logger.debug(
f"[{self.collection_id}]: Fetching photo {id} "
f"({self.photo_index} of {self.photo_total}) at {url}"
)
Expand All @@ -183,7 +185,7 @@ def check_page(self, http_resp: requests.Response) -> int:
self.photo_total = len(data.get(self.response_items_attribute, {}).
get("photo", []))

print(
logger.debug(
f"[{self.collection_id}]: Fetched ids for page {self.write_page} "
f"at {http_resp.url} with {self.photo_total} hits"
)
Expand Down
8 changes: 5 additions & 3 deletions metadata_fetcher/fetchers/nuxeo_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import boto3
import settings
import subprocess
import logging
logger = logging.getLogger(__name__)


# {'harvest_data': {'harvest_extra_data'}}
Expand Down Expand Up @@ -137,7 +139,7 @@ def build_fetch_request(self):
'query': query
}
}
print(
logger.debug(
f"[{self.collection_id}]: Fetching page {page} of {query_type} at "
f"{self.nuxeo['prefix']} - {current_path['path']}"
)
Expand All @@ -162,7 +164,7 @@ def check_page(self, http_resp: requests.Response) -> int:

documents = 0
if query_type in ['documents', 'children'] and response.get('entries'):
print(
logger.debug(
f"[{self.collection_id}]: "
f"Fetched page {self.nuxeo.get('api_page')} of "
f"{query_type} at {self.nuxeo['prefix']} - "
Expand Down Expand Up @@ -192,7 +194,7 @@ def check_page(self, http_resp: requests.Response) -> int:
)

if not documents:
print(
logger.debug(
f"[{self.collection_id}]: Fetched page is empty")

return documents
Expand Down
6 changes: 4 additions & 2 deletions metadata_fetcher/fetchers/oac_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import requests
from xml.etree import ElementTree
from .Fetcher import Fetcher
import logging
logger = logging.getLogger(__name__)


class OacFetcher(Fetcher):
Expand Down Expand Up @@ -70,7 +72,7 @@ def build_fetch_request(self):
f"&startDoc={harvested+1}"
f"&group={current_group}"
)}
print(
logger.debug(
f"[{self.collection_id}]: Fetching page "
f"at {request.get('url')}")

Expand All @@ -86,7 +88,7 @@ def check_page(self, http_resp: requests.Response) -> int:
f"{self.oac.get('url')}&docsPerPage=100&"
f"startDoc={harvested+1}&group={current_group}"
)
print(
logger.debug(
f"[{self.collection_id}]: Fetched page "
f"at {requested_url} "
f"with {len(xml_hits)} hits"
Expand Down
7 changes: 4 additions & 3 deletions metadata_fetcher/fetchers/oai_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from urllib.parse import parse_qs
from sickle import Sickle
import requests
import logging

NAMESPACE = {'oai2': 'http://www.openarchives.org/OAI/2.0/'}

Expand Down Expand Up @@ -69,9 +70,9 @@ def check_page(self, http_resp: requests.Response) -> int:
'oai2:ListRecords', NAMESPACE).findall('oai2:record', NAMESPACE)

if len(xml_hits) > 0:
print(
f"[{self.collection_id}]: Fetched page {self.write_page}; "
f"{len(xml_hits)} hits; {self.build_fetch_request()['url']}"
logging.debug(
f"{self.collection_id}, fetched page {self.write_page} - "
f"{len(xml_hits)} hits,-,-,-,-,-"
)
return len(xml_hits)

Expand Down
31 changes: 21 additions & 10 deletions metadata_fetcher/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import sys
import settings
import importlib
import logging
from fetchers.Fetcher import Fetcher, InvalidHarvestEndpoint

logger = logging.getLogger(__name__)

def import_fetcher(harvest_type):
fetcher_module = importlib.import_module(
Expand All @@ -23,34 +24,36 @@ def fetch_collection(payload, context):
if settings.LOCAL_RUN and isinstance(payload, str):
payload = json.loads(payload)

logger.debug(f"fetch_collection payload: {payload}")

fetcher_class = import_fetcher(payload.get('harvest_type'))

return_val = {'page': payload.get('write_page', 0), 'document_count': 0}
fetch_report = {'page': payload.get('write_page', 0), 'document_count': 0}
try:
fetcher = fetcher_class(payload)
return_val['document_count'] = fetcher.fetch_page()
fetch_report['document_count'] = fetcher.fetch_page()
except InvalidHarvestEndpoint as e:
print(e)
return_val.update({
logger.error(e)
fetch_report.update({
'status': 'error',
'body': json.dumps({
'error': repr(e),
'payload': payload
})
})
return [return_val]
return [fetch_report]

next_page = fetcher.json()
return_val.update({
fetch_report.update({
'status': 'success',
'next_page': next_page
})

return_val = [return_val]
fetch_report = [fetch_report]

if not json.loads(next_page).get('finished'):
if settings.LOCAL_RUN:
return_val.extend(fetch_collection(next_page, {}))
fetch_report.extend(fetch_collection(next_page, {}))
else:
lambda_client = boto3.client('lambda', region_name="us-west-2",)
lambda_client.invoke(
Expand All @@ -59,7 +62,7 @@ def fetch_collection(payload, context):
Payload=next_page.encode('utf-8')
)

return return_val
return fetch_report


if __name__ == "__main__":
Expand All @@ -68,5 +71,13 @@ def fetch_collection(payload, context):
description="Fetch metadata in the institution's vernacular")
parser.add_argument('payload', help='json payload')
args = parser.parse_args(sys.argv[1:])

logging.basicConfig(
filename=f"fetch_collection_{args.payload.get('collection_id')}.log",
encoding='utf-8',
level=logging.DEBUG
)
print(f"Starting to fetch collection {args.payload.get('collection_id')}")
fetch_collection(args.payload, {})
print(f"Finished fetching collection {args.payload.get('collection_id')}")
sys.exit(0)
10 changes: 8 additions & 2 deletions metadata_fetcher/settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os

import sys
import logging
from dotenv import load_dotenv
logger = logging.getLogger(__name__)

load_dotenv()

Expand All @@ -14,6 +16,10 @@
if not LOCAL_RUN and DATA_DEST == 'local':
print(
"A local data destination is only valid "
"when the application is run locally"
"when the application is run locally",
file=sys.stderr
)
exit()

for key, value in os.environ.items():
logger.debug(f"{key}={value}")
Loading