Skip to content

Commit

Permalink
Add init_pno_subscriptions flow
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentAntoine committed May 2, 2024
1 parent 6389354 commit 3d047db
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ frontend/cypress/videos/

# Local data
/datascience/data
/datascience/src/pipeline/data/non_commited_data/pno_ports_subscriptions.csv
/datascience/src/pipeline/data/non_commited_data/pno_segments_subscriptions.csv
/datascience/src/pipeline/data/non_commited_data/pno_vessels_subscriptions.csv

# Python
*.pyc
Expand Down
2 changes: 2 additions & 0 deletions datascience/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# Package structure
ROOT_DIRECTORY = Path(__file__).parent
LIBRARY_LOCATION = ROOT_DIRECTORY / Path("src")
PIPELINE_DATA_LOCATION = LIBRARY_LOCATION / Path("pipeline/data")
NON_COMMITED_DATA_LOCATION = PIPELINE_DATA_LOCATION / Path("non_commited_data")
QUERIES_LOCATION = LIBRARY_LOCATION / Path("pipeline/queries")
TEST_DATA_LOCATION = ROOT_DIRECTORY / Path("tests/test_data")
LOCAL_MIGRATIONS_FOLDER = str(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
control_unit_id,port_locode,receive_all_pnos
10052,FRALM,false
10052,FRPOV,false
10052,FRSRL,false
10052,FRCR2,false
10952,FRLCT,false
10952,FRPDB,true
10952,FRPSL,true
10952,MQKF4,true
10952,FRAMA,true
10952,FRCR2,true
10952,FRETB,true
10052,FRLCT,true
15652,FRMRS,true
15659,FRMTU,false
15659,FRPDB,false
15659,FRPSL,false
15659,FRRYR,false
15659,FRXSR,false
15657,YTDZA,false
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
control_unit_id,segment
10052,seg_1
10052,seg_2
15652,seg_1
15659,seg_5
15659,seg_4
15659,seg_9
15657,seg_1
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
control_unit_id,cfr
1,ABCDEFGHIJ1
1,ABCDEFGHIJ2
2,ABCDEFGHIJ3
5,ABCDEFGHIJ1
1,ABCDEFGHIJ7
102 changes: 102 additions & 0 deletions datascience/src/pipeline/flows/init_pno_subscriptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from pathlib import Path

import pandas as pd
import prefect
from prefect import Flow, Parameter, case, task
from prefect.executors import LocalDaskExecutor

from config import NON_COMMITED_DATA_LOCATION
from src.pipeline.generic_tasks import load
from src.pipeline.shared_tasks.control_flow import check_flow_not_running


@task(checkpoint=False)
def extract_pno_ports_subscriptions(filename: str):
return pd.read_csv(
NON_COMMITED_DATA_LOCATION / filename,
encoding="utf8",
dtype={"receive_all_pnos": bool},
)


@task(checkpoint=False)
def extract_pno_segments_subscriptions(filename: str):
return pd.read_csv(
NON_COMMITED_DATA_LOCATION / filename,
encoding="utf8",
)


@task(checkpoint=False)
def extract_pno_vessels_subscriptions(filename: str):
return pd.read_csv(
NON_COMMITED_DATA_LOCATION / filename,
encoding="utf8",
)


@task(checkpoint=False)
def load_pno_ports_subscriptions(pno_ports_subscriptions: pd.DataFrame):
logger = prefect.context.get("logger")
load(
pno_ports_subscriptions,
table_name="pno_ports_subscriptions",
schema="public",
db_name="monitorfish_remote",
logger=logger,
how="replace",
)


@task(checkpoint=False)
def load_pno_segments_subscriptions(pno_segments_subscriptions: pd.DataFrame):
logger = prefect.context.get("logger")
load(
pno_segments_subscriptions,
table_name="pno_segments_subscriptions",
schema="public",
db_name="monitorfish_remote",
logger=logger,
how="replace",
)


@task(checkpoint=False)
def load_pno_vessels_subscriptions(pno_vessels_subscriptions: pd.DataFrame):
logger = prefect.context.get("logger")
load(
pno_vessels_subscriptions,
table_name="pno_vessels_subscriptions",
schema="public",
db_name="monitorfish_remote",
logger=logger,
how="replace",
)


with Flow("Init pno subscriptions", executor=LocalDaskExecutor()) as flow:
flow_not_running = check_flow_not_running()
with case(flow_not_running, True):
pno_ports_subscriptions_file_name = Parameter(
"pno_ports_subscriptions_file_name"
)
pno_segments_subscriptions_file_name = Parameter(
"pno_segments_subscriptions_file_name"
)
pno_vessels_subscriptions_file_name = Parameter(
"pno_vessels_subscriptions_file_name"
)
pno_ports_subscriptions = extract_pno_ports_subscriptions(
pno_ports_subscriptions_file_name
)
pno_segments_subscriptions = extract_pno_segments_subscriptions(
pno_segments_subscriptions_file_name
)
pno_vessels_subscriptions = extract_pno_vessels_subscriptions(
pno_vessels_subscriptions_file_name
)
load_pno_ports_subscriptions(pno_ports_subscriptions)
load_pno_segments_subscriptions(pno_segments_subscriptions)
load_pno_vessels_subscriptions(pno_vessels_subscriptions)

flow.file_name = Path(__file__).name
17 changes: 16 additions & 1 deletion datascience/src/pipeline/flows_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
MINIMUM_CONSECUTIVE_POSITIONS,
MINIMUM_MINUTES_OF_EMISSION_AT_SEA,
MONITORFISH_VERSION,
NON_COMMITED_DATA_LOCATION,
ROOT_DIRECTORY,
TEST_MODE,
)
Expand All @@ -36,6 +37,7 @@
fishing_gear_codes,
foreign_fmcs,
infractions,
init_pno_subscriptions,
init_pno_types,
init_species_groups,
last_positions,
Expand Down Expand Up @@ -276,6 +278,7 @@
foreign_fmcs.flow,
infractions.flow,
init_pno_types.flow,
init_pno_subscriptions.flow,
init_species_groups.flow,
last_positions.flow,
missing_far_alerts.flow,
Expand Down Expand Up @@ -310,7 +313,7 @@

