Skip to content

Commit

Permalink
Merge pull request #23 from eodcgmbh/fix-dag-id-parallel
Browse files Browse the repository at this point in the history
Add proper naming for parallel dags
  • Loading branch information
lforesta authored Sep 10, 2020
2 parents 7bdb523 + cb8ec63 commit d1b0266
Show file tree
Hide file tree
Showing 25 changed files with 1,479 additions and 1,351 deletions.
3 changes: 2 additions & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pytest
pytest-cov
coverage==4.5.4
pytest-mock
coverage==4.5.4
44 changes: 32 additions & 12 deletions src/eodc_openeo_bindings/job_writer/dag_writer.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,51 @@
import os
import json
from shutil import copyfile
from typing import Tuple, List, Optional, Union
from typing import Tuple, List, Optional, Union, Dict

from eodc_openeo_bindings.job_writer.job_domain import AirflowDagDomain
from eodc_openeo_bindings.job_writer.job_writer import JobWriter
from eodc_openeo_bindings.job_writer.utils import JobIdExtension
from eodc_openeo_bindings.openeo_to_eodatareaders import openeo_to_eodatareaders


class AirflowDagWriter(JobWriter):

def __init__(self):
def __init__(self, job_id_extensions: Optional[Dict[str, str]] = None):
self.job_id_extensions = JobIdExtension(job_id_extensions)
self.not_parallelizable_func = (
'filter_bands',
'filter_bbox',
'filter_temporal',
'array_element'
)

def get_domain(self, job_id: str, user_name: str, process_graph_json: Union[str, dict], job_data: str,
def get_domain(self,
job_id: str,
user_name: str,
process_graph_json: Union[str, dict],
job_data: str,
process_defs: Union[dict, list, str],
user_email: str = None, job_description: str = None, parallelize_tasks: bool = False,
vrt_only: bool = False, add_delete_sensor: bool = False, add_parallel_sensor: bool = False) -> AirflowDagDomain:
return AirflowDagDomain(job_id, user_name, process_graph_json, job_data, process_defs, user_email, job_description,
parallelize_tasks, vrt_only, add_delete_sensor, add_parallel_sensor)
user_email: str = None,
job_description: str = None,
parallelize_tasks: bool = False,
vrt_only: bool = False,
add_delete_sensor: bool = False,
add_parallel_sensor: bool = False
) -> AirflowDagDomain:
return AirflowDagDomain(job_id=job_id,
job_id_extension=self.job_id_extensions,
user_name=user_name,
process_graph_json=process_graph_json,
job_data=job_data,
process_defs=process_defs,
user_email=user_email,
job_description=job_description,
parallelize_tasks=parallelize_tasks,
vrt_only=vrt_only,
add_delete_sensor=add_delete_sensor,
add_parallel_sensor=add_parallel_sensor,
)

