diff --git a/catalog/dags/elasticsearch_cluster/healthcheck_dag.py b/catalog/dags/elasticsearch_cluster/healthcheck_dag.py index 50807968d4c..8ebecd4ed18 100644 --- a/catalog/dags/elasticsearch_cluster/healthcheck_dag.py +++ b/catalog/dags/elasticsearch_cluster/healthcheck_dag.py @@ -18,6 +18,7 @@ import logging from datetime import datetime from textwrap import dedent, indent +from typing import Literal from airflow.decorators import dag, task from airflow.exceptions import AirflowSkipException @@ -39,6 +40,7 @@ EXPECTED_NODE_COUNT = 6 EXPECTED_DATA_NODE_COUNT = 3 EXPECTED_MASTER_NODE_COUNT = 3 +MessageType = Literal["alert", "notification"] def _format_response_body(response_body: dict) -> str: @@ -59,7 +61,7 @@ def _format_response_body(response_body: dict) -> str: """ -def _compose_red_status(env: Environment, response_body: dict): +def _compose_red_status(env: Environment, response_body: dict) -> str: message = f""" Elasticsearch {env} cluster status is **red**. @@ -70,7 +72,7 @@ def _compose_red_status(env: Environment, response_body: dict): return message -def _compose_unexpected_node_count(env: Environment, response_body: dict): +def _compose_unexpected_node_count(env: Environment, response_body: dict) -> str: node_count = response_body["number_of_nodes"] data_node_count = response_body["number_of_data_nodes"] master_node_count = node_count - data_node_count @@ -91,7 +93,7 @@ def _compose_unexpected_node_count(env: Environment, response_body: dict): return message -def _compose_yellow_cluster_health(env: Environment, response_body: dict): +def _compose_yellow_cluster_health(env: Environment, response_body: dict) -> str: message = f""" Elasticsearch {env} cluster health is **yellow**. @@ -104,7 +106,7 @@ def _compose_yellow_cluster_health(env: Environment, response_body: dict): @task -def ping_healthcheck(env: str, es_host: str): +def ping_healthcheck(env: str, es_host: str) -> dict: es_conn: Elasticsearch = ElasticsearchPythonHook(hosts=[es_host]).get_conn response = es_conn.cluster.health() @@ -115,7 +117,7 @@ def ping_healthcheck(env: str, es_host: str): @task def compose_notification( env: Environment, response_body: dict, is_data_refresh_running: bool -): +) -> tuple[MessageType, str]: status = response_body["status"] if status == "red": @@ -133,12 +135,11 @@ def compose_notification( return "notification", _compose_yellow_cluster_health(env, response_body) - logger.info(f"Cluster health was green; {json.dumps(response_body)}") - return None, None + raise AirflowSkipException(f"Cluster health is green; {json.dumps(response_body)}") @task -def notify(env: str, message_type_and_string: tuple[str, str]): +def notify(env: str, message_type_and_string: tuple[MessageType, str]): message_type, message = message_type_and_string if message_type == "alert": diff --git a/catalog/tests/dags/elasticsearch_cluster/test_healthcheck_dag.py b/catalog/tests/dags/elasticsearch_cluster/test_healthcheck_dag.py index 010e9e8a301..aabdb203cc1 100644 --- a/catalog/tests/dags/elasticsearch_cluster/test_healthcheck_dag.py +++ b/catalog/tests/dags/elasticsearch_cluster/test_healthcheck_dag.py @@ -87,6 +87,13 @@ def _missing_node_keys(master_nodes: int, data_nodes: int): ), id="yellow-status-all-nodes-present", ), + pytest.param( + None, + None, + _make_response_body(status="green"), + id="green-status", + marks=pytest.mark.raises(exception=AirflowSkipException), + ), ), ) def test_compose_notification(