From d17b262cc9b10da047830942593fa3f12941af60 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Tue, 4 Nov 2025 15:31:31 -0800 Subject: [PATCH 01/10] Added async workflow client implementation, leveraging new durabletask.aio.client implementation Signed-off-by: Patrick Assuied --- .../dapr/ext/workflow/aio/__init__.py | 11 + .../ext/workflow/aio/dapr_workflow_client.py | 280 ++++++++++++++++++ ext/dapr-ext-workflow/setup.cfg | 2 +- .../tests/test_workflow_client_aio.py | 175 +++++++++++ 4 files changed, 467 insertions(+), 1 deletion(-) create mode 100644 ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py create mode 100644 ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py create mode 100644 ext/dapr-ext-workflow/tests/test_workflow_client_aio.py diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py new file mode 100644 index 00000000..378b4fdd --- /dev/null +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- + +from .dapr_workflow_client import DaprWorkflowClientAsync + +# Public alias to mirror sync naming under aio namespace +DaprWorkflowClient = DaprWorkflowClientAsync + +__all__ = [ + 'DaprWorkflowClientAsync', + 'DaprWorkflowClient', +] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py new file mode 100644 index 00000000..03b03eec --- /dev/null +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Optional, TypeVar + +import durabletask.internal.orchestrator_service_pb2 as pb +from dapr.ext.workflow.logger import Logger, LoggerOptions +from dapr.ext.workflow.util import getAddress +from dapr.ext.workflow.workflow_context import Workflow +from dapr.ext.workflow.workflow_state import WorkflowState +from durabletask.aio import client as aioclient +from grpc.aio import AioRpcError + +from dapr.clients import DaprInternalError +from dapr.clients.http.client import DAPR_API_TOKEN_HEADER +from dapr.conf import settings +from dapr.conf.helpers import GrpcEndpoint + +T = TypeVar('T') +TInput = TypeVar('TInput') +TOutput = TypeVar('TOutput') + + +class DaprWorkflowClientAsync: + """Async client for managing Dapr Workflow instances. + + This uses a gRPC async connection to send commands directly to the workflow engine, + bypassing the Dapr API layer. Intended to be used by workflow applications. + """ + + def __init__( + self, + host: Optional[str] = None, + port: Optional[str] = None, + logger_options: Optional[LoggerOptions] = None, + ): + address = getAddress(host, port) + + try: + uri = GrpcEndpoint(address) + except ValueError as error: + raise DaprInternalError(f'{error}') from error + + self._logger = Logger('DaprWorkflowClientAsync', logger_options) + + metadata = tuple() + if settings.DAPR_API_TOKEN: + metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) + options = self._logger.get_options() + self.__obj = aioclient.AsyncTaskHubGrpcClient( + host_address=uri.endpoint, + metadata=metadata, + secure_channel=uri.tls, + log_handler=options.log_handler, + log_formatter=options.log_formatter, + ) + + async def schedule_new_workflow( + self, + workflow: Workflow, + *, + input: Optional[TInput] = None, + instance_id: Optional[str] = None, + start_at: Optional[datetime] = None, + reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None, + ) -> str: + """Schedules a new workflow instance for execution. + + Args: + workflow: The workflow to schedule. + input: The optional input to pass to the scheduled workflow instance. This must be a + serializable value. + instance_id: The unique ID of the workflow instance to schedule. If not specified, a + new GUID value is used. + start_at: The time when the workflow instance should start executing. + If not specified or if a date-time in the past is specified, the workflow instance will + be scheduled immediately. + reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with + an existing workflow instance. + + Returns: + The ID of the scheduled workflow instance. + """ + if hasattr(workflow, '_dapr_alternate_name'): + return await self.__obj.schedule_new_orchestration( + workflow.__dict__['_dapr_alternate_name'], + input=input, + instance_id=instance_id, + start_at=start_at, + reuse_id_policy=reuse_id_policy, + ) + return await self.__obj.schedule_new_orchestration( + workflow.__name__, + input=input, + instance_id=instance_id, + start_at=start_at, + reuse_id_policy=reuse_id_policy, + ) + + async def get_workflow_state( + self, instance_id: str, *, fetch_payloads: bool = True + ) -> Optional[WorkflowState]: + """Fetches runtime state for the specified workflow instance. + + Args: + instance_id: The unique ID of the workflow instance to fetch. + fetch_payloads: If true, fetches the input, output payloads and custom status + for the workflow instance. Defaults to true. + + Returns: + The current state of the workflow instance, or None if the workflow instance does not + exist. + + """ + try: + state = await self.__obj.get_orchestration_state( + instance_id, fetch_payloads=fetch_payloads + ) + return WorkflowState(state) if state else None + except AioRpcError as error: + if error.details() and 'no such instance exists' in error.details(): + self._logger.warning(f'Workflow instance not found: {instance_id}') + return None + self._logger.error( + f'Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}' + ) + raise + + async def wait_for_workflow_start( + self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 0 + ) -> Optional[WorkflowState]: + """Waits for a workflow to start running and returns a WorkflowState object that contains + metadata about the started workflow. + + A "started" workflow instance is any instance not in the WorkflowRuntimeStatus.Pending + state. This method will return a completed task if the workflow has already started + running or has already completed. + + Args: + instance_id: The unique ID of the workflow instance to wait for. + fetch_payloads: If true, fetches the input, output payloads and custom status for + the workflow instance. Defaults to false. + timeout_in_seconds: The maximum time to wait for the workflow instance to start running. + Defaults to meaning no timeout. + + Returns: + WorkflowState record that describes the workflow instance and its execution status. + If the specified workflow isn't found, the WorkflowState.Exists value will be false. + """ + state = await self.__obj.wait_for_orchestration_start( + instance_id, fetch_payloads=fetch_payloads, timeout=timeout_in_seconds + ) + return WorkflowState(state) if state else None + + async def wait_for_workflow_completion( + self, instance_id: str, *, fetch_payloads: bool = True, timeout_in_seconds: int = 0 + ) -> Optional[WorkflowState]: + """Waits for a workflow to complete and returns a WorkflowState object that contains + metadata about the started instance. + + A "completed" workflow instance is any instance in one of the terminal states. For + example, the WorkflowRuntimeStatus.Completed, WorkflowRuntimeStatus.Failed or + WorkflowRuntimeStatus.Terminated states. + + Workflows are long-running and could take hours, days, or months before completing. + Workflows can also be eternal, in which case they'll never complete unless terminated. + In such cases, this call may block indefinitely, so care must be taken to ensure + appropriate timeouts are enforced using timeout parameter. + + If a workflow instance is already complete when this method is called, the method + will return immediately. + + Args: + instance_id: The unique ID of the workflow instance to wait for. + fetch_payloads: If true, fetches the input, output payloads and custom status + for the workflow instance. Defaults to true. + timeout_in_seconds: The maximum time in seconds to wait for the workflow instance to + complete. Defaults to 0 seconds, meaning no timeout. + + Returns: + WorkflowState record that describes the workflow instance and its execution status. + """ + state = await self.__obj.wait_for_orchestration_completion( + instance_id, fetch_payloads=fetch_payloads, timeout=timeout_in_seconds + ) + return WorkflowState(state) if state else None + + async def raise_workflow_event( + self, instance_id: str, event_name: str, *, data: Optional[Any] = None + ) -> None: + """Sends an event notification message to a waiting workflow instance. + In order to handle the event, the target workflow instance must be waiting for an + event named value of "eventName" param using the wait_for_external_event API. + If the target workflow instance is not yet waiting for an event named param "eventName" + value, then the event will be saved in the workflow instance state and dispatched + immediately when the workflow calls wait_for_external_event. + This event saving occurs even if the workflow has canceled its wait operation before + the event was received. + + Workflows can wait for the same event name multiple times, so sending multiple events + with the same name is allowed. Each external event received by a workflow will complete + just one task returned by the wait_for_external_event method. + + Raised events for a completed or non-existent workflow instance will be silently + discarded. + + Args: + instance_id: The ID of the workflow instance that will handle the event. + event_name: The name of the event. Event names are case-insensitive. + data: The serializable data payload to include with the event. + """ + return await self.__obj.raise_orchestration_event(instance_id, event_name, data=data) + + async def terminate_workflow( + self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True + ) -> None: + """Terminates a running workflow instance and updates its runtime status to + WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in + the task hub. When the task hub worker processes this message, it will update the runtime + status of the target instance to WorkflowRuntimeStatus.Terminated. You can use + wait_for_workflow_completion to wait for the instance to reach the terminated state. + + Terminating a workflow will terminate all child workflows that were started by + the workflow instance. + + However, terminating a workflow has no effect on any in-flight activity function + executions that were started by the terminated workflow instance. + + At the time of writing, there is no way to terminate an in-flight activity execution. + + Args: + instance_id: The ID of the workflow instance to terminate. + output: The optional output to set for the terminated workflow instance. + recursive: The optional flag to terminate all child workflows. + + """ + return await self.__obj.terminate_orchestration( + instance_id, output=output, recursive=recursive + ) + + async def pause_workflow(self, instance_id: str) -> None: + """Suspends a workflow instance, halting processing of it until resume_workflow is used to + resume the workflow. + + Args: + instance_id: The instance ID of the workflow to suspend. + """ + return await self.__obj.suspend_orchestration(instance_id) + + async def resume_workflow(self, instance_id: str) -> None: + """Resumes a workflow instance that was suspended via pause_workflow. + + Args: + instance_id: The instance ID of the workflow to resume. + """ + return await self.__obj.resume_orchestration(instance_id) + + async def purge_workflow(self, instance_id: str, recursive: bool = True) -> None: + """Purge data from a workflow instance. + + Args: + instance_id: The instance ID of the workflow to purge. + recursive: The optional flag to also purge data from all child workflows. + """ + return await self.__obj.purge_orchestration(instance_id, recursive) diff --git a/ext/dapr-ext-workflow/setup.cfg b/ext/dapr-ext-workflow/setup.cfg index 83869566..2d182a76 100644 --- a/ext/dapr-ext-workflow/setup.cfg +++ b/ext/dapr-ext-workflow/setup.cfg @@ -25,7 +25,7 @@ packages = find_namespace: include_package_data = True install_requires = dapr >= 1.16.1rc1 - durabletask-dapr >= 0.2.0a9 + durabletask-dapr >= 0.2.0a11 [options.packages.find] include = diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py new file mode 100644 index 00000000..3c1d2058 --- /dev/null +++ b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import unittest +from datetime import datetime +from typing import Any, Union +from unittest import mock + +import durabletask.internal.orchestrator_service_pb2 as pb +from dapr.ext.workflow.aio import DaprWorkflowClient +from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext +from durabletask import client +from grpc.aio import AioRpcError + +mock_schedule_result = 'workflow001' +mock_raise_event_result = 'event001' +mock_terminate_result = 'terminate001' +mock_suspend_result = 'suspend001' +mock_resume_result = 'resume001' +mock_purge_result = 'purge001' +mock_instance_id = 'instance001' +wf_status = 'not-found' + + +class SimulatedAioRpcError(AioRpcError): + def __init__(self, code, details): + self._code = code + self._details = details + + def code(self): + return self._code + + def details(self): + return self._details + + +class FakeAsyncTaskHubGrpcClient: + async def schedule_new_orchestration( + self, + workflow, + *, + input, + instance_id, + start_at, + reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None, + ): + return mock_schedule_result + + async def get_orchestration_state(self, instance_id, *, fetch_payloads): + if wf_status == 'not-found': + raise SimulatedAioRpcError(code='UNKNOWN', details='no such instance exists') + elif wf_status == 'found': + return self._inner_get_orchestration_state( + instance_id, client.OrchestrationStatus.PENDING + ) + else: + raise SimulatedAioRpcError(code='UNKNOWN', details='unknown error') + + async def wait_for_orchestration_start(self, instance_id, *, fetch_payloads, timeout): + return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.RUNNING) + + async def wait_for_orchestration_completion(self, instance_id, *, fetch_payloads, timeout): + return self._inner_get_orchestration_state( + instance_id, client.OrchestrationStatus.COMPLETED + ) + + async def raise_orchestration_event( + self, instance_id: str, event_name: str, *, data: Union[Any, None] = None + ): + return mock_raise_event_result + + async def terminate_orchestration( + self, instance_id: str, *, output: Union[Any, None] = None, recursive: bool = True + ): + return mock_terminate_result + + async def suspend_orchestration(self, instance_id: str): + return mock_suspend_result + + async def resume_orchestration(self, instance_id: str): + return mock_resume_result + + async def purge_orchestration(self, instance_id: str, recursive: bool = True): + return mock_purge_result + + def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus): + return client.OrchestrationState( + instance_id=instance_id, + name='', + runtime_status=state, + created_at=datetime.now(), + last_updated_at=datetime.now(), + serialized_input=None, + serialized_output=None, + serialized_custom_status=None, + failure_details=None, + ) + + +class WorkflowClientAioTest(unittest.IsolatedAsyncioTestCase): + def mock_client_wf(ctx: DaprWorkflowContext, input): + print(f'{input}') + + async def test_client_functions(self): + with mock.patch( + 'durabletask.aio.client.AsyncTaskHubGrpcClient', return_value=FakeAsyncTaskHubGrpcClient() + ): + wfClient = DaprWorkflowClient() + actual_schedule_result = await wfClient.schedule_new_workflow( + workflow=self.mock_client_wf, input='Hi Chef!' + ) + assert actual_schedule_result == mock_schedule_result + + global wf_status + wf_status = 'not-found' + actual_get_result = await wfClient.get_workflow_state( + instance_id=mock_instance_id, fetch_payloads=True + ) + assert actual_get_result is None + + wf_status = 'error' + with self.assertRaises(AioRpcError): + await wfClient.get_workflow_state(instance_id=mock_instance_id, fetch_payloads=True) + + assert actual_get_result is None + + wf_status = 'found' + actual_get_result = await wfClient.get_workflow_state( + instance_id=mock_instance_id, fetch_payloads=True + ) + assert actual_get_result.runtime_status.name == 'PENDING' + assert actual_get_result.instance_id == mock_instance_id + + actual_wait_start_result = await wfClient.wait_for_workflow_start( + instance_id=mock_instance_id, timeout_in_seconds=30 + ) + assert actual_wait_start_result.runtime_status.name == 'RUNNING' + assert actual_wait_start_result.instance_id == mock_instance_id + + actual_wait_completion_result = await wfClient.wait_for_workflow_completion( + instance_id=mock_instance_id, timeout_in_seconds=30 + ) + assert actual_wait_completion_result.runtime_status.name == 'COMPLETED' + assert actual_wait_completion_result.instance_id == mock_instance_id + + actual_raise_event_result = await wfClient.raise_workflow_event( + instance_id=mock_instance_id, event_name='test_event', data='test_data' + ) + assert actual_raise_event_result == mock_raise_event_result + + actual_terminate_result = await wfClient.terminate_workflow( + instance_id=mock_instance_id, output='test_output' + ) + assert actual_terminate_result == mock_terminate_result + + actual_suspend_result = await wfClient.pause_workflow(instance_id=mock_instance_id) + assert actual_suspend_result == mock_suspend_result + + actual_resume_result = await wfClient.resume_workflow(instance_id=mock_instance_id) + assert actual_resume_result == mock_resume_result + + actual_purge_result = await wfClient.purge_workflow(instance_id=mock_instance_id) + assert actual_purge_result == mock_purge_result From 7109816d5c76c03cd0d8dc01fd6ff63d8b45d267 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Tue, 4 Nov 2025 15:40:57 -0800 Subject: [PATCH 02/10] lint Signed-off-by: Patrick Assuied --- ext/dapr-ext-workflow/tests/test_workflow_client_aio.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py index 3c1d2058..c84fcbfe 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py @@ -115,7 +115,8 @@ def mock_client_wf(ctx: DaprWorkflowContext, input): async def test_client_functions(self): with mock.patch( - 'durabletask.aio.client.AsyncTaskHubGrpcClient', return_value=FakeAsyncTaskHubGrpcClient() + 'durabletask.aio.client.AsyncTaskHubGrpcClient', + return_value=FakeAsyncTaskHubGrpcClient(), ): wfClient = DaprWorkflowClient() actual_schedule_result = await wfClient.schedule_new_workflow( From 6b8d7c8a7b33f397cd7de03350914dc7543e6903 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Tue, 11 Nov 2025 10:19:42 -0800 Subject: [PATCH 03/10] Refactor DaprWorkflowClientAsync to DaprWorkflowClient for consistency Signed-off-by: Patrick Assuied --- ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py | 6 +----- .../dapr/ext/workflow/aio/dapr_workflow_client.py | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py index 378b4fdd..74b1e789 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py @@ -1,11 +1,7 @@ # -*- coding: utf-8 -*- -from .dapr_workflow_client import DaprWorkflowClientAsync - -# Public alias to mirror sync naming under aio namespace -DaprWorkflowClient = DaprWorkflowClientAsync +from .dapr_workflow_client import DaprWorkflowClient __all__ = [ - 'DaprWorkflowClientAsync', 'DaprWorkflowClient', ] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py index 03b03eec..879b1de4 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py @@ -36,7 +36,7 @@ TOutput = TypeVar('TOutput') -class DaprWorkflowClientAsync: +class DaprWorkflowClient: """Async client for managing Dapr Workflow instances. This uses a gRPC async connection to send commands directly to the workflow engine, @@ -56,7 +56,7 @@ def __init__( except ValueError as error: raise DaprInternalError(f'{error}') from error - self._logger = Logger('DaprWorkflowClientAsync', logger_options) + self._logger = Logger('DaprWorkflowClient', logger_options) metadata = tuple() if settings.DAPR_API_TOKEN: From 13f66a31373892ec96d4da7518504235a8869b3e Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Fri, 21 Nov 2025 18:48:42 -0800 Subject: [PATCH 04/10] DRY Signed-off-by: Patrick Assuied --- .../dapr/ext/workflow/aio/dapr_workflow_client.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py index 879b1de4..cd5e632f 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py @@ -96,16 +96,13 @@ async def schedule_new_workflow( Returns: The ID of the scheduled workflow instance. """ - if hasattr(workflow, '_dapr_alternate_name'): - return await self.__obj.schedule_new_orchestration( - workflow.__dict__['_dapr_alternate_name'], - input=input, - instance_id=instance_id, - start_at=start_at, - reuse_id_policy=reuse_id_policy, - ) + workflow_name = ( + workflow.__dict__['_dapr_alternate_name'] + if hasattr(workflow, '_dapr_alternate_name') + else workflow.__name__ + ) return await self.__obj.schedule_new_orchestration( - workflow.__name__, + workflow_name, input=input, instance_id=instance_id, start_at=start_at, From 47edb1feec605e1c168cc20f982a6070da495d8c Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Wed, 26 Nov 2025 09:48:14 -0800 Subject: [PATCH 05/10] Added example for using the async workflow client adapted from the simple example. Signed-off-by: Patrick Assuied --- examples/workflow/README.md | 66 ++++++++++ examples/workflow/simple_aio_client.py | 173 +++++++++++++++++++++++++ 2 files changed, 239 insertions(+) create mode 100644 examples/workflow/simple_aio_client.py diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 2e09eeef..ac70cfa8 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -85,6 +85,72 @@ The output of this example should look like this: - "== APP == Workflow completed! Result: Completed" ``` +### Simple Workflow with async workflow client +This example represents a workflow that manages counters through a series of activities and child workflows. It features using the async workflow client. +It shows several Dapr Workflow features including: +- Basic activity execution with counter increments +- Retryable activities with configurable retry policies +- Child workflow orchestration with retry logic +- External event handling with timeouts +- Workflow state management (pause/resume) +- Activity error handling and retry backoff +- Global state tracking across workflow components +- Workflow lifecycle management (start, pause, resume, purge) + + + +```sh +dapr run --app-id wf-simple-aio-example -- python3 simple_aio_client.py +``` + + +The output of this example should look like this: + +``` + - "== APP == Hi Counter!" + - "== APP == New counter value is: 1!" + - "== APP == New counter value is: 11!" + - "== APP == Retry count value is: 0!" + - "== APP == Retry count value is: 1! This print statement verifies retry" + - "== APP == Appending 1 to child_orchestrator_string!" + - "== APP == Appending a to child_orchestrator_string!" + - "== APP == Appending a to child_orchestrator_string!" + - "== APP == Appending 2 to child_orchestrator_string!" + - "== APP == Appending b to child_orchestrator_string!" + - "== APP == Appending b to child_orchestrator_string!" + - "== APP == Appending 3 to child_orchestrator_string!" + - "== APP == Appending c to child_orchestrator_string!" + - "== APP == Appending c to child_orchestrator_string!" + - "== APP == Get response from hello_world_wf after pause call: SUSPENDED" + - "== APP == Get response from hello_world_wf after resume call: RUNNING" + - "== APP == New counter value is: 111!" + - "== APP == New counter value is: 1111!" + - "== APP == Workflow completed! Result: Completed" +``` + ### Task Chaining This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command: diff --git a/examples/workflow/simple_aio_client.py b/examples/workflow/simple_aio_client.py new file mode 100644 index 00000000..43301b28 --- /dev/null +++ b/examples/workflow/simple_aio_client.py @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from datetime import timedelta +from time import sleep + +from dapr.ext.workflow import ( + DaprWorkflowContext, + RetryPolicy, + WorkflowActivityContext, + WorkflowRuntime, + when_any, +) +from dapr.ext.workflow.aio import DaprWorkflowClient + +from dapr.clients.exceptions import DaprInternalError +from dapr.conf import Settings + +settings = Settings() + +counter = 0 +retry_count = 0 +child_orchestrator_count = 0 +child_orchestrator_string = '' +child_act_retry_count = 0 +instance_id = 'exampleInstanceID' +child_instance_id = 'childInstanceID' +workflow_name = 'hello_world_wf' +child_workflow_name = 'child_wf' +input_data = 'Hi Counter!' +event_name = 'event1' +event_data = 'eventData' +non_existent_id_error = 'no such instance exists' + +retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + backoff_coefficient=2, + max_retry_interval=timedelta(seconds=10), + retry_timeout=timedelta(seconds=100), +) + +wfr = WorkflowRuntime() + + +@wfr.workflow(name='hello_world_wf') +def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + print(f'{wf_input}') + yield ctx.call_activity(hello_act, input=1) + yield ctx.call_activity(hello_act, input=10) + yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy) + + # Change in event handling: Use when_any to handle both event and timeout + event = ctx.wait_for_external_event(event_name) + timeout = ctx.create_timer(timedelta(seconds=30)) + winner = yield when_any([event, timeout]) + + if winner == timeout: + print('Workflow timed out waiting for event') + return 'Timeout' + + yield ctx.call_activity(hello_act, input=100) + yield ctx.call_activity(hello_act, input=1000) + return 'Completed' + + +@wfr.activity(name='hello_act') +def hello_act(ctx: WorkflowActivityContext, wf_input): + global counter + counter += wf_input + print(f'New counter value is: {counter}!', flush=True) + + +@wfr.activity(name='hello_retryable_act') +def hello_retryable_act(ctx: WorkflowActivityContext): + global retry_count + if (retry_count % 2) == 0: + print(f'Retry count value is: {retry_count}!', flush=True) + retry_count += 1 + raise ValueError('Retryable Error') + print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True) + retry_count += 1 + + +@wfr.workflow(name='child_retryable_wf') +def child_retryable_wf(ctx: DaprWorkflowContext): + global child_orchestrator_string, child_orchestrator_count + if not ctx.is_replaying: + child_orchestrator_count += 1 + print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True) + child_orchestrator_string += str(child_orchestrator_count) + yield ctx.call_activity( + act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy + ) + if child_orchestrator_count < 3: + raise ValueError('Retryable Error') + + +@wfr.activity(name='act_for_child_wf') +def act_for_child_wf(ctx: WorkflowActivityContext, inp): + global child_orchestrator_string, child_act_retry_count + inp_char = chr(96 + inp) + print(f'Appending {inp_char} to child_orchestrator_string!', flush=True) + child_orchestrator_string += inp_char + if child_act_retry_count % 2 == 0: + child_act_retry_count += 1 + raise ValueError('Retryable Error') + child_act_retry_count += 1 + + +async def main(): + wfr.start() + wf_client = DaprWorkflowClient() + + print('==========Start Counter Increase as per Input:==========') + await wf_client.schedule_new_workflow( + workflow=hello_world_wf, input=input_data, instance_id=instance_id + ) + + await wf_client.wait_for_workflow_start(instance_id) + + # Sleep to let the workflow run initial activities + await asyncio.sleep(12) + + assert counter == 11 + assert retry_count == 2 + assert child_orchestrator_string == '1aa2bb3cc' + + # Pause Test + await wf_client.pause_workflow(instance_id=instance_id) + metadata = await wf_client.get_workflow_state(instance_id=instance_id) + print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}') + + # Resume Test + await wf_client.resume_workflow(instance_id=instance_id) + metadata =await wf_client.get_workflow_state(instance_id=instance_id) + print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}') + + await asyncio.sleep(2) # Give the workflow time to reach the event wait state + await wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data) + + print('========= Waiting for Workflow completion', flush=True) + try: + state = await wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + if state.runtime_status.name == 'COMPLETED': + print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"'))) + else: + print(f'Workflow failed! Status: {state.runtime_status.name}') + except TimeoutError: + print('*** Workflow timed out!') + + await wf_client.purge_workflow(instance_id=instance_id) + try: + await wf_client.get_workflow_state(instance_id=instance_id) + except DaprInternalError as err: + if non_existent_id_error in err._message: + print('Instance Successfully Purged') + + wfr.shutdown() + + +if __name__ == '__main__': + asyncio.run(main()) From 754619016ae8f1307f430a8256c3b6cf6c430a85 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Wed, 26 Nov 2025 10:28:33 -0800 Subject: [PATCH 06/10] ruff'ed Signed-off-by: Patrick Assuied --- examples/workflow/simple_aio_client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/workflow/simple_aio_client.py b/examples/workflow/simple_aio_client.py index 43301b28..2a279c45 100644 --- a/examples/workflow/simple_aio_client.py +++ b/examples/workflow/simple_aio_client.py @@ -11,7 +11,6 @@ # limitations under the License. import asyncio from datetime import timedelta -from time import sleep from dapr.ext.workflow import ( DaprWorkflowContext, @@ -143,11 +142,13 @@ async def main(): # Resume Test await wf_client.resume_workflow(instance_id=instance_id) - metadata =await wf_client.get_workflow_state(instance_id=instance_id) + metadata = await wf_client.get_workflow_state(instance_id=instance_id) print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}') await asyncio.sleep(2) # Give the workflow time to reach the event wait state - await wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data) + await wf_client.raise_workflow_event( + instance_id=instance_id, event_name=event_name, data=event_data + ) print('========= Waiting for Workflow completion', flush=True) try: From ed50e9158467391737021908e99fc2adecaf612b Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Wed, 26 Nov 2025 12:29:11 -0800 Subject: [PATCH 07/10] PR feedback Signed-off-by: Patrick Assuied --- examples/workflow/simple_aio_client.py | 93 +++++++++++++------------- 1 file changed, 47 insertions(+), 46 deletions(-) diff --git a/examples/workflow/simple_aio_client.py b/examples/workflow/simple_aio_client.py index 2a279c45..087da6e5 100644 --- a/examples/workflow/simple_aio_client.py +++ b/examples/workflow/simple_aio_client.py @@ -121,53 +121,54 @@ async def main(): wfr.start() wf_client = DaprWorkflowClient() - print('==========Start Counter Increase as per Input:==========') - await wf_client.schedule_new_workflow( - workflow=hello_world_wf, input=input_data, instance_id=instance_id - ) - - await wf_client.wait_for_workflow_start(instance_id) - - # Sleep to let the workflow run initial activities - await asyncio.sleep(12) - - assert counter == 11 - assert retry_count == 2 - assert child_orchestrator_string == '1aa2bb3cc' - - # Pause Test - await wf_client.pause_workflow(instance_id=instance_id) - metadata = await wf_client.get_workflow_state(instance_id=instance_id) - print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}') - - # Resume Test - await wf_client.resume_workflow(instance_id=instance_id) - metadata = await wf_client.get_workflow_state(instance_id=instance_id) - print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}') - - await asyncio.sleep(2) # Give the workflow time to reach the event wait state - await wf_client.raise_workflow_event( - instance_id=instance_id, event_name=event_name, data=event_data - ) - - print('========= Waiting for Workflow completion', flush=True) try: - state = await wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) - if state.runtime_status.name == 'COMPLETED': - print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"'))) - else: - print(f'Workflow failed! Status: {state.runtime_status.name}') - except TimeoutError: - print('*** Workflow timed out!') - - await wf_client.purge_workflow(instance_id=instance_id) - try: - await wf_client.get_workflow_state(instance_id=instance_id) - except DaprInternalError as err: - if non_existent_id_error in err._message: - print('Instance Successfully Purged') - - wfr.shutdown() + print('==========Start Counter Increase as per Input:==========') + await wf_client.schedule_new_workflow( + workflow=hello_world_wf, input=input_data, instance_id=instance_id + ) + + await wf_client.wait_for_workflow_start(instance_id) + + # Sleep to let the workflow run initial activities + await asyncio.sleep(12) + + assert counter == 11 + assert retry_count == 2 + assert child_orchestrator_string == '1aa2bb3cc' + + # Pause Test + await wf_client.pause_workflow(instance_id=instance_id) + metadata = await wf_client.get_workflow_state(instance_id=instance_id) + print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}') + + # Resume Test + await wf_client.resume_workflow(instance_id=instance_id) + metadata = await wf_client.get_workflow_state(instance_id=instance_id) + print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}') + + await asyncio.sleep(2) # Give the workflow time to reach the event wait state + await wf_client.raise_workflow_event( + instance_id=instance_id, event_name=event_name, data=event_data + ) + + print('========= Waiting for Workflow completion', flush=True) + try: + state = await wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + if state.runtime_status.name == 'COMPLETED': + print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"'))) + else: + print(f'Workflow failed! Status: {state.runtime_status.name}') + except TimeoutError: + print('*** Workflow timed out!') + + await wf_client.purge_workflow(instance_id=instance_id) + try: + await wf_client.get_workflow_state(instance_id=instance_id) + except DaprInternalError as err: + if non_existent_id_error in err._message: + print('Instance Successfully Purged') + finally: + wfr.shutdown() if __name__ == '__main__': From 4dc8df144c5ae92e0ce3b3f169d418053464adee Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Wed, 26 Nov 2025 12:42:31 -0800 Subject: [PATCH 08/10] added header Signed-off-by: Patrick Assuied --- .../dapr/ext/workflow/aio/__init__.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py index 74b1e789..ceb8672b 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py @@ -1,5 +1,18 @@ # -*- coding: utf-8 -*- +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + from .dapr_workflow_client import DaprWorkflowClient __all__ = [ From 0cdfaaa6096dbc99a214a531c6a554f264f34372 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Mon, 1 Dec 2025 09:15:41 -0800 Subject: [PATCH 09/10] lint Signed-off-by: Patrick Assuied --- examples/workflow/simple_aio_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/workflow/simple_aio_client.py b/examples/workflow/simple_aio_client.py index 087da6e5..fd93a501 100644 --- a/examples/workflow/simple_aio_client.py +++ b/examples/workflow/simple_aio_client.py @@ -144,7 +144,9 @@ async def main(): # Resume Test await wf_client.resume_workflow(instance_id=instance_id) metadata = await wf_client.get_workflow_state(instance_id=instance_id) - print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}') + print( + f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}' + ) await asyncio.sleep(2) # Give the workflow time to reach the event wait state await wf_client.raise_workflow_event( From e40b35986f4870d9e67696fbd315ad3ab39171c2 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Mon, 1 Dec 2025 23:33:31 -0800 Subject: [PATCH 10/10] Remove 'STEP' directives installing server version of package instead of local version (therefore overriding local changes) Signed-off-by: Patrick Assuied --- examples/demo_actor/README.md | 6 ------ examples/demo_workflow/README.md | 6 ------ 2 files changed, 12 deletions(-) diff --git a/examples/demo_actor/README.md b/examples/demo_actor/README.md index f1b1bbe2..6353a6e0 100644 --- a/examples/demo_actor/README.md +++ b/examples/demo_actor/README.md @@ -17,16 +17,10 @@ This document describes how to create an Actor(DemoActor) and invoke its methods You can install dapr SDK package using pip command: - - ```sh pip3 install -r demo_actor/requirements.txt ``` - - ## Run in self-hosted mode - ```sh pip3 install -r demo_workflow/requirements.txt ``` - -