From 49478795616a53cf168c81e03c8c9848bf68c9f6 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 3 Feb 2025 22:57:11 -0500 Subject: [PATCH 1/4] unify OAP v1/v2 job status succeeded/successful (relates to https://github.com/opengeospatial/ogcapi-processes/pull/483) + update job subscribers accordingly with status 'category' to allow any status profile --- CHANGES.rst | 7 +- Makefile | 6 +- docs/examples/job_status_ogcapi.json | 4 +- docs/source/appendix.rst | 4 + docs/source/processes.rst | 2 +- tests/functional/test_builtin.py | 2 +- tests/functional/test_cli.py | 66 +++++---- tests/functional/test_job_provenance.py | 6 +- tests/functional/test_workflow.py | 4 +- tests/functional/test_wps_package.py | 32 ++--- tests/functional/utils.py | 6 +- tests/test_datatype.py | 4 +- tests/test_notify.py | 12 +- tests/test_utils.py | 41 ++++-- tests/wps_restapi/test_jobs.py | 28 ++-- tests/wps_restapi/test_processes.py | 2 +- weaver/cli.py | 4 +- weaver/datatype.py | 2 +- weaver/notify.py | 35 +++-- weaver/processes/execution.py | 14 +- weaver/processes/wps1_process.py | 2 +- weaver/processes/wps_process_base.py | 2 +- weaver/status.py | 125 +++++++++++------- .../examples/job_status_success.json | 4 +- weaver/wps_restapi/jobs/jobs.py | 2 +- weaver/wps_restapi/jobs/utils.py | 2 +- .../templates/notification_email_example.mako | 13 +- 27 files changed, 264 insertions(+), 167 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index dd76df4d8..ed9f15b68 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,7 +12,12 @@ Changes Changes: -------- -- No change. +- Replace ``succeeded`` status by ``successful`` everywhere where applicable (as originally defined by OGC API v1), + to align with reversal of the proposed draft name, aligning between both v1 and v2 of `OGC API - Processes` + (relates to `opengeospatial/ogcapi-processes#483 `_). +- Modify `Job` ``subscribers`` definition to employ the normalized ``weaver.status.StatusCategory`` instead + of ``weaver.status.Status`` as mapping keys, such that email and callback notifications are unified under + a common naming convention regardless of the resolved ``weaver.status.StatusCompliant`` representation. Fixes: ------ diff --git a/Makefile b/Makefile index bb51fd744..3abd8d86f 100644 --- a/Makefile +++ b/Makefile @@ -187,7 +187,7 @@ conda-base: ## obtain and install a missing conda distribution echo "Make sure to add '$(CONDA_BIN_DIR)' to your PATH variable in '~/.bashrc'.") .PHONY: conda-clean -clean-clean: ## remove the conda environment +conda-clean: ## remove the conda environment @echo "Removing conda env '$(CONDA_ENV)'" @-test -d "$(CONDA_ENV_PATH)" && "$(CONDA_BIN)" remove -n "$(CONDA_ENV)" --yes --all @@ -344,8 +344,8 @@ clean-docs-dirs: ## remove documentation artifacts (minimal) clean-src: ## remove all *.pyc files @echo "Removing python artifacts..." @-find "$(APP_ROOT)" -type f -name "*.pyc" -exec rm {} \; - @-rm -rf ./build - @-rm -rf ./src + @-rm -rf "$(APP_ROOT)/build" + @-rm -rf "$(APP_ROOT)/src" .PHONY: clean-test clean-test: ## remove files created by tests and coverage analysis diff --git a/docs/examples/job_status_ogcapi.json b/docs/examples/job_status_ogcapi.json index a6b03157d..cddf5d026 100644 --- a/docs/examples/job_status_ogcapi.json +++ b/docs/examples/job_status_ogcapi.json @@ -4,8 +4,8 @@ "processID": "example-process", "providerID": null, "type": "process", - "status": "succeeded", - "message": "Job succeeded.", + "status": "successful", + "message": "Job successful.", "created": "2024-10-02T14:21:12.380000+00:00", "started": "2024-10-02T14:21:12.990000+00:00", "finished": "2024-10-02T14:21:23.629000+00:00", diff --git a/docs/source/appendix.rst b/docs/source/appendix.rst index ea0f76d58..765c9bcc9 100644 --- a/docs/source/appendix.rst +++ b/docs/source/appendix.rst @@ -238,6 +238,10 @@ Glossary .. seealso:: :ref:`quotation_estimator_model` + openEO + | Open Earth Observation + | Cloud backend :term:`API` initiative for unified Earth Observation, as described by :ref:`openEO`. + OpenSearch Protocol of lookup and retrieval of remotely stored files. Please refer to :ref:`OpenSearch Data Source` for details. diff --git a/docs/source/processes.rst b/docs/source/processes.rst index e415e28d1..373427e8f 100644 --- a/docs/source/processes.rst +++ b/docs/source/processes.rst @@ -1087,7 +1087,7 @@ Once the :term:`Job` is submitted, its status should initially switch to ``accep status will change to ``started`` for preparation steps (i.e.: allocation resources, retrieving required parametrization details, etc.), followed by ``running`` when effectively reaching the execution step of the underlying :term:`Application Package` operation. This status will remain as such until the operation completes, either with -``succeeded`` or ``failed`` status. +``successful`` or ``failed`` status. At any moment during |asynchronous|_ execution, the :term:`Job` status can be requested using |status-req|_. Note that depending on the timing at which the user executes this request and the availability of task workers, it could be diff --git a/tests/functional/test_builtin.py b/tests/functional/test_builtin.py index dd41a5270..460fc2e6e 100644 --- a/tests/functional/test_builtin.py +++ b/tests/functional/test_builtin.py @@ -438,7 +438,7 @@ def test_jsonarray2netcdf_execute_sync(self): assert resp.content_type == ContentType.APP_JSON for field in ["status", "created", "finished", "duration", "progress"]: assert field in resp.json - assert resp.json["status"] == Status.SUCCEEDED + assert resp.json["status"] == Status.SUCCESSFUL assert resp.json["progress"] == 100 out_url = f"{job_url}/results" diff --git a/tests/functional/test_cli.py b/tests/functional/test_cli.py index 8c58e1616..17ac8d13d 100644 --- a/tests/functional/test_cli.py +++ b/tests/functional/test_cli.py @@ -593,7 +593,7 @@ def test_execute_manual_monitor_status_and_download_results(self): result = mocked_sub_requests(self.app, self.client.monitor, job_id, timeout=5, interval=1) assert result.success, result.text assert "undefined" not in result.message - assert result.body.get("status") == Status.SUCCEEDED + assert result.body.get("status") == Status.SUCCESSFUL links = result.body.get("links") assert isinstance(links, list) assert len([_link for _link in links if _link["rel"].endswith("results")]) == 1 @@ -684,8 +684,8 @@ def test_execute_subscribers(self): } } - # order important, expect status 'started' (in-progress) to occur before 'succeeded' - # call for 'failed' should never happen since 'succeeded' expected, as validated by above method + # order important, expect status 'started' (in-progress) to occur before 'successful' + # call for 'failed' should never happen since 'successful' expected, as validated by above method assert mocked_requests.call_count == 2, "Should not have called both failed/success callback requests" assert mocked_requests.call_args_list[0].args == ("POST", subscribers["inProgressUri"]) assert mocked_requests.call_args_list[0].kwargs["json"]["status"] in running_statuses # status JSON @@ -698,7 +698,7 @@ def test_execute_subscribers(self): assert mocked_emails.call_args_list[0].args[:2] == (None, subscribers["inProgressEmail"]) assert f"Job {test_proc_byte} Started".encode() in mocked_emails.call_args_list[0].args[-1] assert mocked_emails.call_args_list[1].args[:2] == (None, subscribers["successEmail"]) - assert f"Job {test_proc_byte} Succeeded".encode() in mocked_emails.call_args_list[1].args[-1] + assert f"Job {test_proc_byte} Successful".encode() in mocked_emails.call_args_list[1].args[-1] # NOTE: # For all below '<>_auto_resolve_vault' test cases, the local file referenced in the Execute request body @@ -834,17 +834,19 @@ def test_update_job(self): result = mocked_sub_requests(self.app, self.client.status, job_id) assert result.success + assert isinstance(result.body, dict) assert result.body["title"] == "Random Title" result = mocked_sub_requests(self.app, self.client.inputs, job_id) assert result.success + assert isinstance(result.body, dict) assert result.body["inputs"] == {"message": "new message"} assert result.body["outputs"] == {"output": {}} assert result.body["headers"]["Prefer"] == f"return={ExecuteReturnPreference.REPRESENTATION}; respond-async" @mocked_dismiss_process() def test_dismiss(self): - for status in [Status.ACCEPTED, Status.FAILED, Status.RUNNING, Status.SUCCEEDED]: + for status in [Status.ACCEPTED, Status.FAILED, Status.RUNNING, Status.SUCCESSFUL]: proc = self.test_process["Echo"] job = self.job_store.save_job(task_id="12345678-1111-2222-3333-111122223333", process=proc) job.status = status @@ -859,7 +861,7 @@ def test_jobs_search_multi_status(self): job1 = self.job_store.save_job(task_id=uuid.uuid4(), process=proc, access=Visibility.PUBLIC) job2 = self.job_store.save_job(task_id=uuid.uuid4(), process=proc, access=Visibility.PUBLIC) job3 = self.job_store.save_job(task_id=uuid.uuid4(), process=proc, access=Visibility.PUBLIC) - job1.status = Status.SUCCEEDED + job1.status = Status.SUCCESSFUL job2.status = Status.FAILED job3.status = Status.RUNNING job1 = self.job_store.update_job(job1) @@ -868,10 +870,10 @@ def test_jobs_search_multi_status(self): jobs = [job1, job2, job3] for test_status, job_expect in [ - (Status.SUCCEEDED, [job1]), - ([Status.SUCCEEDED], [job1]), - ([Status.SUCCEEDED, Status.RUNNING], [job1, job3]), - (f"{Status.SUCCEEDED},{Status.RUNNING}", [job1, job3]), + (Status.SUCCESSFUL, [job1]), + ([Status.SUCCESSFUL], [job1]), + ([Status.SUCCESSFUL, Status.RUNNING], [job1, job3]), + (f"{Status.SUCCESSFUL},{Status.RUNNING}", [job1, job3]), (StatusCategory.FINISHED, [job1, job2]), (StatusCategory.FINISHED.value, [job1, job2]), ([StatusCategory.FINISHED], [job1, job2]), @@ -893,7 +895,7 @@ def setUp(self): job = self.job_store.save_job( task_id="12345678-1111-2222-3333-111122223333", process="fake-process", access=Visibility.PUBLIC ) - job.status = Status.SUCCEEDED + job.status = Status.SUCCESSFUL self.test_job = self.job_store.update_job(job) def test_help_operations(self): @@ -1612,7 +1614,7 @@ def test_execute_inputs_capture(self): entrypoint=weaver_cli, only_local=True, ) - assert any(f"\"status\": \"{Status.SUCCEEDED}\"" in line for line in lines) + assert any(f"\"status\": \"{Status.SUCCESSFUL}\"" in line for line in lines) def test_execute_manual_monitor(self): proc = self.test_process["Echo"] @@ -1656,7 +1658,7 @@ def test_execute_manual_monitor(self): ) assert any(f"\"jobID\": \"{job_id}\"" in line for line in lines) - assert any(f"\"status\": \"{Status.SUCCEEDED}\"" in line for line in lines) + assert any(f"\"status\": \"{Status.SUCCESSFUL}\"" in line for line in lines) assert any(f"\"href\": \"{job_ref}/results\"" in line for line in lines) assert any("\"rel\": \"http://www.opengis.net/def/rel/ogc/1.0/results\"" in line for line in lines) @@ -1683,7 +1685,7 @@ def test_execute_auto_monitor(self): only_local=True, ) assert any("\"jobID\": \"" in line for line in lines) # don't care value, self-handled - assert any(f"\"status\": \"{Status.SUCCEEDED}\"" in line for line in lines) + assert any(f"\"status\": \"{Status.SUCCESSFUL}\"" in line for line in lines) assert any("\"rel\": \"http://www.opengis.net/def/rel/ogc/1.0/results\"" in line for line in lines) def test_execute_result_by_reference(self): @@ -1719,7 +1721,7 @@ def test_execute_result_by_reference(self): only_local=True, ) assert any(line.startswith("jobID: ") for line in lines[:2]) # don't care value, self-handled - assert any(f"status: {Status.SUCCEEDED}" in line for line in lines) + assert any(f"status: {Status.SUCCESSFUL}" in line for line in lines) for line in lines: if line.startswith("jobID: "): job_id = line.split(":")[-1].strip() @@ -1823,7 +1825,7 @@ def test_execute_output_context(self, cli_options, expect_output_context): entrypoint=weaver_cli, only_local=True, ) - assert any(f"status: {Status.SUCCEEDED}" in line for line in lines) + assert any(f"status: {Status.SUCCESSFUL}" in line for line in lines) job_id = None for line in lines: if line.startswith("jobID: "): @@ -1906,7 +1908,7 @@ def test_execute_subscriber_options(self): only_local=True, ) data = json.loads(lines[0]) - assert data["status"] == Status.SUCCEEDED + assert data["status"] == Status.SUCCESSFUL job = self.job_store.fetch_by_id(data["jobID"]) # to properly compare, we must decrypt emails (encrypt is not deterministic on multiple calls) @@ -1914,8 +1916,14 @@ def test_execute_subscriber_options(self): for sub, email in subs["emails"].items(): subs["emails"][sub] = decrypt_email(email, self.settings) assert subs == { - "callbacks": {Status.STARTED: test_callback_started, Status.SUCCEEDED: test_callback_success}, - "emails": {Status.STARTED: test_email_started, Status.FAILED: test_email_failed}, + "callbacks": { + StatusCategory.RUNNING.value.lower(): test_callback_started, + StatusCategory.SUCCESS.value.lower(): test_callback_success, + }, + "emails": { + StatusCategory.RUNNING.value.lower(): test_email_started, + StatusCategory.FAILED.value.lower(): test_email_failed, + }, }, "Job subscribers should be as submitted, after combining CLI options, without extra or missing ones." def test_execute_help_details(self): @@ -1991,7 +1999,7 @@ def test_jobs_no_links_limit_status_filters(self): # "weaver", "jobs", "-u", self.url, - "-S", Status.SUCCEEDED, + "-S", Status.SUCCESSFUL, "-N", 1, "-nL", ], @@ -2017,7 +2025,7 @@ def test_jobs_no_links_nested_detail(self): # "weaver", "jobs", "-u", self.url, - "-S", Status.SUCCEEDED, + "-S", Status.SUCCESSFUL, "-D", # when details active, each job lists its own links "-nL", # unless links are requested to be removed (top-most and nested ones) ], @@ -2037,7 +2045,7 @@ def test_jobs_no_links_nested_detail(self): def test_jobs_filter_status_multi(self): self.job_store.clear_jobs() job = self.job_store.save_job(task_id=uuid.uuid4(), process="test-process", access=Visibility.PUBLIC) - job.status = Status.SUCCEEDED + job.status = Status.SUCCESSFUL job_s = self.job_store.update_job(job) job = self.job_store.save_job(task_id=uuid.uuid4(), process="test-process", access=Visibility.PUBLIC) job.status = Status.FAILED @@ -2052,7 +2060,7 @@ def test_jobs_filter_status_multi(self): # "weaver", "jobs", "-u", self.url, - "-S", Status.SUCCEEDED, Status.ACCEPTED, + "-S", Status.SUCCESSFUL, Status.ACCEPTED, "-D", "-nL", # unless links are requested to be removed (top-most and nested ones) ], @@ -2067,9 +2075,9 @@ def test_jobs_filter_status_multi(self): assert isinstance(body["jobs"], list) assert len(body["jobs"]) assert not any(_job["jobID"] == str(job_f.uuid) for _job in body["jobs"]) - assert all(job["status"] in [Status.SUCCEEDED, Status.ACCEPTED] for job in body["jobs"]) + assert all(job["status"] in [Status.SUCCESSFUL, Status.ACCEPTED] for job in body["jobs"]) jobs_accept = list(filter(lambda _job: _job["status"] == Status.ACCEPTED, body["jobs"])) - jobs_success = list(filter(lambda _job: _job["status"] == Status.SUCCEEDED, body["jobs"])) + jobs_success = list(filter(lambda _job: _job["status"] == Status.SUCCESSFUL, body["jobs"])) assert len(jobs_accept) == 1 and jobs_accept[0]["jobID"] == str(job_a.uuid) assert len(jobs_success) == 1 and jobs_success[0]["jobID"] == str(job_s.uuid) @@ -2339,7 +2347,7 @@ def test_job_logs(self): job = self.job_store.save_job(task_id=uuid.uuid4(), process="test-process", access=Visibility.PUBLIC) job.save_log(message="test start", progress=0, status=Status.ACCEPTED) job.save_log(message="test run", progress=50, status=Status.RUNNING) - job.save_log(message="test done", progress=100, status=Status.SUCCEEDED) + job.save_log(message="test done", progress=100, status=Status.SUCCESSFUL) job = self.job_store.update_job(job) lines = mocked_sub_requests( @@ -2358,7 +2366,7 @@ def test_job_logs(self): assert lines[0] == "[" assert f"0% {Status.ACCEPTED}" in lines[1] assert f"50% {Status.RUNNING}" in lines[2] - assert f"100% {Status.SUCCEEDED}" in lines[3] + assert f"100% {Status.SUCCESSFUL}" in lines[3] assert lines[4] == "]" def test_job_exceptions(self): @@ -2394,7 +2402,7 @@ def test_job_exceptions(self): def test_job_statistics(self): job = self.job_store.save_job(task_id=uuid.uuid4(), process="test-process", access=Visibility.PUBLIC) job.statistics = resources.load_example("job_statistics.json") - job.status = Status.SUCCEEDED # error if not completed + job.status = Status.SUCCESSFUL # error if not completed job = self.job_store.update_job(job) lines = mocked_sub_requests( @@ -2488,7 +2496,7 @@ def test_execute_remote_input(self): entrypoint=weaver_cli, only_local=True, ) - assert any(f"\"status\": \"{Status.SUCCEEDED}\"" in line for line in lines) + assert any(f"\"status\": \"{Status.SUCCESSFUL}\"" in line for line in lines) class TestWeaverClientAuthBase(TestWeaverClientBase): diff --git a/tests/functional/test_job_provenance.py b/tests/functional/test_job_provenance.py index a3dc3c422..7d4019693 100644 --- a/tests/functional/test_job_provenance.py +++ b/tests/functional/test_job_provenance.py @@ -161,7 +161,7 @@ def test_job_prov_info_not_acceptable(self): job = self.job_store.save_job( "test", process=self.proc_id, - status=Status.SUCCEEDED + status=Status.SUCCESSFUL ) prov_url = job.prov_url(self.settings) headers = self.json_headers # note: this is the test, while only plain text is supported @@ -229,7 +229,7 @@ def test_job_prov_data_generated_missing(self): job = self.job_store.save_job( "test", process=self.proc_id, - status=Status.SUCCEEDED + status=Status.SUCCESSFUL ) prov_url = job.prov_url(self.settings) resp = self.app.get(prov_url, headers=self.json_headers, expect_errors=True) @@ -244,7 +244,7 @@ def test_job_prov_data_dynamic_missing(self): job = self.job_store.save_job( "test", process=self.proc_id, - status=Status.SUCCEEDED + status=Status.SUCCESSFUL ) prov_url = job.prov_url(self.settings) headers = {"Accept": ContentType.TEXT_PLAIN} diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index 72f2cb15b..12f101716 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -988,7 +988,7 @@ def validate_test_job_execution(self, job_location_url, user_headers=None, user_ lambda: timeout_running > 0, message=( "Maximum timeout reached for job execution test. " - f"Expected job status change from '{Status.RUNNING}' to '{Status.SUCCEEDED}' " + f"Expected job status change from '{Status.RUNNING}' to '{Status.SUCCESSFUL}' " f"within {self.WEAVER_TEST_JOB_RUNNING_MAX_TIMEOUT}s since first '{Status.RUNNING}'." ) ) @@ -1005,7 +1005,7 @@ def validate_test_job_execution(self, job_location_url, user_headers=None, user_ time.sleep(self.WEAVER_TEST_JOB_GET_STATUS_INTERVAL) continue if status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: - failed = status != Status.SUCCEEDED + failed = status not in JOB_STATUS_CATEGORIES[StatusCategory.SUCCESS] logs, details = self.try_retrieve_logs(job_location_url, detailed_results=not failed) self.assert_test( lambda: not failed, diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 0e9022e45..e8ab60465 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -3732,7 +3732,7 @@ def test_execute_single_output_prefer_header_return_representation_literal(self) # request status instead of results since not expecting 'document' JSON in this case status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL assert "Preference-Applied" in resp.headers assert resp.headers["Preference-Applied"] == prefer_header.replace(",", ";") @@ -3779,7 +3779,7 @@ def test_execute_single_output_prefer_header_return_representation_complex(self) # request status instead of results since not expecting 'document' JSON in this case status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL assert "Preference-Applied" in resp.headers assert resp.headers["Preference-Applied"] == prefer_header.replace(",", ";") @@ -4095,7 +4095,7 @@ def test_execute_single_output_response_raw_value_literal(self): # request status instead of results since not expecting 'document' JSON in this case status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] results = self.app.get(f"/jobs/{job_id}/results") @@ -4149,7 +4149,7 @@ def test_execute_single_output_response_raw_value_complex(self): # request status instead of results since not expecting 'document' JSON in this case status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL out_url = get_wps_output_url(self.settings) job_id = status["jobID"] @@ -4199,7 +4199,7 @@ def test_execute_single_output_response_raw_reference_literal(self): # request status instead of results since not expecting 'document' JSON in this case status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -4259,7 +4259,7 @@ def test_execute_single_output_response_raw_reference_complex(self): # request status instead of results since not expecting 'document' JSON in this case status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -4965,7 +4965,7 @@ def test_execute_multi_output_prefer_header_return_representation(self): # request status instead of results since not expecting 'document' JSON in this case status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -5039,7 +5039,7 @@ def test_execute_multi_output_response_raw_value(self): # request status instead of results since not expecting 'document' JSON in this case status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -5121,7 +5121,7 @@ def test_execute_multi_output_response_raw_reference_default_links(self): # request status instead of results since not expecting 'document' JSON in this case status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -5277,7 +5277,7 @@ def test_execute_multi_output_response_raw_mixed(self): # request status instead of results since not expecting 'document' JSON in this case status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -5366,7 +5366,7 @@ def test_execute_multi_output_prefer_header_return_minimal_defaults(self): status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -5436,7 +5436,7 @@ def test_execute_multi_output_prefer_header_return_minimal_override_transmission status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -5512,7 +5512,7 @@ def test_execute_multi_output_response_document_defaults(self): status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -5579,7 +5579,7 @@ def test_execute_multi_output_response_document_mixed(self): status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -5737,7 +5737,7 @@ def test_execute_jobs_async(self): status_url = resp.json["location"] status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL job_id = status["jobID"] out_url = get_wps_output_url(self.settings) @@ -5815,7 +5815,7 @@ def test_execute_jobs_create_trigger(self): # retrieve the execution status status = self.monitor_job(status_url, return_status=True) - assert status["status"] == Status.SUCCEEDED + assert status["status"] == Status.SUCCESSFUL out_url = get_wps_output_url(self.settings) results = self.app.get(f"/jobs/{job_id}/results") diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 4dd2f4022..94738b260 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -493,7 +493,7 @@ def monitor_job(cls, Monitor until the requested status is reached (default: when job is completed). If no value is specified and :paramref:`expect_failed` is enabled, completion status will be a failure. Otherwise, the successful status is used instead. Explicit intermediate status can be requested instead. - Whichever status is specified or defaulted, failed/succeeded statuses will break out of the monitoring loop, + Whichever status is specified or defaulted, failed/success statuses will break out of the monitoring loop, since no more status change is possible. :param expect_failed: If enabled, allow failing status to during status validation. @@ -502,7 +502,7 @@ def monitor_job(cls, :return: result of the successful job, or the status body if requested. :raises AssertionError: when job fails or took too long to complete. """ - final_status = Status.FAILED if expect_failed else (wait_for_status or Status.SUCCEEDED) + final_status = Status.FAILED if expect_failed else (wait_for_status or Status.SUCCESSFUL) def check_job_status(_resp, running=False): # type: (AnyResponseType, bool) -> bool @@ -511,7 +511,7 @@ def check_job_status(_resp, running=False): statuses = [Status.ACCEPTED, Status.RUNNING, final_status] if running else [final_status] assert _resp.status_code == 200, f"Execution failed:\n{pretty}\n{cls._try_get_logs(status_url)}" assert body["status"] in statuses, f"Error job info:\n{pretty}\n{cls._try_get_logs(status_url)}" - return body["status"] in {final_status, Status.SUCCEEDED, Status.FAILED} # break condition + return body["status"] in {final_status, Status.SUCCESSFUL, Status.FAILED} # break condition time.sleep(1) # small delay to ensure process execution had a chance to start before monitoring left = timeout or cls.monitor_timeout diff --git a/tests/test_datatype.py b/tests/test_datatype.py index 418d259a2..30b86b78f 100644 --- a/tests/test_datatype.py +++ b/tests/test_datatype.py @@ -298,7 +298,7 @@ def test_process_split_version(process_id, result): ("title", None, None), ("title", TypeError, TypeError), ("title", 1234, TypeError), - ("status", Status.SUCCEEDED, Status.SUCCEEDED), + ("status", Status.SUCCESSFUL, Status.SUCCESSFUL), ("status", 12345678, ValueError), ("status", "random", ValueError), ("status_message", None, "no message"), @@ -376,7 +376,7 @@ def test_job_updated_status(): assert job.updated == started job["updated"] = None # reset to test auto resolve job.finished = finished - job.status = Status.SUCCEEDED + job.status = Status.SUCCESSFUL assert job.updated == finished diff --git a/tests/test_notify.py b/tests/test_notify.py index 17c9d89ac..ce38a8f75 100644 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -81,7 +81,7 @@ def test_notify_email_job_complete(): with mock.patch("smtplib.SMTP_SSL", autospec=smtplib.SMTP_SSL) as mock_smtp: mock_smtp.return_value.sendmail.return_value = None # sending worked - test_job.status = Status.SUCCEEDED + test_job.status = Status.SUCCESSFUL notify_job_email(test_job, notify_email, settings) mock_smtp.assert_called_with("xyz.test.com", 12345, timeout=1) assert mock_smtp.return_value.sendmail.call_args[0][0] == "test-weaver@email.com" @@ -91,7 +91,7 @@ def test_notify_email_job_complete(): message = message_encoded.decode("utf8") assert "From: Weaver" in message assert f"To: {notify_email}" in message - assert f"Subject: Job {test_job.process} Succeeded" + assert f"Subject: Job {test_job.process} Successful" assert test_job_out_url in message assert test_job_log_url in message assert test_job_err_url not in message @@ -139,7 +139,7 @@ def test_notify_job_email_custom_template(): test_job = Job( task_id=uuid.uuid4(), process="test-process", - status=Status.SUCCEEDED, + status=Status.SUCCESSFUL, settings=settings, ) @@ -152,7 +152,7 @@ def test_notify_job_email_custom_template(): assert message == "\n".join([ "From: Weaver", f"To: {notify_email}", - f"Subject: Job {test_job.process} {Status.SUCCEEDED}", + f"Subject: Job {test_job.process} {Status.SUCCESSFUL}", "", f"Job: {test_url}/processes/{test_job.process}/jobs/{test_job.id}", ]) @@ -192,7 +192,7 @@ def test_notify_job_email_custom_template(): {"weaver.wps_email_notify_template_dir": "", "weaver.wps_email_notify_template_default": "test-default.mako"}, "random-process", - Status.SUCCEEDED, + Status.SUCCESSFUL, False, "test-default.mako", 2 @@ -219,7 +219,7 @@ def test_notify_job_email_custom_template(): {"weaver.wps_email_notify_template_dir": "", "weaver.wps_email_notify_template_default": "test-default.mako"}, "tmp-process", - Status.SUCCEEDED, + Status.SUCCESSFUL, False, "test-default.mako", 1 diff --git a/tests/test_utils.py b/tests/test_utils.py index df268c615..deb1b963e 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -412,16 +412,41 @@ def test_map_status_compliant(compliance, status): assert map_status(status, compliance) in JOB_STATUS_CATEGORIES[compliance] -def test_map_status_back_compatibility_and_special_cases(): - for c in (set(StatusCompliant.values()) - {StatusCompliant.OPENEO}): # type: ignore - assert map_status("successful", c) == Status.SUCCEEDED - assert map_status("successful", StatusCompliant.OPENEO) == Status.FINISHED +@pytest.mark.parametrize( + ["compliance", "status", "expected"], + list( + itertools.product( + [StatusCompliant.OGC], + [Status.SUCCESSFUL, Status.SUCCEEDED, Status.FINISHED], + [Status.SUCCESSFUL], + ) + ) + list( + itertools.product( + [StatusCompliant.OWSLIB, StatusCompliant.PYWPS], + [Status.SUCCESSFUL, Status.SUCCEEDED, Status.FINISHED], + [Status.SUCCEEDED], + ) + ) + list( + itertools.product( + [StatusCompliant.OPENEO], + [Status.SUCCESSFUL, Status.SUCCEEDED, Status.FINISHED], + [Status.FINISHED], + ) + ) +) +def test_map_status_back_compatibility_and_special_cases(compliance, status, expected): + result = map_status(status, compliance) + assert result == expected -def test_map_status_pywps_compliant_as_int_statuses(): - for s in range(len(WPS_STATUS)): - if STATUS_PYWPS_MAP[s] != Status.UNKNOWN: - assert map_status(s, StatusCompliant.PYWPS) in JOB_STATUS_CATEGORIES[StatusCompliant.PYWPS] +@pytest.mark.parametrize( + ["status_index"], + [[idx] for idx in range(len(WPS_STATUS)) if STATUS_PYWPS_MAP[idx] != Status.UNKNOWN], +) +def test_map_status_pywps_compliant_as_int_statuses(status_index): + status = map_status(status_index, StatusCompliant.PYWPS) + assert status in JOB_STATUS_CATEGORIES[StatusCompliant.PYWPS] + assert status == STATUS_PYWPS_MAP[status_index] def test_map_status_pywps_back_and_forth(): diff --git a/tests/wps_restapi/test_jobs.py b/tests/wps_restapi/test_jobs.py index b8f606ff4..b7235d753 100644 --- a/tests/wps_restapi/test_jobs.py +++ b/tests/wps_restapi/test_jobs.py @@ -118,12 +118,12 @@ def setUp(self): self.job_info = [] # type: List[Job] self.make_job(task_id="0000-0000-0000-0000", process=self.process_public.identifier, service=None, - user_id=self.user_editor1_id, status=Status.SUCCEEDED, progress=100, access=Visibility.PUBLIC, + user_id=self.user_editor1_id, status=Status.SUCCESSFUL, progress=100, access=Visibility.PUBLIC, tags=["unique"], logs=[ ("Start", logging.INFO, Status.ACCEPTED, 1), ("Process", logging.INFO, Status.RUNNING, 10), - ("Complete", logging.INFO, Status.SUCCEEDED, 100) + ("Complete", logging.INFO, Status.SUCCESSFUL, 100) ]) self.make_job(task_id="0000-0000-0000-1111", process=self.process_unknown, service=self.service_public.name, tags=["test-two", "other"], @@ -260,7 +260,7 @@ def check_job_format(job): for link_info in job["links"]: assert "href" in link_info and isinstance(link_info["href"], str) assert job["status"] in Status.values() - if job["status"] == Status.SUCCEEDED: + if job["status"] == Status.SUCCESSFUL: assert len([link for link in job["links"] if link["rel"].endswith("results")]) elif job["status"] == Status.FAILED: assert len([link for link in job["links"] if link["rel"].endswith("exceptions")]) @@ -1304,7 +1304,7 @@ def test_get_jobs_duration_min_max_invalid(self): @pytest.mark.oap_part1 def test_get_jobs_by_status_single(self): - test = {"status": Status.SUCCEEDED} + test = {"status": Status.SUCCESSFUL} path = get_path_kvp(sd.jobs_service.path, **test) resp = self.app.get(path, headers=self.json_headers) assert resp.status_code == 200 @@ -1322,7 +1322,7 @@ def test_get_jobs_by_status_single(self): @pytest.mark.oap_part1 def test_get_jobs_by_status_multi(self): - test = {"status": f"{Status.SUCCEEDED},{Status.RUNNING}"} + test = {"status": f"{Status.SUCCESSFUL},{Status.RUNNING}"} path = get_path_kvp(sd.jobs_service.path, **test) resp = self.app.get(path, headers=self.json_headers) assert resp.status_code == 200 @@ -1438,7 +1438,7 @@ def test_job_dismiss_complete_single(self): """ job_success = self.job_info[0] job_failed = self.job_info[1] - assert job_success.status == Status.SUCCEEDED, "Job must be in successful state for test" + assert job_success.status == Status.SUCCESSFUL, "Job must be in successful state for test" assert job_failed.status == Status.FAILED, "Job must be in failed state for test" # create dummy files to validate results flush of successful job @@ -1760,7 +1760,7 @@ def test_job_logs_formats_unsupported(self): def test_job_statistics_missing(self): job = self.job_info[0] - assert job.status == Status.SUCCEEDED, "invalid job status to run test" + assert job.status == Status.SUCCESSFUL, "invalid job status to run test" path = f"/jobs/{job.id}/statistics" resp = self.app.get(path, headers=self.json_headers, expect_errors=True) assert resp.status_code == 404, "even if job is successful, expects not found if no statistics are available" @@ -1770,7 +1770,7 @@ def test_job_statistics_response(self): job = self.make_job( add_info=False, task_id="2222-0000-0000-0000", process=self.process_public.identifier, service=None, - user_id=self.user_admin_id, status=Status.SUCCEEDED, progress=100, access=Visibility.PUBLIC, + user_id=self.user_admin_id, status=Status.SUCCESSFUL, progress=100, access=Visibility.PUBLIC, statistics=stats ) try: @@ -1811,7 +1811,7 @@ def test_job_inputs_response(self): def test_job_outputs_response(self): new_job = self.make_job( task_id=self.fully_qualified_test_name(), process=self.process_public.identifier, service=None, - status=Status.SUCCEEDED, progress=100, access=Visibility.PRIVATE, context="test/context", + status=Status.SUCCESSFUL, progress=100, access=Visibility.PRIVATE, context="test/context", results=[{"id": "test", "value": "data"}], ) @@ -1825,7 +1825,7 @@ def test_job_outputs_response(self): def test_job_run_response(self): raise NotImplementedError # FIXME (https://github.com/crim-ca/weaver/issues/673) - @parameterized.expand([Status.ACCEPTED, Status.RUNNING, Status.FAILED, Status.SUCCEEDED]) + @parameterized.expand([Status.ACCEPTED, Status.RUNNING, Status.FAILED, Status.SUCCESSFUL]) @pytest.mark.oap_part4 def test_job_update_locked(self, status): new_job = self.make_job( @@ -1919,8 +1919,8 @@ def test_job_update_response_contents(self): test_job = self.job_store.fetch_by_id(new_job.id) assert test_job.subscribers == { "callbacks": { - Status.SUCCEEDED: "https://example.com/success", - Status.FAILED: "https://example.com/failed", + StatusCategory.SUCCESS: "https://example.com/success", + StatusCategory.FAILED: "https://example.com/failed", } } @@ -2086,7 +2086,7 @@ def test_job_status_alt_openeo_accept_response(self): Validate retrieval of :term:`Job` status response with alternate value mapping by ``Accept`` header. """ job = self.job_info[0] - assert job.status == Status.SUCCEEDED, "Precondition invalid." + assert job.status == Status.SUCCESSFUL, "Precondition invalid." headers = {"Accept": f"{ContentType.APP_JSON}; profile={JobStatusSchema.OPENEO}"} path = f"/jobs/{job.id}" resp = self.app.get(path, headers=headers) @@ -2129,7 +2129,7 @@ def test_job_status_alt_openeo_profile_response(self): Validate retrieval of :term:`Job` status response with alternate value mapping by ``profile`` query parameter. """ job = self.job_info[0] - assert job.status == Status.SUCCEEDED, "Precondition invalid." + assert job.status == Status.SUCCESSFUL, "Precondition invalid." path = f"/jobs/{job.id}" resp = self.app.get(path, headers=self.json_headers, params={"schema": JobStatusSchema.OPENEO}) assert resp.status_code == 200 diff --git a/tests/wps_restapi/test_processes.py b/tests/wps_restapi/test_processes.py index e8ff8dce7..f927c393a 100644 --- a/tests/wps_restapi/test_processes.py +++ b/tests/wps_restapi/test_processes.py @@ -2343,7 +2343,7 @@ def test_execute_process_missing_required_params(self): execute_data_tests[6][1]["outputs"] = [{"id": "test_output", "transmissionMode": "random"}] def no_op(*_, **__): - return Status.SUCCEEDED + return Status.SUCCESSFUL path = f"/processes/{self.process_public.identifier}/jobs" with contextlib.ExitStack() as stack_exec: diff --git a/weaver/cli.py b/weaver/cli.py index 7ad33715a..547f18a69 100644 --- a/weaver/cli.py +++ b/weaver/cli.py @@ -2052,7 +2052,7 @@ def monitor( job_reference, # type: str timeout=None, # type: Optional[int] interval=None, # type: Optional[int] - wait_for_status=Status.SUCCEEDED, # type: str + wait_for_status=Status.SUCCESSFUL, # type: str url=None, # type: Optional[str] auth=None, # type: Optional[AuthBase] headers=None, # type: Optional[AnyHeadersContainer] @@ -2101,7 +2101,7 @@ def monitor( return OperationResult(False, "Could not find job with specified reference.", {"job": job_reference}) body = resp.json() status = body.get("status") - if status == wait_for_status: + if status == wait_for_status or map_status(status) == map_status(wait_for_status): msg = f"Requested job status reached [{wait_for_status}]." return self._parse_result(resp, success=True, message=msg, with_links=with_links, with_headers=with_headers, output_format=output_format) diff --git a/weaver/datatype.py b/weaver/datatype.py index 86a0d5e23..5839559b3 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -1585,7 +1585,7 @@ def links(self, container=None, self_link=None): if self.status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: job_status = map_status(self.status) - if job_status == Status.SUCCEEDED: + if job_status == Status.SUCCESSFUL: job_links.extend([ {"href": f"{job_url}/outputs", "rel": "outputs", # unofficial "title": "Job outputs of successful process execution (extended outputs with metadata)."}, diff --git a/weaver/notify.py b/weaver/notify.py index 46d17f266..4d83ba3e5 100644 --- a/weaver/notify.py +++ b/weaver/notify.py @@ -15,7 +15,7 @@ from weaver import WEAVER_MODULE_DIR from weaver.datatype import Job from weaver.processes.constants import JobInputsOutputsSchema -from weaver.status import Status +from weaver.status import Status, StatusCategory, map_status from weaver.utils import bytes2str, fully_qualified_name, get_settings, request_extra, str2bytes from weaver.wps_restapi.jobs.utils import get_results @@ -186,21 +186,22 @@ def map_job_subscribers(job_body, settings): notification_email = job_body.get("notification_email") submit_subscribers = job_body.get("subscribers") or {} mapped_subscribers = {} - for status, name, sub_type, alt in [ - (Status.STARTED, "inProgressEmail", "emails", None), - (Status.FAILED, "failedEmail", "emails", notification_email), - (Status.SUCCEEDED, "successEmail", "emails", notification_email), - (Status.STARTED, "inProgressUri", "callbacks", None), - (Status.FAILED, "failedUri", "callbacks", None), - (Status.SUCCEEDED, "successUri", "callbacks", None), + for status_category, name, sub_type, alt in [ + (StatusCategory.RUNNING, "inProgressEmail", "emails", None), + (StatusCategory.FAILED, "failedEmail", "emails", notification_email), + (StatusCategory.SUCCESS, "successEmail", "emails", notification_email), + (StatusCategory.RUNNING, "inProgressUri", "callbacks", None), + (StatusCategory.FAILED, "failedUri", "callbacks", None), + (StatusCategory.SUCCESS, "successUri", "callbacks", None), ]: value = submit_subscribers.get(name) or alt if not value: continue if sub_type == "emails": value = encrypt_email(value, settings) + status_category = status_category.value.lower() mapped_subscribers.setdefault(sub_type, {}) - mapped_subscribers[sub_type][status] = value + mapped_subscribers[sub_type][status_category] = value return mapped_subscribers or None @@ -210,7 +211,12 @@ def send_job_notification_email(job, task_logger, settings): Sends a notification email about the execution status for the subscriber if requested during :term:`Job` submission. """ job_subs = job.subscribers or {} - notification_email = job_subs.get("emails", {}).get(job.status) + job_status_category = map_status(job.status, category=True) + if job_status_category == Status.UNKNOWN: # pragma: no cover + LOGGER.warning("Unknown status unmapped in subscribers notification email: [%s]", job.status) + return + category = job_status_category.value.lower() + notification_email = job_subs.get("emails", {}).get(category) if notification_email: try: email = decrypt_email(notification_email, settings) @@ -229,10 +235,15 @@ def send_job_callback_request(job, task_logger, settings): Send a callback request about the execution status for the subscriber if requested at :term:`Job` execution. """ job_subs = job.subscribers or {} - request_uri = job_subs.get("callbacks", {}).get(job.status) + job_status_category = map_status(job.status, category=True) + if job_status_category == Status.UNKNOWN: # pragma: no cover + LOGGER.warning("Unknown status unmapped in subscribers callback request: [%s]", job.status) + return + category = job_status_category.value.lower() + request_uri = job_subs.get("callbacks", {}).get(category) if request_uri: try: - if job.status != Status.SUCCEEDED: + if job_status_category != StatusCategory.SUCCESS: body = job.json(settings) else: # OGC-compliant request body needed to respect 'subscribers' callback definition diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 1e558f088..dbe7148a1 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -256,10 +256,10 @@ def execute_process(task, job_id, wps_url, headers=None): if execution.isComplete(): msg_progress = f" (status: {job_msg})" if job_msg else "" - if execution.isSucceded(): + if execution.isSucceeded(): wps_package.retrieve_package_job_log(execution, job, progress_min, progress_max) - job.status = map_status(Status.SUCCEEDED) - job.status_message = f"Job succeeded{msg_progress}." + job.status = map_status(Status.SUCCESSFUL) + job.status_message = f"Job {job.status}{msg_progress}." job.progress = progress_max job.save_log(logger=task_logger) job_results = [ @@ -551,6 +551,8 @@ def log_and_update_status(message, progress=None, status=None, *_, **kwargs): # status = update_status(status) if update_progress and progress is not None: progress = update_progress(progress) + if "error" in kwargs: + kwargs["errors"] = kwargs.pop("error") # align with 'save_log' parameters job.save_log(message=message, progress=progress, status=status, **kwargs) store.update_job(job) return log_and_update_status @@ -580,7 +582,9 @@ def parse_wps_inputs(wps_process, job, container=None): # successful execution of that nested process will log a 'succeeded' entry within this ongoing execution. # Because it is a nested process, it is expected that further operations from the 'parent' process using it will # log many more steps afterwards. Therefore, avoid the ambiguous entry within the context of the parent process. - update_status=lambda _status: Status.RUNNING if _status == Status.SUCCEEDED else _status, + update_status=lambda _status: ( + Status.RUNNING if map_status(_status, category=True) == StatusCategory.SUCCESS else _status + ), # Similarly, progress of the current job will be constraint within inputs retrieval and the following outputs # retrieval for the nested progress execution. Mapping the progress will ensure overall gradual percent values. update_progress=lambda _progress: map_progress(_progress, JobProgress.GET_INPUTS, JobProgress.GET_OUTPUTS), @@ -999,7 +1003,7 @@ def submit_job_dispatch_task( job = store.fetch_by_id(job.id) # when sync is successful, it must return the results direct instead of status info # see: https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response - if job.status == Status.SUCCEEDED: + if job.status in JOB_STATUS_CATEGORIES[StatusCategory.SUCCESS]: _, _, sync_applied = parse_prefer_header_execute_mode(req_headers, [ExecuteControlOption.SYNC]) if sync_applied: resp_headers.update(sync_applied) diff --git a/weaver/processes/wps1_process.py b/weaver/processes/wps1_process.py index e3219a3e2..cdf644205 100644 --- a/weaver/processes/wps1_process.py +++ b/weaver/processes/wps1_process.py @@ -217,7 +217,7 @@ def monitor(self, monitor_reference): num_retries = 0 run_step += 1 - if not execution.isSucceded(): + if not execution.isSucceeded(): exec_msg = execution.statusMessage or "Job failed." exec_status = map_status(execution.getStatus()) exec_status_url = execution.statusLocation diff --git a/weaver/processes/wps_process_base.py b/weaver/processes/wps_process_base.py index 704f5b351..f0dedb5c3 100644 --- a/weaver/processes/wps_process_base.py +++ b/weaver/processes/wps_process_base.py @@ -551,7 +551,7 @@ def monitor(self, monitor_reference): RemoteJobProgress.MONITORING, RemoteJobProgress.STAGE_OUT), Status.RUNNING) - if job_status_value != Status.SUCCEEDED: + if job_status_value not in JOB_STATUS_CATEGORIES[StatusCategory.SUCCESS]: LOGGER.debug(get_log_monitor_msg(job_id, job_status_value, job_status_data.get("percentCompleted", 0), get_any_message(job_status_data), job_status_data.get("statusLocation"))) diff --git a/weaver/status.py b/weaver/status.py index 91f9347a3..d1d319a44 100644 --- a/weaver/status.py +++ b/weaver/status.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, overload from pywps.response.status import _WPS_STATUS, WPS_STATUS # noqa: W0212 @@ -17,6 +17,7 @@ class StatusCategory(ExtendedEnum): RUNNING = "RUNNING" PENDING = "PENDING" FAILED = "FAILED" + SUCCESS = "SUCCESS" class Status(Constants): @@ -39,7 +40,7 @@ class Status(Constants): JOB_STATUS_CATEGORIES = { # note: - # OGC compliant (old): [Accepted, Running, Succeeded, Failed] + # OGC compliant (old): [Accepted, Running, successful, Failed] # OGC compliant (new): [accepted, running, successful, failed, dismissed, created] ('created' in Part 4 only) # PyWPS uses: [Accepted, Started, Succeeded, Failed, Paused, Exception] # OWSLib uses: [Accepted, Running, Succeeded, Failed, Paused] (with 'Process' in front) @@ -52,9 +53,8 @@ class Status(Constants): Status.CREATED, # Part 4: Job Management Status.ACCEPTED, Status.RUNNING, - Status.SUCCEEDED, # new Status.FAILED, - Status.SUCCESSFUL, # old (keep it because it matches existing ADES/EMS and other providers) + Status.SUCCESSFUL, # v1/v2 official (alternative "SUCCEEDED" was in v2-draft for a while) Status.DISMISSED # new ]), StatusCompliant.PYWPS: frozenset([ @@ -103,6 +103,11 @@ class Status(Constants): Status.SUCCESSFUL, Status.FINISHED ]), + StatusCategory.SUCCESS: frozenset([ + Status.SUCCEEDED, + Status.SUCCESSFUL, + Status.FINISHED + ]), StatusCategory.FAILED: frozenset([ Status.FAILED, Status.DISMISSED, @@ -111,10 +116,8 @@ class Status(Constants): ]), } -# FIXME: see below detail in map_status about 'successful', partially compliant to OGC statuses -# https://github.com/opengeospatial/ogcapi-processes/blob/ca8e90/core/openapi/schemas/statusCode.yaml -JOB_STATUS_CODE_API = JOB_STATUS_CATEGORIES[StatusCompliant.OGC] - {Status.SUCCESSFUL} -JOB_STATUS_SEARCH_API = set(list(JOB_STATUS_CODE_API) + [Status.FINISHED]) +JOB_STATUS_CODE_API = JOB_STATUS_CATEGORIES[StatusCompliant.OGC] +JOB_STATUS_SEARCH_API = set(Status.values()) - {Status.UNKNOWN} # allow any variant for various profile support # id -> str STATUS_PYWPS_MAP = {s: _WPS_STATUS._fields[s].lower() for s in range(len(WPS_STATUS))} @@ -122,7 +125,7 @@ class Status(Constants): STATUS_PYWPS_IDS = {k.lower(): v for v, k in STATUS_PYWPS_MAP.items()} if TYPE_CHECKING: - from typing import Type, Union + from typing import Any, Type, Union from weaver.typedefs import Literal, TypeAlias @@ -133,6 +136,7 @@ class Status(Constants): Status.QUEUED, Status.PAUSED, Status.SUCCEEDED, + Status.SUCCESSFUL, Status.FINISHED, Status.FAILED, Status.RUNNING, @@ -164,23 +168,43 @@ class Status(Constants): ] -def map_status(wps_status, compliant=StatusCompliant.OGC): # pylint: disable=R1260 +@overload +def map_status(wps_status): + # type: (AnyStatusType) -> StatusType + ... + + +@overload +def map_status(wps_status, compliant): # type: (AnyStatusType, StatusCompliant) -> StatusType + ... + + +@overload +def map_status(wps_status, *, category): + # type: (AnyStatusType, Any, Literal[True]) -> StatusCategory + ... + + +def map_status(wps_status, compliant=StatusCompliant.OGC, category=False): # pylint: disable=R1260 + # type: (AnyStatusType, StatusCompliant, bool) -> Union[StatusType, StatusCategory] """ - Maps WPS execution statuses to between compatible values of different implementations. + Maps execution statuses between compatible values of different implementations. - Mapping is supported for values from :mod:`weaver.status`, :mod:`OWSLib`, :mod:`pywps` as well as some - specific one-of values of custom implementations. + Mapping is supported for values from :mod:`weaver.status`, :mod:`OWSLib`, :mod:`pywps`, :term:`openEO`, + as well as some specific one-of values of custom :term:`OAP` implementations. - For each compliant combination, unsupported statuses are changed to corresponding ones (with closest logical match). - Statuses are returned following :class:`Status` format. + For each compliant combination, unsupported statuses are changed to corresponding ones with the + closest logical match. Statuses are returned following :class:`Status` format. Specifically, this ensures statues are lowercase and not prefixed by ``Process`` - (as in XML response of OWS WPS like ``ProcessSucceeded`` for example). + (as in :term:`XML` response of :term:`OWS` :term:`WPS` like ``ProcessSucceeded`` for example). :param wps_status: One of :class:`Status` or its literal value to map to `compliant` standard. :param compliant: One of :class:`StatusCompliant` values. - :returns: mapped status complying to the requested compliant category, or :data:`Status.UNKNOWN` if no match found. + :param category: Request that the :class:`StatusCategory` corresponding to the supplied status is returned. + :returns: Mapped status complying to the requested compliant category, or :data:`Status.UNKNOWN` if no match found. """ + compliant = StatusCompliant.get(compliant) # case of raw PyWPS status if isinstance(wps_status, int): @@ -189,57 +213,68 @@ def map_status(wps_status, compliant=StatusCompliant.OGC): # pylint: disable=R1 # remove 'Process' from OWSLib statuses and lower for every compliant job_status = str(wps_status).lower().replace("process", "") - if compliant == StatusCompliant.OGC: + if category: + # order important to prioritize most restrictive + # categories with overlapping statuses first + for status_category in [ + StatusCategory.SUCCESS, + StatusCategory.FAILED, + StatusCategory.RUNNING, + StatusCategory.PENDING, + StatusCategory.FINISHED, + ]: + if job_status in JOB_STATUS_CATEGORIES[status_category]: + return status_category + return Status.UNKNOWN + + elif compliant == StatusCompliant.OGC: if job_status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: if job_status in [Status.STARTED, Status.PAUSED]: - job_status = Status.RUNNING - elif job_status == Status.QUEUED: - job_status = Status.ACCEPTED + return Status.RUNNING + elif job_status in [Status.QUEUED]: + return Status.ACCEPTED elif job_status in [Status.CANCELED, Status.DISMISSED]: - job_status = Status.DISMISSED + return Status.DISMISSED elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FAILED]: - job_status = Status.FAILED - elif job_status == Status.FINISHED: - job_status = Status.SUCCEEDED + return Status.FAILED + elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.SUCCESS]: + return Status.SUCCESSFUL elif compliant == StatusCompliant.PYWPS: if job_status in [Status.RUNNING]: - job_status = Status.STARTED + return Status.STARTED + elif job_status in [Status.ACCEPTED]: + return Status.ACCEPTED elif job_status in [Status.DISMISSED, Status.CANCELED]: - job_status = Status.FAILED + return Status.FAILED elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FAILED]: - job_status = Status.FAILED + return Status.FAILED elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.PENDING]: - job_status = Status.PAUSED + return Status.PAUSED elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: - job_status = Status.SUCCEEDED + return Status.SUCCEEDED elif compliant == StatusCompliant.OWSLIB: if job_status in JOB_STATUS_CATEGORIES[StatusCategory.PENDING]: - job_status = Status.PAUSED + return Status.PAUSED elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: - job_status = Status.RUNNING + return Status.RUNNING elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FAILED]: - job_status = Status.FAILED + return Status.FAILED elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: - job_status = Status.SUCCEEDED + return Status.SUCCEEDED elif compliant == StatusCompliant.OPENEO: if job_status in JOB_STATUS_CATEGORIES[StatusCategory.PENDING]: - job_status = Status.QUEUED - elif job_status == Status.DISMISSED: - job_status = Status.CANCELED + return Status.QUEUED + elif job_status in [Status.DISMISSED]: + return Status.CANCELED elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: - job_status = Status.RUNNING + return Status.RUNNING elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FAILED]: - job_status = Status.ERROR + return Status.ERROR elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: - job_status = Status.FINISHED - - # FIXME: new official status is 'successful', but this breaks everywhere (tests, local/remote execute, etc.) - # https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/statusCode.yaml - if job_status == Status.SUCCESSFUL: - job_status = Status.SUCCEEDED + return Status.FINISHED if job_status in Status: return job_status diff --git a/weaver/wps_restapi/examples/job_status_success.json b/weaver/wps_restapi/examples/job_status_success.json index 17dfd676e..5492c4d0e 100644 --- a/weaver/wps_restapi/examples/job_status_success.json +++ b/weaver/wps_restapi/examples/job_status_success.json @@ -3,8 +3,8 @@ "processID": "Echo", "providerID": null, "type": "process", - "status": "succeeded", - "message": "Job succeeded.", + "status": "successful", + "message": "Job successful.", "created": "2022-06-03T23:46:45.049000+00:00", "started": "2022-06-03T23:46:45.077000+00:00", "finished": "2022-06-03T23:46:48.134000+00:00", diff --git a/weaver/wps_restapi/jobs/jobs.py b/weaver/wps_restapi/jobs/jobs.py index e507e2826..605a09971 100644 --- a/weaver/wps_restapi/jobs/jobs.py +++ b/weaver/wps_restapi/jobs/jobs.py @@ -741,7 +741,7 @@ def get_job_stats(request): """ job = get_job(request) raise_job_dismissed(job, request) - if job.status not in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED] or job.status != Status.SUCCEEDED: + if job.status not in JOB_STATUS_CATEGORIES[StatusCategory.SUCCESS]: raise JobStatisticsNotFound(json={ "title": "NoJobStatistics", "type": "no-job-statistics", # unofficial diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py index 6cbf6fbea..b8d98d65a 100644 --- a/weaver/wps_restapi/jobs/utils.py +++ b/weaver/wps_restapi/jobs/utils.py @@ -1287,7 +1287,7 @@ def raise_job_bad_status_success(job, container=None): """ Raise the appropriate message for :term:`Job` not ready or unable to retrieve output results due to status. """ - if job.status != Status.SUCCEEDED: + if job.status not in JOB_STATUS_CATEGORIES[StatusCategory.SUCCESS]: links = job.links(container=container) headers = [("Link", make_link_header(link)) for link in links] if job.status == Status.FAILED: diff --git a/weaver/wps_restapi/templates/notification_email_example.mako b/weaver/wps_restapi/templates/notification_email_example.mako index 01dcfb8bf..91e292673 100644 --- a/weaver/wps_restapi/templates/notification_email_example.mako +++ b/weaver/wps_restapi/templates/notification_email_example.mako @@ -12,12 +12,12 @@ Below is a non-exhaustive list of example parameters from this method. Refer to the method for complete listing. - status: succeeded, failed, started + status: successful, failed, started logs: url to the logs jobID: example "617f23d3-f474-47f9-a8ec-55da9dd6ac71" result: url to the outputs duration: example "0:01:02" - message: example "Job succeeded." + message: example "Job successful." percentCompleted: example 100 From: Weaver @@ -27,9 +27,14 @@ Content-Type: text/plain; charset=UTF-8 Dear user, -Your job submitted on ${job.created.strftime("%Y/%m/%d %H:%M %Z")} to ${settings.get("weaver.url")} ${job.status}. +Your job submitted on ${job.created.strftime("%Y/%m/%d %H:%M %Z")} to ${settings.get("weaver.url")} \ +%if job.status == "successful": +was successful. +% elif job.status == "failed": +has failed. +%endif -% if job.status == "succeeded": +% if job.status == "successful": You can retrieve the output(s) at the following link: ${job.results_url(settings)} % elif job.status == "failed": You can retrieve potential error details from the following link: ${job.exceptions_url(settings)} From 248546f1979e4a12ed92a2929eb9fa545858707b Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 3 Feb 2025 23:27:38 -0500 Subject: [PATCH 2/4] fix linting --- .pylintrc | 1 + weaver/wps_restapi/jobs/jobs.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.pylintrc b/.pylintrc index a99c2cf65..5b5488bbc 100644 --- a/.pylintrc +++ b/.pylintrc @@ -97,6 +97,7 @@ disable=C0111,missing-docstring, R0901,too-many-ancestors, R0902,too-many-instance-attributes, R0904,too-many-public-methods, + R0911,too-many-return-statements, R0912,too-many-branches, R0913,too-many-arguments, R0914,too-many-locals, diff --git a/weaver/wps_restapi/jobs/jobs.py b/weaver/wps_restapi/jobs/jobs.py index 605a09971..6b53cb860 100644 --- a/weaver/wps_restapi/jobs/jobs.py +++ b/weaver/wps_restapi/jobs/jobs.py @@ -35,7 +35,7 @@ ) from weaver.processes.utils import get_process from weaver.processes.wps_package import mask_process_inputs -from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, StatusCompliant, map_status +from weaver.status import JOB_STATUS_CATEGORIES, StatusCategory, StatusCompliant, map_status from weaver.store.base import StoreJobs from weaver.utils import get_header, get_settings, make_link_header from weaver.wps_restapi import swagger_definitions as sd From cd59034907a77c06a6747d24945ec4d40c53c52a Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 4 Feb 2025 00:29:01 -0500 Subject: [PATCH 3/4] fix test --- tests/wps_restapi/test_jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/wps_restapi/test_jobs.py b/tests/wps_restapi/test_jobs.py index b7235d753..46b11cec8 100644 --- a/tests/wps_restapi/test_jobs.py +++ b/tests/wps_restapi/test_jobs.py @@ -1919,8 +1919,8 @@ def test_job_update_response_contents(self): test_job = self.job_store.fetch_by_id(new_job.id) assert test_job.subscribers == { "callbacks": { - StatusCategory.SUCCESS: "https://example.com/success", - StatusCategory.FAILED: "https://example.com/failed", + StatusCategory.SUCCESS.value.lower(): "https://example.com/success", + StatusCategory.FAILED.value.lower(): "https://example.com/failed", } } From a18fa36a0c9b6dbb29dd0f73db77d1d6b25dfb39 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 6 Feb 2025 10:29:40 -0500 Subject: [PATCH 4/4] =?UTF-8?q?Bump=20version:=206.1.1=20=E2=86=92=206.2.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .zenodo.json | 4 ++-- CHANGES.rst | 13 +++++++++++++ CITATION.cff | 4 ++-- Makefile | 2 +- README.rst | 20 ++++++++++---------- docker/Dockerfile-base | 2 +- setup.cfg | 2 +- weaver/__meta__.py | 2 +- 8 files changed, 31 insertions(+), 18 deletions(-) diff --git a/.zenodo.json b/.zenodo.json index ae2728fb1..64726e3c5 100644 --- a/.zenodo.json +++ b/.zenodo.json @@ -1,8 +1,8 @@ { "upload_type": "software", - "title": "crim-ca/weaver:6.1.1", + "title": "crim-ca/weaver:6.2.0", "description": "Weaver: Workflow Execution Management Service (EMS); Application, Deployment and Execution Service (ADES); OGC API - Processes; WPS; CWL Application Package", - "version": "6.1.1", + "version": "6.2.0", "creators": [ { "name": "Charette-Migneault, Francis", diff --git a/CHANGES.rst b/CHANGES.rst index ed9f15b68..6fd7bb8e9 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,6 +10,19 @@ Changes `Unreleased `_ (latest) ======================================================================== +Changes: +-------- +- No change. + +Fixes: +------ +- No change. + +.. _changes_6.2.0: + +`6.2.0 `_ (2025-02-06) +======================================================================== + Changes: -------- - Replace ``succeeded`` status by ``successful`` everywhere where applicable (as originally defined by OGC API v1), diff --git a/CITATION.cff b/CITATION.cff index 046c21b04..17fd686fd 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -1,10 +1,10 @@ cff-version: 1.2.0 -title: "crim-ca/weaver:6.1.1" +title: "crim-ca/weaver:6.2.0" message: | Weaver: Workflow Execution Management Service (EMS); Application, Deployment and Execution Service (ADES); OGC API - Processes; WPS; CWL Application Package -version: "6.1.1" +version: "6.2.0" authors: - name: &crim Computer Research Institute of Montréal diff --git a/Makefile b/Makefile index 3abd8d86f..407268c6f 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ MAKEFILE_NAME := $(word $(words $(MAKEFILE_LIST)),$(MAKEFILE_LIST)) # Application APP_ROOT := $(abspath $(lastword $(MAKEFILE_NAME))/..) APP_NAME := $(shell basename $(APP_ROOT)) -APP_VERSION ?= 6.1.1 +APP_VERSION ?= 6.2.0 APP_INI ?= $(APP_ROOT)/config/$(APP_NAME).ini DOCKER_REPO ?= pavics/weaver #DOCKER_REPO ?= docker-registry.crim.ca/ogc/weaver diff --git a/README.rst b/README.rst index 98dc3f7d3..f0d2d998c 100644 --- a/README.rst +++ b/README.rst @@ -58,13 +58,13 @@ for each process. :alt: Requires Python 3.9+ :target: https://www.python.org/getit -.. |commits-since| image:: https://img.shields.io/github/commits-since/crim-ca/weaver/6.1.1.svg?logo=github +.. |commits-since| image:: https://img.shields.io/github/commits-since/crim-ca/weaver/6.2.0.svg?logo=github :alt: Commits since latest release - :target: https://github.com/crim-ca/weaver/compare/6.1.1...master + :target: https://github.com/crim-ca/weaver/compare/6.2.0...master -.. |version| image:: https://img.shields.io/badge/latest%20version-6.1.1-blue?logo=github +.. |version| image:: https://img.shields.io/badge/latest%20version-6.2.0-blue?logo=github :alt: Latest Tagged Version - :target: https://github.com/crim-ca/weaver/tree/6.1.1 + :target: https://github.com/crim-ca/weaver/tree/6.2.0 .. |deps| image:: https://img.shields.io/librariesio/github/crim-ca/weaver?logo=librariesdotio&logoColor=white :alt: Libraries.io Dependencies Status @@ -78,9 +78,9 @@ for each process. :alt: Github Actions CI Build Status (master branch) :target: https://github.com/crim-ca/weaver/actions?query=workflow%3ATests+branch%3Amaster -.. |github_tagged| image:: https://img.shields.io/github/actions/workflow/status/crim-ca/weaver/tests.yml?label=6.1.1&branch=6.1.1&logo=github +.. |github_tagged| image:: https://img.shields.io/github/actions/workflow/status/crim-ca/weaver/tests.yml?label=6.2.0&branch=6.2.0&logo=github :alt: Github Actions CI Build Status (latest tag) - :target: https://github.com/crim-ca/weaver/actions?query=workflow%3ATests+branch%3A6.1.1 + :target: https://github.com/crim-ca/weaver/actions?query=workflow%3ATests+branch%3A6.2.0 .. |readthedocs| image:: https://img.shields.io/readthedocs/pavics-weaver?logo=readthedocs :alt: ReadTheDocs Build Status (master branch) @@ -92,7 +92,7 @@ for each process. .. below shield will either indicate the targeted version or 'tag not found' .. since docker tags are pushed following manual builds by CI, they are not automatic and no build artifact exists -.. |docker_build_status| image:: https://img.shields.io/docker/v/pavics/weaver/6.1.1?label=tag&logo=docker +.. |docker_build_status| image:: https://img.shields.io/docker/v/pavics/weaver/6.2.0?label=tag&logo=docker :alt: Docker Build Status (latest version) :target: https://hub.docker.com/r/pavics/weaver/tags @@ -276,12 +276,12 @@ For a prebuilt image, pull as follows: .. code-block:: shell - docker pull pavics/weaver:6.1.1 + docker pull pavics/weaver:6.2.0 For convenience, following tags are also available: -- ``weaver:6.1.1-manager``: `Weaver` image that will run the API for WPS process and job management. -- ``weaver:6.1.1-worker``: `Weaver` image that will run the process job runner application. +- ``weaver:6.2.0-manager``: `Weaver` image that will run the API for WPS process and job management. +- ``weaver:6.2.0-worker``: `Weaver` image that will run the process job runner application. Following links correspond to existing servers with `Weaver` configured as *EMS* or *ADES* instances respectively. diff --git a/docker/Dockerfile-base b/docker/Dockerfile-base index 8d5a2d731..ba2753da7 100644 --- a/docker/Dockerfile-base +++ b/docker/Dockerfile-base @@ -3,7 +3,7 @@ LABEL description.short="Weaver Base" LABEL description.long="Workflow Execution Management Service (EMS); Application, Deployment and Execution Service (ADES)" LABEL maintainer="Francis Charette-Migneault " LABEL vendor="CRIM" -LABEL version="6.1.1" +LABEL version="6.2.0" # setup paths ENV APP_DIR=/opt/local/src/weaver diff --git a/setup.cfg b/setup.cfg index 8f2a4cd0a..2aa116cb0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 6.1.1 +current_version = 6.2.0 commit = True tag = True tag_name = {new_version} diff --git a/weaver/__meta__.py b/weaver/__meta__.py index 1704e5e5b..bd552fa16 100644 --- a/weaver/__meta__.py +++ b/weaver/__meta__.py @@ -1,6 +1,6 @@ __name__ = "weaver" __title__ = "Weaver" -__version__ = "6.1.1" +__version__ = "6.2.0" __description__ = "Workflow Execution Management Service (EMS); Application, Deployment and Execution Service (ADES)." __source_repository__ = "https://github.com/crim-ca/weaver" __docker_repository__ = "https://hub.docker.com/r/pavics/weaver"