diff --git a/cornflow-dags/DAG/activate_dags.py b/cornflow-dags/DAG/activate_dags.py index ec07893d..df57f14f 100644 --- a/cornflow-dags/DAG/activate_dags.py +++ b/cornflow-dags/DAG/activate_dags.py @@ -1,8 +1,9 @@ +import cornflow_client.airflow.dag_utilities as utils from airflow import DAG +from airflow.operators.python import PythonOperator from airflow.secrets.environment_variables import EnvironmentVariablesBackend -import cornflow_client.airflow.dag_utilities as utils + from update_all_schemas import get_new_apps -from airflow.operators.python import PythonOperator def create_dag(app): @@ -27,7 +28,15 @@ def solve(**kwargs): **kwargs ) with dag: - t1 = PythonOperator(task_id=app.name, python_callable=solve) + if not app.notify: + t1 = PythonOperator(task_id=app.name, python_callable=solve) + else: + t1 = PythonOperator( + task_id=app.name, + python_callable=solve, + on_failure_callback=utils.callback_email, + ) + return dag diff --git a/libs/client/cornflow_client/airflow/dag_utilities.py b/libs/client/cornflow_client/airflow/dag_utilities.py index 8b803dd4..8cdb6981 100644 --- a/libs/client/cornflow_client/airflow/dag_utilities.py +++ b/libs/client/cornflow_client/airflow/dag_utilities.py @@ -10,7 +10,6 @@ from datetime import datetime, timedelta from urllib.parse import urlparse, urljoin - # Imports from modules from cornflow_client import CornFlow, CornFlowApiError @@ -314,6 +313,37 @@ def cf_check(fun, dag_name, secrets, **kwargs): ) +def callback_email(context): + from airflow.utils.email import send_email + from airflow.secrets.environment_variables import EnvironmentVariablesBackend + + path_to_log = ( + f"./logs/{context['dag'].dag_id}/" + f"{context['ti'].task_id}/{context['ts']}/1.log" + ) + environment = EnvironmentVariablesBackend().get_variable("ENVIRONMENT") + notification_email = EnvironmentVariablesBackend().get_variable( + "NOTIFICATION_EMAIL" + ) + environment_name = EnvironmentVariablesBackend().get_variable("ENVIRONMENT_NAME") + + title = f"Airflow. {environment_name} ({environment}). DAG/task error: {context['dag'].dag_id}/{context['ti'].task_id} Failed" + body = f""" + The DAG/task {context['dag'].dag_id}/{context['ti'].task_id} has failed. +
+ The log is attached. + """ + + send_email( + to=[ + notification_email, + ], + subject=title, + html_content=body, + files=[path_to_log], + ) + + class NoSolverException(Exception): pass diff --git a/libs/client/cornflow_client/core/application.py b/libs/client/cornflow_client/core/application.py index b076d66e..6230b9d9 100644 --- a/libs/client/cornflow_client/core/application.py +++ b/libs/client/cornflow_client/core/application.py @@ -33,6 +33,13 @@ class ApplicationCore(ABC): """ The application template. """ + # We create a new attribute controlling the use of the notification mail function + def __init__(self): + self._notify = False + + @property + def notify(self) -> bool: + return self._notify @property @abstractmethod