Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support properties field modifiers #752

Draft
wants to merge 7 commits into
base: job-nested-process-workflow
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pygeofilter[fes,backend-native]
# - https://github.com/celery/kombu/pull/1536
# - https://github.com/celery/celery/pull/7834
pymongo>=4.6.3 # either (pymongo>=4, kombu>=5.3.0b2) or (pymongo<4, celery<5.2)
pyparsing
pyramid>=1.7.3
pyramid_beaker>=0.8
# see https://github.com/sontek/pyramid_celery/pull/102 to fix Python 3.12 support and other improvements
Expand Down
16 changes: 15 additions & 1 deletion tests/functional/test_builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@
)
from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import ContentEncoding, ContentType, get_format, repr_json
from weaver.processes.builtin import file_index_selector, jsonarray2netcdf, metalink2netcdf, register_builtin_processes
from weaver.processes.builtin import (
field_modifier_processor,
file_index_selector,
jsonarray2netcdf,
metalink2netcdf,
register_builtin_processes
)
from weaver.processes.constants import JobInputsOutputsSchema
from weaver.status import Status
from weaver.utils import create_metalink, fully_qualified_name, get_path_kvp
Expand Down Expand Up @@ -1099,3 +1105,11 @@ def test_file_index_selector_invalid_out_dir():
with pytest.raises(ValueError) as err:
file_index_selector.main("-f", "", "-i", "1", "-o", tmp_out_dir)
assert "does not exist" in str(err.value)


def test_field_modifier_processor_expression_variables():
expr = field_modifier_processor.create_expression_parser()
calc = "properties.eo:cloud_cover + 1"
var = {"properties.eo:cloud_cover": 2}
val = expr.parse_string(calc)[0].eval(var, dict.__getitem__)
assert val == 3.0
183 changes: 182 additions & 1 deletion tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
from weaver.wps_restapi import swagger_definitions as sd

if TYPE_CHECKING:
from typing import List
from typing import Dict, List

from responses import RequestsMock

Expand Down Expand Up @@ -1185,6 +1185,7 @@ def test_deploy_block_builtin_processes_from_api(self):
process = self.process_store.fetch_by_id(self._testMethodName)
assert process.type == ProcessType.APPLICATION

@pytest.mark.oap_part2
@parameterized.expand([
# not allowed even if combined with another known and valid definition
({"UnknownRequirement": {}, CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}}, ),
Expand Down Expand Up @@ -1239,6 +1240,7 @@ def test_deploy_requirement_inline_javascript(self):
_, cwl = self.deploy_process(body, describe_schema=ProcessSchema.OGC)
assert CWL_REQUIREMENT_INLINE_JAVASCRIPT in cwl["requirements"]

@pytest.mark.oap_part2
def test_deploy_merge_complex_io_with_multiple_formats_and_defaults(self):
"""
Test validates that different format types are set on different input variations simultaneously:
Expand Down Expand Up @@ -1586,6 +1588,7 @@ def test_deploy_merge_complex_io_with_multiple_formats_and_defaults(self):
assert pkg["outputs"][3]["type"] == {"type": "array", "items": "File"}
assert "format" not in pkg["outputs"][3], "CWL format array not allowed for outputs."

@pytest.mark.oap_part2
def test_deploy_merge_resolution_io_min_max_occurs(self):
"""
Test validates that various merging/resolution strategies of I/O definitions are properly applied for
Expand Down Expand Up @@ -1755,6 +1758,7 @@ def test_deploy_merge_resolution_io_min_max_occurs(self):
assert pkg["inputs"][13]["id"] == "optional_array_max_fixed_by_wps"
# assert pkg["inputs"][13]["type"] == "string[]?"

@pytest.mark.oap_part2
def test_deploy_merge_valid_io_min_max_occurs_as_str_or_int(self):
"""
Test validates that I/O definitions with ``minOccurs`` and/or ``maxOccurs`` are permitted as both integer and
Expand Down Expand Up @@ -1935,6 +1939,7 @@ def test_execute_job_with_accept_languages(self):
"Expected error description to indicate bad language"
)

@pytest.mark.oap_part1
@mocked_aws_config
@mocked_aws_s3
@mocked_http_file
Expand Down Expand Up @@ -2111,6 +2116,7 @@ def tmp(value):
assert processed_values["test_reference_http_value"] == "THIS IS A GENERATED FILE FOR HTTP TEST"
assert processed_values["test_reference_file_value"] == "THIS IS A GENERATED FILE FOR FILE TEST"

@pytest.mark.oap_part1
def test_execute_job_with_inline_input_values(self):
"""
Validates that the job can receive an object and array types inputs and process them as expected.
Expand Down Expand Up @@ -2246,6 +2252,7 @@ def test_execute_job_with_inline_input_values(self):
assert processed_values["measureFloatInput"] == 10.2
assert processed_values["measureFileInput"] == {"VALUE": {"REF": 1, "MEASUREMENT": 10.3, "UOM": "M"}}

@pytest.mark.oap_part1
def test_execute_job_with_bbox(self):
body = self.retrieve_payload("EchoBoundingBox", "deploy", local=True)
proc = self.fully_qualified_test_name(self._testMethodName)
Expand Down Expand Up @@ -2281,7 +2288,13 @@ def test_execute_job_with_bbox(self):
"Expected the BBOX CRS URI to be interpreted and validated by known WPS definitions."
)

