Skip to content

Commit

Permalink
Feature/email notification (#470)
Browse files Browse the repository at this point in the history
* Adding the notification email on failure

* Adding new variable notify to ApplicationCore()

* Adding new function callback_email() to utilities

* Correcting format ( using black )

* Correcting format ( using black ), including imports inside function and adding msc environment variable

* Removing duplicated "import logging"

* Small adjustments to PR

---------

Co-authored-by: Guillermo Gonzalez-Santander <guillermo.gonzalez@baobabsoluciones.es>
  • Loading branch information
AlejandraGalan and ggsdc authored Oct 5, 2023
1 parent 82a0d66 commit b0f0195
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
15 changes: 12 additions & 3 deletions cornflow-dags/DAG/activate_dags.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import cornflow_client.airflow.dag_utilities as utils
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.secrets.environment_variables import EnvironmentVariablesBackend
import cornflow_client.airflow.dag_utilities as utils

from update_all_schemas import get_new_apps
from airflow.operators.python import PythonOperator


def create_dag(app):
Expand All @@ -27,7 +28,15 @@ def solve(**kwargs):
**kwargs
)
with dag:
t1 = PythonOperator(task_id=app.name, python_callable=solve)
if not app.notify:
t1 = PythonOperator(task_id=app.name, python_callable=solve)
else:
t1 = PythonOperator(
task_id=app.name,
python_callable=solve,
on_failure_callback=utils.callback_email,
)

return dag


Expand Down
32 changes: 31 additions & 1 deletion libs/client/cornflow_client/airflow/dag_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from datetime import datetime, timedelta
from urllib.parse import urlparse, urljoin


# Imports from modules
from cornflow_client import CornFlow, CornFlowApiError

Expand Down Expand Up @@ -314,6 +313,37 @@ def cf_check(fun, dag_name, secrets, **kwargs):
)


def callback_email(context):
from airflow.utils.email import send_email
from airflow.secrets.environment_variables import EnvironmentVariablesBackend

path_to_log = (
f"./logs/{context['dag'].dag_id}/"
f"{context['ti'].task_id}/{context['ts']}/1.log"
)
environment = EnvironmentVariablesBackend().get_variable("ENVIRONMENT")
notification_email = EnvironmentVariablesBackend().get_variable(
"NOTIFICATION_EMAIL"
)
environment_name = EnvironmentVariablesBackend().get_variable("ENVIRONMENT_NAME")

title = f"Airflow. {environment_name} ({environment}). DAG/task error: {context['dag'].dag_id}/{context['ti'].task_id} Failed"
body = f"""
The DAG/task {context['dag'].dag_id}/{context['ti'].task_id} has failed.
<br>
The log is attached.
"""

send_email(
to=[
notification_email,
],
subject=title,
html_content=body,
files=[path_to_log],
)


class NoSolverException(Exception):
pass

Expand Down
7 changes: 7 additions & 0 deletions libs/client/cornflow_client/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class ApplicationCore(ABC):
"""
The application template.
"""
# We create a new attribute controlling the use of the notification mail function
def __init__(self):
self._notify = False

@property
def notify(self) -> bool:
return self._notify

@property
@abstractmethod
Expand Down

0 comments on commit b0f0195

Please sign in to comment.