Skip to content

Commit

Permalink
Merge pull request #25 from eodcgmbh/remove-csw-dependency
Browse files Browse the repository at this point in the history
Remove csw dependency
  • Loading branch information
lforesta authored Sep 25, 2020
2 parents 5da3bf3 + 5bce119 commit 892b32f
Show file tree
Hide file tree
Showing 23 changed files with 347 additions and 410 deletions.
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ package_dir =
setup_requires = pyscaffold>=3.1a0,<3.2a0
# Add here dependencies of your project (semicolon/line-separated), e.g.
install_requires =
owslib
openeo-pg-parser
regex
requests
Expand Down
25 changes: 16 additions & 9 deletions src/eodc_openeo_bindings/job_writer/basic_writer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Union, Tuple
from typing import List, Union, Tuple

from eodc_openeo_bindings.job_writer.job_domain import BasicJobDomain
from eodc_openeo_bindings.job_writer.job_writer import JobWriter
Expand All @@ -9,13 +9,16 @@
class BasicJobWriter(JobWriter):

def write_job(self, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str], output_filepath: str = None):
process_defs: Union[dict, list, str], in_filepaths: List[str],
output_filepath: str = None):
return super().write_job(process_graph_json=process_graph_json, job_data=job_data,
process_defs=process_defs, output_filepath=output_filepath)
process_defs=process_defs, in_filepaths=in_filepaths,
output_filepath=output_filepath)

def get_domain(self, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str], output_filepath: str = None):
return BasicJobDomain(process_graph_json, job_data, process_defs, output_filepath)
process_defs: Union[dict, list, str], in_filepaths: List[str],
output_filepath: str = None):
return BasicJobDomain(process_graph_json, job_data, process_defs, in_filepaths, output_filepath)

def get_imports(self, domain) -> str:
return '''\
Expand Down Expand Up @@ -47,9 +50,13 @@ def get_nodes(self, domain: BasicJobDomain) -> Tuple[dict, list]:
for node in nodes:
node_id = node[0]
params = node[1]
filepaths = node[2]
node_dependencies = node[3]
node_operator = node[4]
node_dependencies = node[2]
node_operator = node[3]

if not node_dependencies:
filepaths = domain.in_filepaths
else:
filepaths = None

if filepaths:
filepaths0 = 'filepaths = '
Expand Down Expand Up @@ -78,7 +85,7 @@ def get_nodes(self, domain: BasicJobDomain) -> Tuple[dict, list]:

for node in nodes:
node_id = node[0]
node_dependencies = node[3]
node_dependencies = node[2]

current_index = translated_nodes_keys.index(node_id)
dep_indices = []
Expand Down
35 changes: 22 additions & 13 deletions src/eodc_openeo_bindings/job_writer/dag_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ def __init__(self, job_id_extensions: Optional[Dict[str, str]] = None):
def get_domain(self,
job_id: str,
user_name: str,
dags_folder: str,
process_graph_json: Union[str, dict],
job_data: str,
process_defs: Union[dict, list, str],
in_filepaths: List[str],
user_email: str = None,
job_description: str = None,
parallelize_tasks: bool = False,
Expand All @@ -36,9 +38,11 @@ def get_domain(self,
return AirflowDagDomain(job_id=job_id,
job_id_extension=self.job_id_extensions,
user_name=user_name,
dags_folder=dags_folder,
process_graph_json=process_graph_json,
job_data=job_data,
process_defs=process_defs,
in_filepaths=in_filepaths,
user_email=user_email,
job_description=job_description,
parallelize_tasks=parallelize_tasks,
Expand All @@ -47,31 +51,32 @@ def get_domain(self,
add_parallel_sensor=add_parallel_sensor,
)

def write_job(self, job_id: str, user_name: str, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str],
def write_job(self, job_id: str, user_name: str, dags_folder: str, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str], in_filepaths: List[str],
user_email: str = None, job_description: str = None, parallelize_tasks: bool = False,
vrt_only: bool = False, add_delete_sensor: bool = False, add_parallel_sensor: bool = False):
return super().write_job(job_id=job_id, user_name=user_name, process_graph_json=process_graph_json,
job_data=job_data, process_defs=process_defs, user_email=user_email, job_description=job_description,
return super().write_job(job_id=job_id, user_name=user_name, dags_folder=dags_folder, process_graph_json=process_graph_json,
job_data=job_data, process_defs=process_defs, in_filepaths=in_filepaths, user_email=user_email, job_description=job_description,
parallelize_tasks=parallelize_tasks, vrt_only=vrt_only,
add_delete_sensor=add_delete_sensor, add_parallel_sensor=add_parallel_sensor)

def move_dag(self, filepath: str):
def move_dag(self, filepath: str, dags_folder: str):
# Move file to DAGs folder (must copy/delete because of different volume mounts)
copyfile(filepath, os.environ.get('AIRFLOW_DAGS') + "/" + filepath)
#copyfile(filepath, os.environ.get('AIRFLOW_DAGS') + "/" + filepath)
copyfile(filepath, dags_folder + "/" + filepath)
os.remove(filepath)

def write_and_move_job(self, job_id: str, user_name: str, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str],
def write_and_move_job(self, job_id: str, user_name: str, dags_folder: str, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str], in_filepaths: List[str],
user_email: str = None, job_description: str = None, parallelize_tasks: bool = False,
vrt_only: bool = False, add_delete_sensor: bool = False, add_parallel_sensor: bool = False):
_, domain = self.write_job(job_id, user_name, process_graph_json, job_data, process_defs, user_email, job_description,
_, domain = self.write_job(job_id, user_name, dags_folder, process_graph_json, job_data, process_defs, in_filepaths, user_email, job_description,
parallelize_tasks, vrt_only, add_delete_sensor, add_parallel_sensor)
self.move_dag(domain.filepath)
self.move_dag(domain.filepath, domain.dags_folder)

def rewrite_and_move_job(self, domain: AirflowDagDomain):
_, domain = self.rewrite_job(domain)
self.move_dag(domain.filepath)
self.move_dag(domain.filepath, domain.dags_folder)

def get_imports(self, domain: AirflowDagDomain) -> str:
imports = '''\
Expand Down Expand Up @@ -139,11 +144,15 @@ def get_dependencies_txt(self, node_id, dependencies):
def _get_node_info(self, node, domain):
node_id = node[0]
params = node[1]
filepaths = node[2]
node_dependencies = node[3]
node_dependencies = node[2]

