diff --git a/datascience/src/pipeline/emails/stylesheets/email_to_control_units.css b/datascience/src/pipeline/emails/stylesheets/email_to_control_units.css new file mode 100644 index 0000000000..9e3de230f0 --- /dev/null +++ b/datascience/src/pipeline/emails/stylesheets/email_to_control_units.css @@ -0,0 +1,16 @@ +h1 { + font-size: 1.999rem; +} + +h2 { +font-size: 1.414rem; +} + +body { + text-align: unset; + max-width: 54rem; +} + +strong { + font-weight: bold; +} \ No newline at end of file diff --git a/datascience/src/pipeline/emails/templates/email_actions_to_units/email_actions_to_units.jinja b/datascience/src/pipeline/emails/templates/email_actions_to_units/email_actions_to_units.jinja new file mode 100644 index 0000000000..52e936d210 --- /dev/null +++ b/datascience/src/pipeline/emails/templates/email_actions_to_units/email_actions_to_units.jinja @@ -0,0 +1,45 @@ + + + + Bilan hebdomadaire contrôle des pêches + + + + +
+

Bilan hebdomadaire contrôle des pêches

+
+
+
+
+

Bonjour,

+

Vous trouverez ci-dessous les données des actions de contrôle des pêches effectuées par votre unité + ({{ control_unit_name }}) entre le {{ from_date }} et le {{ to_date }} que vous avez rapportés au Centre National de Surveillance des Pêches (CNSP).

+

Seuls les contrôles et surveillances dont les données sont complètes sont transmis dans ce bilan hebdomadaire. Si certaines données n'ont pas encore été transmises (par ex. l'établissement d'un PV ou non), + il est normal que le contrôle ne figure pas encore dans le rapport.

+

Si des données sont manquantes, incorrectes ou incomplètes, ou pour toute remarque concernant ce bilan, n'hésitez pas à contacter le CNSP : {{ cnsp_france_email_address }}.

+
+
+
+

Contrôles à quai

+ {{ land_controls }} + +

Contrôles en mer

+ {{ sea_controls }} + +

Contrôles aériens

+ {{ air_controls }} + +

Surveillances aériennes