################### Define flows' run config ####################
for flow in flows_to_register:
if flow.name == "Logbook":
if flow.name == logbook.flow.name:
host_config = {
"group_add": [LOGBOOK_FILES_GID],
"mounts": [
Expand All @@ -321,6 +324,18 @@
)
],
}

elif flow.name in (init_pno_subscriptions.flow.name,):
host_config = {
"mounts": [
Mount(
target=NON_COMMITED_DATA_LOCATION.as_posix(),
source="/opt/pipeline-data",
type="bind",
)
],
}

else:
host_config = None

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import pandas as pd

from src.pipeline.flows.init_pno_subscriptions import flow
from src.read_query import read_query
from tests.mocks import mock_check_flow_not_running

flow.replace(flow.get_tasks("check_flow_not_running")[0], mock_check_flow_not_running)


def test_flow(reset_test_data):
pno_ports_subscriptions_query = (
"SELECT * FROM pno_ports_subscriptions ORDER BY control_unit_id, port_locode"
)
pno_segments_subscriptions_query = (
"SELECT * FROM pno_segments_subscriptions ORDER BY control_unit_id, segment"
)
pno_vessels_subscriptions_query = (
"SELECT * FROM pno_vessels_subscriptions ORDER BY control_unit_id, cfr"
)
initial_pno_ports_subscriptions = read_query(
pno_ports_subscriptions_query, db="monitorfish_remote"
)
initial_pno_segments_subscriptions = read_query(
pno_segments_subscriptions_query, db="monitorfish_remote"
)
initial_pno_vessels_subscriptions = read_query(
pno_vessels_subscriptions_query, db="monitorfish_remote"
)

flow.schedule = None
state = flow.run(
pno_ports_subscriptions_file_name="dummy_pno_ports_subscriptions.csv",
pno_segments_subscriptions_file_name="dummy_pno_segments_subscriptions.csv",
pno_vessels_subscriptions_file_name="dummy_pno_vessels_subscriptions.csv",
)
assert state.is_successful()

pno_ports_subscriptions_first_run = read_query(
pno_ports_subscriptions_query, db="monitorfish_remote"
)
pno_segments_subscriptions_first_run = read_query(
pno_segments_subscriptions_query, db="monitorfish_remote"
)
pno_vessels_subscriptions_first_run = read_query(
pno_vessels_subscriptions_query, db="monitorfish_remote"
)

assert len(initial_pno_ports_subscriptions) == 0
assert len(initial_pno_segments_subscriptions) == 0
assert len(initial_pno_vessels_subscriptions) == 0

assert len(pno_ports_subscriptions_first_run) == 19
assert len(pno_segments_subscriptions_first_run) == 7
assert len(pno_vessels_subscriptions_first_run) == 5

# Re-running should succeed and lead to the same pno types
state = flow.run(
pno_ports_subscriptions_file_name="dummy_pno_ports_subscriptions.csv",
pno_segments_subscriptions_file_name="dummy_pno_segments_subscriptions.csv",
pno_vessels_subscriptions_file_name="dummy_pno_vessels_subscriptions.csv",
)
assert state.is_successful()

pno_ports_subscriptions_second_run = read_query(
pno_ports_subscriptions_query, db="monitorfish_remote"
)
pno_segments_subscriptions_second_run = read_query(
pno_segments_subscriptions_query, db="monitorfish_remote"
)
pno_vessels_subscriptions_second_run = read_query(
pno_vessels_subscriptions_query, db="monitorfish_remote"
)

pd.testing.assert_frame_equal(
pno_ports_subscriptions_first_run, pno_ports_subscriptions_second_run
)

pd.testing.assert_frame_equal(
pno_segments_subscriptions_first_run, pno_segments_subscriptions_second_run
)

pd.testing.assert_frame_equal(
pno_vessels_subscriptions_first_run, pno_vessels_subscriptions_second_run
)

0 comments on commit 3d047db

Please sign in to comment.