From b0f0195253cb70555f501f4948fa1e4cfc9710ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandra=20Gal=C3=A1n?= <99278460+AlejandraGalan@users.noreply.github.com> Date: Thu, 5 Oct 2023 11:01:04 +0400 Subject: [PATCH] Feature/email notification (#470) * Adding the notification email on failure * Adding new variable notify to ApplicationCore() * Adding new function callback_email() to utilities * Correcting format ( using black ) * Correcting format ( using black ), including imports inside function and adding msc environment variable * Removing duplicated "import logging" * Small adjustments to PR --------- Co-authored-by: Guillermo Gonzalez-Santander --- cornflow-dags/DAG/activate_dags.py | 15 +++++++-- .../cornflow_client/airflow/dag_utilities.py | 32 ++++++++++++++++++- .../cornflow_client/core/application.py | 7 ++++ 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/cornflow-dags/DAG/activate_dags.py b/cornflow-dags/DAG/activate_dags.py index ec07893df..df57f14f1 100644 --- a/cornflow-dags/DAG/activate_dags.py +++ b/cornflow-dags/DAG/activate_dags.py @@ -1,8 +1,9 @@ +import cornflow_client.airflow.dag_utilities as utils from airflow import DAG +from airflow.operators.python import PythonOperator from airflow.secrets.environment_variables import EnvironmentVariablesBackend -import cornflow_client.airflow.dag_utilities as utils + from update_all_schemas import get_new_apps -from airflow.operators.python import PythonOperator def create_dag(app): @@ -27,7 +28,15 @@ def solve(**kwargs): **kwargs ) with dag: - t1 = PythonOperator(task_id=app.name, python_callable=solve) + if not app.notify: + t1 = PythonOperator(task_id=app.name, python_callable=solve) + else: + t1 = PythonOperator( + task_id=app.name, + python_callable=solve, + on_failure_callback=utils.callback_email, + ) + return dag diff --git a/libs/client/cornflow_client/airflow/dag_utilities.py b/libs/client/cornflow_client/airflow/dag_utilities.py index 8b803dd4d..8cdb69814 100644 --- a/libs/client/cornflow_client/airflow/dag_utilities.py +++ b/libs/client/cornflow_client/airflow/dag_utilities.py @@ -10,7 +10,6 @@ from datetime import datetime, timedelta from urllib.parse import urlparse, urljoin - # Imports from modules from cornflow_client import CornFlow, CornFlowApiError @@ -314,6 +313,37 @@ def cf_check(fun, dag_name, secrets, **kwargs): ) +def callback_email(context): + from airflow.utils.email import send_email + from airflow.secrets.environment_variables import EnvironmentVariablesBackend + + path_to_log = ( + f"./logs/{context['dag'].dag_id}/" + f"{context['ti'].task_id}/{context['ts']}/1.log" + ) + environment = EnvironmentVariablesBackend().get_variable("ENVIRONMENT") + notification_email = EnvironmentVariablesBackend().get_variable( + "NOTIFICATION_EMAIL" + ) + environment_name = EnvironmentVariablesBackend().get_variable("ENVIRONMENT_NAME") + + title = f"Airflow. {environment_name} ({environment}). DAG/task error: {context['dag'].dag_id}/{context['ti'].task_id} Failed" + body = f""" + The DAG/task {context['dag'].dag_id}/{context['ti'].task_id} has failed. +
+ The log is attached. + """ + + send_email( + to=[ + notification_email, + ], + subject=title, + html_content=body, + files=[path_to_log], + ) + + class NoSolverException(Exception): pass diff --git a/libs/client/cornflow_client/core/application.py b/libs/client/cornflow_client/core/application.py index b076d66e8..6230b9d9d 100644 --- a/libs/client/cornflow_client/core/application.py +++ b/libs/client/cornflow_client/core/application.py @@ -33,6 +33,13 @@ class ApplicationCore(ABC): """ The application template. """ + # We create a new attribute controlling the use of the notification mail function + def __init__(self): + self._notify = False + + @property + def notify(self) -> bool: + return self._notify @property @abstractmethod