+ {{ air_surveillances }} +
+
+ + + \ No newline at end of file diff --git a/datascience/src/pipeline/entities/control_units.py b/datascience/src/pipeline/entities/control_units.py new file mode 100644 index 0000000000..225a05a613 --- /dev/null +++ b/datascience/src/pipeline/entities/control_units.py @@ -0,0 +1,42 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import List + +import pandas as pd + +from src.pipeline.helpers.dates import Period + + +@dataclass +class ControlUnitWithEmails: + control_unit_id: int + control_unit_name: str + emails: List[str] + + +@dataclass +class ControlUnitActions: + """ + Control unit and its fisheries control actions between two dates. + """ + + control_unit: ControlUnitWithEmails + period: Period + land_controls: pd.DataFrame + sea_controls: pd.DataFrame + air_controls: pd.DataFrame + air_surveillances: pd.DataFrame + + +@dataclass +class ControlUnitActionsSentMessage: + control_unit_id: int + control_unit_name: str + email_address: str + sending_datetime_utc: datetime + actions_min_datetime_utc: datetime + actions_max_datetime_utc: datetime + number_of_actions: int + success: bool + error_code: int + error_message: str diff --git a/datascience/src/pipeline/entities/missions.py b/datascience/src/pipeline/entities/missions.py index bb116f4498..82bcb54fea 100644 --- a/datascience/src/pipeline/entities/missions.py +++ b/datascience/src/pipeline/entities/missions.py @@ -80,3 +80,19 @@ def from_poseidon_infraction_field(infraction: int): class Infraction: natinf: int comments: str + + +class FlightGoal(Enum): + VMS_AIS_CHECK = "VMS_AIS_CHECK" + UNAUTHORIZED_FISHING = "UNAUTHORIZED_FISHING" + CLOSED_AREA = "CLOSED_AREA" + + @property + def label(self): + labels = { + "VMS_AIS_CHECK": "Vérif. AIS/VMS", + "UNAUTHORIZED_FISHING": "Pêche interdite", + "CLOSED_AREA": "Zone fermée", + } + + return labels[self.name] diff --git a/datascience/src/pipeline/flows/email_actions_to_units.py b/datascience/src/pipeline/flows/email_actions_to_units.py new file mode 100644 index 0000000000..43fca1a02e --- /dev/null +++ b/datascience/src/pipeline/flows/email_actions_to_units.py @@ -0,0 +1,484 @@ +from datetime import datetime, timedelta +from email.message import EmailMessage +from pathlib import Path +from typing import List + +import css_inline +import pandas as pd +import prefect +from jinja2 import Environment, FileSystemLoader, Template, select_autoescape +from prefect import Flow, Parameter, case, flatten, task, unmapped +from prefect.engine.signals import SKIP +from prefect.executors import LocalDaskExecutor + +from config import ( + CNSP_FRANCE_EMAIL_ADDRESS, + EMAIL_STYLESHEETS_LOCATION, + EMAIL_TEMPLATES_LOCATION, +) +from src.pipeline.entities.control_units import ( + ControlUnitActions, + ControlUnitActionsSentMessage, + ControlUnitWithEmails, +) +from src.pipeline.entities.missions import FlightGoal, MissionActionType +from src.pipeline.generic_tasks import extract, load +from src.pipeline.helpers.dates import Period +from src.pipeline.helpers.emails import ( + CommunicationMeans, + create_html_email, + send_email_or_sms_or_fax_message, +) +from src.pipeline.shared_tasks.control_flow import ( + check_flow_not_running, + filter_results, +) +from src.pipeline.shared_tasks.control_units import fetch_control_units_contacts +from src.pipeline.shared_tasks.dates import get_utcnow + + +@task(checkpoint=False) +def get_actions_period( + utcnow: datetime, + start_days_ago: int, + end_days_ago: int, +) -> Period: + assert isinstance(start_days_ago, int) + assert isinstance(end_days_ago, int) + assert start_days_ago >= end_days_ago + + today = utcnow.date() + + start_day = today - timedelta(days=start_days_ago) + end_day = today - timedelta(days=end_days_ago - 1) # -1 to include the last day + + return Period( + start=datetime(year=start_day.year, month=start_day.month, day=start_day.day), + end=datetime(year=end_day.year, month=end_day.month, day=end_day.day), + ) + + +@task(checkpoint=False) +def extract_mission_actions(period: Period) -> pd.DataFrame: + logger = prefect.context.get("logger") + + df = extract( + "monitorfish_remote", + "monitorfish/mission_actions_to_email.sql", + params={ + "min_datetime_utc": period.start, + "max_datetime_utc": period.end, + }, + ) + + def convert_flight_goal(s: str) -> FlightGoal: + try: + return FlightGoal(s).label + except ValueError: + logger.error(f"Unkown flight goal {s}.") + return s + + def convert_flight_goals(li: List[str]) -> List[str]: + return [convert_flight_goal(s) for s in li] + + df["flight_goals"] = df.flight_goals.map(convert_flight_goals) + + return df + + +@task(checkpoint=False) +def get_control_unit_ids(env_action: pd.DataFrame) -> List[int]: + # Warning : using `set` and not `.unique()` on `control_unit_id ` in order to + # return `int` and not `numpy.int64` values, which are not handled by psycopg2 when + # passed as query parameters. + return sorted(set(env_action.control_unit_id)) + + +@task(checkpoint=False) +def filter_control_units_contacts( + all_control_units_contacts: pd.DataFrame, control_unit_ids: List[str] +) -> List[ControlUnitWithEmails]: + if len(control_unit_ids) == 0: + raise SKIP("No control units to extract.") + + control_units = all_control_units_contacts.loc[ + ( + all_control_units_contacts.control_unit_id.isin(control_unit_ids) + & (all_control_units_contacts.emails.map(len) > 0) + ), + ["control_unit_id", "control_unit_name", "emails"], + ] + records = control_units.to_dict(orient="records") + return [ControlUnitWithEmails(**control_unit) for control_unit in records] + + +@task(checkpoint=False) +def to_control_unit_actions( + mission_actions: pd.DataFrame, + period: Period, + control_units: List[ControlUnitWithEmails], +) -> List[ControlUnitActions]: + return [ + ControlUnitActions( + control_unit=control_unit, + period=period, + land_controls=mission_actions[ + (mission_actions.control_type == MissionActionType.LAND_CONTROL.value) + & (mission_actions.control_unit_id == control_unit.control_unit_id) + ].reset_index(drop=True), + sea_controls=mission_actions[ + (mission_actions.control_type == MissionActionType.SEA_CONTROL.value) + & (mission_actions.control_unit_id == control_unit.control_unit_id) + ].reset_index(drop=True), + air_controls=mission_actions[ + (mission_actions.control_type == MissionActionType.AIR_CONTROL.value) + & (mission_actions.control_unit_id == control_unit.control_unit_id) + ].reset_index(drop=True), + air_surveillances=mission_actions[ + ( + mission_actions.control_type + == MissionActionType.AIR_SURVEILLANCE.value + ) + & (mission_actions.control_unit_id == control_unit.control_unit_id) + ].reset_index(drop=True), + ) + for control_unit in control_units + ] + + +@task(checkpoint=False) +def get_template() -> Template: + templates_locations = [ + EMAIL_TEMPLATES_LOCATION / "email_actions_to_units", + EMAIL_STYLESHEETS_LOCATION, + ] + + env = Environment( + loader=FileSystemLoader(templates_locations), + autoescape=select_autoescape(), + ) + + return env.get_template("email_actions_to_units.jinja") + + +@task(checkpoint=False) +def render(control_unit_actions: ControlUnitActions, template: Template) -> str: + def format_segments(segments: list) -> str: + return ", ".join(segments) + + def format_longitude_latitude(lon: float, lat: float) -> str: + lon_direction = "E" if lon > 0 else "W" + lat_direction = "N" if lat > 0 else "S" + return f"{abs(lat): .4f}{lat_direction} {abs(lon): .4f}{lon_direction}" + + # Sea controls + if len(control_unit_actions.sea_controls) > 0: + sea_controls = control_unit_actions.sea_controls.copy(deep=True) + + sea_controls["vessel"] = sea_controls.apply( + lambda row: f"{row['vessel_name']} ({row['flag_state']})", axis=1 + ) + + sea_controls["infraction"] = sea_controls.infraction.map( + {True: "Oui", False: "Non"}, na_action="ignore" + ).fillna("-") + + sea_controls["infraction_report"] = sea_controls.infraction_report.map( + {True: "Oui", False: "Non"}, na_action="ignore" + ).fillna("-") + + sea_controls["position"] = sea_controls.apply( + lambda row: format_longitude_latitude(row["longitude"], row["latitude"]), + axis=1, + ) + + sea_controls["control_datetime_utc"] = sea_controls.control_datetime_utc.map( + lambda d: d.strftime("%Y-%m-%d %H:%M") + ) + + sea_controls["segments"] = sea_controls.segments.map(format_segments) + + columns = { + "control_datetime_utc": "Date du contrôle", + "vessel": "Navire", + "position": "Position", + "infraction": "Infraction constatée", + "infraction_report": "PV dressé", + "segments": "Segment(s) de flotte", + } + + sea_controls = sea_controls[columns.keys()].rename(columns=columns) + sea_controls = sea_controls.to_html(index=False, border=1) + + else: + sea_controls = "Aucun" + + # Land controls + if len(control_unit_actions.land_controls) > 0: + land_controls = control_unit_actions.land_controls.copy(deep=True) + + land_controls["vessel"] = land_controls.apply( + lambda row: f"{row['vessel_name']} ({row['flag_state']})", axis=1 + ) + land_controls["infraction"] = land_controls.infraction.map( + {True: "Oui", False: "Non"}, na_action="ignore" + ).fillna("-") + + land_controls["infraction_report"] = land_controls.infraction_report.map( + {True: "Oui", False: "Non"}, na_action="ignore" + ).fillna("-") + + land_controls["control_datetime_utc"] = land_controls.control_datetime_utc.map( + lambda d: d.strftime("%Y-%m-%d %H:%M") + ) + + land_controls["port"] = land_controls.apply( + lambda row: f"{row['port_name']} ({row['port_locode']})", axis=1 + ) + + land_controls["segments"] = land_controls.segments.map(format_segments) + + columns = { + "control_datetime_utc": "Date du contrôle", + "vessel": "Navire", + "port": "Port", + "infraction": "Infraction constatée", + "infraction_report": "PV dressé", + "segments": "Segment(s) de flotte", + } + + land_controls = land_controls[columns.keys()].rename(columns=columns) + land_controls = land_controls.to_html(index=False, border=1) + else: + land_controls = "Aucun" + + # Air controls + if len(control_unit_actions.air_controls) > 0: + air_controls = control_unit_actions.air_controls.copy(deep=True) + + air_controls["vessel"] = air_controls.apply( + lambda row: f"{row['vessel_name']} ({row['flag_state']})", axis=1 + ) + air_controls["infraction"] = air_controls.infraction.map( + {True: "Oui", False: "Non"}, na_action="ignore" + ).fillna("-") + + air_controls["infraction_report"] = air_controls.infraction_report.map( + {True: "Oui", False: "Non"}, na_action="ignore" + ).fillna("-") + + air_controls["position"] = air_controls.apply( + lambda row: format_longitude_latitude(row["longitude"], row["latitude"]), + axis=1, + ) + + air_controls["control_datetime_utc"] = air_controls.control_datetime_utc.map( + lambda d: d.strftime("%Y-%m-%d %H:%M") + ) + + columns = { + "control_datetime_utc": "Date du contrôle", + "vessel": "Navire", + "position": "Position", + "infraction": "Infraction constatée", + "infraction_report": "PV dressé", + } + + air_controls = air_controls[columns.keys()].rename(columns=columns) + air_controls = air_controls.to_html(index=False, border=1) + else: + air_controls = "Aucun" + + # Air surveillances + if len(control_unit_actions.air_surveillances) > 0: + air_surveillances = control_unit_actions.air_surveillances.copy(deep=True) + + air_surveillances[ + "control_datetime_utc" + ] = air_surveillances.control_datetime_utc.map( + lambda d: d.strftime("%Y-%m-%d %H:%M") + ) + + air_surveillances["segments"] = air_surveillances.segments.map(format_segments) + air_surveillances["flight_goals"] = air_surveillances.flight_goals.map( + lambda li: ", ".join(li), na_action="ignore" + ) + + columns = { + "control_datetime_utc": "Date du vol", + "number_of_vessels_flown_over": "Navires survolés", + "flight_goals": "Objectifs du vols", + "segments": "Segments ciblés", + } + + air_surveillances = air_surveillances[columns.keys()].rename(columns=columns) + air_surveillances = air_surveillances.to_html(index=False, border=1) + else: + air_surveillances = "Aucune" + + html = template.render( + control_unit_name=control_unit_actions.control_unit.control_unit_name, + land_controls=land_controls, + sea_controls=sea_controls, + air_controls=air_controls, + air_surveillances=air_surveillances, + from_date=control_unit_actions.period.start.strftime("%d/%m/%Y %H:%M UTC"), + to_date=control_unit_actions.period.end.strftime("%d/%m/%Y %H:%M UTC"), + cnsp_france_email_address=CNSP_FRANCE_EMAIL_ADDRESS, + ) + + html = css_inline.inline(html) + return html + + +@task(checkpoint=False) +def create_email( + html: str, actions: ControlUnitActions, test_mode: bool +) -> EmailMessage: + to = CNSP_FRANCE_EMAIL_ADDRESS if test_mode else actions.control_unit.emails + + message = create_html_email( + to=to, + subject="Bilan hebdomadaire contrôle des pêches", + html=html, + reply_to=CNSP_FRANCE_EMAIL_ADDRESS, + ) + + return message + + +@task(checkpoint=False) +def send_mission_actions_email( + message: EmailMessage, actions: ControlUnitActions, is_integration: bool +) -> List[ControlUnitActionsSentMessage]: + """ + Sends input email using the contents of `From` header as sender and `To`, `Cc` + and `Bcc` headers as recipients. + + Args: + message (EmailMessage): email message to send + actions (ControlUnitActions): `ControlUnitActions` related to message + is_integration (bool): if ``False``, the message is not actually sent + + Returns: + List[ControlUnitActionsSentMessage]: List of sent messages and their error + codes, if any. + """ + + logger = prefect.context.get("logger") + addressees = actions.control_unit.emails + + send_errors = send_email_or_sms_or_fax_message( + msg=message, + communication_means=CommunicationMeans.EMAIL, + is_integration=is_integration, + logger=logger, + ) + + now = datetime.utcnow() + + sent_messages = [] + + for addressee in addressees: + if addressee in send_errors: + success = False + error_code = send_errors[addressee][0] + error_message = send_errors[addressee][1] + else: + success = True + error_code = None + error_message = None + + sent_messages.append( + ControlUnitActionsSentMessage( + control_unit_id=actions.control_unit.control_unit_id, + control_unit_name=actions.control_unit.control_unit_name, + email_address=addressee, + sending_datetime_utc=now, + actions_min_datetime_utc=actions.period.start, + actions_max_datetime_utc=actions.period.end, + number_of_actions=( + len(actions.sea_controls) + + len(actions.land_controls) + + len(actions.air_controls) + + len(actions.air_surveillances) + ), + success=success, + error_code=error_code, + error_message=error_message, + ) + ) + return sent_messages + + +@task(checkpoint=False) +def control_unit_actions_list_to_df( + messages: List[ControlUnitActionsSentMessage], +) -> pd.DataFrame: + messages = pd.DataFrame(messages) + return messages + + +@task(checkpoint=False) +def load_emails_sent_to_control_units( + emails_sent_to_control_units: pd.DataFrame, +): + load( + emails_sent_to_control_units, + table_name="emails_sent_to_control_units", + schema="public", + db_name="monitorfish_remote", + how="append", + nullable_integer_columns=["error_code"], + logger=prefect.context.get("logger"), + ) + + +with Flow("Email actions to units", executor=LocalDaskExecutor()) as flow: + flow_not_running = check_flow_not_running() + with case(flow_not_running, True): + test_mode = Parameter("test_mode") + is_integration = Parameter("is_integration") + start_days_ago = Parameter("start_days_ago") + end_days_ago = Parameter("end_days_ago") + + template = get_template() + utcnow = get_utcnow() + + period = get_actions_period( + utcnow=utcnow, + start_days_ago=start_days_ago, + end_days_ago=end_days_ago, + ) + mission_actions = extract_mission_actions(period=period) + all_control_units_contacts = fetch_control_units_contacts() + + control_unit_ids = get_control_unit_ids(mission_actions) + control_units_emails = filter_control_units_contacts( + all_control_units_contacts, control_unit_ids + ) + + control_unit_actions = to_control_unit_actions( + mission_actions, period, control_units_emails + ) + + html = render.map(control_unit_actions, template=unmapped(template)) + + message = create_email.map( + html=html, + actions=control_unit_actions, + test_mode=unmapped(test_mode), + ) + message = filter_results(message) + + sent_messages = send_mission_actions_email.map( + message, + control_unit_actions, + is_integration=unmapped(is_integration), + ) + + sent_messages = flatten(sent_messages) + sent_messages = control_unit_actions_list_to_df(sent_messages) + load_emails_sent_to_control_units(sent_messages) + +flow.file_name = Path(__file__).name diff --git a/datascience/src/pipeline/queries/monitorfish/mission_actions_to_email.sql b/datascience/src/pipeline/queries/monitorfish/mission_actions_to_email.sql new file mode 100644 index 0000000000..a113898347 --- /dev/null +++ b/datascience/src/pipeline/queries/monitorfish/mission_actions_to_email.sql @@ -0,0 +1,37 @@ +WITH actions_to_email AS ( + SELECT DISTINCT ON (id, control_unit_id) + id, + control_unit_id, + control_unit, + control_type, + control_datetime_utc AT TIME ZONE 'UTC' AS control_datetime_utc, + vessel_name, + flag_state, + longitude, + latitude, + port_name, + port_locode, + infraction, + infraction_report, + number_of_vessels_flown_over, + flight_goals + FROM analytics_controls_full_data + WHERE + control_datetime_utc >= :min_datetime_utc + AND control_datetime_utc < :max_datetime_utc +), + +actions_segments AS ( + SELECT + id, + ARRAY_AGG(DISTINCT segment ORDER BY segment) AS segments + FROM analytics_controls_full_data + WHERE id IN (SELECT id FROM actions_to_email) + GROUP BY id +) + +SELECT a.*, s.segments +FROM actions_to_email a +LEFT JOIN actions_segments s +ON a.id = s.id +ORDER BY a.id, a.control_unit_id \ No newline at end of file diff --git a/datascience/tests/test_data/emails/email_actions_to_units/expected_rendered_email.html b/datascience/tests/test_data/emails/email_actions_to_units/expected_rendered_email.html new file mode 100644 index 0000000000..2fd2771224 --- /dev/null +++ b/datascience/tests/test_data/emails/email_actions_to_units/expected_rendered_email.html @@ -0,0 +1,166 @@ + + Bilan hebdomadaire contrôle des pêches + + + + +
+