def write_job(self, job_id: str, user_name: str, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str],
Expand Down Expand Up @@ -82,7 +104,7 @@ def get_additional_header(self, domain: AirflowDagDomain):
'email_on_retry': False,
}}
dag = DAG(dag_id="{domain.job_id}",
dag = DAG(dag_id="{domain.dag_id}",
description="{domain.job_description}",
catchup=True,
max_active_runs=1,
Expand Down Expand Up @@ -141,8 +163,6 @@ def _check_key_is_parallelizable(self, params: List[dict]):

return True

return parallelizable

def expand_node_dependencies(self, node_dependencies, dep_subnodes, split_dependencies=False):
"""
Expand dependencies, in case a node had been split because of parallelization.
Expand Down Expand Up @@ -299,7 +319,7 @@ def parallelise_dag(job_id, user_name, process_graph_json, job_data, process_def
add_delete_sensor=True,
vrt_only=False,
parallelize_tasks=True)
domain.job_id = domain.job_id + "_2"
domain.job_id = "{self.job_id_extensions.get_parallel(domain.job_id)}"
writer.rewrite_and_move_job(domain)
sleep(10) # give a few seconds to Airflow to add DAG to its internal DB
''',
Expand All @@ -314,7 +334,7 @@ def parallelise_dag(job_id, user_name, process_graph_json, job_data, process_def
"trigger_new_dag": f'''
trigger_dag = TriggerDagRunOperator(task_id='trigger_dag',
dag=dag,
trigger_dag_id='{domain.job_id}_2',
trigger_dag_id='{self.job_id_extensions.get_parallel(domain.job_id)}',
queue='process')
''',
"dep_trigger_new_dag": self.get_dependencies_txt("trigger_dag", ["parallelise_dag"]),
Expand Down
55 changes: 47 additions & 8 deletions src/eodc_openeo_bindings/job_writer/job_domain.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Union

from eodc_openeo_bindings.job_writer.utils import JobIdExtension


class JobDomain:

Expand All @@ -26,14 +28,42 @@ def get_filepath(self) -> str:

class AirflowDagDomain(JobDomain):

def __init__(self, job_id: str, user_name: str, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str],
user_email: str = None, job_description: str = None, parallelize_tasks: bool = False,
vrt_only: bool = False, add_delete_sensor: bool = False, add_parallel_sensor: bool = False):
def __init__(self,
job_id: str,
job_id_extension: JobIdExtension,
user_name: str,
process_graph_json: Union[str, dict],
job_data: str,
process_defs: Union[dict, list, str],
user_email: str = None,
job_description: str = None,
parallelize_tasks: bool = False,
vrt_only: bool = False,
add_delete_sensor: bool = False,
add_parallel_sensor: bool = False,
):
"""
Args:
job_id: The identifier of the job. NOT equivalent to the dag_id.
job_id_extension: Class storing and adding job_id extnsions.
user_name: The user name.
user_email: The user email address.
process_graph_json: The process graph.
job_data:
process_defs:
job_description: A short description about the job.
parallelize_tasks: Use existing vrt files and create nodes for parallel tasks.
vrt_only: Whether the dag should ONLY create vrt files. If activated nothing will be processed only
description on how to do this will be created. The created vrt files can be used to run the job in
parallel in a following dag.
add_delete_sensor: Adds an airflow sensor task to ensure dags can be stopped.
add_parallel_sensor: Adds a node which will create and trigger a second parallelized dag based on vrt files.
"""
self.process_graph_json = process_graph_json
self.job_data = job_data
self.process_defs= process_defs
self.job_id = job_id
self.job_id_extension = job_id_extension
self.user_name = user_name
self.user_email = user_email
self.job_description = job_description if job_description else 'No description provided'
Expand All @@ -44,8 +74,17 @@ def __init__(self, job_id: str, user_name: str, process_graph_json: Union[str, d
self.nodes = None # filled if task is parallelised
super(AirflowDagDomain, self).__init__()

def get_filepath(self, **kwargs) -> str:
dag_name = f'dag_{self.job_id}'
@property
def dag_id(self):
"""Get the dag-id based on given parameters."""
if self.parallelize_task:
dag_name += '_parallelize'
return dag_name + '.py'
return f'{self.job_id_extension.get_parallel(self.job_id)}'
else:
return f'{self.job_id_extension.get_preparation(self.job_id)}'

def get_filepath(self, **kwargs) -> str:
"""Create the name of the dag-file.
Here dag-filename and dag-id are equal.
"""
return f'dag_{self.dag_id}.py'
20 changes: 19 additions & 1 deletion src/eodc_openeo_bindings/job_writer/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import glob
import os
from typing import Union, List
from typing import Union, List, Optional, Dict


class JobWriterUtils:
Expand Down Expand Up @@ -93,3 +93,21 @@ def get_dc_filepaths_from_dependencies(self, node_dependencies, job_data):
dc_filepaths = None

return dc_filepaths


class JobIdExtension:

def __init__(self, extensions: Optional[Dict[str, str]] = None) -> None:
self.preparation = "prep"
self.parallel = "parallel"
if isinstance(extensions, dict):
if "preparation" in extensions:
self.preparation = extensions["preparation"]
if "parallel" in extensions:
self.parallel = extensions["parallel"]

def get_preparation(self, job_id: str) -> str:
return f"{job_id}_{self.preparation}"

def get_parallel(self, job_id: str) -> str:
return f"{job_id}_{self.parallel}"
55 changes: 40 additions & 15 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,6 @@ def test_folder():
return get_test_folder()


@pytest.fixture()
def csw_server():
os.environ['OEO_CSW_SERVER'] = 'http://pycsw:8000'


@pytest.fixture()
def acube_csw_server():
os.environ['OEO_CSW_SERVER_DC'] = 'https://csw-acube.eodc.eu/'


@pytest.fixture()
def evi_file():
test_folder = get_test_folder()
Expand Down Expand Up @@ -73,7 +63,7 @@ def fin():
os.remove(path)
request.addfinalizer(fin)
return path


@pytest.fixture()
def out_filepath_basic_apply(request):
Expand All @@ -92,7 +82,6 @@ def backend_processes():
return json.load(open(os.path.join(test_folder, 'backend_processes.json')))['processes']



@pytest.fixture()
def setup_airflow_dag_folder(request):
test_folder = get_test_folder()
Expand All @@ -108,11 +97,47 @@ def fin():


@pytest.fixture()
def setup_ref_job_folder(request):
test_folder = get_test_folder()
os.environ['REF_JOBS'] = os.path.join(test_folder, 'ref_jobs')
def setup_ref_job_folder():
os.environ['REF_JOBS'] = os.path.join(get_test_folder(), 'ref_jobs')


@pytest.fixture()
def airflow_job_folder():
return os.path.join(get_test_folder(), 'ref_airflow_job')


@pytest.fixture()
def csw_server_default(mocker):
csw_server_response = [
'/s2a_prd_msil1c/2018/06/08/S2A_MSIL1C_20180608T101021_N0206_R022_T32TPS_20180608T135059.zip',
'/s2a_prd_msil1c/2018/06/11/S2A_MSIL1C_20180611T102021_N0206_R065_T32TPS_20180611T123241.zip',
'/s2a_prd_msil1c/2018/06/18/S2A_MSIL1C_20180618T101021_N0206_R022_T32TPS_20180618T135619.zip',
'/s2a_prd_msil1c/2018/06/21/S2A_MSIL1C_20180621T102021_N0206_R065_T32TPS_20180621T140615.zip',
'/s2b_prd_msil1c/2018/06/06/S2B_MSIL1C_20180606T102019_N0206_R065_T32TPS_20180606T172808.zip',
'/s2b_prd_msil1c/2018/06/13/S2B_MSIL1C_20180613T101019_N0206_R022_T32TPS_20180613T122213.zip',
'/s2b_prd_msil1c/2018/06/16/S2B_MSIL1C_20180616T102019_N0206_R065_T32TPS_20180616T154713.zip',
]
mocker.patch('eodc_openeo_bindings.map_cubes_processes.csw_query', return_value=csw_server_response)


@pytest.fixture()
def acube_csw_server_default(mocker):
csw_server_response = [
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VHD_20170301_050935--_EU010M_E052N015T1.tif',
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VHD_20170301_051000--_EU010M_E052N015T1.tif',
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VVD_20170301_050935--_EU010M_E052N015T1.tif',
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VVD_20170301_051000--_EU010M_E052N015T1.tif',
'sig0/SIG0-----_SGRTA01_S1B_IWGRDH1VHD_20170302_050053--_EU010M_E052N015T1.tif',
'sig0/SIG0-----_SGRTA01_S1B_IWGRDH1VHD_20170302_050118--_EU010M_E052N015T1.tif',
'sig0/SIG0-----_SGRTA01_S1B_IWGRDH1VVD_20170302_050053--_EU010M_E052N015T1.tif',
'sig0/SIG0-----_SGRTA01_S1B_IWGRDH1VVD_20170302_050118--_EU010M_E052N015T1.tif',
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VHD_20170301_050935--_EU010M_E052N016T1.tif',
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VHD_20170301_051000--_EU010M_E052N016T1.tif',
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VVD_20170301_050935--_EU010M_E052N016T1.tif',
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VVD_20170301_051000--_EU010M_E052N016T1.tif',
'sig0/SIG0-----_SGRTA01_S1B_IWGRDH1VHD_20170302_050028--_EU010M_E052N016T1.tif',
'sig0/SIG0-----_SGRTA01_S1B_IWGRDH1VHD_20170302_050053--_EU010M_E052N016T1.tif',
'sig0/SIG0-----_SGRTA01_S1B_IWGRDH1VVD_20170302_050028--_EU010M_E052N016T1.tif',
'sig0/SIG0-----_SGRTA01_S1B_IWGRDH1VVD_20170302_050053--_EU010M_E052N016T1.tif',
]
mocker.patch('eodc_openeo_bindings.map_cubes_processes.csw_query', return_value=csw_server_response)
Loading

0 comments on commit d1b0266

Please sign in to comment.