diff --git a/catalog/dags/database/catalog_cleaner/catalog_cleaner.py b/catalog/dags/database/catalog_cleaner/catalog_cleaner.py index 7cd665895cf..913e8bf0d68 100644 --- a/catalog/dags/database/catalog_cleaner/catalog_cleaner.py +++ b/catalog/dags/database/catalog_cleaner/catalog_cleaner.py @@ -1,8 +1,18 @@ """ Catalog Data Cleaner DAG -Use TSV files created during the clean step of the ingestion process to bring the -changes into the catalog. +Use the TSV files created during the cleaning step of the ingestion process to bring +the changes into the catalog database and make the updates permanent. + +The DAG has a structure similar to the batched_update DAG, but with a few key +differences: + 1. Given the structure of the TSV, it updates a single column at a time. + 2. The batch updates are parallelized to speed up the process. The maximum number of + active tasks is limited to 3 (at first to try it out and) to avoid overwhelming + the database. + 3. It needs slightly different SQL queries to update the data. One change for example, + is that it only works with the `image` table given that is the only one where the + cleaning steps are applied to in the ingestion server. """ import logging @@ -13,7 +23,9 @@ from airflow.models.abstractoperator import AbstractOperator from airflow.models.param import Param from airflow.operators.python import get_current_context +from airflow.utils.trigger_rule import TriggerRule +from common import slack from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID from common.sql import ( RETURN_ROW_COUNT, @@ -30,6 +42,7 @@ @task def count_dirty_rows(temp_table_name: str, task: AbstractOperator = None): + """Get the number of rows in the temp table before the updates.""" count = run_sql.function( dry_run=False, sql_template=f"SELECT COUNT(*) FROM {temp_table_name}", @@ -43,6 +56,7 @@ def count_dirty_rows(temp_table_name: str, task: AbstractOperator = None): @task def get_batches(total_row_count: int, batch_size: int) -> list[tuple[int, int]]: + """Return a list of tuples with the start and end row_id for each batch.""" return [(i, i + batch_size) for i in range(0, total_row_count, batch_size)] @@ -55,6 +69,9 @@ def update_batch( ): batch_start, batch_end = batch logger.info(f"Going through row_id {batch_start:,} to {batch_end:,}.") + + # Includes the formatted batch range in the context to be used as the index + # template for easier identification of the tasks in the UI. context = get_current_context() context["index_template"] = f"{batch_start}__{batch_end}" @@ -72,6 +89,21 @@ def update_batch( return count +@task +def sum_up_counts(counts: list[int]) -> int: + return sum(counts) + + +@task +def notify_slack(text): + slack.send_message( + text=text, + username=constants.SLACK_USERNAME, + icon_emoji=constants.SLACK_ICON, + dag_id=constants.DAG_ID, + ) + + @dag( dag_id=constants.DAG_ID, default_args={ @@ -100,7 +132,6 @@ def update_batch( enum=["url", "creator_url", "foreign_landing_url"], description="The column of the table to apply the updates.", ), - # "table": Param(type="str", description="The media table to update."), "batch_size": Param( default=10000, type="integer", @@ -113,11 +144,12 @@ def catalog_cleaner(): column = "{{ params.column }}" temp_table_name = f"temp_cleaned_image_{column}" - create = PGExecuteQueryOperator( + create_table = PGExecuteQueryOperator( task_id="create_temp_table", postgres_conn_id=POSTGRES_CONN_ID, - sql=constants.CREATE_SQL.format(temp_table_name=temp_table_name, column=column), - execution_timeout=timedelta(minutes=1), + sql=constants.CREATE_TEMP_TABLE_SQL.format( + temp_table_name=temp_table_name, column=column + ), ) load = PGExecuteQueryOperator( @@ -130,16 +162,28 @@ def catalog_cleaner(): s3_path_to_file="{{ params.s3_path }}", aws_region=aws_region, ), - execution_timeout=timedelta(hours=1), + ) + + create_index = PGExecuteQueryOperator( + task_id="create_temp_table_index", + postgres_conn_id=POSTGRES_CONN_ID, + sql=constants.CREATE_INDEX_SQL.format(temp_table_name=temp_table_name), ) count = count_dirty_rows(temp_table_name) batches = get_batches(total_row_count=count, batch_size="{{ params.batch_size }}") - updates = update_batch.partial( - temp_table_name=temp_table_name, column=column - ).expand(batch=batches) + updates = ( + update_batch.override( + max_active_tis_per_dag=3, + retries=0, + ) + .partial(temp_table_name=temp_table_name, column=column) + .expand(batch=batches) + ) + + total = sum_up_counts(updates) drop = PGExecuteQueryOperator( task_id="drop_temp_tables", @@ -148,7 +192,21 @@ def catalog_cleaner(): execution_timeout=timedelta(minutes=1), ) - create >> load >> count >> updates >> drop + notify_success = notify_slack.override(task_id="notify_success")( + f"Upstream cleaning was completed successfully updating column `{column}` for" + f" {total} rows.", + ) + + notify_failure = notify_slack.override( + task_id="notify_failure", trigger_rule=TriggerRule.ONE_FAILED + )("Upstream cleaning failed. Check the logs for more information.") + + create_table >> load >> create_index >> count + + # Make explicit the dependency from total (sum_up_counts task) to show it in the graph + updates >> [drop, total] >> notify_success + + drop >> notify_failure catalog_cleaner() diff --git a/catalog/dags/database/catalog_cleaner/constants.py b/catalog/dags/database/catalog_cleaner/constants.py index e4027772966..59e9d70b2ad 100644 --- a/catalog/dags/database/catalog_cleaner/constants.py +++ b/catalog/dags/database/catalog_cleaner/constants.py @@ -1,11 +1,13 @@ DAG_ID = "catalog_cleaner" +SLACK_USERNAME = "Catalog Cleaner DAG" +SLACK_ICON = ":disk-cleanup:" -CREATE_SQL = """ +CREATE_TEMP_TABLE_SQL = """ DROP TABLE IF EXISTS {temp_table_name}; CREATE UNLOGGED TABLE {temp_table_name} ( row_id SERIAL, identifier uuid NOT NULL, - {column} TEXT + {column} TEXT NOT NULL ); """ @@ -17,6 +19,8 @@ ); """ +CREATE_INDEX_SQL = "CREATE INDEX ON {temp_table_name}(row_id);" + UPDATE_SQL = """ UPDATE image SET {column} = tmp.{column}, updated_on = NOW() FROM {temp_table_name} AS tmp diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index 15e2a08eb56..5bae7e3d086 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -48,6 +48,7 @@ The following are DAGs grouped by their primary tag: | DAG ID | Schedule Interval | | -------------------------------------------------------------------------------------- | ----------------- | | [`batched_update`](#batched_update) | `None` | +| [`catalog_cleaner`](#catalog_cleaner) | `None` | | [`decode_and_deduplicate_image_tags`](#decode_and_deduplicate_media_type_tags) | `None` | | [`delete_records`](#delete_records) | `None` | | [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | @@ -201,7 +202,7 @@ is constructed from the `license` and `license_version` columns. This is a maintenance DAG that should be run once. ---- +---- ### `airflow_log_cleanup` @@ -228,7 +229,7 @@ airflow dags trigger --conf - maxLogAgeInDays: - Optional - enableDelete: - Optional ---- +---- ### `auckland_museum_workflow` @@ -248,7 +249,7 @@ Resource: | /search, /id | 10 | 1000 | | /id/media | 10 | 1000 | ---- +---- ### `batched_update` @@ -327,7 +328,27 @@ already created: for example, if there was a problem with the `update_query` which caused DAG failures during the `update_batches` step. In this case, verify that the `BATCH_START` var is set appropriately for your needs. ---- +---- + +### `catalog_cleaner` + +Catalog Data Cleaner DAG + +Use the TSV files created during the cleaning step of the ingestion process to +bring the changes into the catalog database and make the updates permanent. + +The DAG has a structure similar to the batched_update DAG, but with a few key +differences: + +1. Given the structure of the TSV, it updates a single column at a time. +2. The batch updates are parallelized to speed up the process. The maximum + number of active tasks is limited to 3 (at first to try it out and) to avoid + overwhelming the database. +3. It needs slightly different SQL queries to update the data. One change for + example, is that it only works with the `image` table given that is the only + one where the cleaning steps are applied to in the ingestion server. + +---- ### `cc_mixter_workflow` @@ -341,7 +362,7 @@ Notes: Documentation: ccMixter sends bad JSON and extremely huge headers, both of which need workarounds that are handled by this DAG. ---- +---- ### `check_silenced_dags` @@ -368,7 +389,7 @@ issue has been resolved. The DAG runs weekly. ---- +---- ### `create_filtered_{media_type}_index` @@ -426,7 +447,7 @@ There are two mechanisms that prevent this from happening: This ensures that neither are depending on or modifying the origin indexes critical for the creation of the filtered indexes. ---- +---- ### `create_new_{environment}_es_index` @@ -539,7 +560,7 @@ es-concurrency group for the DAG's environment is running. (E.g., the `create_new_staging_es_index` DAG fails immediately if any DAGs tagged with `staging-es-concurrency` are running.) ---- +---- ### `create_proportional_by_source_staging_index` @@ -574,7 +595,7 @@ interfere with the production `data_refresh` or `create_filtered_index` DAGs. However, it will fail immediately if any of the DAGs tagged as part of the `staging-es-concurrency` group are running. ---- +---- ### `decode_and_deduplicate_{media_type}_tags` @@ -646,7 +667,7 @@ that records selected for deletion in this DAG are not also being written to by a provider DAG, for instance. The simplest way to do this is to ensure that any affected provider DAGs are not currently running. ---- +---- ### `europeana_workflow` @@ -658,7 +679,7 @@ Output: TSV file containing the images and the respective meta-data. Notes: ---- +---- ### `finnish_museums_workflow` @@ -674,7 +695,7 @@ provider script is a dated DAG that ingests all records that were last updated in the previous day. Because of this, it is not necessary to run a separate reingestion DAG, as updated data will be processed during regular ingestion. ---- +---- ### `flickr_audit_sub_provider_workflow` @@ -684,7 +705,7 @@ Check the list of member institutions of the Flickr Commons for institutions that have cc-licensed images and are not already configured as sub-providers for the Flickr DAG. Report suggestions for new sub-providers to Slack. ---- +---- ### `flickr_workflow` @@ -698,7 +719,7 @@ Notes: Rate limit: 3600 requests per hour. ---- +---- ### `freesound_workflow` @@ -713,7 +734,7 @@ Notes: Rate limit: No limit for our API key. This script can be run either to ingest the full dataset or as a dated DAG. ---- +---- ### `inaturalist_workflow` @@ -733,7 +754,7 @@ We use the table structure defined here, except for adding ancestry tags to the taxa table. ---- +---- ### `jamendo_workflow` @@ -748,7 +769,7 @@ non-commercial apps Jamendo Music has more than 500,000 tracks shared by 40,000 artists from over 150 countries all over the world. Audio quality: uploaded as WAV/ FLAC/ AIFF bit depth: 16/24 sample rate: 44.1 or 48 kHz channels: 1/2 ---- +---- ### `justtakeitfree_workflow` @@ -761,7 +782,7 @@ Output: TSV file containing the media and the respective meta-data. Notes: This API requires an API key. For more details, see ---- +---- ### `metropolitan_museum_workflow` @@ -787,7 +808,7 @@ avoid of blocking during local development testing. connect with just date and license. ---- +---- ### `nappy_workflow` @@ -800,7 +821,7 @@ Output: TSV file containing the image meta-data. Notes: This api was written specially for Openverse. There are no known limits or restrictions. ---- +---- ### `oauth2_authorization` @@ -816,7 +837,7 @@ authorization will create an access/refresh token pair in the - Freesound ---- +---- ### `oauth2_token_refresh` @@ -830,7 +851,7 @@ will update the tokens stored in the Variable upon successful refresh. - Freesound ---- +---- ### `phylopic_workflow` @@ -842,7 +863,7 @@ Output: TSV file containing the image, their respective meta-data. Notes: No rate limit specified. ---- +---- ### `point_{environment}_es_alias` @@ -867,7 +888,7 @@ es-concurrency group for the DAG's environment is running. (E.g., the `point_staging_alias` DAG fails immediately if any DAGs tagged with `staging-es-concurrency` are running.) ---- +---- ### `pr_review_reminders` @@ -897,7 +918,7 @@ Unfortunately the DAG does not know when someone is on vacation. It is up to the author of the PR to re-assign review if one of the randomly selected reviewers is unavailable for the time period during which the PR should be reviewed. ---- +---- ### `rawpixel_workflow` @@ -913,7 +934,7 @@ issues. The public API max results range is limited to 100,000 results, although the API key we've been given can circumvent this limit. ---- +---- ### `recreate_full_staging_index` @@ -956,7 +977,7 @@ However, as the DAG operates on the staging API database and ES cluster it will exit immediately if any of the DAGs tagged as part of the `staging_es_concurrency` group are already running. ---- +---- ### `recreate_{media_type}_popularity_calculation` @@ -972,7 +993,7 @@ popularity constants and standardized popularity scores using the new functions. These DAGs are not on a schedule, and should only be run manually when new SQL code is deployed for the calculation. ---- +---- ### `report_pending_reported_media` @@ -987,7 +1008,7 @@ whether further action (such as deindexing the record) needs to be taken. If a record has been reported multiple times, it only needs to be reviewed once and so is only counted once in the reporting by this DAG. ---- +---- ### `rotate_db_snapshots` @@ -1006,7 +1027,7 @@ Requires two variables: `CATALOG_RDS_DB_IDENTIFIER`: The "DBIdentifier" of the RDS DB instance. `CATALOG_RDS_SNAPSHOTS_TO_RETAIN`: How many historical snapshots to retain. ---- +---- ### `science_museum_workflow` @@ -1020,7 +1041,7 @@ Notes: Rate limited, no specific rate given. ---- +---- ### `smithsonian_workflow` @@ -1032,7 +1053,7 @@ Output: TSV file containing the images and the respective meta-data. Notes: ---- +---- ### `smk_workflow` @@ -1044,7 +1065,7 @@ Output: TSV file containing the media metadata. Notes: ---- +---- ### `staging_database_restore` @@ -1070,7 +1091,7 @@ the RDS operations run using a different hook: - `AIRFLOW_CONN_`: The connection string to use for RDS operations (per the above example, it might be `AIRFLOW_CONN_AWS_RDS`) ---- +---- ### `stocksnap_workflow` @@ -1084,7 +1105,7 @@ Notes: All images are licensed under CC0. No rate limits or authorization required. API is undocumented. ---- +---- ### `wikimedia_commons_workflow` @@ -1205,7 +1226,7 @@ parameter to avoid this issue on subsequent iterations. For these requests, we can remove the `globalusage` property from the `prop` parameter entirely and eschew the popularity data for these items. ---- +---- ### `wordpress_workflow` @@ -1218,7 +1239,7 @@ Output: TSV file containing the media metadata. Notes: Provide photos, media, users and more related resources. No rate limit specified. ---- +---- ### `{environment}_elasticsearch_cluster_healthcheck` @@ -1237,7 +1258,7 @@ it is expected, and occurs whenever shards and replicas are being relocated assurance, but we could choose to add logic that ignores yellow cluster health during data refresh or other similar operations. ---- +---- ### `{environment}_{media_type}_data_refresh` @@ -1282,7 +1303,7 @@ and related PRs: - [[Implementation Plan] Ingestion Server Removal](https://docs.openverse.org/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.html) - [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453) ---- +---- ### `{media_type}_data_refresh` @@ -1308,7 +1329,7 @@ and related PRs: - [[Feature] Data refresh orchestration DAG](https://github.com/WordPress/openverse-catalog/issues/353) - [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453) ---- +---- ### `{media_type}_popularity_refresh`