From 6b093c21add162d4b2f0a78fd8fe1170f761ea76 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Tue, 29 Oct 2024 10:43:02 +0100 Subject: [PATCH] workflows, remove deprecated functions Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- dapr/aio/clients/grpc/client.py | 7 ++ dapr/clients/grpc/client.py | 7 ++ examples/demo_workflow/README.md | 6 +- examples/demo_workflow/app.py | 75 +++++++++---------- .../demo_workflow/requirements.txt | 2 +- .../tests/test_workflow_client.py | 2 + tests/clients/test_dapr_grpc_client.py | 2 +- 7 files changed, 55 insertions(+), 46 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 2b40101cf..b2158ff8c 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -1408,6 +1408,7 @@ async def start_workflow( send_raw_bytes: bool = False, ) -> StartWorkflowResponse: """Starts a workflow. + Deprecated: use dapr-ext-workflow instead Args: workflow_component (str): the name of the workflow component @@ -1469,6 +1470,7 @@ async def start_workflow( async def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflowResponse: """Gets information on a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, @@ -1510,6 +1512,7 @@ async def get_workflow(self, instance_id: str, workflow_component: str) -> GetWo async def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse: """Terminates a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, e.g. @@ -1547,6 +1550,7 @@ async def raise_workflow_event( send_raw_bytes: bool = False, ) -> DaprResponse: """Raises an event on a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, @@ -1610,6 +1614,7 @@ async def raise_workflow_event( async def pause_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse: """Pause a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, @@ -1642,6 +1647,7 @@ async def pause_workflow(self, instance_id: str, workflow_component: str) -> Dap async def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse: """Resumes a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, @@ -1673,6 +1679,7 @@ async def resume_workflow(self, instance_id: str, workflow_component: str) -> Da async def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse: """Purges a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, diff --git a/dapr/clients/grpc/client.py b/dapr/clients/grpc/client.py index 94793907d..8e1fd974a 100644 --- a/dapr/clients/grpc/client.py +++ b/dapr/clients/grpc/client.py @@ -1409,6 +1409,7 @@ def start_workflow( send_raw_bytes: bool = False, ) -> StartWorkflowResponse: """Starts a workflow. + Deprecated: use dapr-ext-workflow instead Args: workflow_component (str): the name of the workflow component @@ -1466,6 +1467,7 @@ def start_workflow( def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflowResponse: """Gets information on a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, @@ -1507,6 +1509,7 @@ def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflow def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse: """Terminates a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, e.g. @@ -1545,6 +1548,7 @@ def raise_workflow_event( send_raw_bytes: bool = False, ) -> DaprResponse: """Raises an event on a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, @@ -1609,6 +1613,7 @@ def raise_workflow_event( def pause_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse: """Pause a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, @@ -1641,6 +1646,7 @@ def pause_workflow(self, instance_id: str, workflow_component: str) -> DaprRespo def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse: """Resumes a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, @@ -1672,6 +1678,7 @@ def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResp def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse: """Purges a workflow. + Deprecated: use dapr-ext-workflow instead Args: instance_id (str): the ID of the workflow instance, diff --git a/examples/demo_workflow/README.md b/examples/demo_workflow/README.md index 12b84ae2c..c83aa70fe 100644 --- a/examples/demo_workflow/README.md +++ b/examples/demo_workflow/README.md @@ -2,10 +2,10 @@ This document describes how to register a workflow and activities inside it and start running it. It demonstrates the following APIs: -- **start_workflow**: Start an instance of a workflow -- **get_workflow**: Get information on a single workflow +- **schedule_new_workflow**: Start an instance of a workflow +- **get_workflow_state**: Get information on a single workflow - **terminate_workflow**: Terminate or stop a particular instance of a workflow -- **raise_event**: Raise an event on a workflow +- **raise_workflow_event**: Raise an event on a workflow - **pause_workflow**: Pauses or suspends a workflow instance that can later be resumed - **resume_workflow**: Resumes a paused workflow instance - **purge_workflow**: Removes all metadata related to a specific workflow instance from the state store diff --git a/examples/demo_workflow/app.py b/examples/demo_workflow/app.py index 739655627..55d24e79c 100644 --- a/examples/demo_workflow/app.py +++ b/examples/demo_workflow/app.py @@ -13,6 +13,7 @@ from datetime import timedelta from time import sleep from dapr.ext.workflow import ( + DaprWorkflowClient, WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext, @@ -106,7 +107,7 @@ def act_for_child_wf(ctx: WorkflowActivityContext, inp): def main(): - with DaprClient() as d: + with DaprWorkflowClient() as wfc: workflow_runtime = WorkflowRuntime() workflow_runtime.register_workflow(hello_world_wf) workflow_runtime.register_workflow(child_retryable_wf) @@ -119,14 +120,11 @@ def main(): sleep(2) print('==========Start Counter Increase as per Input:==========') - start_resp = d.start_workflow( + instance_id = wfc.schedule_new_workflow( instance_id=instance_id, - workflow_component=workflow_component, - workflow_name=workflow_name, - input=input_data, - workflow_options=workflow_options, - ) - print(f'start_resp {start_resp.instance_id}') + workflow=hello_world_wf, + input=input_data) + print(f'start_resp {instance_id}') # Sleep for a while to let the workflow run sleep(12) @@ -135,62 +133,56 @@ def main(): assert child_orchestrator_string == '1aa2bb3cc' # Pause Test - d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component) - get_response = d.get_workflow( - instance_id=instance_id, workflow_component=workflow_component - ) + wfc.pause_workflow(instance_id=instance_id) + get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True) print(f'Get response from {workflow_name} after pause call: {get_response.runtime_status}') # Resume Test - d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component) - get_response = d.get_workflow( - instance_id=instance_id, workflow_component=workflow_component - ) + wfc.resume_workflow(instance_id=instance_id) + get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True) print(f'Get response from {workflow_name} after resume call: {get_response.runtime_status}') sleep(1) # Raise event - d.raise_workflow_event( - instance_id=child_instance_id, - workflow_component=workflow_component, + wfc.raise_workflow_event( + instance_id=instance_id, event_name=event_name, - event_data=event_data, + data=event_data, ) sleep(5) # Purge Test - d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component) + # // TODO IMPLEMENT PURGE + # d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component) try: - d.get_workflow(instance_id=instance_id, workflow_component=workflow_component) - except DaprInternalError as err: + wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True) + except Exception as err: + # TODO temporary print + print(f'got error {err}') if non_existent_id_error in err._message: print('Instance Successfully Purged') # Kick off another workflow for termination purposes # This will also test using the same instance ID on a new workflow after # the old instance was purged - start_resp = d.start_workflow( + instance_id = wfc.schedule_new_workflow( instance_id=instance_id, - workflow_component=workflow_component, - workflow_name=workflow_name, - input=input_data, - workflow_options=workflow_options, + workflow=hello_world_wf, + input=input_data ) - print(f'start_resp {start_resp.instance_id}') + print(f'start_resp {instance_id}') sleep(5) # Terminate Test - d.terminate_workflow(instance_id=instance_id, workflow_component=workflow_component) + wfc.terminate_workflow(instance_id=instance_id) sleep(1) - get_response = d.get_workflow( - instance_id=instance_id, workflow_component=workflow_component - ) + get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True) print( f'Get response from {workflow_name} ' f'after terminate call: {get_response.runtime_status}' ) - child_get_response = d.get_workflow( - instance_id=child_instance_id, workflow_component=workflow_component + child_get_response = wfc.get_workflow_state( + instance_id=child_instance_id, fetch_payloads=True ) print( f'Get response from {child_workflow_name} ' @@ -198,12 +190,13 @@ def main(): ) # Purge Test - d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component) - try: - d.get_workflow(instance_id=instance_id, workflow_component=workflow_component) - except DaprInternalError as err: - if non_existent_id_error in err._message: - print('Instance Successfully Purged') + # TODO IMPLEMENT PURGE + # d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component) + # try: + # d.get_workflow(instance_id=instance_id, workflow_component=workflow_component) + # except DaprInternalError as err: + # if non_existent_id_error in err._message: + # print('Instance Successfully Purged') workflow_runtime.shutdown() diff --git a/examples/demo_workflow/demo_workflow/requirements.txt b/examples/demo_workflow/demo_workflow/requirements.txt index 61f7f5c82..4190656b8 100644 --- a/examples/demo_workflow/demo_workflow/requirements.txt +++ b/examples/demo_workflow/demo_workflow/requirements.txt @@ -1 +1 @@ -dapr-ext-workflow-dev>=0.0.1rc1.dev +dapr-ext-workflow-dev>=0.4.1rc1.dev diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index 4a7f93b95..d4767a746 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -119,3 +119,5 @@ def test_client_functions(self): actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId) assert actual_resume_result == mock_resume_result + + # TODO add purge support diff --git a/tests/clients/test_dapr_grpc_client.py b/tests/clients/test_dapr_grpc_client.py index d3eab2363..0a88727f5 100644 --- a/tests/clients/test_dapr_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client.py @@ -864,7 +864,7 @@ def test_unlock_input_validation(self): # Tests for workflow # - def test_workflow(self): + def test_workflow_deprecated(self): dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') # Sane parameters workflow_name = 'test_workflow'