diff --git a/catalog/dags/common/sensors/constants.py b/catalog/dags/common/sensors/constants.py index 3041a39880b..2d3bf1d6958 100644 --- a/catalog/dags/common/sensors/constants.py +++ b/catalog/dags/common/sensors/constants.py @@ -2,8 +2,8 @@ # DagTags used to establish a concurrency group for each environment -PRODUCTION_ES_CONCURRENCY_TAG = "production_concurrency" -STAGING_ES_CONCURRENCY_TAG = "staging_concurrency" +PRODUCTION_ES_CONCURRENCY_TAG = "production_es_concurrency" +STAGING_ES_CONCURRENCY_TAG = "staging_es_concurrency" ES_CONCURRENCY_TAGS = { PRODUCTION: PRODUCTION_ES_CONCURRENCY_TAG, diff --git a/catalog/dags/data_refresh/create_filtered_index_dag.py b/catalog/dags/data_refresh/create_filtered_index_dag.py index f68798e1687..9dccaa58774 100644 --- a/catalog/dags/data_refresh/create_filtered_index_dag.py +++ b/catalog/dags/data_refresh/create_filtered_index_dag.py @@ -42,8 +42,9 @@ There are two mechanisms that prevent this from happening: -1. The filtered index creation DAGs are not allowed to run if a data refresh -for the media type is already running. +1. The filtered index creation DAGs fail immediately if any of the DAGs that are +tagged as prt of the `production-es-concurrency` group (including the data +refreshes) are currently running. 2. The data refresh DAGs will wait for any pre-existing filtered index creation DAG runs for the media type to finish before continuing. diff --git a/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py b/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py index 984ee5e4de2..e60a049399d 100644 --- a/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py +++ b/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py @@ -1,5 +1,5 @@ """ -# Update the staging database +# Staging Database Restore DAG This DAG is responsible for updating the staging database using the most recent snapshot of the production database. @@ -19,6 +19,11 @@ (e.g. `aws_rds`) - `AIRFLOW_CONN_`: The connection string to use for RDS operations (per the above example, it might be `AIRFLOW_CONN_AWS_RDS`) + +## Race conditions + +Because this DAG completely replaces the staging database, it first waits on any +running DAGs that are tagged as part of the `staging_es_concurrency` group. """ import logging diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py index 17771501348..a35c4f02b7d 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py @@ -100,6 +100,13 @@ } } ``` + +## Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) """ import logging diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index fb349b0872f..3d7fe127f9d 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -29,13 +29,9 @@ ## Race conditions Because this DAG runs on the staging elasticsearch cluster, it does _not_ interfere - with the `data_refresh` or `create_filtered_index` DAGs. - -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: -* `staging_database_restore` -* `recreate_full_staging_index` -* `create_new_staging_es_index` + with the production `data_refresh` or `create_filtered_index` DAGs. However, it will + fail immediately if any of the DAGs tagged as part of the `staging-es-concurrency` + group are running. """ from datetime import datetime, timedelta diff --git a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py index 0f1be96d116..c7a452db61b 100644 --- a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py +++ b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py @@ -12,6 +12,13 @@ ## When this DAG runs This DAG is on a `None` schedule and is run manually. + +## Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`point_staging_alias` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) """ from datetime import datetime diff --git a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py index 00496b9e89f..8283da49c38 100644 --- a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py @@ -34,11 +34,9 @@ cluster, it does _not_ interfere with the `data_refresh` or `create_filtered_index` DAGs. -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: -* `staging_database_restore` -* `create_proportional_by_provider_staging_index` -* `create_new_staging_es_index` +However, as the DAG operates on the staging API database and ES cluster it will exit +immediately if any of the DAGs tagged as part of the `staging_es_concurrency` group +are already running. """ from datetime import datetime diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index 2dd9b88a559..42d2dce60f6 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -42,18 +42,15 @@ The following are DAGs grouped by their primary tag: ### Database -| DAG ID | Schedule Interval | -| --------------------------------------------------------------------------------------------- | ----------------- | -| [`batched_update`](#batched_update) | `None` | -| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | -| [`delete_records`](#delete_records) | `None` | -| [`point_production_es_alias`](#point_production_es_alias) | `None` | -| [`point_staging_es_alias`](#point_staging_es_alias) | `None` | -| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | -| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | -| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | -| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | -| [`staging_database_restore`](#staging_database_restore) | `@monthly` | +| DAG ID | Schedule Interval | +| --------------------------------------------------------------------------------- | ----------------- | +| [`batched_update`](#batched_update) | `None` | +| [`delete_records`](#delete_records) | `None` | +| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | +| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | +| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | +| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | +| [`staging_database_restore`](#staging_database_restore) | `@monthly` | ### Elasticsearch @@ -61,6 +58,9 @@ The following are DAGs grouped by their primary tag: | ----------------------------------------------------------------------------------------------- | ----------------- | | [`create_new_production_es_index`](#create_new_production_es_index) | `None` | | [`create_new_staging_es_index`](#create_new_staging_es_index) | `None` | +| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | +| [`point_production_es_alias`](#point_production_es_alias) | `None` | +| [`point_staging_es_alias`](#point_staging_es_alias) | `None` | | [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck) | `*/15 * * * *` | | [`staging_elasticsearch_cluster_healthcheck`](#staging_elasticsearch_cluster_healthcheck) | `*/15 * * * *` | @@ -441,8 +441,9 @@ source from which to pull documents. There are two mechanisms that prevent this from happening: -1. The filtered index creation DAGs are not allowed to run if a data refresh for - the media type is already running. +1. The filtered index creation DAGs fail immediately if any of the DAGs that are + tagged as prt of the `production-es-concurrency` group (including the data + refreshes) are currently running. 2. The data refresh DAGs will wait for any pre-existing filtered index creation DAG runs for the media type to finish before continuing. @@ -496,8 +497,9 @@ source from which to pull documents. There are two mechanisms that prevent this from happening: -1. The filtered index creation DAGs are not allowed to run if a data refresh for - the media type is already running. +1. The filtered index creation DAGs fail immediately if any of the DAGs that are + tagged as prt of the `production-es-concurrency` group (including the data + refreshes) are currently running. 2. The data refresh DAGs will wait for any pre-existing filtered index creation DAG runs for the media type to finish before continuing. @@ -608,6 +610,13 @@ The resulting, merged configuration will be: } ``` +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + ### `create_new_staging_es_index` #### Create New ES Index DAG @@ -712,6 +721,13 @@ The resulting, merged configuration will be: } ``` +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + ### `create_proportional_by_source_staging_index` #### Create Proportional By Source Staging Index DAG @@ -740,16 +756,10 @@ This DAG is on a `None` schedule and is run manually. ##### Race conditions -Because this DAG runs on the staging ingestion server and staging elasticsearch -cluster, it does _not_ interfere with the `data_refresh` or -`create_filtered_index` DAGs. - -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: - -- `staging_database_restore` -- `recreate_full_staging_index` -- `create_new_staging_es_index` +Because this DAG runs on the staging elasticsearch cluster, it does _not_ +interfere with the production `data_refresh` or `create_filtered_index` DAGs. +However, it will fail immediately if any of the DAGs tagged as part of the +`staging-es-concurrency` group are running. ### `delete_records` @@ -1084,7 +1094,18 @@ also delete that index afterward. This DAG is on a `None` schedule and is run manually. +<<<<<<< HEAD ### `point_staging_es_alias` +======= +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`point_staging_alias` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + +### `point_staging_alias` +>>>>>>> be549cdee (Update dag docs) #### Point ES Alias DAG @@ -1100,6 +1121,13 @@ also delete that index afterward. This DAG is on a `None` schedule and is run manually. +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`point_staging_alias` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + ### `pr_review_reminders` #### PR Review Reminders @@ -1205,12 +1233,9 @@ Because this DAG runs on the staging ingestion server and staging elasticsearch cluster, it does _not_ interfere with the `data_refresh` or `create_filtered_index` DAGs. -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: - -- `staging_database_restore` -- `create_proportional_by_provider_staging_index` -- `create_new_staging_es_index` +However, as the DAG operates on the staging API database and ES cluster it will +exit immediately if any of the DAGs tagged as part of the +`staging_es_concurrency` group are already running. ### `recreate_image_popularity_calculation` @@ -1290,7 +1315,7 @@ Notes: https://www.smk.dk/en/article/smk-api/ ### `staging_database_restore` -#### Update the staging database +#### Staging Database Restore DAG This DAG is responsible for updating the staging database using the most recent snapshot of the production database. @@ -1312,6 +1337,11 @@ the RDS operations run using a different hook: - `AIRFLOW_CONN_`: The connection string to use for RDS operations (per the above example, it might be `AIRFLOW_CONN_AWS_RDS`) +##### Race conditions + +Because this DAG completely replaces the staging database, it first waits on any +running DAGs that are tagged as part of the `staging_es_concurrency` group. + ### `staging_elasticsearch_cluster_healthcheck` Monitor staging and production Elasticsearch cluster health endpoint.