Skip to content

Commit 8e21893

Browse files
authored
Add create_new_es_index DAGs (#3537)
* Add create_new_es_index DAG * Use new utility * Add tests for merge_configurations * Add merge index tests * Add docs * Configure timeouts * Prevent concurrency with other DAGs * Fix typo, trigger rule * Set es_host in types * Use existing env variables * Fix warning * Get es_host in Airflow task * Add requests_per_second using Variable but configurable by environment * Use more precise error * Update docstring * Move import to local imports section * Add default value for variable * Rename module to elasticsearch_cluster to disambiguate * Deserialize variable to be safe
1 parent 990e8b4 commit 8e21893

File tree

17 files changed

+1040
-36
lines changed

17 files changed

+1040
-36
lines changed

catalog/dags/common/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
MediaType = Literal["audio", "image"]
1414

15+
STAGING = "staging"
16+
PRODUCTION = "production"
17+
1518
CONTACT_EMAIL = os.getenv("CONTACT_EMAIL")
1619

1720
DAG_DEFAULT_ARGS = {

catalog/dags/common/sensors/utils.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from datetime import datetime
22

3-
from airflow.decorators import task
3+
from airflow.decorators import task, task_group
44
from airflow.exceptions import AirflowSensorTimeout
55
from airflow.models import DagRun
66
from airflow.sensors.external_task import ExternalTaskSensor
@@ -39,6 +39,14 @@ def wait_for_external_dag(external_dag_id: str, task_id: str | None = None):
3939
"""
4040
Return a Sensor task which will wait if the given external DAG is
4141
running.
42+
43+
To fully ensure that the waiting DAG and the external DAG do not run
44+
concurrently, the external DAG should have a `prevent_concurrency_with_dag`
45+
task which fails immediately if the waiting DAG is running.
46+
47+
If the external DAG should _not_ fail when the waiting DAG is running,
48+
but instead wait its turn, use the SingleRunExternalDagSensor in both
49+
DAGs to avoid deadlock.
4250
"""
4351
if not task_id:
4452
task_id = f"wait_for_{external_dag_id}"
@@ -57,6 +65,16 @@ def wait_for_external_dag(external_dag_id: str, task_id: str | None = None):
5765
)
5866

5967

68+
@task_group(group_id="wait_for_external_dags")
69+
def wait_for_external_dags(external_dag_ids: list[str]):
70+
"""
71+
Wait for all DAGs with the given external DAG ids to no longer be
72+
in a running state before continuing.
73+
"""
74+
for dag_id in external_dag_ids:
75+
wait_for_external_dag(dag_id)
76+
77+
6078
@task(retries=0)
6179
def prevent_concurrency_with_dag(external_dag_id: str, **context):
6280
"""
@@ -73,3 +91,12 @@ def prevent_concurrency_with_dag(external_dag_id: str, **context):
7391
wait_for_dag.execute(context)
7492
except AirflowSensorTimeout:
7593
raise ValueError(f"Concurrency check with {external_dag_id} failed.")
94+
95+
96+
@task_group(group_id="prevent_concurrency")
97+
def prevent_concurrency_with_dags(external_dag_ids: list[str]):
98+
"""Fail immediately if any of the given external dags are in progress."""
99+
for dag_id in external_dag_ids:
100+
prevent_concurrency_with_dag.override(
101+
task_id=f"prevent_concurrency_with_{dag_id}"
102+
)(dag_id)

catalog/dags/data_refresh/create_filtered_index_dag.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,15 @@
5555
from airflow import DAG
5656
from airflow.models.param import Param
5757

58-
from common.constants import DAG_DEFAULT_ARGS
59-
from common.sensors.utils import prevent_concurrency_with_dag
58+
from common.constants import DAG_DEFAULT_ARGS, PRODUCTION
59+
from common.sensors.utils import prevent_concurrency_with_dags
6060
from data_refresh.create_filtered_index import (
6161
create_filtered_index_creation_task_groups,
6262
)
6363
from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS, DataRefresh
64+
from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import (
65+
CREATE_NEW_INDEX_CONFIGS,
66+
)
6467

6568

6669
# Note: We can't use the TaskFlow `@dag` DAG factory decorator
@@ -80,7 +83,7 @@ def create_filtered_index_creation_dag(data_refresh: DataRefresh):
8083
media_type = data_refresh.media_type
8184

8285
with DAG(
83-
dag_id=f"create_filtered_{media_type}_index",
86+
dag_id=data_refresh.filtered_index_dag_id,
8487
default_args=DAG_DEFAULT_ARGS,
8588
schedule=None,
8689
start_date=datetime(2023, 4, 1),
@@ -113,10 +116,15 @@ def create_filtered_index_creation_dag(data_refresh: DataRefresh):
113116
},
114117
render_template_as_native_obj=True,
115118
) as dag:
116-
# Immediately fail if the associated data refresh is running.
117-
prevent_concurrency = prevent_concurrency_with_dag.override(
118-
task_id=f"prevent_concurrency_with_{media_type}_data_refresh"
119-
)(external_dag_id=f"{media_type}_data_refresh")
119+
# Immediately fail if the associated data refresh is running, or the
120+
# create_new_production_es_index DAG is running. This prevents multiple
121+
# DAGs from reindexing from a single production index simultaneously.
122+
prevent_concurrency = prevent_concurrency_with_dags(
123+
external_dag_ids=[
124+
data_refresh.dag_id,
125+
CREATE_NEW_INDEX_CONFIGS[PRODUCTION].dag_id,
126+
]
127+
)
120128

121129
# Once the concurrency check has passed, actually create the filtered
122130
# index.

catalog/dags/data_refresh/data_refresh_task_factory.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,16 @@
5353
from airflow.utils.trigger_rule import TriggerRule
5454

5555
from common import ingestion_server
56-
from common.constants import XCOM_PULL_TEMPLATE
56+
from common.constants import PRODUCTION, XCOM_PULL_TEMPLATE
5757
from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor
58-
from common.sensors.utils import wait_for_external_dag
58+
from common.sensors.utils import wait_for_external_dags
5959
from data_refresh.create_filtered_index import (
6060
create_filtered_index_creation_task_groups,
6161
)
6262
from data_refresh.data_refresh_types import DataRefresh
63+
from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import (
64+
CREATE_NEW_INDEX_CONFIGS,
65+
)
6366

6467

6568
logger = logging.getLogger(__name__)
@@ -112,16 +115,19 @@ def create_data_refresh_task_group(
112115
pool=DATA_REFRESH_POOL,
113116
)
114117

115-
# If filtered index creation was manually triggered before the data refresh
116-
# started, we need to wait for it to finish or the data refresh could destroy
117-
# the origin index. Realistically the data refresh is too slow to beat the
118-
# filtered index creation process, even if it was triggered immediately after
119-
# filtered index creation. However, it is safer to avoid the possibility
120-
# of the race condition altogether.
121-
wait_for_filtered_index_creation = wait_for_external_dag(
122-
external_dag_id=f"create_filtered_{data_refresh.media_type}_index",
118+
# Wait for other DAGs that operate on the ES cluster. If a new or filtered index
119+
# is being created by one of these DAGs, we need to wait for it to finish or else
120+
# the data refresh might destroy the index being used as the source index.
121+
# Realistically the data refresh is too slow to beat the index creation process,
122+
# even if it was triggered immediately after one of these DAGs; however, it is
123+
# always safer to avoid the possibility of the race condition altogether.
124+
wait_for_es_dags = wait_for_external_dags.override(group_id="wait_for_es_dags")(
125+
external_dag_ids=[
126+
data_refresh.filtered_index_dag_id,
127+
CREATE_NEW_INDEX_CONFIGS[PRODUCTION].dag_id,
128+
]
123129
)
124-
tasks.append([wait_for_data_refresh, wait_for_filtered_index_creation])
130+
tasks.append([wait_for_data_refresh, wait_for_es_dags])
125131

126132
# Get the index currently mapped to our target alias, to delete later.
127133
get_current_index = ingestion_server.get_current_index(target_alias)

catalog/dags/data_refresh/data_refresh_types.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class DataRefresh:
5353
"""
5454

5555
dag_id: str = field(init=False)
56+
filtered_index_dag_id: str = field(init=False)
5657
media_type: str
5758
start_date: datetime = datetime(2020, 1, 1)
5859
schedule: str | None = "0 0 * * 1" # Mondays 00:00 UTC
@@ -69,6 +70,7 @@ class DataRefresh:
6970

7071
def __post_init__(self):
7172
self.dag_id = f"{self.media_type}_data_refresh"
73+
self.filtered_index_dag_id = f"create_filtered_{self.media_type}_index"
7274

7375

7476
DATA_REFRESH_CONFIGS = {

catalog/dags/database/staging_database_restore/staging_database_restore_dag.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@
2929
from airflow.providers.amazon.aws.operators.rds import RdsDeleteDbInstanceOperator
3030
from airflow.providers.amazon.aws.sensors.rds import RdsSnapshotExistenceSensor
3131
from airflow.utils.trigger_rule import TriggerRule
32-
from es.recreate_staging_index.recreate_full_staging_index import (
33-
DAG_ID as RECREATE_STAGING_INDEX_DAG_ID,
34-
)
3532

3633
from common.constants import (
3734
AWS_RDS_CONN_ID,
@@ -51,6 +48,9 @@
5148
restore_staging_from_snapshot,
5249
skip_restore,
5350
)
51+
from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import (
52+
DAG_ID as RECREATE_STAGING_INDEX_DAG_ID,
53+
)
5454

5555

5656
log = logging.getLogger(__name__)
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
import logging
2+
from datetime import timedelta
3+
4+
from airflow.decorators import task, task_group
5+
from airflow.models.connection import Connection
6+
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
7+
from airflow.sensors.python import PythonSensor
8+
9+
from common.constants import REFRESH_POKE_INTERVAL
10+
from elasticsearch_cluster.create_new_es_index.utils import merge_configurations
11+
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
# Index settings that should not be copied over from the base configuration when
17+
# creating a new index.
18+
EXCLUDED_INDEX_SETTINGS = ["provided_name", "creation_date", "uuid", "version"]
19+
20+
GET_FINAL_INDEX_CONFIG_TASK_NAME = "get_final_index_configuration"
21+
GET_CURRENT_INDEX_CONFIG_TASK_NAME = "get_current_index_configuration"
22+
23+
24+
@task
25+
def get_es_host(environment: str):
26+
conn = Connection.get_connection_from_secrets(f"elasticsearch_http_{environment}")
27+
return conn.host
28+
29+
30+
@task
31+
def get_index_name(media_type: str, index_suffix: str):
32+
return f"{media_type}-{index_suffix}".lower()
33+
34+
35+
@task.branch
36+
def check_override_config(override):
37+
if override:
38+
# Skip the steps to fetch the current index configuration
39+
# and merge changes in.
40+
return GET_FINAL_INDEX_CONFIG_TASK_NAME
41+
42+
return GET_CURRENT_INDEX_CONFIG_TASK_NAME
43+
44+
45+
@task
46+
def get_current_index_configuration(
47+
source_index: str,
48+
es_host: str,
49+
):
50+
"""
51+
Return the configuration for the current index, identified by the
52+
`source_index` param. `source_index` may be either an index name
53+
or an alias, but must uniquely identify one existing index or an
54+
error will be raised.
55+
"""
56+
es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn
57+
58+
response = es_conn.indices.get(
59+
index=source_index,
60+
# Return empty dict instead of throwing error if no index can be
61+
# found. We raise our own error instead.
62+
ignore_unavailable=True,
63+
)
64+
65+
if len(response) != 1:
66+
raise ValueError(f"Index {source_index} could not be uniquely identified.")
67+
68+
# The response has the form:
69+
# { index_name: index_configuration }
70+
# However, since `source_index` can be an alias rather than the index name,
71+
# we do not necessarily know the index_name so we cannot access the configuration
72+
# directly by key. We instead get the first value from the dict, knowing that we
73+
# have already ensured in a previous check that there is exactly one value in the
74+
# response.
75+
config = next(iter(response.values()))
76+
return config
77+
78+
79+
@task
80+
def merge_index_configurations(new_index_config, current_index_config):
81+
"""
82+
Merge the `new_index_config` into the `current_index_config`, and
83+
return an index configuration in the appropriate format for being
84+
passed to the `create_index` API.
85+
"""
86+
# Do not automatically apply any aliases to the new index
87+
current_index_config.pop("aliases")
88+
89+
# Remove fields from the current_index_config that should not be copied
90+
# over into the new index (such as uuid)
91+
for setting in EXCLUDED_INDEX_SETTINGS:
92+
current_index_config.get("settings", {}).get("index", {}).pop(setting)
93+
94+
# Merge the new configuration values into the current configuration
95+
return merge_configurations(current_index_config, new_index_config)
96+
97+
98+
@task
99+
def get_final_index_configuration(
100+
override_config: bool,
101+
index_config,
102+
merged_config,
103+
index_name: str,
104+
):
105+
"""
106+
Resolve the final index configuration to be used in the `create_index`
107+
task.
108+
109+
Required arguments:
110+
111+
override_config: Whether the index_config should be used instead of
112+
the merged_config
113+
index_config: The new index configuration which was passed in via
114+
DAG params
115+
merged_config: The result of merging the index_config with the current
116+
index configuration. This may be None if the merge
117+
tasks were skipped using the override param.
118+
index_name: Name of the index to update.
119+
"""
120+
config = index_config if override_config else merged_config
121+
122+
# Apply the desired index name
123+
config["index"] = index_name
124+
return config
125+
126+
127+
@task
128+
def create_index(index_config, es_host: str):
129+
es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn
130+
131+
new_index = es_conn.indices.create(**index_config)
132+
133+
return new_index
134+
135+
136+
@task_group(group_id="trigger_and_wait_for_reindex")
137+
def trigger_and_wait_for_reindex(
138+
index_name: str,
139+
source_index: str,
140+
query: dict,
141+
timeout: timedelta,
142+
requests_per_second: int,
143+
es_host: str,
144+
):
145+
@task
146+
def trigger_reindex(
147+
index_name: str,
148+
source_index: str,
149+
query: dict,
150+
requests_per_second: int,
151+
es_host: str,
152+
):
153+
es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn
154+
155+
source = {"index": source_index}
156+
# An empty query is not accepted; only pass it
157+
# if a query was actually supplied
158+
if query:
159+
source["query"] = query
160+
161+
response = es_conn.reindex(
162+
source=source,
163+
dest={"index": index_name},
164+
# Parallelize indexing
165+
slices="auto",
166+
# Do not hold the slot while awaiting completion
167+
wait_for_completion=False,
168+
# Immediately refresh the index after completion to make
169+
# the data available for search
170+
refresh=True,
171+
# Throttle
172+
requests_per_second=requests_per_second,
173+
)
174+
175+
return response["task"]
176+
177+
def _wait_for_reindex(task_id: str, es_host: str):
178+
es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn
179+
180+
response = es_conn.tasks.get(task_id=task_id)
181+
return response.get("completed")
182+
183+
trigger_reindex_task = trigger_reindex(
184+
index_name, source_index, query, requests_per_second, es_host
185+
)
186+
187+
wait_for_reindex = PythonSensor(
188+
task_id="wait_for_reindex",
189+
python_callable=_wait_for_reindex,
190+
timeout=timeout,
191+
poke_interval=REFRESH_POKE_INTERVAL,
192+
op_kwargs={"task_id": trigger_reindex_task, "es_host": es_host},
193+
)
194+
195+
trigger_reindex_task >> wait_for_reindex

0 commit comments

Comments
 (0)