Skip to content

Commit 30abcc4

Browse files
Add Elasticsearch cluster health monitor DAGs (#3748)
* Extract shared Elasticsearch cluster connection utilities * Add Elasticsearch healthcheck dag * Refactor to make it easier to test message composition * Add note to remind about --pdb for catalog tests * Add healthcheck dag tests * Update dag docs with new DAG names * Use dynamic dags to generate pre-env dag * Raise value error if message_type is unknown * Fix message indentation * Only alert for non-yellow statuses in prod when data refresh is running * Fix typo in dag tag * Fix missing context
1 parent 3e57bfa commit 30abcc4

File tree

10 files changed

+387
-13
lines changed

10 files changed

+387
-13
lines changed

catalog/dags/common/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
STAGING = "staging"
1616
PRODUCTION = "production"
1717

18+
Environment = Literal["staging", "production"]
19+
ENVIRONMENTS = [STAGING, PRODUCTION]
20+
1821
CONTACT_EMAIL = os.getenv("CONTACT_EMAIL")
1922

2023
DAG_DEFAULT_ARGS = {

catalog/dags/common/sensors/utils.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,24 @@ def prevent_concurrency_with_dag(external_dag_id: str, **context):
9393
raise ValueError(f"Concurrency check with {external_dag_id} failed.")
9494

9595

96+
@task(retries=0)
97+
def is_concurrent_with_any(external_dag_ids: list[str], **context):
98+
"""
99+
Detect whether any of the external DAG are running.
100+
101+
Returns the ID of the first DAG found to be running. Otherwise,
102+
returns None.
103+
"""
104+
for dag_id in external_dag_ids:
105+
try:
106+
prevent_concurrency_with_dag.function(dag_id, **context)
107+
except ValueError:
108+
return dag_id
109+
110+
# Explicit return None to clarify expectations
111+
return None
112+
113+
96114
@task_group(group_id="prevent_concurrency")
97115
def prevent_concurrency_with_dags(external_dag_ids: list[str]):
98116
"""Fail immediately if any of the given external dags are in progress."""

catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from datetime import timedelta
33

44
from airflow.decorators import task, task_group
5-
from airflow.models.connection import Connection
65
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
76
from airflow.sensors.python import PythonSensor
87

@@ -21,12 +20,6 @@
2120
GET_CURRENT_INDEX_CONFIG_TASK_NAME = "get_current_index_configuration"
2221

2322

24-
@task
25-
def get_es_host(environment: str):
26-
conn = Connection.get_connection_from_secrets(f"elasticsearch_http_{environment}")
27-
return conn.host
28-
29-
3023
@task
3124
def get_index_name(media_type: str, index_suffix: str):
3225
return f"{media_type}-{index_suffix}".lower()

catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
CREATE_NEW_INDEX_CONFIGS,
110110
CreateNewIndex,
111111
)
112+
from elasticsearch_cluster.shared import get_es_host
112113

113114

114115
logger = logging.getLogger(__name__)
@@ -188,7 +189,7 @@ def create_new_es_index_dag(config: CreateNewIndex):
188189
with dag:
189190
prevent_concurrency = prevent_concurrency_with_dags(config.blocking_dags)
190191

191-
es_host = es.get_es_host(environment=config.environment)
192+
es_host = get_es_host(environment=config.environment)
192193

193194
index_name = es.get_index_name(
194195
media_type="{{ params.media_type }}",
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
"""
2+
Monitor staging and production Elasticsearch cluster health endpoint.
3+
4+
Requests the cluster health and alerts under the following conditions:
5+
6+
- Red cluster health
7+
- Unexpected number of nodes
8+
- Unresponsive cluster
9+
10+
Additionally, the DAG will notify (rather than alert) when the cluster health is yellow.
11+
Yellow cluster health may or may not be an issue, depending on whether it is expected,
12+
and occurs whenever shards and replicas are being relocated (e.g., during reindexes).
13+
It is worthwhile to notify in these cases, as an assurance, but we could choose to add
14+
logic that ignores yellow cluster health during data refresh or other similar operations.
15+
"""
16+
17+
import json
18+
import logging
19+
from datetime import datetime
20+
from textwrap import dedent, indent
21+
22+
from airflow.decorators import dag, task
23+
from airflow.exceptions import AirflowSkipException
24+
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
25+
from elasticsearch import Elasticsearch
26+
27+
from common.constants import ENVIRONMENTS, PRODUCTION, Environment
28+
from common.sensors.utils import is_concurrent_with_any
29+
from common.slack import send_alert, send_message
30+
from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS
31+
from elasticsearch_cluster.shared import get_es_host
32+
33+
34+
logger = logging.getLogger(__name__)
35+
36+
37+
_DAG_ID = "{env}_elasticsearch_cluster_healthcheck"
38+
39+
EXPECTED_NODE_COUNT = 6
40+
EXPECTED_DATA_NODE_COUNT = 3
41+
EXPECTED_MASTER_NODE_COUNT = 3
42+
43+
44+
def _format_response_body(response_body: dict) -> str:
45+
body_str = indent(json.dumps(response_body, indent=4), prefix=" " * 4)
46+
# body_str is indented in, because the f string added an indentation to
47+
# the front, causing the first curly brace to be incorrectly indented
48+
# and interpolating a multi-line string into the f string led subsequent lines
49+
# to have incorrect indentation (they did not incorporate the f-strings
50+
# own indentation.
51+
# Adding our own indentation using `indent` to match the f-strings
52+
# allows us to correctly dedent later on without issue, with a uniform indentation
53+
# on every line.
54+
return f"""
55+
Full healthcheck response body:
56+
```
57+
{body_str}
58+
```
59+
"""
60+
61+
62+
def _compose_red_status(env: Environment, response_body: dict):
63+
message = f"""
64+
Elasticsearch {env} cluster status is **red**.
65+
66+
This is a critical status change, **investigate ASAP**.
67+
68+
{_format_response_body(response_body)}
69+
"""
70+
return message
71+
72+
73+
def _compose_unexpected_node_count(env: Environment, response_body: dict):
74+
node_count = response_body["number_of_nodes"]
75+
data_node_count = response_body["number_of_data_nodes"]
76+
master_node_count = node_count - data_node_count
77+
78+
message = f"""
79+
Elasticsearch {env} cluster node count is **{node_count}**.
80+
Expected {EXPECTED_NODE_COUNT} total nodes.
81+
82+
Master nodes: **{master_node_count}** of expected {EXPECTED_MASTER_NODE_COUNT}
83+
Data nodes: **{data_node_count}** of expected {EXPECTED_DATA_NODE_COUNT}
84+
85+
This is a critical status change, **investigate ASAP**.
86+
If this is expected (e.g., during controlled node or cluster changes), acknowledge immediately with explanation.
87+
88+
{_format_response_body(response_body)}
89+
"""
90+
logger.error(f"Unexpected node count; {json.dumps(response_body)}")
91+
return message
92+
93+
94+
def _compose_yellow_cluster_health(env: Environment, response_body: dict):
95+
message = f"""
96+
Elasticsearch {env} cluster health is **yellow**.
97+
98+
This does not mean something is necessarily wrong, but if this is not expected (e.g., data refresh) then investigate cluster health now.
99+
100+
{_format_response_body(response_body)}
101+
"""
102+
logger.info(f"Cluster health was yellow; {json.dumps(response_body)}")
103+
return message
104+
105+
106+
@task
107+
def ping_healthcheck(env: str, es_host: str):
108+
es_conn: Elasticsearch = ElasticsearchPythonHook(hosts=[es_host]).get_conn
109+
110+
response = es_conn.cluster.health()
111+
112+
return response.body
113+
114+
115+
@task
116+
def compose_notification(
117+
env: Environment, response_body: dict, is_data_refresh_running: bool
118+
):
119+
status = response_body["status"]
120+
121+
if status == "red":
122+
return "alert", _compose_red_status(env, response_body)
123+
124+
if response_body["number_of_nodes"] != EXPECTED_NODE_COUNT:
125+
return "alert", _compose_unexpected_node_count(env, response_body)
126+
127+
if status == "yellow":
128+
if is_data_refresh_running and env == PRODUCTION:
129+
raise AirflowSkipException(
130+
"Production cluster health status is yellow during data refresh. "
131+
"This is an expected state, so no alert is sent."
132+
)
133+
134+
return "notification", _compose_yellow_cluster_health(env, response_body)
135+
136+
logger.info(f"Cluster health was green; {json.dumps(response_body)}")
137+
return None, None
138+
139+
140+
@task
141+
def notify(env: str, message_type_and_string: tuple[str, str]):
142+
message_type, message = message_type_and_string
143+
144+
if message_type == "alert":
145+
send_alert(dedent(message), dag_id=_DAG_ID.format(env=env))
146+
elif message_type == "notification":
147+
send_message(dedent(message), dag_id=_DAG_ID.format(env=env))
148+
else:
149+
raise ValueError(
150+
f"Invalid message_type. Expected 'alert' or 'notification', "
151+
f"received {message_type}"
152+
)
153+
154+
155+
_SHARED_DAG_ARGS = {
156+
# Every 15 minutes
157+
"schedule": "*/15 * * * *",
158+
"start_date": datetime(2024, 2, 4),
159+
"catchup": False,
160+
"max_active_runs": 1,
161+
"doc_md": __doc__,
162+
"tags": ["elasticsearch", "monitoring"],
163+
}
164+
165+
166+
_DATA_REFRESH_DAG_IDS = []
167+
for config in DATA_REFRESH_CONFIGS.values():
168+
_DATA_REFRESH_DAG_IDS += [config.dag_id, config.filtered_index_dag_id]
169+
170+
171+
for env in ENVIRONMENTS:
172+
173+
@dag(dag_id=_DAG_ID.format(env=env), **_SHARED_DAG_ARGS)
174+
def cluster_healthcheck_dag():
175+
is_data_refresh_running = is_concurrent_with_any(_DATA_REFRESH_DAG_IDS)
176+
177+
es_host = get_es_host(env)
178+
healthcheck_response = ping_healthcheck(env, es_host)
179+
notification = compose_notification(
180+
env, healthcheck_response, is_data_refresh_running
181+
)
182+
es_host >> healthcheck_response >> notification >> notify(env, notification)
183+
184+
cluster_healthcheck_dag()
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from airflow.decorators import task
2+
from airflow.models.connection import Connection
3+
from airflow.models.xcom_arg import XComArg
4+
5+
from common.constants import Environment
6+
7+
8+
@task
9+
def get_es_host(environment: Environment) -> XComArg:
10+
conn = Connection.get_connection_from_secrets(f"elasticsearch_http_{environment}")
11+
return conn.host

catalog/justfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ _mount-test command: up-deps
108108
{{ command }}
109109

110110
# Launch a Bash shell in a test container under `SERVICE`
111+
# Run pytest with `--pdb` to workaround xdist breaking pdb.set_trace()
111112
test-session:
112113
just _mount-test bash
113114

0 commit comments

Comments
 (0)