diff --git a/catalog/dags/common/elasticsearch.py b/catalog/dags/common/elasticsearch.py index 92acc57b6fc..16ef8332d8c 100644 --- a/catalog/dags/common/elasticsearch.py +++ b/catalog/dags/common/elasticsearch.py @@ -3,10 +3,11 @@ from typing import Literal, Union from airflow.decorators import task, task_group +from airflow.exceptions import AirflowSkipException from airflow.models.connection import Connection from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook from airflow.sensors.base import PokeReturnValue -from airflow.utils.trigger_rule import TriggerRule +from elasticsearch.exceptions import NotFoundError from common.constants import REFRESH_POKE_INTERVAL @@ -175,51 +176,90 @@ def refresh_index(es_host: str, index_name: str): @task_group(group_id="point_alias") -def point_alias(index_name: str, alias: str, es_host: str): +def point_alias( + es_host: str, + target_index: str, + target_alias: str, + should_delete_old_index: bool = False, +): """ Point the target alias to the given index. If the alias is already being - used by one or more indices, it will first be removed from all of them. - """ + used by another index, it will be removed from this index first. Optionally, + that index may also be automatically deleted. - @task.branch - def check_if_alias_exists(alias: str, es_host: str): - """Check if the alias already exists.""" - es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - return ( - "point_alias.remove_existing_alias" - if es_conn.indices.exists_alias(name=alias) - else "point_alias.point_new_alias" - ) + Required Arguments: + + es_host: Connection string for elasticsearch + target_index: Str identifier for the target index. May be either the index name + or an existing alias. + target_alias: The new alias to be applied to the target index + + Optional Arguments: + + should_delete_old_index: If True, the index previously pointed to by the target + alias (if one exists) will be deleted. + """ @task - def remove_existing_alias(alias: str, es_host: str): - """Remove the given alias from any indices to which it points.""" + def get_existing_index(es_host: str, target_alias: str): + """Get the index to which the target alias currently points, if it exists.""" + if not target_alias: + raise AirflowSkipException("No target alias was provided.") + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - response = es_conn.indices.delete_alias( - name=alias, - # Remove the alias from _all_ indices to which it currently - # applies - index="_all", - ) - return response.get("acknowledged") + + try: + response = es_conn.indices.get_alias(name=target_alias) + if len(response) > 1: + raise ValueError( + "Expected at most one existing index with target alias" + f"{target_alias}, but {len(response)} were found." + ) + return list(response.keys())[0] + except NotFoundError: + logger.info(f"Target alias {target_alias} does not exist.") + return None @task def point_new_alias( es_host: str, - index_name: str, - alias: str, + target_index: str, + existing_index: str, + target_alias: str, ): + """ + Remove the target_alias from the existing index to which it applies, if + applicable, and point it to the target_index in one atomic operation. + """ + if not target_alias: + raise AirflowSkipException("No target alias was provided.") + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - response = es_conn.indices.put_alias(index=index_name, name=alias) + + actions = [] + if existing_index: + actions.append({"remove": {"index": existing_index, "alias": target_alias}}) + actions.append({"add": {"index": target_index, "alias": target_alias}}) + logger.info(f"Applying actions: {actions}") + + response = es_conn.indices.update_aliases(body={"actions": actions}) return response.get("acknowledged") - exists_alias = check_if_alias_exists(alias, es_host) - remove_alias = remove_existing_alias(alias, es_host) + @task + def delete_old_index(es_host: str, index_name: str, should_delete_old_index: bool): + if not should_delete_old_index: + raise AirflowSkipException("`should_delete_old_index` is set to `False`.") + if not index_name: + raise AirflowSkipException("No applicable index to delete.") + + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + response = es_conn.indices.delete(index=index_name) + return response.get("acknowledged") + + existing_index = get_existing_index(es_host, target_alias) + + point_alias = point_new_alias(es_host, target_index, existing_index, target_alias) - point_alias = point_new_alias.override( - # The remove_alias task may be skipped. - trigger_rule=TriggerRule.NONE_FAILED, - )(es_host, index_name, alias) + delete_index = delete_old_index(es_host, existing_index, should_delete_old_index) - exists_alias >> [remove_alias, point_alias] - remove_alias >> point_alias + existing_index >> point_alias >> delete_index diff --git a/catalog/dags/common/sensors/constants.py b/catalog/dags/common/sensors/constants.py new file mode 100644 index 00000000000..2d3bf1d6958 --- /dev/null +++ b/catalog/dags/common/sensors/constants.py @@ -0,0 +1,11 @@ +from common.constants import PRODUCTION, STAGING + + +# DagTags used to establish a concurrency group for each environment +PRODUCTION_ES_CONCURRENCY_TAG = "production_es_concurrency" +STAGING_ES_CONCURRENCY_TAG = "staging_es_concurrency" + +ES_CONCURRENCY_TAGS = { + PRODUCTION: PRODUCTION_ES_CONCURRENCY_TAG, + STAGING: STAGING_ES_CONCURRENCY_TAG, +} diff --git a/catalog/dags/common/sensors/utils.py b/catalog/dags/common/sensors/utils.py index 08c3bb9a6ac..0d0085295ee 100644 --- a/catalog/dags/common/sensors/utils.py +++ b/catalog/dags/common/sensors/utils.py @@ -2,14 +2,18 @@ from airflow.decorators import task, task_group from airflow.exceptions import AirflowSensorTimeout -from airflow.models import DagRun +from airflow.models import DagModel, DagRun, DagTag from airflow.sensors.external_task import ExternalTaskSensor +from airflow.utils.session import provide_session from airflow.utils.state import State from common.constants import REFRESH_POKE_INTERVAL -def get_most_recent_dag_run(dag_id) -> list[datetime] | datetime: +THREE_DAYS = 60 * 60 * 24 * 3 + + +def _get_most_recent_dag_run(dag_id) -> list[datetime] | datetime: """ Retrieve the most recent DAG run's execution date. @@ -35,9 +39,40 @@ def get_most_recent_dag_run(dag_id) -> list[datetime] | datetime: return [] -def wait_for_external_dag(external_dag_id: str, task_id: str | None = None): +@task +def get_dags_with_concurrency_tag( + tag: str, excluded_dag_ids: list[str], session=None, **context +): + """ + Get a list of DAG ids with the given tag. The id of the running DAG is excluded, + as well as any ids in the `excluded_dag_ids` list. """ - Return a Sensor task which will wait if the given external DAG is + dags = session.query(DagModel).filter(DagModel.tags.any(DagTag.name == tag)).all() + dag_ids = [dag.dag_id for dag in dags] + + running_dag_id = context["dag"].dag_id + if running_dag_id not in dag_ids: + raise ValueError( + f"The `{running_dag_id}` DAG tried preventing concurrency with the `{tag}`," + " tag, but does not have the tag itself. To ensure that other DAGs with this" + f" tag will also avoid running concurrently with `{running_dag_id}`, it must" + f"have the `{tag}` tag applied." + ) + + # Return just the ids of DAGs to prevent concurrency with. This excludes the running dag id, + # and any supplied `excluded_dag_ids` + return [id for id in dag_ids if id not in [*excluded_dag_ids, running_dag_id]] + + +@task +def wait_for_external_dag( + external_dag_id: str, + task_id: str | None = None, + timeout: int | None = THREE_DAYS, + **context, +): + """ + Execute a Sensor task which will wait if the given external DAG is running. To fully ensure that the waiting DAG and the external DAG do not run @@ -51,28 +86,39 @@ def wait_for_external_dag(external_dag_id: str, task_id: str | None = None): if not task_id: task_id = f"wait_for_{external_dag_id}" - return ExternalTaskSensor( + sensor = ExternalTaskSensor( task_id=task_id, poke_interval=REFRESH_POKE_INTERVAL, external_dag_id=external_dag_id, # Wait for the whole DAG, not just a part of it external_task_id=None, check_existence=False, - execution_date_fn=lambda _: get_most_recent_dag_run(external_dag_id), + execution_date_fn=lambda _: _get_most_recent_dag_run(external_dag_id), mode="reschedule", # Any "finished" state is sufficient for us to continue allowed_states=[State.SUCCESS, State.FAILED], + # execution_timeout for the task does not include time that the sensor + # was up for reschedule but not actually running. `timeout` does + timeout=timeout, ) + sensor.execute(context) + @task_group(group_id="wait_for_external_dags") -def wait_for_external_dags(external_dag_ids: list[str]): +@provide_session +def wait_for_external_dags_with_tag( + tag: str, excluded_dag_ids: list[str] = None, session=None +): """ - Wait for all DAGs with the given external DAG ids to no longer be - in a running state before continuing. + Wait until all DAGs with the given `tag`, excluding those identified by the + `excluded_dag_ids`, are no longer in the running state before continuing. """ - for dag_id in external_dag_ids: - wait_for_external_dag(dag_id) + external_dag_ids = get_dags_with_concurrency_tag.override( + task_id=f"get_dags_in_{tag}_group" + )(tag=tag, excluded_dag_ids=excluded_dag_ids or [], session=session) + + wait_for_external_dag.expand(external_dag_id=external_dag_ids) @task(retries=0) @@ -81,18 +127,35 @@ def prevent_concurrency_with_dag(external_dag_id: str, **context): Prevent concurrency with the given external DAG, by failing immediately if that DAG is running. """ - - wait_for_dag = wait_for_external_dag( - external_dag_id=external_dag_id, - task_id=f"check_for_running_{external_dag_id}", - ) - wait_for_dag.timeout = 0 try: - wait_for_dag.execute(context) + wait_for_external_dag.function( + external_dag_id=external_dag_id, + task_id=f"check_for_running_{external_dag_id}", + timeout=0, + **context, + ) except AirflowSensorTimeout: raise ValueError(f"Concurrency check with {external_dag_id} failed.") +@task_group(group_id="prevent_concurrency_with_dags") +@provide_session +def prevent_concurrency_with_dags_with_tag( + tag: str, excluded_dag_ids: list[str] = None, session=None +): + """ + Prevent concurrency with any DAGs that have the given `tag`, excluding + those identified by the `excluded_dag_ids`. Concurrency is prevented by + failing the task immediately if any of the tagged DAGs are in the running + state. + """ + external_dag_ids = get_dags_with_concurrency_tag.override( + task_id=f"get_dags_in_{tag}_group" + )(tag=tag, excluded_dag_ids=excluded_dag_ids or [], session=session) + + prevent_concurrency_with_dag.expand(external_dag_id=external_dag_ids) + + @task(retries=0) def is_concurrent_with_any(external_dag_ids: list[str], **context): """ @@ -109,12 +172,3 @@ def is_concurrent_with_any(external_dag_ids: list[str], **context): # Explicit return None to clarify expectations return None - - -@task_group(group_id="prevent_concurrency") -def prevent_concurrency_with_dags(external_dag_ids: list[str]): - """Fail immediately if any of the given external dags are in progress.""" - for dag_id in external_dag_ids: - prevent_concurrency_with_dag.override( - task_id=f"prevent_concurrency_with_{dag_id}" - )(dag_id) diff --git a/catalog/dags/data_refresh/create_filtered_index_dag.py b/catalog/dags/data_refresh/create_filtered_index_dag.py index 20f365f982a..0cfea3fe7db 100644 --- a/catalog/dags/data_refresh/create_filtered_index_dag.py +++ b/catalog/dags/data_refresh/create_filtered_index_dag.py @@ -42,8 +42,9 @@ There are two mechanisms that prevent this from happening: -1. The filtered index creation DAGs are not allowed to run if a data refresh -for the media type is already running. +1. The filtered index creation DAGs fail immediately if any of the DAGs that are +tagged as prt of the `production-es-concurrency` group (including the data +refreshes) are currently running. 2. The data refresh DAGs will wait for any pre-existing filtered index creation DAG runs for the media type to finish before continuing. @@ -56,15 +57,13 @@ from airflow import DAG from airflow.models.param import Param -from common.constants import DAG_DEFAULT_ARGS, PRODUCTION -from common.sensors.utils import prevent_concurrency_with_dags +from common.constants import DAG_DEFAULT_ARGS +from common.sensors.constants import PRODUCTION_ES_CONCURRENCY_TAG +from common.sensors.utils import prevent_concurrency_with_dags_with_tag from data_refresh.create_filtered_index import ( create_filtered_index_creation_task_groups, ) from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS, DataRefresh -from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( - CREATE_NEW_INDEX_CONFIGS, -) # Note: We can't use the TaskFlow `@dag` DAG factory decorator @@ -88,7 +87,7 @@ def create_filtered_index_creation_dag(data_refresh: DataRefresh): default_args=DAG_DEFAULT_ARGS, schedule=None, start_date=datetime(2023, 4, 1), - tags=["data_refresh"], + tags=["data_refresh", PRODUCTION_ES_CONCURRENCY_TAG], max_active_runs=1, catchup=False, doc_md=__doc__, @@ -117,14 +116,11 @@ def create_filtered_index_creation_dag(data_refresh: DataRefresh): }, render_template_as_native_obj=True, ) as dag: - # Immediately fail if the associated data refresh is running, or the - # create_new_production_es_index DAG is running. This prevents multiple - # DAGs from reindexing from a single production index simultaneously. - prevent_concurrency = prevent_concurrency_with_dags( - external_dag_ids=[ - data_refresh.dag_id, - CREATE_NEW_INDEX_CONFIGS[PRODUCTION].dag_id, - ] + # Immediately fail if any DAG that operates on the production elasticsearch + # cluster is running. This prevents multiple DAGs from reindexing from a + # single production index simultaneously. + prevent_concurrency = prevent_concurrency_with_dags_with_tag( + tag=PRODUCTION_ES_CONCURRENCY_TAG, ) # Once the concurrency check has passed, actually create the filtered diff --git a/catalog/dags/data_refresh/dag_factory.py b/catalog/dags/data_refresh/dag_factory.py index 358a0b3e212..c9d7026377d 100644 --- a/catalog/dags/data_refresh/dag_factory.py +++ b/catalog/dags/data_refresh/dag_factory.py @@ -34,6 +34,7 @@ OPENLEDGER_API_CONN_ID, XCOM_PULL_TEMPLATE, ) +from common.sensors.constants import PRODUCTION_ES_CONCURRENCY_TAG from common.sql import PGExecuteQueryOperator, single_value from data_refresh.data_refresh_task_factory import create_data_refresh_task_group from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS, DataRefresh @@ -70,7 +71,7 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc max_active_runs=1, catchup=False, doc_md=__doc__, - tags=["data_refresh"], + tags=["data_refresh", PRODUCTION_ES_CONCURRENCY_TAG], ) with dag: diff --git a/catalog/dags/data_refresh/data_refresh_task_factory.py b/catalog/dags/data_refresh/data_refresh_task_factory.py index 3ad20a806d3..c1e09066be8 100644 --- a/catalog/dags/data_refresh/data_refresh_task_factory.py +++ b/catalog/dags/data_refresh/data_refresh_task_factory.py @@ -55,16 +55,14 @@ from airflow.utils.trigger_rule import TriggerRule from common import cloudwatch, ingestion_server -from common.constants import PRODUCTION, XCOM_PULL_TEMPLATE +from common.constants import XCOM_PULL_TEMPLATE +from common.sensors.constants import PRODUCTION_ES_CONCURRENCY_TAG from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor -from common.sensors.utils import wait_for_external_dags +from common.sensors.utils import wait_for_external_dags_with_tag from data_refresh.create_filtered_index import ( create_filtered_index_creation_task_groups, ) from data_refresh.data_refresh_types import DataRefresh -from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( - CREATE_NEW_INDEX_CONFIGS, -) logger = logging.getLogger(__name__) @@ -123,11 +121,13 @@ def create_data_refresh_task_group( # Realistically the data refresh is too slow to beat the index creation process, # even if it was triggered immediately after one of these DAGs; however, it is # always safer to avoid the possibility of the race condition altogether. - wait_for_es_dags = wait_for_external_dags.override(group_id="wait_for_es_dags")( - external_dag_ids=[ - data_refresh.filtered_index_dag_id, - CREATE_NEW_INDEX_CONFIGS[PRODUCTION].dag_id, - ] + wait_for_es_dags = wait_for_external_dags_with_tag.override( + group_id="wait_for_es_dags" + )( + tag=PRODUCTION_ES_CONCURRENCY_TAG, + # Exclude the other data refresh DAG ids, as waiting on these was handled in + # the previous task. + excluded_dag_ids=external_dag_ids, ) tasks.append([wait_for_data_refresh, wait_for_es_dags]) diff --git a/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py b/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py index 85bed3561b8..e03c16d2d85 100644 --- a/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py +++ b/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py @@ -1,5 +1,5 @@ """ -# Update the staging database +# Staging Database Restore DAG This DAG is responsible for updating the staging database using the most recent snapshot of the production database. @@ -19,6 +19,11 @@ (e.g. `aws_rds`) - `AIRFLOW_CONN_`: The connection string to use for RDS operations (per the above example, it might be `AIRFLOW_CONN_AWS_RDS`) + +## Race conditions + +Because this DAG completely replaces the staging database, it first waits on any +running DAGs that are tagged as part of the `staging_es_concurrency` group. """ import logging @@ -35,7 +40,8 @@ DAG_DEFAULT_ARGS, POSTGRES_API_STAGING_CONN_ID, ) -from common.sensors.utils import wait_for_external_dag +from common.sensors.constants import STAGING_ES_CONCURRENCY_TAG +from common.sensors.utils import wait_for_external_dags_with_tag from common.sql import PGExecuteQueryOperator from database.staging_database_restore import constants from database.staging_database_restore.staging_database_restore import ( @@ -48,9 +54,6 @@ restore_staging_from_snapshot, skip_restore, ) -from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import ( - DAG_ID as RECREATE_STAGING_INDEX_DAG_ID, -) log = logging.getLogger(__name__) @@ -60,7 +63,7 @@ dag_id=constants.DAG_ID, schedule="@monthly", start_date=datetime(2023, 5, 1), - tags=["database"], + tags=["database", STAGING_ES_CONCURRENCY_TAG], max_active_runs=1, dagrun_timeout=timedelta(days=1), catchup=False, @@ -74,11 +77,9 @@ render_template_as_native_obj=True, ) def restore_staging_database(): - # If the `recreate_full_staging_index` DAG was manually triggered prior - # to the database restoration starting, we should wait for it to - # finish. - wait_for_recreate_full_staging_index = wait_for_external_dag( - external_dag_id=RECREATE_STAGING_INDEX_DAG_ID, + # Wait for any DAGs that operate on the staging elasticsearch cluster + wait_for_recreate_full_staging_index = wait_for_external_dags_with_tag( + tag=STAGING_ES_CONCURRENCY_TAG, ) should_skip = skip_restore() latest_snapshot = get_latest_prod_snapshot() diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py index 407af9a5fb9..ab73f0c5201 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py @@ -22,6 +22,11 @@ * `override_config`: boolean override; when True, the `index_config` will be used for the new index configuration _without_ merging any values from the source index config. +* `target_alias` : optional alias to be applied to the new index after reindexing. + If the alias already applies to an existing index, it will be + removed first. +* `should_delete_old_index`: whether to remove the index previously pointed to by + the target_alias, if it exists. Defaults to False. ## Merging policy @@ -95,6 +100,13 @@ } } ``` + +## Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) """ import logging @@ -104,8 +116,9 @@ from airflow.utils.trigger_rule import TriggerRule from common import elasticsearch as es +from common import slack from common.constants import AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES -from common.sensors.utils import prevent_concurrency_with_dags +from common.sensors.utils import prevent_concurrency_with_dags_with_tag from elasticsearch_cluster.create_new_es_index.create_new_es_index import ( GET_CURRENT_INDEX_CONFIG_TASK_NAME, GET_FINAL_INDEX_CONFIG_TASK_NAME, @@ -131,7 +144,7 @@ def create_new_es_index_dag(config: CreateNewIndex): max_active_runs=1, catchup=False, doc_md=__doc__, - tags=["elasticsearch"], + tags=["elasticsearch", config.concurrency_tag], render_template_as_native_obj=True, params={ "media_type": Param( @@ -191,11 +204,32 @@ def create_new_es_index_dag(config: CreateNewIndex): " configuration." ), ), + "target_alias": Param( + default=None, + type=["string", "null"], + description=( + "Optional alias which will be applied to the newly created index. If" + " the alias already exists, it will first be removed from the" + " index to which it previously pointed." + ), + ), + "should_delete_old_index": Param( + default=False, + type="boolean", + description=( + "Whether to delete the index previously pointed to by the" + " `target_alias`." + ), + ), }, ) with dag: - prevent_concurrency = prevent_concurrency_with_dags(config.blocking_dags) + # Fail early if any other DAG that operates on the relevant elasticsearch cluster + # is running + prevent_concurrency = prevent_concurrency_with_dags_with_tag( + tag=config.concurrency_tag, + ) es_host = es.get_es_host(environment=config.environment) @@ -242,11 +276,31 @@ def create_new_es_index_dag(config: CreateNewIndex): es_host=es_host, ) + point_alias = es.point_alias( + es_host=es_host, + target_index=index_name, + target_alias="{{ params.target_alias }}", + should_delete_old_index="{{ params.should_delete_old_index }}", + ) + + notify_completion = slack.notify_slack.override( + trigger_rule=TriggerRule.NONE_FAILED + )( + text=( + f"New index { index_name } was successfully created with alias" + "{{ params.target_alias }}." + ), + dag_id=dag.dag_id, + username="Create New ES Index", + icon_emoji=":elasticsearch:", + ) + # Set up dependencies prevent_concurrency >> [es_host, index_name] index_name >> check_override >> [current_index_config, final_index_config] current_index_config >> merged_index_config >> final_index_config - final_index_config >> create_new_index >> reindex + final_index_config >> create_new_index >> reindex >> point_alias + point_alias >> notify_completion return dag diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_types.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_types.py index cf631e4cb0f..62c3ddbb267 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_types.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_types.py @@ -4,13 +4,7 @@ from airflow.models import Variable from common.constants import PRODUCTION, STAGING -from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS -from database.staging_database_restore.constants import ( - DAG_ID as STAGING_DB_RESTORE_DAG_ID, -) -from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import ( - DAG_ID as RECREATE_STAGING_INDEX_DAG_ID, -) +from common.sensors.constants import ES_CONCURRENCY_TAGS @dataclass @@ -22,8 +16,8 @@ class CreateNewIndex: environment: str representation of the environment in which to create the new index - blocking_dags: list of dags with which to prevent concurrency; the - generated create_new_es_index dag will fail + concurrency_tag: tag used to identify dags with which to prevent + concurrency immediately if any of these dags are running. reindex_timeout: timedelta expressing maximum amount of time the reindexing step may take @@ -33,13 +27,14 @@ class CreateNewIndex: dag_id: str = field(init=False) es_host: str = field(init=False) + concurrency_tag: str = field(init=False) environment: str - blocking_dags: list requests_per_second: int | None = None reindex_timeout: timedelta = timedelta(hours=12) def __post_init__(self): self.dag_id = f"create_new_{self.environment}_es_index" + self.concurrency_tag = ES_CONCURRENCY_TAGS[self.environment] if not self.requests_per_second: self.requests_per_second = Variable.get( @@ -50,17 +45,8 @@ def __post_init__(self): CREATE_NEW_INDEX_CONFIGS = { STAGING: CreateNewIndex( environment=STAGING, - blocking_dags=[RECREATE_STAGING_INDEX_DAG_ID, STAGING_DB_RESTORE_DAG_ID], ), PRODUCTION: CreateNewIndex( environment=PRODUCTION, - blocking_dags=( - # Block on all the data refreshes - [data_refresh.dag_id for data_refresh in DATA_REFRESH_CONFIGS.values()] - + [ # Block on the filtered index creation DAGs - data_refresh.filtered_index_dag_id - for data_refresh in DATA_REFRESH_CONFIGS.values() - ] - ), ), } diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index 1903d5f0ce8..aeecd9381ab 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -19,6 +19,8 @@ * source_index: An existing staging Elasticsearch index to use as the basis for the new index. If not provided, the index aliased to `-filtered` will be used. +* should_delete_old_index: If True, the index previously pointed to by the target + alias (if one exists) will be deleted. ## When this DAG runs @@ -26,21 +28,17 @@ ## Race conditions -Because this DAG runs on the staging ingestion server and staging elasticsearch -cluster, it does _not_ interfere with the `data_refresh` or -`create_filtered_index` DAGs. - -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: -* `staging_database_restore` -* `recreate_full_staging_index` -* `create_new_staging_es_index` +Because this DAG runs on the staging elasticsearch cluster, it does _not_ interfere + with the production `data_refresh` or `create_filtered_index` DAGs. However, it will + fail immediately if any of the DAGs tagged as part of the `staging-es-concurrency` + group are running. """ from datetime import datetime, timedelta from airflow.decorators import dag from airflow.models.param import Param +from airflow.utils.trigger_rule import TriggerRule from common import elasticsearch as es from common import slack @@ -50,19 +48,11 @@ MEDIA_TYPES, STAGING, ) -from common.sensors.utils import prevent_concurrency_with_dags -from database.staging_database_restore.constants import ( - DAG_ID as STAGING_DB_RESTORE_DAG_ID, -) -from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( - CREATE_NEW_INDEX_CONFIGS, -) +from common.sensors.constants import STAGING_ES_CONCURRENCY_TAG +from common.sensors.utils import prevent_concurrency_with_dags_with_tag from elasticsearch_cluster.create_proportional_by_source_staging_index import ( create_proportional_by_source_staging_index as create_index, ) -from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import ( - DAG_ID as RECREATE_STAGING_INDEX_DAG_ID, -) DAG_ID = "create_proportional_by_source_staging_index" @@ -73,7 +63,7 @@ default_args=DAG_DEFAULT_ARGS, schedule=None, start_date=datetime(2024, 1, 31), - tags=["database", "elasticsearch"], + tags=["elasticsearch", STAGING_ES_CONCURRENCY_TAG], max_active_runs=1, catchup=False, doc_md=__doc__, @@ -102,17 +92,21 @@ " the index aliased to `-filtered` will be used." ), ), + "should_delete_old_index": Param( + default=False, + type="boolean", + description=( + "Whether to delete the index previously pointed to by the" + " `{media_type}-subset-by-source` alias." + ), + ), }, render_template_as_native_obj=True, ) def create_proportional_by_source_staging_index(): # Fail early if any conflicting DAGs are running - prevent_concurrency = prevent_concurrency_with_dags( - external_dag_ids=[ - STAGING_DB_RESTORE_DAG_ID, - RECREATE_STAGING_INDEX_DAG_ID, - CREATE_NEW_INDEX_CONFIGS[STAGING].dag_id, - ] + prevent_concurrency = prevent_concurrency_with_dags_with_tag( + tag=STAGING_ES_CONCURRENCY_TAG, ) es_host = es.get_es_host(environment=STAGING) @@ -171,10 +165,15 @@ def create_proportional_by_source_staging_index(): ) point_alias = es.point_alias( - index_name=destination_index_name, alias=destination_alias, es_host=es_host + es_host=es_host, + target_index=destination_index_name, + target_alias=destination_alias, + should_delete_old_index="{{ params.should_delete_old_index }}", ) - notify_completion = slack.notify_slack( + notify_completion = slack.notify_slack.override( + trigger_rule=TriggerRule.NONE_FAILED + )( text=f"Reindexing complete for {destination_index_name}.", dag_id=DAG_ID, username="Proportional by Source Staging Index Creation", diff --git a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py new file mode 100644 index 00000000000..d14bc0af5e1 --- /dev/null +++ b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py @@ -0,0 +1,107 @@ +""" +# Point ES Alias DAG + +This file generates our Point ES Alias DAGs using a factory function. A +separate DAG is generated for the staging and production environments. + +The DAGs are used to point a `target_alias` to a `target_index` in the +given environment's elasticsearch cluster. When the alias is applied, it +is first removed from any existing index to which it already applies; +optionally, it can also delete that index afterward. + +## When this DAG runs + +This DAG is on a `None` schedule and is run manually. + +## Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`point_staging_alias` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) +""" + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.utils.trigger_rule import TriggerRule + +from common import elasticsearch as es +from common import slack +from common.constants import ( + DAG_DEFAULT_ARGS, + ENVIRONMENTS, +) +from common.sensors.constants import ES_CONCURRENCY_TAGS +from common.sensors.utils import prevent_concurrency_with_dags_with_tag + + +def point_es_alias_dag(environment: str): + dag = DAG( + dag_id=f"point_{environment}_alias", + default_args=DAG_DEFAULT_ARGS, + schedule=None, + start_date=datetime(2024, 1, 31), + tags=["elasticsearch", ES_CONCURRENCY_TAGS[environment]], + max_active_runs=1, + catchup=False, + doc_md=__doc__, + params={ + "target_index": Param( + type="string", + description=( + "The existing Elasticsearch index to which the target alias" + " should be applied." + ), + ), + "target_alias": Param( + type="string", + description=( + "The alias which will be applied to the index. If" + " the alias already exists, it will first be removed from the" + " index to which it previously pointed." + ), + ), + "should_delete_old_index": Param( + default=False, + type="boolean", + description=( + "Whether to delete the index previously pointed to by the" + " `target_alias`." + ), + ), + }, + render_template_as_native_obj=True, + ) + + with dag: + # Fail early if any other DAG that operates on the elasticsearch cluster for + # this environment is running + prevent_concurrency = prevent_concurrency_with_dags_with_tag( + tag=ES_CONCURRENCY_TAGS[environment], + ) + + es_host = es.get_es_host(environment=environment) + + point_alias = es.point_alias( + es_host=es_host, + target_index="{{ params.target_index }}", + target_alias="{{ params.target_alias }}", + should_delete_old_index="{{ params.should_delete_old_index }}", + ) + + notify_completion = slack.notify_slack.override( + trigger_rule=TriggerRule.NONE_FAILED + )( + text="Alias {{ params.target_alias }} applied to index {{ params.target_index }}.", + dag_id=dag.dag_id, + username="Point Alias", + icon_emoji=":elasticsearch:", + ) + + prevent_concurrency >> es_host >> point_alias >> notify_completion + + +for environment in ENVIRONMENTS: + point_es_alias_dag(environment) diff --git a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py index 0be1bc848dc..ddcd94b5b10 100644 --- a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py @@ -34,11 +34,9 @@ cluster, it does _not_ interfere with the `data_refresh` or `create_filtered_index` DAGs. -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: -* `staging_database_restore` -* `create_proportional_by_provider_staging_index` -* `create_new_staging_es_index` +However, as the DAG operates on the staging API database and ES cluster it will exit +immediately if any of the DAGs tagged as part of the `staging_es_concurrency` group +are already running. """ from datetime import datetime @@ -52,16 +50,10 @@ AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES, - STAGING, XCOM_PULL_TEMPLATE, ) -from common.sensors.utils import prevent_concurrency_with_dags -from database.staging_database_restore.constants import ( - DAG_ID as STAGING_DB_RESTORE_DAG_ID, -) -from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( - CREATE_NEW_INDEX_CONFIGS, -) +from common.sensors.constants import STAGING_ES_CONCURRENCY_TAG +from common.sensors.utils import prevent_concurrency_with_dags_with_tag from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import ( DAG_ID, create_index, @@ -76,7 +68,7 @@ default_args=DAG_DEFAULT_ARGS, schedule=None, start_date=datetime(2023, 4, 1), - tags=["database", "elasticsearch"], + tags=["database", "elasticsearch", STAGING_ES_CONCURRENCY_TAG], max_active_runs=1, catchup=False, doc_md=__doc__, @@ -108,13 +100,10 @@ render_template_as_native_obj=True, ) def recreate_full_staging_index(): - # Fail early if the staging_db_restore DAG or the create_new_staging_es_index DAG + # Fail early if any other DAG that operates on the staging elasticsearch cluster # is running - prevent_concurrency = prevent_concurrency_with_dags( - external_dag_ids=[ - STAGING_DB_RESTORE_DAG_ID, - CREATE_NEW_INDEX_CONFIGS[STAGING].dag_id, - ] + prevent_concurrency = prevent_concurrency_with_dags_with_tag( + tag=STAGING_ES_CONCURRENCY_TAG, ) target_alias = get_target_alias( diff --git a/catalog/tests/dags/common/sensors/test_utils.py b/catalog/tests/dags/common/sensors/test_utils.py index 67e599ddf27..be28e65d92d 100644 --- a/catalog/tests/dags/common/sensors/test_utils.py +++ b/catalog/tests/dags/common/sensors/test_utils.py @@ -5,7 +5,7 @@ from airflow.utils.timezone import datetime from airflow.utils.types import DagRunType -from common.sensors.utils import get_most_recent_dag_run +from common.sensors.utils import _get_most_recent_dag_run TEST_DAG_ID = "data_refresh_dag_factory_test_dag" @@ -29,11 +29,11 @@ def test_get_most_recent_dag_run_returns_most_recent_execution_date( most_recent = datetime(2023, 5, 10) for i in range(3): _create_dagrun(most_recent - timedelta(days=i), sample_dag_id_fixture) - assert get_most_recent_dag_run(sample_dag_id_fixture) == most_recent + assert _get_most_recent_dag_run(sample_dag_id_fixture) == most_recent def test_get_most_recent_dag_run_returns_empty_list_when_no_runs( sample_dag_id_fixture, clean_db ): # Relies on ``clean_db`` cleaning up DagRuns from other tests - assert get_most_recent_dag_run(sample_dag_id_fixture) == [] + assert _get_most_recent_dag_run(sample_dag_id_fixture) == [] diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index 15a6875e0a8..8c63f072f89 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -42,16 +42,15 @@ The following are DAGs grouped by their primary tag: ### Database -| DAG ID | Schedule Interval | -| --------------------------------------------------------------------------------------------- | ----------------- | -| [`batched_update`](#batched_update) | `None` | -| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | -| [`delete_records`](#delete_records) | `None` | -| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | -| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | -| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | -| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | -| [`staging_database_restore`](#staging_database_restore) | `@monthly` | +| DAG ID | Schedule Interval | +| --------------------------------------------------------------------------------- | ----------------- | +| [`batched_update`](#batched_update) | `None` | +| [`delete_records`](#delete_records) | `None` | +| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | +| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | +| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | +| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | +| [`staging_database_restore`](#staging_database_restore) | `@monthly` | ### Elasticsearch @@ -59,6 +58,9 @@ The following are DAGs grouped by their primary tag: | ----------------------------------------------------------------------------------------------- | ----------------- | | [`create_new_production_es_index`](#create_new_production_es_index) | `None` | | [`create_new_staging_es_index`](#create_new_staging_es_index) | `None` | +| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | +| [`point_production_es_alias`](#point_production_es_alias) | `None` | +| [`point_staging_es_alias`](#point_staging_es_alias) | `None` | | [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck) | `*/15 * * * *` | | [`staging_elasticsearch_cluster_healthcheck`](#staging_elasticsearch_cluster_healthcheck) | `*/15 * * * *` | @@ -166,6 +168,8 @@ The following is documentation associated with each DAG (where available): 1. [`oauth2_token_refresh`](#oauth2_token_refresh) 1. [`phylopic_reingestion_workflow`](#phylopic_reingestion_workflow) 1. [`phylopic_workflow`](#phylopic_workflow) +1. [`point_production_es_alias`](#point_production_es_alias) +1. [`point_staging_es_alias`](#point_staging_es_alias) 1. [`pr_review_reminders`](#pr_review_reminders) 1. [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck) 1. [`rawpixel_workflow`](#rawpixel_workflow) @@ -437,8 +441,9 @@ source from which to pull documents. There are two mechanisms that prevent this from happening: -1. The filtered index creation DAGs are not allowed to run if a data refresh for - the media type is already running. +1. The filtered index creation DAGs fail immediately if any of the DAGs that are + tagged as prt of the `production-es-concurrency` group (including the data + refreshes) are currently running. 2. The data refresh DAGs will wait for any pre-existing filtered index creation DAG runs for the media type to finish before continuing. @@ -492,8 +497,9 @@ source from which to pull documents. There are two mechanisms that prevent this from happening: -1. The filtered index creation DAGs are not allowed to run if a data refresh for - the media type is already running. +1. The filtered index creation DAGs fail immediately if any of the DAGs that are + tagged as prt of the `production-es-concurrency` group (including the data + refreshes) are currently running. 2. The data refresh DAGs will wait for any pre-existing filtered index creation DAG runs for the media type to finish before continuing. @@ -525,6 +531,11 @@ available: - `override_config`: boolean override; when True, the `index_config` will be used for the new index configuration _without_ merging any values from the source index config. +- `target_alias` : optional alias to be applied to the new index after + reindexing. If the alias already applies to an existing index, it will be + removed first. +- `should_delete_old_index`: whether to remove the index previously pointed to + by the target_alias, if it exists. Defaults to False. ##### Merging policy @@ -599,6 +610,13 @@ The resulting, merged configuration will be: } ``` +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + ### `create_new_staging_es_index` #### Create New ES Index DAG @@ -624,6 +642,11 @@ available: - `override_config`: boolean override; when True, the `index_config` will be used for the new index configuration _without_ merging any values from the source index config. +- `target_alias` : optional alias to be applied to the new index after + reindexing. If the alias already applies to an existing index, it will be + removed first. +- `should_delete_old_index`: whether to remove the index previously pointed to + by the target_alias, if it exists. Defaults to False. ##### Merging policy @@ -698,6 +721,13 @@ The resulting, merged configuration will be: } ``` +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + ### `create_proportional_by_source_staging_index` #### Create Proportional By Source Staging Index DAG @@ -717,6 +747,8 @@ Optional params: - source_index: An existing staging Elasticsearch index to use as the basis for the new index. If not provided, the index aliased to `-filtered` will be used. +- should_delete_old_index: If True, the index previously pointed to by the + target alias (if one exists) will be deleted. ##### When this DAG runs @@ -724,16 +756,10 @@ This DAG is on a `None` schedule and is run manually. ##### Race conditions -Because this DAG runs on the staging ingestion server and staging elasticsearch -cluster, it does _not_ interfere with the `data_refresh` or -`create_filtered_index` DAGs. - -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: - -- `staging_database_restore` -- `recreate_full_staging_index` -- `create_new_staging_es_index` +Because this DAG runs on the staging elasticsearch cluster, it does _not_ +interfere with the production `data_refresh` or `create_filtered_index` DAGs. +However, it will fail immediately if any of the DAGs tagged as part of the +`staging-es-concurrency` group are running. ### `delete_records` @@ -1052,6 +1078,52 @@ Output: TSV file containing the image, their respective meta-data. Notes: http://api-docs.phylopic.org/v2/ No rate limit specified. +### `point_production_es_alias` + +#### Point ES Alias DAG + +This file generates our Point ES Alias DAGs using a factory function. A separate +DAG is generated for the staging and production environments. + +The DAGs are used to point a `target_alias` to a `target_index` in the given +environment's elasticsearch cluster. When the alias is applied, it is first +removed from any existing index to which it already applies; optionally, it can +also delete that index afterward. + +##### When this DAG runs + +This DAG is on a `None` schedule and is run manually. + +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`point_staging_alias` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + +### `point_staging_es_alias` + +#### Point ES Alias DAG + +This file generates our Point ES Alias DAGs using a factory function. A separate +DAG is generated for the staging and production environments. + +The DAGs are used to point a `target_alias` to a `target_index` in the given +environment's elasticsearch cluster. When the alias is applied, it is first +removed from any existing index to which it already applies; optionally, it can +also delete that index afterward. + +##### When this DAG runs + +This DAG is on a `None` schedule and is run manually. + +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`point_staging_alias` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + ### `pr_review_reminders` #### PR Review Reminders @@ -1157,12 +1229,9 @@ Because this DAG runs on the staging ingestion server and staging elasticsearch cluster, it does _not_ interfere with the `data_refresh` or `create_filtered_index` DAGs. -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: - -- `staging_database_restore` -- `create_proportional_by_provider_staging_index` -- `create_new_staging_es_index` +However, as the DAG operates on the staging API database and ES cluster it will +exit immediately if any of the DAGs tagged as part of the +`staging_es_concurrency` group are already running. ### `recreate_image_popularity_calculation` @@ -1242,7 +1311,7 @@ Notes: https://www.smk.dk/en/article/smk-api/ ### `staging_database_restore` -#### Update the staging database +#### Staging Database Restore DAG This DAG is responsible for updating the staging database using the most recent snapshot of the production database. @@ -1264,6 +1333,11 @@ the RDS operations run using a different hook: - `AIRFLOW_CONN_`: The connection string to use for RDS operations (per the above example, it might be `AIRFLOW_CONN_AWS_RDS`) +##### Race conditions + +Because this DAG completely replaces the staging database, it first waits on any +running DAGs that are tagged as part of the `staging_es_concurrency` group. + ### `staging_elasticsearch_cluster_healthcheck` Monitor staging and production Elasticsearch cluster health endpoint.