@pytest.mark.oap_part3
def test_execute_job_with_collection_input_geojson_feature_collection(self):
"""
Validate parsing and handling of ``collection`` specified in an input reference to a :term:`GeoJSON` file.

.. versionadded:: 5.8
"""
name = "EchoFeatures"
body = self.retrieve_payload(name, "deploy", local=True)
proc = self.fully_qualified_test_name(self._testMethodName)
Expand Down Expand Up @@ -2331,12 +2344,18 @@ def test_execute_job_with_collection_input_geojson_feature_collection(self):
out_data = json.load(out_fd)
assert out_data["features"] == col_feats["features"]

@pytest.mark.oap_part3
@parameterized.expand([
# note: the following are not *actually* filtering, but just validating formats are respected across code paths
("POST", "cql2-json", {"op": "=", "args": [{"property": "name"}, "test"]}),
("GET", "cql2-text", "property.name = 'test'"),
])
def test_execute_job_with_collection_input_ogc_features(self, filter_method, filter_lang, filter_value):
"""
Validate parsing and handling of ``collection`` specified in an input with an `OGC API - Features` reference.

.. versionadded:: 5.8
"""
name = "EchoFeatures"
body = self.retrieve_payload(name, "deploy", local=True)
proc = self.fully_qualified_test_name(self._testMethodName)
Expand Down Expand Up @@ -2394,6 +2413,168 @@ def test_execute_job_with_collection_input_ogc_features(self, filter_method, fil
out_data = json.load(out_fd)
assert out_data["features"] == col_feats["features"]

@pytest.mark.oap_part3
def test_execute_job_with_collection_input_stac(self):
"""
Validate parsing and handling of ``collection`` specified in an input with :term:`STAC` :term:`API` endpoint.

.. versionadded:: 5.8
.. versionchanged:: 6.0
Evaluate the dynamic insertion of a ``properties`` definition into the retrieved collection features.
"""
name = "EchoFeatures"
body = self.retrieve_payload(name, "deploy", local=True)
proc = self.fully_qualified_test_name(self._testMethodName)
self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc)

with contextlib.ExitStack() as stack:
tmp_host = "https://mocked-file-server.com" # must match collection prefix hostnames
tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) # pylint: disable=R1732
tmp_svr = stack.enter_context(
mocked_file_server(tmp_dir, tmp_host, settings=self.settings, mock_browse_index=True)
)
exec_body_val = self.retrieve_payload(name, "execute", local=True)
col_feats = exec_body_val["inputs"]["features"]["value"] # type: JSON
col_feats["features"][0]["properties"]["data"] = 10
field_props = {
"properties.result": "A * (B + C) / properties.data", # = 21
# intermediate variables, should not be set
"B": 123,
"C": -50,
"A": 3,
}
filter_lang = "cql2-json"
filter_value = {"op": "=", "args": [{"property": "name"}, "test"]}
search_body = {
# note: 'properties' are not in search request, separate post-operation
"collections": ["test"],
"filter": filter_value,
"filter-lang": filter_lang,
}
search_match = responses.matchers.json_params_matcher(search_body)
tmp_svr.add("POST", f"{tmp_host}/search", json=col_feats, match=[search_match])