if node_dependencies:
node_dependencies = self.utils.get_existing_node(domain.job_data, node_dependencies)
# Assumption: dependency means that this is *not* a 'load_collection' node, hence do not write input filepaths into its mapping
filepaths = None
else:
# Assumption: no dependency means that this is a 'load_collection' node
filepaths = domain.in_filepaths

return node_id, params, filepaths, node_dependencies

Expand Down
12 changes: 9 additions & 3 deletions src/eodc_openeo_bindings/job_writer/job_domain.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union
from typing import List, Union

from eodc_openeo_bindings.job_writer.utils import JobIdExtension

Expand All @@ -15,10 +15,12 @@ def get_filepath(self) -> str:
class BasicJobDomain(JobDomain):

def __init__(self, process_graph_json: Union[str, dict], job_data: str,
process_defs: Union[dict, list, str], output_filepath: str = None):
process_defs: Union[dict, list, str], in_filepaths: List[str],
output_filepath: str = None):
self.process_graph_json = process_graph_json
self.job_data = job_data
self.process_defs= process_defs
self.in_filepaths = in_filepaths
self.output_filepath = output_filepath
super(BasicJobDomain, self).__init__()

Expand All @@ -31,10 +33,12 @@ class AirflowDagDomain(JobDomain):
def __init__(self,
job_id: str,
job_id_extension: JobIdExtension,
dags_folder: str,
user_name: str,
process_graph_json: Union[str, dict],
job_data: str,
process_defs: Union[dict, list, str],
in_filepaths: List[str],
user_email: str = None,
job_description: str = None,
parallelize_tasks: bool = False,
Expand All @@ -61,9 +65,11 @@ def __init__(self,
"""
self.process_graph_json = process_graph_json
self.job_data = job_data
self.process_defs= process_defs
self.process_defs = process_defs
self.in_filepaths = in_filepaths
self.job_id = job_id
self.job_id_extension = job_id_extension
self.dags_folder = dags_folder
self.user_name = user_name
self.user_email = user_email
self.job_description = job_description if job_description else 'No description provided'
Expand Down
55 changes: 1 addition & 54 deletions src/eodc_openeo_bindings/map_cubes_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,6 @@ def map_load_collection(process):
Retrieves a file list and maps bbox and band filters to eoDataReaders.
"""

# Get list of filepaths fro csw server
filepaths = csw_query(collection=process['arguments']["id"],
spatial_extent=(
process['arguments']['spatial_extent']['south'],
process['arguments']['spatial_extent']['west'],
process['arguments']['spatial_extent']['north'],
process['arguments']['spatial_extent']['east']
),
temporal_extent=process['arguments']["temporal_extent"]
)

dict_item_list = []

# Map band filter
Expand All @@ -36,7 +25,7 @@ def map_load_collection(process):
dict_item = map_filter_bbox(process)[0]
dict_item_list.append(dict_item)

return dict_item_list, filepaths
return dict_item_list


def map_filter_bands(process):
Expand Down Expand Up @@ -211,48 +200,6 @@ def map_add_dimension(process):
return dict_item_list


def csw_query(collection, spatial_extent, temporal_extent):
"""
Retrieves a file list from the EODC CSW server according to the specified parameters.
"""

if collection == 'SIG0':
csw = CatalogueServiceWeb(environ.get('OEO_CSW_SERVER_DC'), timeout=300)
else:
csw = CatalogueServiceWeb(environ.get('OEO_CSW_SERVER'), timeout=300)
constraints = []

# Collection filter
if collection == 'SIG0':
constraints.append(PropertyIsEqualTo('eodc:variable_name', collection))
else:
constraints.append(PropertyIsLike('apiso:ParentIdentifier', collection))
# Spatial filter
constraints.append(BBox(spatial_extent))
# Temporal filter
constraints.append(PropertyIsGreaterThan('apiso:TempExtent_begin', temporal_extent[0]))
constraints.append(PropertyIsLessThan('apiso:TempExtent_end', temporal_extent[1]))

# Run the query
constraints = [constraints]
csw.getrecords2(constraints=constraints, maxrecords=100)

# Put found records in a variable (dictionary)
records0 = csw.records

# Put statistics about results in a variable
#results = csw.results

# Sort records
records = []
for record in records0:
records.append(records0[record].references[0]['url'])
records = sorted(records)

return records


def check_dim_name(dimension_name):
"""
Map common dimension names for spectral and time to fieladnames used in eodatareaders.
Expand Down
24 changes: 9 additions & 15 deletions src/eodc_openeo_bindings/map_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,14 @@ def map_process(process, node_id, is_result, root_folder,

process_params = eval("map_" + process['process_id'] + "(process)")

if isinstance(process_params, tuple):
# This should happen only for "load_collection"
filepaths = process_params[1]
process_params = process_params[0]

if process['arguments']["id"][:2] in ('s1', 's3', 's5'):
# Workaround to use S1 and S3 Level-1 data, or S5p Level-2 data, which are not georeferenced
# TODO for the moment this is a workaround (29.06.2020)
job_params = set_output_folder(root_folder, node_id + '_0')
# Add a 'quick_geocode' step before cropping/clipping
process_params.insert(-1, {'name': 'quick_geocode', 'scale_sampling': '1;int'})
process_params.insert(-1, set_output_folder(root_folder, node_id)[0])
else:
filepaths = None
if process['process_id'] == 'load_collection' and process['arguments']["id"][:2] in ('s1', 's3', 's5'):
# Workaround to use S1 and S3 Level-1 data, or S5p Level-2 data, which are not georeferenced
# TODO for the moment this is a workaround (29.06.2020)
job_params = set_output_folder(root_folder, node_id + '_0')
# Add a 'quick_geocode' step before cropping/clipping
process_params.insert(-1, {'name': 'quick_geocode', 'scale_sampling': '1;int'})
process_params.insert(-1, set_output_folder(root_folder, node_id)[0])

for param in process_params:
job_params.append(param)

Expand All @@ -73,4 +67,4 @@ def map_process(process, node_id, is_result, root_folder,
pickled_filepath = job_params[0]['out_dirpath'] + node_id + '.dc'
job_params.append({'name': 'to_pickle', 'filepath': f'{pickled_filepath};str'})

return job_params, filepaths
return job_params
5 changes: 2 additions & 3 deletions src/eodc_openeo_bindings/openeo_to_eodatareaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@ def openeo_to_eodatareaders(process_graph_json_in: Union[dict, str], job_data: s
if cur_node.content['process_id'] == 'run_udf':
operator = "UdfExec"
params = map_udf(cur_node.content, job_data, cur_node.id)
filepaths = None
else:
operator = "EODataProcessor"
params, filepaths = map_process(
params = map_process(
cur_node.content,
cur_node.id,
cur_node.is_result,
Expand All @@ -97,7 +96,7 @@ def openeo_to_eodatareaders(process_graph_json_in: Union[dict, str], job_data: s
node_dependencies = list(cur_node.dependencies.ids)

# Add to nodes list
nodes.append((cur_node.id, params, filepaths, node_dependencies, operator))
nodes.append((cur_node.id, params, node_dependencies, operator))

return nodes, graph

Expand Down
25 changes: 13 additions & 12 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,20 @@ def backend_processes():


@pytest.fixture()
def setup_airflow_dag_folder(request):
def airflow_dag_folder(request):
test_folder = get_test_folder()
os.environ['AIRFLOW_DAGS'] = os.path.join(test_folder, 'airflow_dag')
if os.path.isdir(os.environ['AIRFLOW_DAGS']):
shutil.rmtree(os.environ['AIRFLOW_DAGS'])
os.makedirs(os.environ['AIRFLOW_DAGS'])
dags_folder = os.path.join(test_folder, 'airflow_dag')
if os.path.isdir(dags_folder):
shutil.rmtree(dags_folder)
os.makedirs(dags_folder)

def fin():
shutil.rmtree(os.environ['AIRFLOW_DAGS'])
shutil.rmtree(dags_folder)

request.addfinalizer(fin)

return dags_folder


@pytest.fixture()
def setup_ref_job_folder():
Expand All @@ -107,8 +109,8 @@ def airflow_job_folder():


@pytest.fixture()
def csw_server_default(mocker):
csw_server_response = [
def S2_filepaths():
return [
'/s2a_prd_msil1c/2018/06/08/S2A_MSIL1C_20180608T101021_N0206_R022_T32TPS_20180608T135059.zip',
'/s2a_prd_msil1c/2018/06/11/S2A_MSIL1C_20180611T102021_N0206_R065_T32TPS_20180611T123241.zip',
'/s2a_prd_msil1c/2018/06/18/S2A_MSIL1C_20180618T101021_N0206_R022_T32TPS_20180618T135619.zip',
Expand All @@ -117,12 +119,11 @@ def csw_server_default(mocker):
'/s2b_prd_msil1c/2018/06/13/S2B_MSIL1C_20180613T101019_N0206_R022_T32TPS_20180613T122213.zip',
'/s2b_prd_msil1c/2018/06/16/S2B_MSIL1C_20180616T102019_N0206_R065_T32TPS_20180616T154713.zip',
]
mocker.patch('eodc_openeo_bindings.map_cubes_processes.csw_query', return_value=csw_server_response)


@pytest.fixture()
def acube_csw_server_default(mocker):
csw_server_response = [
def ACube_filepaths():
return [
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VHD_20170301_050935--_EU010M_E052N015T1.tif',
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VHD_20170301_051000--_EU010M_E052N015T1.tif',
'sig0/SIG0-----_SGRTA01_S1A_IWGRDH1VVD_20170301_050935--_EU010M_E052N015T1.tif',
Expand All @@ -140,4 +141,4 @@ def acube_csw_server_default(mocker):
'sig0/SIG0-----_SGRTA01_S1B_IWGRDH1VVD_20170302_050028--_EU010M_E052N016T1.tif',
'sig0/SIG0-----_SGRTA01_S1B_IWGRDH1VVD_20170302_050053--_EU010M_E052N016T1.tif',
]
mocker.patch('eodc_openeo_bindings.map_cubes_processes.csw_query', return_value=csw_server_response)

Loading

0 comments on commit 892b32f

Please sign in to comment.