Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Add docs and template ProviderDataIngester #790

Merged
merged 18 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,7 @@ generate-dag-docs fail_on_diff="false":
exit 1
fi
fi

# Generate files for a new provider
add-provider provider_name endpoint +media_types="image":
python3 openverse_catalog/templates/create_provider_ingester.py "{{ provider_name }}" "{{ endpoint }}" -m {{ media_types }}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,20 @@ class ProviderDataIngester(ABC):
@abstractmethod
def providers(self) -> dict[str, str]:
"""
A dictionary whose keys are the supported `media_types`, and values are
the `provider` string in the `media` table of the DB for that type.
A dictionary mapping each supported media type to its corresponding
`provider` string (the string that will populate the `provider` field
in the Catalog DB). These strings should be defined as constants in
common.loader.provider_details.py

By convention, when a provider supports multiple media types we set
separate provider strings for each type. For example:

```
providers = {
"image": provider_details.MYPROVIDER_IMAGE_PROVIDER,
"audio": provider_details.MYPROVIDER_AUDIO_PROVIDER,
}
```
"""
pass

Expand Down Expand Up @@ -105,7 +117,7 @@ def __init__(self, conf: dict = None, date: str = None):
self.delayed_requester = DelayedRequester(
delay=self.delay, headers=self.headers
)
self.media_stores = self.init_media_stores()
self.media_stores = self._init_media_stores()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good call formalizing the public/private nature of these methods!

self.date = date

# dag_run configuration options
Expand All @@ -126,7 +138,7 @@ def __init__(self, conf: dict = None, date: str = None):
# Create a generator to facilitate fetching the next set of query_params.
self.override_query_params = (qp for qp in query_params_list)

def init_media_stores(self) -> dict[str, MediaStore]:
def _init_media_stores(self) -> dict[str, MediaStore]:
"""
Initialize a media store for each media type supported by this
provider.
Expand All @@ -153,7 +165,7 @@ def ingest_records(self, **kwargs) -> None:
logger.info(f"Begin ingestion for {self.__class__.__name__}")

while should_continue:
query_params = self.get_query_params(query_params, **kwargs)
query_params = self._get_query_params(query_params, **kwargs)
if query_params is None:
# Break out of ingestion if no query_params are supplied. This can
# happen when the final `override_query_params` is processed.
Expand All @@ -175,7 +187,7 @@ def ingest_records(self, **kwargs) -> None:

# If errors have already been caught during processing, raise them
# as well.
if error_summary := self.get_ingestion_errors():
if error_summary := self._get_ingestion_errors():
raise error_summary from error
raise

Expand All @@ -192,21 +204,21 @@ def ingest_records(self, **kwargs) -> None:

# Commit whatever records we were able to process, and rethrow the
# exception so the taskrun fails.
self.commit_records()
self._commit_records()
raise error from ingestion_error

if self.limit and record_count >= self.limit:
logger.info(f"Ingestion limit of {self.limit} has been reached.")
should_continue = False

# Commit whatever records we were able to process
self.commit_records()
self._commit_records()

# If errors were caught during processing, raise them now
if error_summary := self.get_ingestion_errors():
if error_summary := self._get_ingestion_errors():
raise error_summary

def get_ingestion_errors(self) -> AggregateIngestionError | None:
def _get_ingestion_errors(self) -> AggregateIngestionError | None:
"""
If any errors were skipped during ingestion, log them as well as the
associated query parameters. Then return an AggregateIngestionError.
Expand Down Expand Up @@ -235,10 +247,13 @@ def get_ingestion_errors(self) -> AggregateIngestionError | None:
)
return None