col_exec_body = {
"mode": ExecuteMode.ASYNC,
"response": ExecuteResponse.DOCUMENT,
"inputs": {
"features": {
"collection": f"{tmp_host}/collections/test",
"format": ExecuteCollectionFormat.STAC,
"type": ContentType.APP_GEOJSON,
"filter-lang": filter_lang,
"filter": filter_value,
"properties": field_props,
}
}
}

for mock_exec in mocked_execute_celery():
stack.enter_context(mock_exec)
proc_url = f"/processes/{proc}/execution"
resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5,
data=col_exec_body, headers=self.json_headers, only_local=True)
assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}"

status_url = resp.json["location"]
results = self.monitor_job(status_url)
assert "features" in results

job_id = status_url.rsplit("/", 1)[-1]
wps_dir = get_wps_output_dir(self.settings)
job_dir = os.path.join(wps_dir, job_id)
job_out = os.path.join(job_dir, "features", "features.geojson")
assert os.path.isfile(job_out), f"Invalid output file not found: [{job_out}]"
with open(job_out, mode="r", encoding="utf-8") as out_fd:
out_data = json.load(out_fd)

assert "features" in out_data and isinstance(out_data["features"], list)
assert all("properties" in feat for feat in out_data["features"])
out_prop = {"name": "test"}
out_prop.update(field_props)
for feat_src, feat_out in zip(col_feats["features"], out_data["features"]):
assert feat_src["properties"] == {"name": "test"}
assert feat_out["properties"] == out_prop

@pytest.mark.oap_part3
def test_execute_job_with_nested_process_properties_field_modifier(self):
"""
Validate parsing and handling of ``properties`` specified for a nested process execution.

.. versionadded:: 6.0
"""
name = "EchoFeatures"
body = self.retrieve_payload(name, "deploy", local=True)
proc = self.fully_qualified_test_name(self._testMethodName)
self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc)

with contextlib.ExitStack() as stack:
tmp_host = "https://mocked-file-server.com" # must match collection prefix hostnames
tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) # pylint: disable=R1732
tmp_svr = stack.enter_context(
mocked_file_server(tmp_dir, tmp_host, settings=self.settings, mock_browse_index=True)
)

exec_body_val = self.retrieve_payload(name, "execute", local=True)
col_file = os.path.join(tmp_dir, "test.json")
col_feats = exec_body_val["inputs"]["features"]["value"] # type: JSON
with open(col_file, mode="w", encoding="utf-8") as tmp_feature_collection_geojson:
json.dump(col_feats, tmp_feature_collection_geojson)

filter_value = {
"filter": "property.name = 'test'"
}
filter_match = responses.matchers.json_params_matcher(filter_value)
tmp_svr.add("POST", f"{tmp_host}/collections/test/items", json=col_feats, match=[filter_match])

proc_url = f"/processes/{proc}"
exec_url = f"{proc_url}/execution"
col_exec_body = {
"mode": ExecuteMode.ASYNC,
"response": ExecuteResponse.DOCUMENT,
"process": proc_url,
"inputs": {
"features": {
"process": proc_url,
"inputs": {
"features": {
# accessed directly as a static GeoJSON FeatureCollection
"collection": "https://mocked-file-server.com/test.json",
"format": ExecuteCollectionFormat.GEOJSON,
"schema": "http://www.opengis.net/def/glossary/term/FeatureCollection",
"filter": "property.name = 'test'",
"properties": {
"nested": "1 / 2"
}
}
},
"properties": {
"extra": 3.1416
}
}
}
}

for mock_exec in mocked_execute_celery():
stack.enter_context(mock_exec)
resp = mocked_sub_requests(self.app, "post_json", exec_url, timeout=5,
data=col_exec_body, headers=self.json_headers, only_local=True)
assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}"

status_url = resp.json["location"]
results = self.monitor_job(status_url)
assert "features" in results

job_id = status_url.rsplit("/", 1)[-1]
wps_dir = get_wps_output_dir(self.settings)
job_dir = os.path.join(wps_dir, job_id)
job_out = os.path.join(job_dir, "features", "features.geojson")
assert os.path.isfile(job_out), f"Invalid output file not found: [{job_out}]"
with open(job_out, mode="r", encoding="utf-8") as out_fd:
out_data = json.load(out_fd)

assert out_data["features"] == col_feats["features"]

def test_execute_job_with_context_output_dir(self):
cwl = {
"cwlVersion": "v1.0",
Expand Down
Loading
Loading