Bilan hebdomadaire contrôle des pêches

+
+
+
+
+

Bonjour,

+

Vous trouverez ci-dessous les données des actions de contrôle des pêches effectuées par votre unité + (Nom de l'unité) entre le 23/06/2020 00:00 UTC et le 06/05/2020 18:45 UTC que vous avez rapportés au Centre National de Surveillance des Pêches (CNSP).

+

Seuls les contrôles et surveillances dont les données sont complètes sont transmis dans ce bilan hebdomadaire. Si certaines données n'ont pas encore été transmises (par ex. l'établissement d'un PV ou non), + il est normal que le contrôle ne figure pas encore dans le rapport.

+

Si des données sont manquantes, incorrectes ou incomplètes, ou pour toute remarque concernant ce bilan, n'hésitez pas à contacter le CNSP : cnsp.france@test.email.

+
+
+
+

Contrôles à quai

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Date du contrôleNavirePortInfraction constatéePV dresséSegment(s) de flotte
2021-08-06 15:23None (FR)Somewhere over the rainbow (FRCQF)OuiNonNS13, NWW08
2022-04-22 15:23None (FR)Somewhere over the ocean (FRLEH)OuiOuiFR_SCE
2022-04-29 15:23None (NL)Somewhere over the swell (FRDKK)NonNonHors segment
2022-04-26 15:23None (FR)Somewhere over the top (FRZJZ)OuiOuiHors segment
2022-04-27 15:23None (FR)Somewhere over the top (FRZJZ)NonNonHors segment
+ +

Contrôles en mer

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Date du contrôleNavirePositionInfraction constatéePV dresséSegment(s) de flotte
2022-02-05 15:23None (UNDEFINED)6.8500N 53.1200ENonNonHors segment
2022-02-06 15:23None (UNDEFINED)6.8500N 53.1200ENonNonHors segment
2021-12-06 15:23None (FR)nanS nanWNonNonHors segment
+ +

Contrôles aériens

+ + + + + + + + + + + + + + + + + + + +
Date du contrôleNavirePositionInfraction constatéePV dressé
2022-05-03 15:23El barco (NL)6.8500S 45.1200ENonNon
+ +

Surveillances aériennes

+ + + + + + + + + + + + + + + + + +
Date du volNavires survolésObjectifs du volsSegments ciblés
2022-05-04 15:2327.0Vérif. AIS/VMS, Zone ferméeFR_SCE
+
+
+ + + \ No newline at end of file diff --git a/datascience/tests/test_data/emails/email_actions_to_units/expected_rendered_email_without_actions.html b/datascience/tests/test_data/emails/email_actions_to_units/expected_rendered_email_without_actions.html new file mode 100644 index 0000000000..6ebdf85be0 --- /dev/null +++ b/datascience/tests/test_data/emails/email_actions_to_units/expected_rendered_email_without_actions.html @@ -0,0 +1,40 @@ + + Bilan hebdomadaire contrôle des pêches + + + + +
+

Bilan hebdomadaire contrôle des pêches

+
+
+
+
+

Bonjour,

+

Vous trouverez ci-dessous les données des actions de contrôle des pêches effectuées par votre unité + (Nom de l'unité) entre le 23/06/2020 00:00 UTC et le 06/05/2020 18:45 UTC que vous avez rapportés au Centre National de Surveillance des Pêches (CNSP).

+

Seuls les contrôles et surveillances dont les données sont complètes sont transmis dans ce bilan hebdomadaire. Si certaines données n'ont pas encore été transmises (par ex. l'établissement d'un PV ou non), + il est normal que le contrôle ne figure pas encore dans le rapport.

+

Si des données sont manquantes, incorrectes ou incomplètes, ou pour toute remarque concernant ce bilan, n'hésitez pas à contacter le CNSP : cnsp.france@test.email.

+
+
+
+

Contrôles à quai

+ Aucun + +

Contrôles en mer

+ Aucun + +

Contrôles aériens

+ Aucun + +

Surveillances aériennes

+ Aucune +
+
+ + + \ No newline at end of file diff --git a/datascience/tests/test_data/remote_database/V666.39__Reset_emails_sent_to_control_units.sql b/datascience/tests/test_data/remote_database/V666.39__Reset_emails_sent_to_control_units.sql new file mode 100644 index 0000000000..ffdbe6e113 --- /dev/null +++ b/datascience/tests/test_data/remote_database/V666.39__Reset_emails_sent_to_control_units.sql @@ -0,0 +1 @@ +DELETE FROM emails_sent_to_control_units; \ No newline at end of file diff --git a/datascience/tests/test_pipeline/test_flows/test_email_actions_to_units.py b/datascience/tests/test_pipeline/test_flows/test_email_actions_to_units.py new file mode 100644 index 0000000000..0790009c55 --- /dev/null +++ b/datascience/tests/test_pipeline/test_flows/test_email_actions_to_units.py @@ -0,0 +1,768 @@ +import dataclasses +from datetime import datetime, timedelta +from email.message import EmailMessage +from smtplib import SMTPDataError +from typing import List +from unittest.mock import patch + +import pandas as pd +import pytest +from dateutil import relativedelta +from jinja2 import Template +from prefect import task + +from config import ( + CNSP_FRANCE_EMAIL_ADDRESS, + MONITORFISH_EMAIL_ADDRESS, + TEST_DATA_LOCATION, +) +from src.pipeline.entities.control_units import ( + ControlUnitActions, + ControlUnitActionsSentMessage, + ControlUnitWithEmails, +) +from src.pipeline.entities.missions import FlightGoal +from src.pipeline.flows.email_actions_to_units import ( + control_unit_actions_list_to_df, + create_email, + extract_mission_actions, + filter_control_units_contacts, + flow, + get_actions_period, + get_control_unit_ids, + get_template, + load_emails_sent_to_control_units, + render, + send_mission_actions_email, + to_control_unit_actions, +) +from src.pipeline.helpers.dates import Period +from src.read_query import read_query +from tests.mocks import mock_check_flow_not_running + +flow.replace(flow.get_tasks("check_flow_not_running")[0], mock_check_flow_not_running) + + +@task(checkpoint=False) +def mock_fetch_control_units_contacts(): + return pd.DataFrame( + { + "control_unit_id": [3, 5, 8], + "control_unit_name": ["Unité 3", "Unité 5", "Unité 8"], + "emails": [ + ["alternative@email", "some.email@control.unit.4"], + [], + ["email8@email.com"], + ], + "phone_numbers": [ + ["'00 11 22 33 44 55"], + ["44 44 44 44 44"], + [], + ], + } + ) + + +flow.replace( + flow.get_tasks("fetch_control_units_contacts")[0], mock_fetch_control_units_contacts +) + + +@pytest.fixture +def expected_mission_actions() -> pd.DataFrame: + now = datetime.utcnow() + + return pd.DataFrame( + { + "id": [-199999, -144762, 1, 6, 6, 10, 11, 21, 22, 23], + "control_unit_id": [8, 8, 5, 3, 8, 8, 8, 8, 8, 8], + "control_unit": [ + "Bobby McDewis", + "Bobby McDewis", + "Mike The Buster", + "Nozy Mary", + "Bobby McDewis", + "Bobby McDewis", + "Bobby McDewis", + "Bobby McDewis", + "Bobby McDewis", + "Bobby McDewis", + ], + "control_type": [ + "SEA_CONTROL", + "SEA_CONTROL", + "SEA_CONTROL", + "LAND_CONTROL", + "LAND_CONTROL", + "LAND_CONTROL", + "SEA_CONTROL", + "LAND_CONTROL", + "LAND_CONTROL", + "LAND_CONTROL", + ], + "control_datetime_utc": [ + now - relativedelta.relativedelta(months=3, days=1), + now - relativedelta.relativedelta(months=3), + now - relativedelta.relativedelta(weeks=3), + now - relativedelta.relativedelta(months=9), + now - relativedelta.relativedelta(months=9), + now - relativedelta.relativedelta(weeks=2), + now - relativedelta.relativedelta(months=5), + now - relativedelta.relativedelta(weeks=1), + now - relativedelta.relativedelta(weeks=1, days=3), + now - relativedelta.relativedelta(weeks=1, days=2), + ], + "vessel_name": [None, None, None, None, None, None, None, None, None, None], + "flag_state": [ + "UNDEFINED", + "UNDEFINED", + "FR", + "FR", + "FR", + "FR", + "FR", + "NL", + "FR", + "FR", + ], + "longitude": [ + 53.12, + 53.12, + -1.566, + None, + None, + None, + None, + None, + None, + None, + ], + "latitude": [6.85, 6.85, 46.0, None, None, None, None, None, None, None], + "port_name": [ + None, + None, + None, + "Somewhere over the rainbow", + "Somewhere over the rainbow", + "Somewhere over the ocean", + None, + "Somewhere over the swell", + "Somewhere over the top", + "Somewhere over the top", + ], + "port_locode": [ + None, + None, + None, + "FRCQF", + "FRCQF", + "FRLEH", + None, + "FRDKK", + "FRZJZ", + "FRZJZ", + ], + "infraction": [ + False, + False, + True, + True, + True, + True, + False, + False, + True, + False, + ], + "infraction_report": [ + False, + False, + True, + False, + False, + True, + False, + False, + True, + False, + ], + "number_of_vessels_flown_over": [ + None, + None, + None, + None, + None, + None, + None, + None, + None, + None, + ], + "flight_goals": [ + [], + [], + [], + [], + [], + [], + [], + [], + [], + [], + ], + "segments": [ + ["Hors segment"], + ["Hors segment"], + ["SWW01/02/03"], + ["NS13", "NWW08"], + ["NS13", "NWW08"], + ["FR_SCE"], + ["Hors segment"], + ["Hors segment"], + ["Hors segment"], + ["Hors segment"], + ], + } + ) + + +@pytest.fixture +def sample_mission_actions(expected_mission_actions) -> pd.DataFrame: + now = datetime.utcnow() + + df = pd.concat( + [ + expected_mission_actions, + pd.DataFrame( + { + "id": [24, 25], + "control_unit_id": [3, 3], + "control_unit": [ + "Bobby McDewis", + "Bobby McDewis", + ], + "control_type": [ + "AIR_CONTROL", + "AIR_SURVEILLANCE", + ], + "control_datetime_utc": [ + now - relativedelta.relativedelta(days=3), + now - relativedelta.relativedelta(days=2), + ], + "vessel_name": ["El barco", "La Caravella"], + "flag_state": [ + "NL", + "FR", + ], + "longitude": [45.12, None], + "latitude": [-6.85, None], + "port_name": [ + None, + None, + ], + "infraction": [ + False, + False, + ], + "infraction_report": [ + False, + False, + ], + "number_of_vessels_flown_over": [None, 27], + "flight_goals": [ + [], + [FlightGoal.VMS_AIS_CHECK.label, FlightGoal.CLOSED_AREA.label], + ], + "segments": [ + ["NS13", "NWW08"], + ["FR_SCE"], + ], + } + ), + ] + ).reset_index(drop=True) + + d = datetime(2022, 5, 6, 15, 23, 40) + + df.loc[:, "control_datetime_utc"] = [ + d - relativedelta.relativedelta(months=3, days=1), + d - relativedelta.relativedelta(months=3), + d - relativedelta.relativedelta(weeks=3), + d - relativedelta.relativedelta(months=9), + d - relativedelta.relativedelta(months=9), + d - relativedelta.relativedelta(weeks=2), + d - relativedelta.relativedelta(months=5), + d - relativedelta.relativedelta(weeks=1), + d - relativedelta.relativedelta(weeks=1, days=3), + d - relativedelta.relativedelta(weeks=1, days=2), + d - relativedelta.relativedelta(days=3), + d - relativedelta.relativedelta(days=2), + ] + + return df + + +@pytest.fixture +def expected_control_unit_ids() -> List[int]: + return [3, 5, 8] + + +@pytest.fixture +def sample_control_units() -> pd.DataFrame: + return [ + ControlUnitWithEmails( + control_unit_id=3, + control_unit_name="Unité 3", + emails=["unité_3@email.fr", "unité_3_bis@email.fr"], + ), + ControlUnitWithEmails( + control_unit_id=5, control_unit_name="Unité 5", emails=["unité_5@email.fr"] + ), + ControlUnitWithEmails( + control_unit_id=8, + control_unit_name="Unité 8", + emails=["unité_8@email.fr", "unité_8_bis@email.fr"], + ), + ] + + +@pytest.fixture +def expected_all_control_units() -> pd.DataFrame: + return pd.DataFrame( + { + "control_unit_id": [10002, 10018, 10019], + "control_unit_name": ["DML – DDTM 59", "P602 Verdon", "BN Toulon"], + "email_addresses": [ + ["dml59@surveillance.fr"], + ["diffusion.p602@email.fr", "diffusion_bis.p602@email.fr"], + ["bn_toulon@email.fr"], + ], + } + ) + + +@pytest.fixture +def sample_control_unit_actions(sample_mission_actions) -> ControlUnitActions: + return ControlUnitActions( + control_unit=ControlUnitWithEmails( + control_unit_id=13, + control_unit_name="Nom de l'unité", + emails=["email@email.com", "email2@email.com"], + ), + period=Period( + start=datetime(2020, 6, 23, 0, 0, 0), + end=datetime(2020, 5, 6, 18, 45, 6), + ), + sea_controls=sample_mission_actions.iloc[[0, 1, 6]].reset_index(drop=True), + land_controls=sample_mission_actions.iloc[[4, 5, 7, 8, 9]].reset_index( + drop=True + ), + air_controls=sample_mission_actions.iloc[[10]].reset_index(drop=True), + air_surveillances=sample_mission_actions.iloc[[11]].reset_index(drop=True), + ) + + +@pytest.fixture +def sample_control_unit_actions_without_actions( + sample_control_unit_actions, +) -> ControlUnitActions: + return dataclasses.replace( + sample_control_unit_actions, + sea_controls=sample_control_unit_actions.sea_controls.head(0), + land_controls=sample_control_unit_actions.land_controls.head(0), + air_controls=sample_control_unit_actions.air_controls.head(0), + air_surveillances=sample_control_unit_actions.air_surveillances.head(0), + ) + + +@pytest.fixture +def control_unit_actions_sent_messages() -> List[ControlUnitActionsSentMessage]: + return [ + ControlUnitActionsSentMessage( + control_unit_id=13, + control_unit_name="Nom de l'unité", + email_address="email@email.com", + sending_datetime_utc=datetime(2024, 3, 19, 14, 37, 24, 497093), + actions_min_datetime_utc=datetime(2020, 6, 23, 0, 0), + actions_max_datetime_utc=datetime(2020, 5, 6, 18, 45, 6), + number_of_actions=2, + success=True, + error_code=None, + error_message=None, + ), + ControlUnitActionsSentMessage( + control_unit_id=13, + control_unit_name="Nom de l'unité", + email_address="email2@email.com", + sending_datetime_utc=datetime(2024, 3, 19, 14, 37, 24, 497093), + actions_min_datetime_utc=datetime(2020, 6, 23, 0, 0), + actions_max_datetime_utc=datetime(2020, 5, 6, 18, 45, 6), + number_of_actions=2, + success=False, + error_code=550, + error_message="Email cound not be sent.", + ), + ] + + +@pytest.fixture +def control_unit_actions_sent_messages_df() -> pd.DataFrame: + return pd.DataFrame( + { + "control_unit_id": [13, 13], + "control_unit_name": ["Nom de l'unité", "Nom de l'unité"], + "email_address": ["email@email.com", "email2@email.com"], + "sending_datetime_utc": [ + datetime( + year=2024, + month=3, + day=19, + hour=14, + minute=37, + second=24, + microsecond=497093, + ), + datetime( + year=2024, + month=3, + day=19, + hour=14, + minute=37, + second=24, + microsecond=497093, + ), + ], + "actions_min_datetime_utc": [ + datetime(year=2020, month=6, day=23, hour=00, minute=00, second=00), + datetime(year=2020, month=6, day=23, hour=00, minute=00, second=00), + ], + "actions_max_datetime_utc": [ + datetime(year=2020, month=5, day=6, hour=18, minute=45, second=6), + datetime(year=2020, month=5, day=6, hour=18, minute=45, second=6), + ], + "number_of_actions": [2, 2], + "success": [True, False], + "error_code": [None, 550.0], + "error_message": [None, "Email cound not be sent."], + } + ) + + +@pytest.fixture +def expected_email(sample_control_unit_actions) -> EmailMessage: + email = EmailMessage() + email["Subject"] = "Bilan hebdomadaire contrôle des pêches" + email["From"] = MONITORFISH_EMAIL_ADDRESS + email["To"] = ", ".join(sample_control_unit_actions.control_unit.emails) + email["Reply-To"] = CNSP_FRANCE_EMAIL_ADDRESS + email.set_content("Bonjour ceci est un email test.\n", subtype="html") + + return email + + +def test_get_actions_period(): + period = get_actions_period.run( + utcnow=datetime(2021, 2, 21, 16, 10, 0), + start_days_ago=5, + end_days_ago=2, + ) + assert period == Period( + start=datetime(2021, 2, 16, 0, 0), end=datetime(2021, 2, 20, 0, 0) + ) + + +def test_extract_mission_actions(reset_test_data, expected_mission_actions): + # Dates with some data + now = datetime.utcnow() + y = relativedelta.relativedelta(years=1) + actions = extract_mission_actions.run(period=Period(start=now - y, end=now)) + pd.testing.assert_frame_equal( + actions.drop(columns=["control_datetime_utc"]), + expected_mission_actions.drop(columns=["control_datetime_utc"]), + ) + assert ( + ( + actions.control_datetime_utc - expected_mission_actions.control_datetime_utc + ).abs() + < timedelta(seconds=10) + ).all() + + +def test_get_control_unit_ids(expected_mission_actions, expected_control_unit_ids): + ids = get_control_unit_ids.run(expected_mission_actions) + assert ids == expected_control_unit_ids + + +def test_filter_control_units_contacts(control_units_contacts): + res = filter_control_units_contacts.run( + all_control_units_contacts=control_units_contacts, control_unit_ids=[2, 3] + ) + + assert res == [ + ControlUnitWithEmails( + control_unit_id=2, + control_unit_name="Unité 2", + emails=["alternative@email", "some.email@control.unit.4"], + ) + ] + + +def test_to_control_unit_actions(sample_mission_actions, sample_control_units): + period = Period( + start=datetime(1996, 6, 11, 2, 52, 36), + end=datetime(1996, 6, 13, 6, 17, 18), + ) + + control_unit_actions = to_control_unit_actions.run( + mission_actions=sample_mission_actions, + period=period, + control_units=sample_control_units, + ) + + assert len(control_unit_actions) == 3 + + assert isinstance(control_unit_actions[0], ControlUnitActions) + assert control_unit_actions[0].control_unit.control_unit_id == 3 + assert control_unit_actions[0].period == period + assert len(control_unit_actions[0].sea_controls) == 0 + + pd.testing.assert_frame_equal( + control_unit_actions[0].land_controls, + sample_mission_actions.iloc[[3]].reset_index(drop=True), + ) + pd.testing.assert_frame_equal( + control_unit_actions[0].air_controls, + sample_mission_actions.iloc[[10]].reset_index(drop=True), + ) + pd.testing.assert_frame_equal( + control_unit_actions[0].air_surveillances, + sample_mission_actions.iloc[[11]].reset_index(drop=True), + ) + + assert control_unit_actions[1].control_unit.control_unit_id == 5 + assert control_unit_actions[1].period == period + pd.testing.assert_frame_equal( + control_unit_actions[1].sea_controls, + sample_mission_actions.iloc[[2]].reset_index(drop=True), + ) + assert len(control_unit_actions[1].land_controls) == 0 + assert len(control_unit_actions[1].air_controls) == 0 + assert len(control_unit_actions[1].air_surveillances) == 0 + + assert control_unit_actions[2].control_unit.control_unit_id == 8 + assert control_unit_actions[2].period == period + pd.testing.assert_frame_equal( + control_unit_actions[2].sea_controls, + sample_mission_actions.iloc[[0, 1, 6]].reset_index(drop=True), + ) + pd.testing.assert_frame_equal( + control_unit_actions[2].land_controls, + sample_mission_actions.iloc[[4, 5, 7, 8, 9]].reset_index(drop=True), + ) + + assert len(control_unit_actions[2].air_controls) == 0 + assert len(control_unit_actions[2].air_surveillances) == 0 + + +def test_get_template(): + template = get_template.run() + assert isinstance(template, Template) + + +def test_render_html(sample_control_unit_actions): + template = get_template.run() + html = render.run( + control_unit_actions=sample_control_unit_actions, template=template + ) + + # Uncomment to update the expected html file + # with open(( + # TEST_DATA_LOCATION / + # "emails/email_actions_to_units/expected_rendered_email.html" + # ), "w") as f: + # f.write(html) + + with open( + ( + TEST_DATA_LOCATION + / "emails/email_actions_to_units/expected_rendered_email.html" + ), + "r", + ) as f: + expected_html = f.read() + + assert html == expected_html + + +def test_render_html_when_unit_has_no_actions( + sample_control_unit_actions_without_actions, +): + template = get_template.run() + html = render.run( + control_unit_actions=sample_control_unit_actions_without_actions, + template=template, + ) + + # Uncomment to update the expected html file + # with open(( + # TEST_DATA_LOCATION / + # "emails/email_actions_to_units/expected_rendered_email_without_actions.html"), + # "w") as f: + # f.write(html) + + with open( + ( + TEST_DATA_LOCATION + / "emails/email_actions_to_units/expected_rendered_email_without_actions.html" + ), + "r", + ) as f: + expected_html = f.read() + + assert html == expected_html + + +@pytest.mark.parametrize("test_mode", [False, True]) +def test_create_email(sample_control_unit_actions, expected_email, test_mode): + email = create_email.run( + html="Bonjour ceci est un email test.", + actions=sample_control_unit_actions, + test_mode=test_mode, + ) + + assert email["Subject"] == expected_email["Subject"] + assert email["From"] == expected_email["From"] + assert ( + email["To"] == CNSP_FRANCE_EMAIL_ADDRESS if test_mode else expected_email["To"] + ) + assert email["Reply-To"] == expected_email["Reply-To"] + assert email.get_content_type() == expected_email.get_content_type() + + body = email.get_body() + expected_body = expected_email.get_body() + assert body.get_content_type() == expected_body.get_content_type() + + assert body.get_charsets() == expected_body.get_charsets() + assert body.get_content() == expected_body.get_content() + + +@pytest.mark.parametrize( + "is_integration,send_email_outcome", + [ + (False, SMTPDataError(100, "Erreur SMTP")), + (False, dict()), + (False, {"email2@email.com": (550, "Email cound not be sent.")}), + (True, Exception("Autre erreur")), + ], +) +@patch("src.pipeline.helpers.emails.send_email") +@patch("src.pipeline.helpers.emails.sleep") +def test_send_mission_actions_email( + mock_sleep, + mock_send_email, + expected_email, + sample_control_unit_actions, + is_integration, + send_email_outcome, +): + def send_email_side_effect(message): + if isinstance(send_email_outcome, Exception): + raise send_email_outcome + else: + return send_email_outcome + + mock_send_email.side_effect = send_email_side_effect + + sent_messages = send_mission_actions_email.run( + message=expected_email, + actions=sample_control_unit_actions, + is_integration=is_integration, + ) + assert len(sent_messages) == 2 + for msg in sent_messages: + success = True + error_code = None + error_message = None + addressee = msg.email_address + if not is_integration: + if isinstance(send_email_outcome, SMTPDataError): + success = False + error_message = ( + "The server replied with an unexpected error code " + "(other than a refusal of a recipient)." + ) + else: + if msg.email_address in send_email_outcome: + success = False + error_code, error_message = send_email_outcome[addressee] + assert isinstance(msg, ControlUnitActionsSentMessage) + assert ( + msg.control_unit_id + == sample_control_unit_actions.control_unit.control_unit_id + ) + assert ( + msg.control_unit_name + == sample_control_unit_actions.control_unit.control_unit_name + ) + assert msg.email_address == addressee + assert msg.actions_min_datetime_utc == sample_control_unit_actions.period.start + assert msg.actions_max_datetime_utc == sample_control_unit_actions.period.end + assert msg.number_of_actions == ( + len(sample_control_unit_actions.sea_controls) + + len(sample_control_unit_actions.land_controls) + + len(sample_control_unit_actions.air_controls) + + len(sample_control_unit_actions.air_surveillances) + ) + assert msg.success == success + assert msg.error_code == error_code + assert msg.error_message == error_message + + +def test_control_unit_actions_list_to_df( + control_unit_actions_sent_messages, control_unit_actions_sent_messages_df +): + df = control_unit_actions_list_to_df.run(control_unit_actions_sent_messages) + pd.testing.assert_frame_equal(df, control_unit_actions_sent_messages_df) + + +def test_load_emails_sent_to_control_units( + reset_test_data, control_unit_actions_sent_messages_df +): + query = "SELECT * FROM emails_sent_to_control_units ORDER BY email_address" + + initial_emails = read_query(db="monitorfish_remote", query=query) + + load_emails_sent_to_control_units.run(control_unit_actions_sent_messages_df) + emails_after_one_run = read_query(db="monitorfish_remote", query=query) + + load_emails_sent_to_control_units.run(control_unit_actions_sent_messages_df) + emails_after_two_runs = read_query(db="monitorfish_remote", query=query) + + assert len(initial_emails) == 0 + assert 2 * len(emails_after_one_run) == len(emails_after_two_runs) == 4 + + +def test_flow(reset_test_data): + start_days_ago = 365 + end_days_ago = 0 + + query = "SELECT * FROM emails_sent_to_control_units ORDER BY email_address" + initial_emails = read_query(db="monitorfish_remote", query=query) + + flow.schedule = None + state = flow.run( + test_mode=False, + is_integration=True, + start_days_ago=start_days_ago, + end_days_ago=end_days_ago, + ) + assert state.is_successful() + + final_emails = read_query(db="monitorfish_remote", query=query) + assert len(initial_emails) == 0 + assert len(final_emails) == 3 + assert (final_emails.number_of_actions == [1, 8, 1]).all() + assert ( + final_emails.email_address + == ["alternative@email", "email8@email.com", "some.email@control.unit.4"] + ).all()