-
Notifications
You must be signed in to change notification settings - Fork 3
The Fetcher Component
This documentation assumes a functional knowledge of Python, the requests library, and boto3 as a mechanism for writing python to interact with AWS services. This documentation additionally assumes a theoretical understanding of stateless iterators and/or generator functions and a basic understanding of what the AWS Lambda service does (if not all the particulars).
Fetching metadata from a source institution is the very first thing that must happen before any other harvester activities. The fetcher must hit an institution's API, check that we've gotten a meaningful response, stash the response locally, and increment to get the next page of results.
The fetcher must gracefully fail in the following particular cases:
- We have for one reason or another been unable to hit an institution's API
- The institution's API has returned something other than a 2xx status code
- The institution's API has returned a 2xx status code, but the response is empty or we are otherwise unable to understand it
- We are unable to stash the response locally
- We are unable to increment the next page of results
In these cases, failing gracefully typically means continuing to retrieve as much as we are able, while reporting out as much detail as possible about what we were unable to fetch. It also means that our fetcher should be able to operate at both a collection level (paginating over many pages of data), as well as the singular page level such that we can try again manually.
The fetcher is a stateless iterator inspired by python generators and written in python. (PyCon 2013 talk by Ned Batchelder on generators: https://www.youtube.com/watch?v=EnSu9hHGq5o) Because of its very simple job (hit API, check response, stash results, increment page), we've chosen to deploy the fetcher as an AWS Lambda function for a serverless solution with very low start-up cost as compared to creating a Fargate container. We make the following assumptions about our choice to use Lambda:
- It will be reasonably trivial to switch to using Fargate if, for some reason, Lambda doesn't quite meet our needs.
- It will be reasonably trivial to trigger lambda to run via boto3 or the command line, or by setting up an API Gateway to send HTTP requests, setting up the lambda to listen to changes in s3, or setting up the lambda to listen to Amazon SQS.
- Given this range of input options, we also assume it will be reasonably trivial to hook a Lambda function into some workflow management system such as AWS Steps, Apache Airflow, etc.
Our AWS Lambda is currently configured to use Amazon Linux 2, Kernel 4.14, x86_64 architecture, Python 3.8, in the US West 2 region.
AWS Lambda functions require the author to specify an entry point that takes a payload and a context. The entry point to our AWS Lambda function is fetch_collection(payload, context). This entry point takes a json payload that generically looks like:
{
"collection_id": 466, # Collection ID from [Collection Registry](https://registry.cdlib.org/)
"harvest_type": "nuxeo", # name prefix of the fetcher module and fetcher subclass that should be used to handle this fetch task TODO: add link from registry api definition
"write_page": 0, # the next page number to write to in s3, as in 0.jsonl
"harvest_data": {} # dictionary of information used by NuxeoFetcher
}
fetch_collection
creates a NuxeoFetcher
object, passing it the entire payload on initialization. The NuxeoFetcher
object requests a page, stashes it in s3, and increments itself in its fetch_page()
method. If there is more to fetch, fetch_collection
will kick off a new lambda function with the fetcher's internal state as payload to the next lambda function.
Each AWS Lambda function call should typically retrieve only 1 page of metadata records per call, making only 1 request to the institutional API. This is because AWS Lambda functions time out at 15 minutes, but we don't know when we start the job how many pages of records we'll be retrieving or how long it will take to retrieve them.
Note: There may be some exceptions to the 1 request per lambda rule. For example, data about the feed might need to be retrieved on fetcher initialization before we can fetch page results. Relevant results of these exceptional requests should be cached in the harvest data dictionary for subsequent calls to the AWS Lambda function.
Note: While AWS Lambda functions do time out at 15 minutes, under the hood AWS Lambda does keep instances warm.
The Fetcher
abstract class defines fetch_page(self)
, which generally enforces the following workflow:
- Building a request
- Issuing the request using
requests.get()
and handling/logging any errors - Getting the records
- Stashing records on s3
- Incrementing the Fetcher's internal state.
To this end, the Fetcher abstract class generically handles any errors raised by requests.get()
, as well as the stashing of files on s3. The following method signatures: build_fetch_request(self)
, check_page(self, http_resp)
, increment(self, http_resp)
, and json(self)
are left to be implemented by Fetcher subclasses. Any errors encountered during check_page are left for the subclass implementation to raise.
Note: The
Fetcher
is designed to use two environment variables to, respectively, run locally and stash metadata on the local filesystem. To run the fetcher locally, setFETCHER_LOCAL_RUN=True
. To stash metadata locally, setFETCHER_DATA_DEST='local'
. IfFETCHER_LOCAL_RUN
is not set in the environment, the metadata fetcher will run in AWS, and ifFETCHER_DATA_DEST
is not set, it will stash metadata on s3. The Fetcher abstract class also generically handles this difference withfetch_to_local
andfetch_to_s3
. To unsetFETCHER_LOCAL_RUN
in your environment, useunset FETCHER_LOCAL_RUN
(notexport FETCHER_LOCAL_RUN=False
).
It is assumed the __init__(self)
method will have to be subclassed in order to initialize the fetcher with data specific to a particular fetcher type that is stored in the payload['harvest_data']
dictionary. For the NuxeoFetcher, a harvest_data
value of:
{
"root": "/asset-library/UCM/Wilma_McDaniel/Publish/"
}
will start a new fetch task at the beginning of the McDaniel collection, while a harvest_data
value of:
{
"root": "/asset-library/UCM/Wilma_McDaniel/Publish/",
"current_path": {
"path": "/asset-library/UCM/Wilma_McDaniel/Publish/Box13/",
"uid": "2a50c01b-c7fa-471a-9960-052ecc4b180f"
},
"query_type": "documents",
"api_page": 0,
"prefix": ["r", "fp0", "f1"],
}
will start fetching the documents in the Nuxeo folder Box 13 of the McDaniel collection. You can imagine an OAI harvest_data
that included, for example, a resumption token.
Builds a dictionary of parameters for the call to that institution's API using requests.get()
. This method should minimally return {'url': <str>}
but may also include {'headers': {}, 'params': {}}
or any other options accepted by requests.get(). This method takes no arguments - the fetcher's own internal state as set by __init__
should know enough to produce this fetch request's parameters.
Checks that the http_resp
from the institution's API contains metadata records. This method should return a boolean indicating if the page contains records and should be saved. This method should handle, for example, instances where the institutional API returns a 200 status code, but the response contains an error message stating there are no records found. Takes as an argument a requests.Response object.
Increments the internal state of the fetcher for fetching the next page. This method has no return value, but instead updates class member variables. Takes as an argument the requests.Response object from the institutional API call, which often includes either a resumptionToken or page information relevant for producing the next request. This is comparable to the __next__()
python function.
Builds a json serialization of the current internal state of the fetcher to be passed as a payload to the next lambda function. Returns json.dumps(<dict>)
, or None
if there are no more pages to fetch.
You can run a fetcher in a variety of ways:
-
Run the fetcher in a local environment from the command line with FETCHER_LOCAL_RUN=True and FETCHER_DATA_DEST='local|s3' for one collection:
python lambda_function.py '{"collection_id": 26695, "harvest_type": "NuxeoFetcher", "write_page": 0, "nuxeo": {"path": "/asset-library/UCI/SCA_UniversityArchives/Publications/Anthill/"}}'
-
Run the fetcher in a local environment from the command line with FETCHER_LOCAL_RUN=True and FETCHER_DATA_DEST='local|s3' for multiple collections:
- Add your sample collections to a file in the
sample_data
folder - Modify
metadata_fetcher/tests.py
to import that sample datapython tests.py
- Run the fetcher from your local environment on Lambda:
- Get AWS permissions locally however you do that (profile, env variables, sts, etc)
- aws lambda TODO: add full command here
- Run the fetcher from the AWS console:
- Login to the AWS console, navigate to Lambda
- Create new "test data" and add your Lambda input
- Click "test"
- Identify a few sample collections
- Look up the old fetcher in the existing harvester codebase.
- Isolate out any fetcher-level mapping that may exist in the old fetcher - track it for the mapper migration. Creating a link like the following: https://github.com/ucldc/harvester/blob/master/harvester/fetcher/oac_fetcher.py#L69-L133 and attaching it to a GitHub issue is a great way to track this.
- Determine what's in the collection registry for "harvest endpoint" and "harvest extra data" for collections that use this fetcher.
- Determine how pagination works for this fetcher from looking at the old fetcher and curling harvest endpoint/harvest extra data.
- Determine a data model for the
harvest_data
dictionary for this source type. - Write
build_fetch_request
method. - Determine that the response looks good and is worth saving. Implement in
check_page
method. - Write
increment
method.
From the Collection Registry, under Limit by harvest type, select the fetcher type you're interested in migrating and then select a few sample collections to develop against. It's easiest to develop against a smaller collection, but try selecting at least one collection with fewer than 100 objects, one collection with 200-300 objects, and one collection with a few thousand objects. Since the fetcher operates on pages of data at a time, it's important to select a sample case with several pages of data (typically more than 100 objects).
Identify the Collection ID of your sample collections by looking at the collection's URL. For example, the Collection ID for the William Randolph Hearst, Jr. Photograph Collection is 26327.
- Collections are currently fetched by a user (typically Gabriela or Adrian) selecting "Queue Harvest to CouchDB Stage" in the action drop-down in the Django Admin interface of the Collection Registry for a given collection.
- Selecting this action runs the
queue_harvest.sh
shell script on the registry machine, which sets some environment variables, sources a virtual environment and then runs thequeue_harvest.py
python script within a virtual environment.- The
queue_harvest.py
python script then adds theharvester.run_ingest.main
function to a Redis queue, passing it a user email (the user who selected the action), and a collection id (the collection to be fetched).
- The
- A worker machine picks up the
run_ingest.py
job, which does several different things, but on L133 it callsfetcher.main()
.-
fetcher.main()
is actually implemented incontroller.main()
-
controller.main()
initializes aHarvestController
This initialization process:- Looks up the harvest type in the Collection Registry
- Maps the registry harvest type value to a Fetcher class
-
Initializes a Fetcher object. The Fetcher initialization process:
- Seems to set up data we'll need for fetching from the institutional API but does not seem to make that first fetch request; but also seems to lack some standardization. Some examples:
- Back in
controller.main()
, we then callHarvestController.harvest()
- The
HarvestController.harvest()
method iterates over fetched results.- By using the fetcher's
next()
method (OAC Fetcher used here as an example)
- By using the fetcher's
- The
-
Key Takeaway: Given that the existing source code uses actual generator classes, much of the code we are interested in migrating lives in the existing fetcher's __init__()
and next()
methods.
As stated above, the Fetcher initialization process seems to set up data we'll need for fetching from the institutional API but does not seem to make that first fetch request; but also seems to lack some standardization. Some examples:
A Fetcher's next()
method frequently calls methods defined in Fetcher subclasses to do some preliminary parsing and mapping work before returning the next set of records to the HarvestController. Again, there seems to be a lack of standardization on what counts as 'preliminary' parsing and mapping, vs. what rightly belongs in a mapper. Some examples:
-
NuxeoFetcher.next()
sets isShownBy -
OAIFetcher.next()
uses Sickle to parse the OAI XML feed -
OACFetcher.next()
does all kinds of XML parsing in_docHits_to_objset
The new fetcher will not perform any of this parsing or preliminary mapping work. The objective of the new fetcher is simply and explicitly to make a request to an external API and, given a valid response, stash the response body in s3. However, much of this parsing and mapping logic will be critical to migrate into the Rikolti mappers, so it's important to not just extract this logic, but also track what has been extracted for later use in migrating the corresponding mapper functions.
- TODO: should we store response metadata alongside the response body? Caching?
The existing fetchers and the new Rikolti fetchers both utilize the harvest api url and harvest extra data stored in the Collection Registry. To start developing a Rikolti fetcher, you'll want to get a few sample api urls for that harvest type and survey the kinds of information stored in harvest extra data. While it is just one field, sometimes multiple pieces of data are stored there using a delimiter.
At https://registry.cdlib.org, select limit by Solr count not empty, and then choose the relevant fetcher in the sidebar under the heading "Limit by harvest type". Note the harvest type abbreviation used in the URL. You can explore the harvest url and harvest extra data values using the following API endpoint:
https://registry.cdlib.org/api/v1/rikoltifetcher/?harvest_type=&format=json
This API endpoint also accepts the following filtering parameters: offset
, id__in
, harvest_extra_data__isnull
, solr_count__gt
, and solr_count__lt
5. Determine how pagination works for this fetcher from looking at the old fetcher and curling harvest endpoint/harvest extra data.
Is there a resumption token in the response from the previous page? Is there a page number?
Typically includes at least the existing url_harvest
and harvest_extra_data
. Rename harvest_extra_data
so that it makes sense for the given harvest type.
Sometimes we've shoe-horned several pieces of "extra data" into the harvest_extra_data
field with some delimiter character. For bonus points, it is better to be explicit with the data needed to fetch a collection - split this out into multiple keys in the harvest_data
model, but also allow for the historical use of one string with delimiter characters.
The harvest_data
model should also include some sort of way to specify the page we are interested in fetching, but assume the first page if none is provided.
This method should use the harvest_data
data model to construct, at a minimum, a dictionary containing the key url
as well as any other parameters to pass through to requests.get().
Once this step is complete, you should be able to run your Fetcher, though it will only fetch the first page and the results won't get stashed anywhere.
This method should somehow check that the response is a meaningful response.
Once this step is complete, you should be able to run your Fetcher. It will only fetch the first page of results, but the results should get stashed (either locally, or on s3 depending on your environment)
This method should update the harvest_data
data model for the next page.
This method should return a json representation of the current state for the next page.
Once this step is complete, you should be able to run your Fetcher to fetch all pages of results.
Assumes you have access to our AWS account: ./deploy-version.sh
Currently, testing consists of running the fetcher with FETCHER_LOCAl_RUN=True
and FETCHER_DATA_DEST='local'
set in the environment with a sample collection set and spot checking the results saved locally against manually hitting the API in the browser or via curl. Here's some sample data for harvesting Nuxeo collections: https://github.com/ucldc/rikolti/blob/main/metadata_fetcher/sample_data/nuxeo_harvests.py. It's pretty easy to create this sample data from the registry by hitting the rikoltifetcher
API endpoint and using filters to produce a specific sample set: https://registry.cdlib.org/api/v1/rikoltifetcher/?format=json. Filters currently supported: harvest_type
, id__in
, harvest_extra_data__isnull
, solr_count__gt
, solr_count__lt
, and offset
for pagination. Django filter reference: https://docs.djangoproject.com/en/4.1/ref/models/querysets/#filter and collection model reference: https://github.com/ucldc/avram/blob/master/library_collection/models.py
Then you can simply point the tests.py
script at your sample data and run python tests.py