diff --git a/catalog/tests/utilities/dag_doc_gen/test_dag_doc_generation.py b/catalog/tests/utilities/dag_doc_gen/test_dag_doc_generation.py index 95df1e6cbd7..0e21440689f 100644 --- a/catalog/tests/utilities/dag_doc_gen/test_dag_doc_generation.py +++ b/catalog/tests/utilities/dag_doc_gen/test_dag_doc_generation.py @@ -27,6 +27,15 @@ class DagMock(NamedTuple): _MODULE = "utilities.dag_doc_gen.dag_doc_generation" +@pytest.mark.parametrize( + "dag_id, expected_mapped_dag_id", + [ + ("sample_dag_123", "sample_dag_123"), + ("sample_dag_audio_123", "sample_dag_{media_type}_123"), + ("staging_database_restore", "staging_database_restore"), + ("wikimedia_reingestion_workflow", "wikimedia_commons_workflow"), + ], +) @pytest.mark.parametrize("schedule", ["@daily", None]) @pytest.mark.parametrize( "doc, expected_doc", @@ -75,20 +84,30 @@ class DagMock(NamedTuple): ], ) def test_get_dags_info( - schedule, doc, expected_doc, tags, type_, provider_workflow, catchup, expected_dated + dag_id, + expected_mapped_dag_id, + schedule, + doc, + expected_doc, + tags, + type_, + provider_workflow, + catchup, + expected_dated, ): dag = DagMock(schedule_interval=schedule, doc_md=doc, catchup=catchup, tags=tags) expected = DagInfo( - dag_id=DAG_ID, + dag_id=dag_id, schedule=schedule, doc=expected_doc, dated=expected_dated, type_=type_, provider_workflow=provider_workflow, + mapped_dag_id=expected_mapped_dag_id, ) with mock.patch(f"{_MODULE}.get_provider_workflows") as provider_workflow_mock: provider_workflow_mock.return_value.get.return_value = provider_workflow - actual = dag_doc_generation.get_dags_info({DAG_ID: dag})[0] + actual = dag_doc_generation.get_dags_info({dag_id: dag})[0] assert actual == expected @@ -104,6 +123,7 @@ def test_get_dags_info( type_="", dated=False, provider_workflow=None, + mapped_dag_id=DAG_ID, ), False, """ @@ -123,6 +143,7 @@ def test_get_dags_info( type_="", dated=False, provider_workflow=None, + mapped_dag_id=DAG_ID, ), False, """ @@ -142,6 +163,7 @@ def test_get_dags_info( type_="", dated=False, provider_workflow=PROVIDER_WORKFLOW_INSTANCE, + mapped_dag_id=DAG_ID, ), True, """ @@ -150,13 +172,34 @@ def test_get_dags_info( | DAG ID | Schedule Interval | Dated | Media Type(s) | | --- | --- | --- | --- | | [`sample_dag_123`](#sample_dag_123) | `@daily` | `False` | m1, m2 | +""", + ), + # Separate mapped DAG ID + ( + DagInfo( + dag_id=DAG_ID, + schedule="@daily", + doc="A doc does exist here", + type_="", + dated=False, + provider_workflow=None, + mapped_dag_id="something_entirely_different", + ), + False, + """ +### Special Name + +| DAG ID | Schedule Interval | +| --- | --- | +| [`sample_dag_123`](#something_entirely_different) | `@daily` | """, ), ], ) def test_generate_type_subsection(dag_info, is_provider, expected): + dag_by_doc_md = {dag_info.doc: dag_info.mapped_dag_id} actual = dag_doc_generation.generate_type_subsection( - "Special Name", [dag_info], is_provider + "Special Name", [dag_info], is_provider, dag_by_doc_md ) assert actual.strip() == expected.strip() @@ -188,8 +231,8 @@ def test_generate_dag_doc(): with mock.patch(f"{_MODULE}.get_dags_info") as get_dags_info_mock: # Return in reverse order to ensure they show up in the correct order get_dags_info_mock.return_value = [ - DagInfo("b", None, "this one has a doc", "t1", False, None), - DagInfo("a", None, None, "t1", False, None), + DagInfo("b", None, "this one has a doc", "t1", False, None, "b"), + DagInfo("a", None, None, "t1", False, None, "a"), ] actual = dag_doc_generation.generate_dag_doc() assert actual == expected diff --git a/catalog/utilities/dag_doc_gen/dag_doc_generation.py b/catalog/utilities/dag_doc_gen/dag_doc_generation.py index aabec1dffa7..190bf1abd23 100644 --- a/catalog/utilities/dag_doc_gen/dag_doc_generation.py +++ b/catalog/utilities/dag_doc_gen/dag_doc_generation.py @@ -57,6 +57,23 @@ The following is documentation associated with each DAG (where available): """ +# Mapping of terms in DAG IDs to collapse into a single term when displaying +# the single-documentation section. +DAG_ID_TERMS_TO_COLLAPSE = { + "image": "{media_type}", + "audio": "{media_type}", + "production": "{environment}", + "staging": "{environment}", +} +# DAG IDs to ignore when collapsing reference documentation for a mapped term +DAG_IDS_TO_IGNORE_COLLAPSE = { + "recreate_full_staging_index", + "staging_database_restore", + "create_proportional_by_source_staging_index", +} +REINGESTION_SPECIFIC_MAPPING = { + "wikimedia_reingestion_workflow": "wikimedia_commons_workflow" +} # Typing DagMapping = dict[str, DAG] @@ -69,6 +86,7 @@ class DagInfo(NamedTuple): type_: str dated: bool provider_workflow: ProviderWorkflow | None + mapped_dag_id: str def load_dags(dag_folder: str) -> DagMapping: @@ -112,6 +130,26 @@ def fix_headings(doc: str) -> str: return doc +def determine_mapped_dag_id(dag_id: str) -> str: + """ + Determine the mapped DAG ID for the provided DAG ID. + + This is used to collapse multiple references to the same DAG documentation into + a single definition. + """ + if dag_id in DAG_IDS_TO_IGNORE_COLLAPSE: + return dag_id + if dag_id in REINGESTION_SPECIFIC_MAPPING: + return REINGESTION_SPECIFIC_MAPPING[dag_id] + if "_reingestion" in dag_id: + return dag_id.replace("_reingestion", "") + parts = dag_id.split("_") + for idx, part in enumerate(parts): + if part in DAG_ID_TERMS_TO_COLLAPSE: + parts[idx] = DAG_ID_TERMS_TO_COLLAPSE[part] + return "_".join(parts) + + def get_dags_info(dags: DagMapping) -> list[DagInfo]: """ Convert the provided DAG ID -> DAG mapping into a list of DagInfo instances. @@ -139,6 +177,7 @@ def get_dags_info(dags: DagMapping) -> list[DagInfo]: type_=type_, dated=dated, provider_workflow=provider_workflow, + mapped_dag_id=determine_mapped_dag_id(dag_id), ) ) @@ -146,7 +185,10 @@ def get_dags_info(dags: DagMapping) -> list[DagInfo]: def generate_type_subsection( - name: str, dags_info: list[DagInfo], is_provider: bool + name: str, + dags_info: list[DagInfo], + is_provider: bool, + dag_by_doc_md: dict[str, str], ) -> str: """Generate the documentation for a "DAGs by type" subsection.""" log.info(f"Building subsection for '{name}'") @@ -166,12 +208,12 @@ def generate_type_subsection( text += header + "\n" text += "| " + " | ".join(["---"] * column_count) + " |" - for dag in dags_info: + for dag in sorted(dags_info, key=lambda d: d.mapped_dag_id): dag_id = f"`{dag.dag_id}`" # If we have documentation for the DAG, we'll want to link to it within the # markdown, so we reference it using the heading text (the DAG ID) if dag.doc: - dag_id = f"[{dag_id}](#{dag.dag_id})" + dag_id = f"[{dag_id}](#{dag_by_doc_md[dag.doc]})" text += f"\n| {dag_id} | `{dag.schedule}` |" if is_provider: text += f" `{dag.dated}` | {', '.join(dag.provider_workflow.media_types)} |" @@ -184,7 +226,7 @@ def generate_type_subsection( def generate_single_documentation(dag: DagInfo) -> str: """Generate the documentation for a single DAG.""" return f""" -### `{dag.dag_id}` +### `{dag.mapped_dag_id}` {dag.doc} @@ -210,7 +252,14 @@ def generate_dag_doc(dag_folder: Path = DAG_FOLDER) -> str: for dag in dags_info: dags_by_type[dag.type_].append(dag) + dag_by_doc_md: dict[str, str] = {} for type_, dags in sorted(dags_by_type.items()): + for dag in dags: + if dag.doc not in dag_by_doc_md: + # Sphinx removes the brackets from heading references + dag_by_doc_md[dag.doc] = dag.mapped_dag_id.replace("{", "").replace( + "}", "" + ) # Create a more human-readable name name = type_.replace("_", " ").replace("-", " ").title() # Special case for provider tables since they have extra information @@ -219,23 +268,25 @@ def generate_dag_doc(dag_folder: Path = DAG_FOLDER) -> str: # sub-list as part of a table of contents, but defer adding the sub-lists until # all are generated. text += f" 1. [{name}](#{type_.replace('_', '-')})\n" - dag_types.append(generate_type_subsection(name, dags, is_provider)) + dag_types.append( + generate_type_subsection(name, dags, is_provider, dag_by_doc_md) + ) text += "\n" + "\n\n".join(dag_types) text += MIDAMBLE - dag_docs = [] - for dag in sorted(dags_info, key=lambda d: d.dag_id): + dag_docs = set() + for dag in sorted(dags_info, key=lambda d: d.mapped_dag_id): # This section only contains subsections for DAGs where we have documentation if not dag.doc: continue # Similar to the DAGs-by-type section, we add the reference to a table of # contents first, and then defer adding all the generated individual docs until # the very end. - text += f" 1. [`{dag.dag_id}`](#{dag.dag_id})\n" - dag_docs.append(generate_single_documentation(dag)) + text += f" 1. [`{dag.dag_id}`](#{dag_by_doc_md[dag.doc]})\n" + dag_docs.add(generate_single_documentation(dag)) - text += "\n" + "".join(dag_docs) + text += "\n" + "".join(sorted(dag_docs)) # Normalize the newlines at the end of the file and add one more to make sure # our pre-commit checks are happy! diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index 65675922782..fa2ca5fbf8e 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -32,36 +32,36 @@ The following are DAGs grouped by their primary tag: ### Data Refresh -| DAG ID | Schedule Interval | -| ------------------------------------------------------------- | ----------------- | -| [`audio_data_refresh`](#audio_data_refresh) | `0 0 * * 1` | -| [`create_filtered_audio_index`](#create_filtered_audio_index) | `None` | -| [`create_filtered_image_index`](#create_filtered_image_index) | `None` | -| [`image_data_refresh`](#image_data_refresh) | `0 0 * * 1` | +| DAG ID | Schedule Interval | +| ------------------------------------------------------------------ | ----------------- | +| [`create_filtered_audio_index`](#create_filtered_media_type_index) | `None` | +| [`create_filtered_image_index`](#create_filtered_media_type_index) | `None` | +| [`audio_data_refresh`](#media_type_data_refresh) | `0 0 * * 1` | +| [`image_data_refresh`](#media_type_data_refresh) | `0 0 * * 1` | ### Database -| DAG ID | Schedule Interval | -| --------------------------------------------------------------------------------- | ----------------- | -| [`batched_update`](#batched_update) | `None` | -| [`delete_records`](#delete_records) | `None` | -| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | -| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | -| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | -| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | -| [`staging_database_restore`](#staging_database_restore) | `@monthly` | +| DAG ID | Schedule Interval | +| -------------------------------------------------------------------------------------- | ----------------- | +| [`batched_update`](#batched_update) | `None` | +| [`delete_records`](#delete_records) | `None` | +| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | +| [`recreate_audio_popularity_calculation`](#recreate_media_type_popularity_calculation) | `None` | +| [`recreate_image_popularity_calculation`](#recreate_media_type_popularity_calculation) | `None` | +| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | +| [`staging_database_restore`](#staging_database_restore) | `@monthly` | ### Elasticsearch -| DAG ID | Schedule Interval | -| ----------------------------------------------------------------------------------------------- | ----------------- | -| [`create_new_production_es_index`](#create_new_production_es_index) | `None` | -| [`create_new_staging_es_index`](#create_new_staging_es_index) | `None` | -| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | -| [`point_production_es_alias`](#point_production_es_alias) | `None` | -| [`point_staging_es_alias`](#point_staging_es_alias) | `None` | -| [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck) | `*/15 * * * *` | -| [`staging_elasticsearch_cluster_healthcheck`](#staging_elasticsearch_cluster_healthcheck) | `*/15 * * * *` | +| DAG ID | Schedule Interval | +| ------------------------------------------------------------------------------------------------ | ----------------- | +| [`create_new_production_es_index`](#create_new_environment_es_index) | `None` | +| [`create_new_staging_es_index`](#create_new_environment_es_index) | `None` | +| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | +| [`point_production_es_alias`](#point_environment_es_alias) | `None` | +| [`point_staging_es_alias`](#point_environment_es_alias) | `None` | +| [`production_elasticsearch_cluster_healthcheck`](#environment_elasticsearch_cluster_healthcheck) | `*/15 * * * *` | +| [`staging_elasticsearch_cluster_healthcheck`](#environment_elasticsearch_cluster_healthcheck) | `*/15 * * * *` | ### Maintenance @@ -82,10 +82,10 @@ The following are DAGs grouped by their primary tag: ### Popularity Refresh -| DAG ID | Schedule Interval | -| ------------------------------------------------------- | ----------------- | -| [`audio_popularity_refresh`](#audio_popularity_refresh) | `@monthly` | -| [`image_popularity_refresh`](#image_popularity_refresh) | `@monthly` | +| DAG ID | Schedule Interval | +| ------------------------------------------------------------ | ----------------- | +| [`audio_popularity_refresh`](#media_type_popularity_refresh) | `@monthly` | +| [`image_popularity_refresh`](#media_type_popularity_refresh) | `@monthly` | ### Provider @@ -117,12 +117,12 @@ The following are DAGs grouped by their primary tag: ### Provider Reingestion -| DAG ID | Schedule Interval | -| --------------------------------------------------------------------------------------- | ----------------- | -| [`flickr_reingestion_workflow`](#flickr_reingestion_workflow) | `@weekly` | -| [`metropolitan_museum_reingestion_workflow`](#metropolitan_museum_reingestion_workflow) | `@weekly` | -| [`phylopic_reingestion_workflow`](#phylopic_reingestion_workflow) | `@weekly` | -| [`wikimedia_reingestion_workflow`](#wikimedia_reingestion_workflow) | `@weekly` | +| DAG ID | Schedule Interval | +| --------------------------------------------------------------------------- | ----------------- | +| [`flickr_reingestion_workflow`](#flickr_workflow) | `@weekly` | +| [`metropolitan_museum_reingestion_workflow`](#metropolitan_museum_workflow) | `@weekly` | +| [`phylopic_reingestion_workflow`](#phylopic_workflow) | `@weekly` | +| [`wikimedia_reingestion_workflow`](#wikimedia_commons_workflow) | `@weekly` | ## DAG documentation @@ -131,54 +131,54 @@ The following is documentation associated with each DAG (where available): 1. [`add_license_url`](#add_license_url) 1. [`airflow_log_cleanup`](#airflow_log_cleanup) 1. [`auckland_museum_workflow`](#auckland_museum_workflow) -1. [`audio_data_refresh`](#audio_data_refresh) -1. [`audio_popularity_refresh`](#audio_popularity_refresh) 1. [`batched_update`](#batched_update) 1. [`cc_mixter_workflow`](#cc_mixter_workflow) 1. [`check_silenced_dags`](#check_silenced_dags) -1. [`create_filtered_audio_index`](#create_filtered_audio_index) -1. [`create_filtered_image_index`](#create_filtered_image_index) -1. [`create_new_production_es_index`](#create_new_production_es_index) -1. [`create_new_staging_es_index`](#create_new_staging_es_index) +1. [`create_filtered_audio_index`](#create_filtered_media_type_index) +1. [`create_filtered_image_index`](#create_filtered_media_type_index) +1. [`create_new_production_es_index`](#create_new_environment_es_index) +1. [`create_new_staging_es_index`](#create_new_environment_es_index) 1. [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) 1. [`delete_records`](#delete_records) 1. [`europeana_workflow`](#europeana_workflow) 1. [`finnish_museums_workflow`](#finnish_museums_workflow) 1. [`flickr_audit_sub_provider_workflow`](#flickr_audit_sub_provider_workflow) -1. [`flickr_reingestion_workflow`](#flickr_reingestion_workflow) +1. [`flickr_reingestion_workflow`](#flickr_workflow) 1. [`flickr_workflow`](#flickr_workflow) 1. [`freesound_workflow`](#freesound_workflow) -1. [`image_data_refresh`](#image_data_refresh) -1. [`image_popularity_refresh`](#image_popularity_refresh) 1. [`inaturalist_workflow`](#inaturalist_workflow) 1. [`jamendo_workflow`](#jamendo_workflow) 1. [`justtakeitfree_workflow`](#justtakeitfree_workflow) -1. [`metropolitan_museum_reingestion_workflow`](#metropolitan_museum_reingestion_workflow) +1. [`metropolitan_museum_reingestion_workflow`](#metropolitan_museum_workflow) 1. [`metropolitan_museum_workflow`](#metropolitan_museum_workflow) 1. [`nappy_workflow`](#nappy_workflow) 1. [`oauth2_authorization`](#oauth2_authorization) 1. [`oauth2_token_refresh`](#oauth2_token_refresh) -1. [`phylopic_reingestion_workflow`](#phylopic_reingestion_workflow) +1. [`phylopic_reingestion_workflow`](#phylopic_workflow) 1. [`phylopic_workflow`](#phylopic_workflow) -1. [`point_production_es_alias`](#point_production_es_alias) -1. [`point_staging_es_alias`](#point_staging_es_alias) +1. [`point_production_es_alias`](#point_environment_es_alias) +1. [`point_staging_es_alias`](#point_environment_es_alias) 1. [`pr_review_reminders`](#pr_review_reminders) -1. [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck) 1. [`rawpixel_workflow`](#rawpixel_workflow) -1. [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) 1. [`recreate_full_staging_index`](#recreate_full_staging_index) -1. [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) +1. [`recreate_audio_popularity_calculation`](#recreate_media_type_popularity_calculation) +1. [`recreate_image_popularity_calculation`](#recreate_media_type_popularity_calculation) 1. [`report_pending_reported_media`](#report_pending_reported_media) 1. [`rotate_db_snapshots`](#rotate_db_snapshots) 1. [`science_museum_workflow`](#science_museum_workflow) 1. [`smithsonian_workflow`](#smithsonian_workflow) 1. [`smk_workflow`](#smk_workflow) 1. [`staging_database_restore`](#staging_database_restore) -1. [`staging_elasticsearch_cluster_healthcheck`](#staging_elasticsearch_cluster_healthcheck) 1. [`stocksnap_workflow`](#stocksnap_workflow) 1. [`wikimedia_commons_workflow`](#wikimedia_commons_workflow) -1. [`wikimedia_reingestion_workflow`](#wikimedia_reingestion_workflow) +1. [`wikimedia_reingestion_workflow`](#wikimedia_commons_workflow) 1. [`wordpress_workflow`](#wordpress_workflow) +1. [`production_elasticsearch_cluster_healthcheck`](#environment_elasticsearch_cluster_healthcheck) +1. [`staging_elasticsearch_cluster_healthcheck`](#environment_elasticsearch_cluster_healthcheck) +1. [`audio_data_refresh`](#media_type_data_refresh) +1. [`image_data_refresh`](#media_type_data_refresh) +1. [`audio_popularity_refresh`](#media_type_popularity_refresh) +1. [`image_popularity_refresh`](#media_type_popularity_refresh) ### `add_license_url` @@ -235,53 +235,6 @@ https://github.com/AucklandMuseum/API/wiki/Tutorial | /search, /id | 10 | 1000 | | /id/media | 10 | 1000 | -### `audio_data_refresh` - -#### Data Refresh DAG Factory - -This file generates our data refresh DAGs using a factory function. For the -given media type these DAGs will initiate a data refresh on the ingestion server -and await the success or failure of that task. - -A data refresh occurs on the Ingestion server in the Openverse project. This is -a task which imports data from the upstream Catalog database into the API, -copies contents to a new Elasticsearch index, and finally makes the index -"live". This process is necessary to make new content added to the Catalog by -our provider DAGs available to the API. You can read more in the -[README](https://github.com/WordPress/openverse/blob/main/ingestion_server/README.md) -Importantly, the data refresh TaskGroup is also configured to handle concurrency -requirements of the Ingestion server. Finally, once the origin indexes have been -refreshed, the corresponding filtered index creation DAG is triggered. - -You can find more background information on this process in the following issues -and related PRs: - -- [[Feature] Data refresh orchestration DAG](https://github.com/WordPress/openverse-catalog/issues/353) -- [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453) - -### `audio_popularity_refresh` - -#### Popularity Refresh DAG Factory - -This file generates our popularity refresh DAGs using a factory function. - -For the given media type these DAGs will first update the popularity metrics -table, adding any new metrics and updating the percentile that is used in -calculating the popularity constants. It then refreshes the popularity constants -view, which recalculates the popularity constant for each provider. - -Once the constants have been updated, the DAG will trigger a `batched_update` -DagRun for each provider of this media_type that is configured to support -popularity data. The batched update recalculates standardized popularity scores -for all records, using the new constant. When the updates are complete, all -records have up-to-date popularity data. This DAG can be run concurrently with -data refreshes and regular ingestion. - -You can find more background information on this process in the following -implementation plan: - -- [[Implementation Plan] Decoupling Popularity Calculations from the Data Refresh](https://docs.openverse.org/projects/proposals/popularity_optimizations/20230420-implementation_plan_popularity_optimizations.html) - ### `batched_update` #### Batched Update DAG @@ -396,63 +349,7 @@ issue has been resolved. The DAG runs weekly. -### `create_filtered_audio_index` - -#### Create filtered index DAG factory - -This module creates the filtered index creation DAGs for each media type using a -factory function. - -Filtered index creation is handled by the ingestion server. The DAGs generated -by the `create_filtered_index_creation_dag` function in this module are -responsible for triggering the ingestion server action to create and populate -the filtered index for a given media type. The DAG awaits the completion of the -filtered index creation and then points the filtered index alias for the media -type to the newly created index. They make use of the -`create_filtered_index_creation_task_groups` factory, which is also used by the -data refreshes to perform the same functions. The purpose of these DAGs is to -allow the filtered index creation steps to be run in isolation from the data -refresh. - -##### When this DAG runs - -The DAGs generated by the `create_filtered_index_creation_dag` can be used to -manually run the filtered index creation and promotion steps described above in -isolation from the rest of the data refresh. These DAGs also include checks to -ensure that race conditions with the data refresh DAGs are not encountered (see -`Race conditions` section below). - -The DAGs generated in this module are on a `None` schedule and are only -triggered manually. This is primarily useful in two cases: for testing changes -to the filtered index creation; and for re-running filtered index creation if an -urgent change to the sensitive terms calls for an immediate recreation of the -filtered indexes. - -##### Race conditions - -Because filtered index creation employs the `reindex` Elasticsearch API to -derive the filtered index from an existing index, we need to be mindful of the -race condition that potentially exists between the data refresh DAG and this -DAG. The race condition is caused by the fact that the data refresh DAG always -deletes the previous index once the new index for the media type is finished -being created. Consider the situation where filtered index creation is triggered -to run during a data refresh. The filtered index is being derived from the -previous index for the media type. Once the data refresh is finished, it will -delete that index, causing the reindex to halt because suddenly it has no data -source from which to pull documents. - -There are two mechanisms that prevent this from happening: - -1. The filtered index creation DAGs fail immediately if any of the DAGs that are - tagged as part of the `production-es-concurrency` group (including the data - refreshes) are currently running. -2. The data refresh DAGs will wait for any pre-existing filtered index creation - DAG runs for the media type to finish before continuing. - -This ensures that neither are depending on or modifying the origin indexes -critical for the creation of the filtered indexes. - -### `create_filtered_image_index` +### `create_filtered_{media_type}_index` #### Create filtered index DAG factory @@ -508,118 +405,7 @@ There are two mechanisms that prevent this from happening: This ensures that neither are depending on or modifying the origin indexes critical for the creation of the filtered indexes. -### `create_new_production_es_index` - -#### Create New ES Index DAG - -This file generates our Create New ES Index DAGs using a factory function. A -separate DAG is generated for the staging and production environments. - -Each DAG can be used to create new Elasticsearch indices in their respective -environment, based on an existing index. The following configuration options are -available: - -- `media_type` : media type for which to create the new index -- `index_suffix` : optional suffix to be added to the new index name. If not - supplied, a creation timestamp is used. -- `source_index` : the existing index on which to base the new index, and from - which to copy records -- `index_config` : a JSON object containing the configuration for the new index. - By default, this will be merged into the configuration of the source index - according to the merging policy documented below. -- `query` : an optional Elasticsearch query, used to filter the documents copied - from the source index into the new index. If not supplied, all records are - copied. -- `override_config`: boolean override; when True, the `index_config` will be - used for the new index configuration _without_ merging any values from the - source index config. -- `target_alias` : optional alias to be applied to the new index after - reindexing. If the alias already applies to an existing index, it will be - removed first. -- `should_delete_old_index`: whether to remove the index previously pointed to - by the target_alias, if it exists. Defaults to False. - -##### Merging policy - -The configuration will be merged such that a leaf key in the `index_config` -overwrites the entire value present in the source configuration at that key. The -leaf values are merged naively, so a list for instance is replaced entirely -(rather than appending values). For example, if the base configuration is: - -``` -{ - "settings": { - "index": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "analysis": { - "filter": { - "stem_overrides": { - "type": "stemmer_override", - "rules": [ - "animals => animal", - "animal => animal", - "anime => anime", - "animate => animate", - "animated => animate", - "universe => universe" - ] - } - } - } - } -} -``` - -And the `index_config` passed in is: - -``` -{ - "settings": { - "index": { - "number_of_shards": 2, - }, - "analysis": { - "filter": { - "stem_overrides": { - "rules": ["crim => cribble"] - } - } - } - } -} -``` - -The resulting, merged configuration will be: - -``` -{ - "settings": { - "index": { - "number_of_shards": 2, - "number_of_replicas": 1 - }, - "analysis": { - "filter": { - "stem_overrides": { - "type": "stemmer_override", - "rules": ["crim => cribble"] - } - } - } - } -} -``` - -##### Race conditions - -Each DAG will fail immediately if any of the DAGs tagged as part of the -es-concurrency group for the DAG's environment is running. (E.g., the -`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with -`staging-es-concurrency` are running.) - -### `create_new_staging_es_index` +### `create_new_{environment}_es_index` #### Create New ES Index DAG @@ -850,16 +636,6 @@ Check the list of member institutions of the Flickr Commons for institutions that have cc-licensed images and are not already configured as sub-providers for the Flickr DAG. Report suggestions for new sub-providers to Slack. -### `flickr_reingestion_workflow` - -Content Provider: Flickr - -ETL Process: Use the API to identify all CC licensed images. - -Output: TSV file containing the images and the respective meta-data. - -Notes: https://www.flickr.com/help/terms/api Rate limit: 3600 requests per hour. - ### `flickr_workflow` Content Provider: Flickr @@ -881,76 +657,29 @@ Output: TSV file containing the image, the respective meta-data. Notes: https://freesound.org/docs/api/ Rate limit: No limit for our API key. This script can be run either to ingest the full dataset or as a dated DAG. -### `image_data_refresh` +### `inaturalist_workflow` -#### Data Refresh DAG Factory +Provider: iNaturalist -This file generates our data refresh DAGs using a factory function. For the -given media type these DAGs will initiate a data refresh on the ingestion server -and await the success or failure of that task. +Output: Records loaded to the image catalog table. -A data refresh occurs on the Ingestion server in the Openverse project. This is -a task which imports data from the upstream Catalog database into the API, -copies contents to a new Elasticsearch index, and finally makes the index -"live". This process is necessary to make new content added to the Catalog by -our provider DAGs available to the API. You can read more in the -[README](https://github.com/WordPress/openverse/blob/main/ingestion_server/README.md) -Importantly, the data refresh TaskGroup is also configured to handle concurrency -requirements of the Ingestion server. Finally, once the origin indexes have been -refreshed, the corresponding filtered index creation DAG is triggered. +Notes: The iNaturalist API is not intended for data scraping. +https://api.inaturalist.org/v1/docs/ But there is a full dump intended for +sharing on S3. +https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata +Because these are exceptionally large normalized tables, as opposed to more +document oriented API responses, we found that bringing the data into postgres +first was the most effective approach. More detail in slack here: +https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N +We use the table structure defined here, +https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql +except for adding ancestry tags to the taxa table. -You can find more background information on this process in the following issues -and related PRs: +### `jamendo_workflow` -- [[Feature] Data refresh orchestration DAG](https://github.com/WordPress/openverse-catalog/issues/353) -- [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453) +Content Provider: Jamendo -### `image_popularity_refresh` - -#### Popularity Refresh DAG Factory - -This file generates our popularity refresh DAGs using a factory function. - -For the given media type these DAGs will first update the popularity metrics -table, adding any new metrics and updating the percentile that is used in -calculating the popularity constants. It then refreshes the popularity constants -view, which recalculates the popularity constant for each provider. - -Once the constants have been updated, the DAG will trigger a `batched_update` -DagRun for each provider of this media_type that is configured to support -popularity data. The batched update recalculates standardized popularity scores -for all records, using the new constant. When the updates are complete, all -records have up-to-date popularity data. This DAG can be run concurrently with -data refreshes and regular ingestion. - -You can find more background information on this process in the following -implementation plan: - -- [[Implementation Plan] Decoupling Popularity Calculations from the Data Refresh](https://docs.openverse.org/projects/proposals/popularity_optimizations/20230420-implementation_plan_popularity_optimizations.html) - -### `inaturalist_workflow` - -Provider: iNaturalist - -Output: Records loaded to the image catalog table. - -Notes: The iNaturalist API is not intended for data scraping. -https://api.inaturalist.org/v1/docs/ But there is a full dump intended for -sharing on S3. -https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata -Because these are exceptionally large normalized tables, as opposed to more -document oriented API responses, we found that bringing the data into postgres -first was the most effective approach. More detail in slack here: -https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N -We use the table structure defined here, -https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql -except for adding ancestry tags to the taxa table. - -### `jamendo_workflow` - -Content Provider: Jamendo - -ETL Process: Use the API to identify all CC-licensed audio. +ETL Process: Use the API to identify all CC-licensed audio. Output: TSV file containing the audio meta-data. @@ -970,30 +699,6 @@ Output: TSV file containing the media and the respective meta-data. Notes: https://justtakeitfree.com/api/api.php This API requires an API key. For more details, see https://github.com/WordPress/openverse/pull/2793 -### `metropolitan_museum_reingestion_workflow` - -Content Provider: Metropolitan Museum of Art - -ETL Process: Use the API to identify all CC0 artworks. - -Output: TSV file containing the image, their respective meta-data. - -Notes: https://metmuseum.github.io/#search "Please limit requests to 80 requests -per second." May need to bump up the delay (e.g. to 3 seconds), to avoid of -blocking during local development testing. - - Some analysis to improve data quality was conducted using a - separate csv file here: https://github.com/metmuseum/openaccess - - Get a list of object IDs: - https://collectionapi.metmuseum.org/public/collection/v1/objects?metadataDate=2022-08-10 - Get a specific object: - https://collectionapi.metmuseum.org/public/collection/v1/objects/1027 - The search functionality requires a specific query (term search) - in addition to date and public domain. It seems like it won't - connect with just date and license. - https://collectionapi.metmuseum.org/public/collection/v1/search?isPublicDomain=true&metadataDate=2022-08-07 - ### `metropolitan_museum_workflow` Content Provider: Metropolitan Museum of Art @@ -1055,16 +760,6 @@ will update the tokens stored in the Variable upon successful refresh. - Freesound -### `phylopic_reingestion_workflow` - -Content Provider: PhyloPic - -ETL Process: Use the API to identify all CC licensed images. - -Output: TSV file containing the image, their respective meta-data. - -Notes: http://api-docs.phylopic.org/v2/ No rate limit specified. - ### `phylopic_workflow` Content Provider: PhyloPic @@ -1075,30 +770,7 @@ Output: TSV file containing the image, their respective meta-data. Notes: http://api-docs.phylopic.org/v2/ No rate limit specified. -### `point_production_es_alias` - -#### Point ES Alias DAG - -This file generates our Point ES Alias DAGs using a factory function. A separate -DAG is generated for the staging and production environments. - -The DAGs are used to point a `target_alias` to a `target_index` in the given -environment's elasticsearch cluster. When the alias is applied, it is first -removed from any existing index to which it already applies; optionally, it can -also delete that index afterward. - -##### When this DAG runs - -This DAG is on a `None` schedule and is run manually. - -##### Race conditions - -Each DAG will fail immediately if any of the DAGs tagged as part of the -es-concurrency group for the DAG's environment is running. (E.g., the -`point_staging_alias` DAG fails immediately if any DAGs tagged with -`staging-es-concurrency` are running.) - -### `point_staging_es_alias` +### `point_{environment}_es_alias` #### Point ES Alias DAG @@ -1144,23 +816,6 @@ Unfortunately the DAG does not know when someone is on vacation. It is up to the author of the PR to re-assign review if one of the randomly selected reviewers is unavailable for the time period during which the PR should be reviewed. -### `production_elasticsearch_cluster_healthcheck` - -Monitor staging and production Elasticsearch cluster health endpoint. - -Requests the cluster health and alerts under the following conditions: - -- Red cluster health -- Unexpected number of nodes -- Unresponsive cluster - -Additionally, the DAG will notify (rather than alert) when the cluster health is -yellow. Yellow cluster health may or may not be an issue, depending on whether -it is expected, and occurs whenever shards and replicas are being relocated -(e.g., during reindexes). It is worthwhile to notify in these cases, as an -assurance, but we could choose to add logic that ignores yellow cluster health -during data refresh or other similar operations. - ### `rawpixel_workflow` Content Provider: Rawpixel @@ -1175,20 +830,6 @@ issues. The public API max results range is limited to 100,000 results, although the API key we've been given can circumvent this limit. https://www.rawpixel.com/api/v1/search?tags=$publicdomain&page=1&pagesize=100 -### `recreate_audio_popularity_calculation` - -This file generates Apache Airflow DAGs that, for the given media type, -completely wipes out and recreates the PostgreSQL functions involved in -calculating our standardized popularity metric. - -Note that they do not drop any tables or views related to popularity, and they -do not perform any popularity calculations. Once this DAG has been run, the -associated popularity refresh DAG must be run in order to actually recalculate -popularity constants and standardized popularity scores using the new functions. - -These DAGs are not on a schedule, and should only be run manually when new SQL -code is deployed for the calculation. - ### `recreate_full_staging_index` #### Recreate Full Staging Index DAG @@ -1230,7 +871,7 @@ However, as the DAG operates on the staging API database and ES cluster it will exit immediately if any of the DAGs tagged as part of the `staging_es_concurrency` group are already running. -### `recreate_image_popularity_calculation` +### `recreate_{media_type}_popularity_calculation` This file generates Apache Airflow DAGs that, for the given media type, completely wipes out and recreates the PostgreSQL functions involved in @@ -1330,23 +971,6 @@ the RDS operations run using a different hook: - `AIRFLOW_CONN_`: The connection string to use for RDS operations (per the above example, it might be `AIRFLOW_CONN_AWS_RDS`) -### `staging_elasticsearch_cluster_healthcheck` - -Monitor staging and production Elasticsearch cluster health endpoint. - -Requests the cluster health and alerts under the following conditions: - -- Red cluster health -- Unexpected number of nodes -- Unresponsive cluster - -Additionally, the DAG will notify (rather than alert) when the cluster health is -yellow. Yellow cluster health may or may not be an issue, depending on whether -it is expected, and occurs whenever shards and replicas are being relocated -(e.g., during reindexes). It is worthwhile to notify in these cases, as an -assurance, but we could choose to add logic that ignores yellow cluster health -during data refresh or other similar operations. - ### `stocksnap_workflow` Content Provider: StockSnap @@ -1478,132 +1102,77 @@ parameter to avoid this issue on subsequent iterations. For these requests, we can remove the `globalusage` property from the `prop` parameter entirely and eschew the popularity data for these items. -### `wikimedia_reingestion_workflow` - -**Content Provider:** Wikimedia Commons - -**ETL Process:** Use the API to identify all CC-licensed images. - -**Output:** TSV file containing the image, the respective meta-data. - -#### Notes - -Rate limit of no more than 200 requests/second, and we are required to set a -unique User-Agent field -([docs](https://www.mediawiki.org/wiki/Wikimedia_REST_API#Terms_and_conditions)). - -Wikimedia Commons uses an implementation of the -[MediaWiki API](https://www.mediawiki.org/wiki/API:Main_page). This API is -incredibly complex in the level of configuration you can provide when querying, -and as such it can also be quite abstruse. The most straightforward docs can be -found on the -[Wikimedia website directly](https://commons.wikimedia.org/w/api.php?action=help&modules=query), -as these show all the parameters available. Specifications on queries can also -be found on the [query page](https://www.mediawiki.org/wiki/API:Query). - -Different kinds of queries can be made against the API using "modules", we use -the [allimages module](https://www.mediawiki.org/wiki/API:Allimages), which lets -us search for images in a given time range (see `"generator": "allimages"` in -the query params). +### `wordpress_workflow` -Many queries will return results in batches, with the API supplying a "continue" -token. This token is used on subsequent calls to tell the API where to start the -next set of results from; it functions as a page offset -([docs](https://www.mediawiki.org/wiki/API:Query#Continuing_queries)). +Content Provider: WordPress Photo Directory -We can also specify what kinds of information we want for the query. Wikimedia -has a massive amount of data on it, so it only returns what we ask for. The -fields that are returned are defined by the -[properties](https://www.mediawiki.org/wiki/API:Properties) or "props" -parameter. Sub-properties can also be defined, with the parameter name of the -sub-property determined by an abbreviation of the higher-level property. For -instance, if our property is "imageinfo" and we want to set sub-property values, -we would define those in "iiprops". +ETL Process: Use the API to identify all openly licensed media. -The data within a property is paginated as well, Wikimedia will handle iteration -through the property sub-pages using the "continue" token -([see here for more details](https://www.mediawiki.org/wiki/API:Properties#Additional_notes)). +Output: TSV file containing the media metadata. -Depending on the kind of property data that's being returned, it's possible for -the API to iterate extensively on a specific media item. What Wikimedia is -iterating over in these cases can be gleaned from the "continue" token. Those -tokens take the form of, as I understand it, -"||", paired with an "continue" value -for the property being iterated over. For example, if we're were iterating over -a set of image properties, the token might look like: +Notes: https://wordpress.org/photos/wp-json/wp/v2 Provide photos, media, users +and more related resources. No rate limit specified. -``` -{ - "iicontinue": "The_Railway_Chronicle_1844.pdf|20221209222801", - "gaicontinue": "20221209222614|NTUL-0527100_英國產業革命史略.pdf", - "continue": "gaicontinue||globalusage", -} -``` +### `{environment}_elasticsearch_cluster_healthcheck` -In this case, we're iterating over the "global all images" generator -(gaicontinue) as our primary iterator, with the "image properties" (iicontinue) -as the secondary continue iterator. The "globalusage" property would be the next -property to iterate over. It's also possible for multiple sub-properties to be -iterated over simultaneously, in which case the "continue" token would not have -a secondary value (e.g. `gaicontinue||`). +Monitor staging and production Elasticsearch cluster health endpoint. -In most runs, the "continue" key will be `gaicontinue||` after the first -request, which means that we have more than one batch to iterate over for the -primary iterator. Some days will have fewer images than the batch limit but -still have multiple batches of content on the secondary iterator, which means -the "continue" key may not have a primary iteration component (e.g. -`||globalusage`). This token can also be seen when the first request has more -data in the secondary iterator, before we've processed any data on the primary -iterator. +Requests the cluster health and alerts under the following conditions: -Occasionally, the ingester will come across a piece of media that has many -results for the property it's iterating over. An example of this can include an -item being on many pages, this it would have many "global usage" results. In -order to process the entire batch, we have to iterate over _all_ of the returned -results; Wikimedia does not provide a mechanism to "skip to the end" of a batch. -On numerous occasions, this iteration has been so extensive that the pull media -task has hit the task's timeout. To avoid this, we limit the number of -iterations we make for parsing through a sub-property's data. If we hit the -limit, we re-issue the original query _without_ requesting properties that -returned large amounts of data. Unfortunately, this means that we will **not** -have that property's data for these items the second time around (e.g. -popularity data if we needed to skip global usage). In the case of popularity, -especially since the problem with these images is that they're so popular, we -want to preserve that information where possible! So we cache the popularity -data from previous iterations and use it in subsequent ones if we come across -the same item again. +- Red cluster health +- Unexpected number of nodes +- Unresponsive cluster -Below are some specific references to various properties, with examples for -cases where they might exceed the limit. Technically, it's feasible for almost -any property to exceed the limit, but these are the ones that we've seen in -practice. +Additionally, the DAG will notify (rather than alert) when the cluster health is +yellow. Yellow cluster health may or may not be an issue, depending on whether +it is expected, and occurs whenever shards and replicas are being relocated +(e.g., during reindexes). It is worthwhile to notify in these cases, as an +assurance, but we could choose to add logic that ignores yellow cluster health +during data refresh or other similar operations. -##### `imageinfo` +### `{media_type}_data_refresh` -[Docs](https://commons.wikimedia.org/w/api.php?action=help&modules=query%2Bimageinfo) +#### Data Refresh DAG Factory -[Example where metadata has hundreds of data points](https://commons.wikimedia.org/wiki/File:The_Railway_Chronicle_1844.pdf#metadata) -(see "Metadata" table, which may need to be expanded). +This file generates our data refresh DAGs using a factory function. For the +given media type these DAGs will initiate a data refresh on the ingestion server +and await the success or failure of that task. -For these requests, we can remove the `metadata` property from the `iiprops` -parameter to avoid this issue on subsequent iterations. +A data refresh occurs on the Ingestion server in the Openverse project. This is +a task which imports data from the upstream Catalog database into the API, +copies contents to a new Elasticsearch index, and finally makes the index +"live". This process is necessary to make new content added to the Catalog by +our provider DAGs available to the API. You can read more in the +[README](https://github.com/WordPress/openverse/blob/main/ingestion_server/README.md) +Importantly, the data refresh TaskGroup is also configured to handle concurrency +requirements of the Ingestion server. Finally, once the origin indexes have been +refreshed, the corresponding filtered index creation DAG is triggered. -##### `globalusage` +You can find more background information on this process in the following issues +and related PRs: -[Docs](https://commons.wikimedia.org/w/api.php?action=help&modules=query%2Bglobalusage) +- [[Feature] Data refresh orchestration DAG](https://github.com/WordPress/openverse-catalog/issues/353) +- [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453) -[Example where an image is used on almost every wiki](https://commons.wikimedia.org/w/index.php?curid=4298234). +### `{media_type}_popularity_refresh` -For these requests, we can remove the `globalusage` property from the `prop` -parameter entirely and eschew the popularity data for these items. +#### Popularity Refresh DAG Factory -### `wordpress_workflow` +This file generates our popularity refresh DAGs using a factory function. -Content Provider: WordPress Photo Directory +For the given media type these DAGs will first update the popularity metrics +table, adding any new metrics and updating the percentile that is used in +calculating the popularity constants. It then refreshes the popularity constants +view, which recalculates the popularity constant for each provider. -ETL Process: Use the API to identify all openly licensed media. +Once the constants have been updated, the DAG will trigger a `batched_update` +DagRun for each provider of this media_type that is configured to support +popularity data. The batched update recalculates standardized popularity scores +for all records, using the new constant. When the updates are complete, all +records have up-to-date popularity data. This DAG can be run concurrently with +data refreshes and regular ingestion. -Output: TSV file containing the media metadata. +You can find more background information on this process in the following +implementation plan: -Notes: https://wordpress.org/photos/wp-json/wp/v2 Provide photos, media, users -and more related resources. No rate limit specified. +- [[Implementation Plan] Decoupling Popularity Calculations from the Data Refresh](https://docs.openverse.org/projects/proposals/popularity_optimizations/20230420-implementation_plan_popularity_optimizations.html) diff --git a/documentation/projects/proposals/data_normalization/20240227-implementation_plan_catalog_data_cleaning.md b/documentation/projects/proposals/data_normalization/20240227-implementation_plan_catalog_data_cleaning.md index 1a7db196b93..8cf23270fc8 100644 --- a/documentation/projects/proposals/data_normalization/20240227-implementation_plan_catalog_data_cleaning.md +++ b/documentation/projects/proposals/data_normalization/20240227-implementation_plan_catalog_data_cleaning.md @@ -45,7 +45,7 @@ approach entails a problem of wasting resources both in time, which continues to increase, and in the machines (CPU) it uses, which could easily be avoided making the changes permanent by saving them in the upstream database. -[img-data-refresh]: ./../../../catalog/reference/DAGs.md#image_data_refresh +[img-data-refresh]: ./../../../catalog/reference/DAGs.md#media_type_data_refresh ## Expected Outcomes