def get_query_params(self, prev_query_params: dict | None, **kwargs) -> dict | None:
def _get_query_params(
self, prev_query_params: dict | None, **kwargs
) -> dict | None:
"""
Returns the next set of query_params for the next request, handling
optional overrides via the dag_run conf.
optional overrides via the dag_run conf. This method should not be overridden;
instead override get_next_query_params.
"""
# If we are getting query_params for the first batch and initial_query_params
# have been set, return them.
Expand Down Expand Up @@ -391,7 +406,7 @@ def get_record_data(self, data: dict) -> dict | list[dict] | None:
"""
pass

def commit_records(self) -> int:
def _commit_records(self) -> int:
total = 0
for store in self.media_stores.values():
total += store.commit()
Expand Down
99 changes: 99 additions & 0 deletions openverse_catalog/docs/adding_a_new_provider.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Openverse Providers

## Overview

The Openverse Catalog collects data from the APIs of sites that share openly-licensed media, and saves them in our Catalog database. This process is automated by [Airflow DAGs](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html) generated for each provider. A simple provider DAG looks like this:

![Example DAG](assets/provider_dags/simple_dag.png)

At a high level the steps are:

1. `generate_filename`: Generates the named of a TSV (tab-separated values) text file that will be used for saving the data to the disk in later steps
2. `pull_data`: Pulls records from the provider API, collects just the data we need, and commits it to local storage in TSVs.
3. `load_data`: Loads the data from TSVs into the Catalog database, updating old records and discarding duplicates.
4. `report_load_completion`: Reports a summary of added and updated records.

When a provider supports multiple media types (for example, `audio` *and* `images`), the `pull` step consumes data of all types, but separate `load` steps are generated:

![Example Multi-Media DAG](assets/provider_dags/multi_media_dag.png)

## Adding a New Provider

Adding a new provider to Openverse means adding a new provider DAG. Fortunately, our DAG factories automate most of this process. To generate a fully functioning provider DAG, you need to:

1. Implement a `ProviderDataIngester`
2. Add a `ProviderWorkflow` configuration class

### Implementing a `ProviderDataIngester` class

We call the code that pulls data from our provider APIs "Provider API scripts". You can find examples in [`provider_api_scripts` folder](../dags/providers/provider_api_scripts). This code will be run during the `pull` steps of the provider DAG.

At a high level, a provider script should iteratively request batches of records from the provider API, extract data in the format required by Openverse, and commit it to local storage. Much of this logic is implemented in a [`ProviderDataIngester` base class](../dags/providers/provider_api_scripts/provider_data_ingester.py) (which also provides additional testing features *<TODO: link to documentation for testing features like ingestion_limit, skip_ingestion_errors etc>*). To add a new provider, extend this class and implement its abstract methods.

We provide a [script](../dags/templates/create_provider_ingester.py) that can be used to generate the files you'll need and get you started:

```
# PROVIDER_NAME: The name of the provider
# ENDPOINT: The API endpoint from which to fetch data
# MEDIA: Optionally, a space-delineated list of media types ingested by this provider
# (and supported by Openverse). If not provided, defaults to "image".

> just add-provider <PROVIDER_NAME> <ENDPOINT> <MEDIA>

# Example usages:

# Creates a provider that supports just audio
> just add-provider TestProvider https://test.test/search audio

# Creates a provider that supports images and audio
> just add-provider "Foobar Museum" https://foobar.museum.org/api/v1 image audio

# Creates a provider that supports the default, just image
> just add-provider TestProvider https://test.test/search
```

You should see output similar to this:
```
Creating files in /Users/staci/projects/openverse-projects/openverse-catalog
API script: openverse-catalog/openverse_catalog/dags/providers/provider_api_scripts/foobar_museum.py
API script test: openverse-catalog/tests/dags/providers/provider_api_scripts/test_foobar_museum.py

NOTE: You will also need to add a new ProviderWorkflow dataclass configuration to the PROVIDER_WORKFLOWS list in `openverse-catalog/dags/providers/provider_workflows.py`.
```

This generates a provider script with a templated `ProviderDataIngester` for you in the [`provider_api_scripts` folder](../dags/providers/provider_api_scripts), as well as a corresponding test file. Complete the TODOs detailed in the generated files to implement behavior specific to your API.

Some APIs may not fit perfectly into the established `ProviderDataIngester` pattern. For advanced use cases and examples of how to modify the ingestion flow, see the [`ProviderDataIngester` FAQ](provider_data_ingester_faq.md).


### Add a `ProviderWorkflow` configuration class

Now that you have an ingester class, you're ready to wire up a provider DAG in Airflow to automatically pull data and load it into our Catalog database. This is done by defining a `ProviderWorkflow` configuration dataclass and adding it to the `PROVIDER_WORKFLOWS` list in [`provider_workflows.py`](../dags/providers/provider_workflows.py). Our DAG factories will pick up the configuration and generate a complete new DAG in Airflow!

At minimum, you'll need to provide the following in your configuration:
* `provider_script`: the name of the file where you defined your `ProviderDataIngester` class
* `ingestion_callable`: the `ProviderDataIngester` class itself
* `media_types`: the media types your provider handles
Comment on lines +74 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm realizing that we should make an issue for removing the legacy wrapper handling logic, which also means that we should be able to remove provider_script here eventually. I can make that issue 🙂 Also, I think we could even determine media_types automatically by using ProviderDataIngester::providers.keys() 😮 Lots of pairing down we could do on these configs once the refactors are all done! 🥳


Example:
```python
# In openverse_catalog/dags/providers/provider_workflows.py
from providers.provider_api_scripts.foobar_museum import FoobarMuseumDataIngester

...

PROVIDER_WORKFLOWS = [
...
ProviderWorkflow(
provider_script='foobar_museum',
ingestion_callable=FooBarMuseumDataIngester,
media_types=("image", "audio",)
)
]
```

There are many other options that allow you to tweak the `schedule` (when and how often your DAG is run), timeouts for individual steps of the DAG, and more. These are documented in the definition of the `ProviderWorkflow` dataclass. *<TODO: add docs for other options.>*

After adding your configuration, run `just up` and you should now have a fully functioning provider DAG! *<TODO: add and link to docs for how to run provider DAGs locally, preferably with images.>*

*NOTE*: when your code is merged, the DAG will become available in production but will be disabled by default. A contributor with Airflow access will need to manually turn the DAG on in production.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
57 changes: 57 additions & 0 deletions openverse_catalog/docs/data_models.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
*<TODO: This documentation is temporary and should be replaced by more thorough documentation of our DB fields in https://github.com/WordPress/openverse-catalog/issues/783>*

# Data Models

The following is temporary, limited documentation of the columns for each of our Catalog data models.

## Required Fields

| field name | description |
| --- | --- |
| *foreign_identifier* | Unique identifier for the record on the source site. |
| *foreign_landing_url* | URL of page where the record lives on the source website. |
| *audio_url* / *image_url* | Direct link to the media file. Note that until [issue #784 is addressed](https://github.com/WordPress/openverse-catalog/issues/784) the field name differs depending on media type. |
| *license_info* | [LicenseInfo object](https://github.com/WordPress/openverse-catalog/blob/8423590fd86a0a3272ca91bc11f2f37979048181/openverse_catalog/dags/common/licenses/licenses.py#L25) that has (1) the URL of the license for the record, (2) string representation of the license, (3) version of the license, (4) raw license URL that was by provider, if different from canonical URL. Usually generated by calling [`get_license_info`](https://github.com/WordPress/openverse-catalog/blob/8423590fd86a0a3272ca91bc11f2f37979048181/openverse_catalog/dags/common/licenses/licenses.py#L29) on respective fields returns/available from the API. |

## Optional Fields

The following fields are optional, but it is highly encouraged to populate as much data as possible:

| field name | description |
| --- | --- |
| *thumbnail_url* | Direct link to a thumbnail-sized version of the record. |
| *filesize* | Size of the main file in bytes. |
| *filetype* | The filetype of the main file, eg. 'mp3', 'jpg', etc. |
Copy link
Contributor

@obulat obulat Oct 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add something about the fact that filetype can be extracted from the file URL? And will be 'validated', that is, different names of the same filetype (jpeg - jpg) will be unified?

def _validate_filetype(self, filetype: str | None, url: str) -> str | None:
"""
Extracts filetype from the media URL if filetype is None.
Unifies filetypes that have variants such as jpg/jpeg and tiff/tif.
:param filetype: Optional filetype string.
:return: filetype string or None
"""
if filetype is None:
filetype = extract_filetype(url, self.media_type)
if self.media_type != "image":
return filetype
return FILETYPE_EQUIVALENTS.get(filetype, filetype)

| *creator* | The creator of the image. |
| *creator_url* | The user page, or home page of the creator. |
| *title* | Title of the record. |
| *meta_data* | Dictionary of metadata about the record. Currently, a key we prefer to have is `description`. |
| *raw_tags* | List of tags associated with the record. |
| *watermarked* | Boolean, true if the record has a watermark. |

#### Image-specific fields

Image also has the following fields:

| field_name | description |
| --- | --- |
| *width* | Image width in pixels. |
| *height* | Image height in pixels. |

#### Audio-specific fields

Audio has the following fields:

| field_name | description |
| --- | --- |
| *duration* | Audio duration in milliseconds. |
| *bit_rate* | Audio bit rate as int. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add the measurement units here, too.

| *sample_rate* | Audio sample rate as int. |
| *category* | Category such as 'music', 'sound', 'audio_book', or 'podcast'. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is available to all media types. Can we add a reference to it in the image-specific fields as well? Also it might be worth referencing the category enums we have:

(or, it looks like just enum? 🤔 doesn't seem like we have an audio category enum, I'll make an issue for that)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! This documentation at the moment is basically just copying docstrings from the image stores. This doc in particular should be totally rewritten in the Data normalization project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a lot of comments before I got to this one, @stacimc :) Totally fine if we leave everything as is before the re-write. Except for the foreign_identifier being required.

| *genres* | List of genres. |
| *set_foreign_id* | Unique identifier for the audio set on the source site. |
| *audio_set* | The name of the set (album, pack, etc) the audio is part of. |
| *set_position* | Position of the audio in the audio_set. |
| *set_thumbnail* | URL of the audio_set thumbnail. |
| *set_url* | URL of the audio_set. |
| *alt_files* | A dictionary with information about alternative files for the audio (different formats/quality). Dict should have the following keys: *url*, *filesize*, *bit_rate*, *sample_rate*.
Loading