diff --git a/examples/workflow/README.md b/examples/workflow/README.md new file mode 100644 index 00000000..c97d4562 --- /dev/null +++ b/examples/workflow/README.md @@ -0,0 +1,145 @@ +# Workflow Examples + +This directory contains examples of using the [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/) extension. You can find additional information about these examples in the [Dapr Workflow Application Patterns docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-patterns#tabs-0-python). + +## Prerequisites + +- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started) +- [Install Python 3.8+](https://www.python.org/downloads/) + +### Install requirements + +You can install dapr SDK package using pip command: + +```sh +pip3 install -r requirements.txt +``` + +## Running the samples + +Each of the examples in this directory can be run directly from the command line. + +### Task Chaining + +This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command: + +```sh +dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py +``` + +The output of this example should look like this: + +``` +== APP == Workflow started. Instance ID: b716208586c24829806b44b62816b598 +== APP == Step 1: Received input: 42. +== APP == Step 2: Received input: 43. +== APP == Step 3: Received input: 86. +== APP == Workflow completed! Status: WorkflowStatus.COMPLETED +``` + +### Fan-out/Fan-in + +This example demonstrates how to fan-out a workflow into multiple parallel tasks, and then fan-in the results of those tasks. You can run this sample using the following command: + +```sh +dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py +``` + +The output of this sample should look like this: + +``` +== APP == Workflow started. Instance ID: 2e656befbb304e758776e30642b75944 +== APP == Processing work item: 1. +== APP == Processing work item: 2. +== APP == Processing work item: 3. +== APP == Processing work item: 4. +== APP == Processing work item: 5. +== APP == Processing work item: 6. +== APP == Processing work item: 7. +== APP == Processing work item: 8. +== APP == Processing work item: 9. +== APP == Processing work item: 10. +== APP == Work item 1 processed. Result: 2. +== APP == Work item 2 processed. Result: 4. +== APP == Work item 3 processed. Result: 6. +== APP == Work item 4 processed. Result: 8. +== APP == Work item 5 processed. Result: 10. +== APP == Work item 6 processed. Result: 12. +== APP == Work item 7 processed. Result: 14. +== APP == Work item 8 processed. Result: 16. +== APP == Work item 9 processed. Result: 18. +== APP == Work item 10 processed. Result: 20. +== APP == Final result: 110. +``` + +Note that the ordering of the work-items is non-deterministic since they are all running in parallel. + +### Human Interaction + +This example demonstrates how to use a workflow to interact with a human user. This example requires input from the user, so you'll need to have a separate command for the Dapr CLI and the Python app. + +The Dapr CLI can be started using the following command: + +```sh +dapr run --app-id wfexample --dapr-grpc-port 50001 +``` + +In a separate terminal window, run the following command to start the Python workflow app: + +```sh + python3 human_approval.py + ``` + +When you run the example, you will see output as well as a prompt like this: + +``` +*** Requesting approval from user for order: namespace(cost=2000, product='MyProduct', quantity=1) +Press [ENTER] to approve the order... +``` + +Press the `ENTER` key to continue the workflow. If `ENTER` is pressed before the hardcoded timeout expires, then the following output will be displayed: + +``` +*** Placing order: namespace(cost=2000, product='MyProduct', quantity=1) +Workflow completed! Result: "Approved by 'Me'" +``` + +However, if the timeout expires before `ENTER` is pressed, then the following output will be displayed: + +``` +*** Workflow timed out! +``` + +### Monitor + +This example demonstrates how to eternally running workflow that polls an endpoint to detect service health events. This example requires input from the user, so you'll need to have a separate command for the Dapr CLI and the Python app. + +The Dapr CLI can be started using the following command: + +```sh +dapr run --app-id wfexample --dapr-grpc-port 50001 +``` + +In a separate terminal window, run the following command to start the Python workflow app: + +```sh +python3 monitor.py +``` + +The workflow runs forever, or until the app is stopped. While it is running, it will periodically report information about whether a "job" is healthy or unhealthy. After several minutes, the output of this workflow will look something like this (note that the healthy and unhealthy message ordering is completely random): + +``` +Press Enter to stop... +Job 'job1' is healthy. +Job 'job1' is healthy. +Job 'job1' is unhealthy. +*** Alert: Job 'job1' is unhealthy! +Job 'job1' is healthy. +Job 'job1' is healthy. +Job 'job1' is healthy. +Job 'job1' is unhealthy. +*** Alert: Job 'job1' is unhealthy! +Job 'job1' is unhealthy. +``` + +This workflow runs forever or until you press `ENTER` to stop it. Starting the app again after stopping it will cause the same workflow instance to resume where it left off. diff --git a/examples/workflow/fan_out_fan_in.py b/examples/workflow/fan_out_fan_in.py new file mode 100644 index 00000000..458737c6 --- /dev/null +++ b/examples/workflow/fan_out_fan_in.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from typing import List +import dapr.ext.workflow as wf + + +def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): + # get a batch of N work items to process in parallel + work_batch = yield ctx.call_activity(get_work_batch, input=wf_input) + + # schedule N parallel tasks to process the work items and wait for all to complete + parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch] + outputs = yield wf.when_all(parallel_tasks) + + # aggregate the results and send them to another activity + total = sum(outputs) + yield ctx.call_activity(process_results, input=total) + + +def get_work_batch(ctx, batch_size: int) -> List[int]: + return [i + 1 for i in range(batch_size)] + + +def process_work_item(ctx, work_item: int) -> int: + print(f'Processing work item: {work_item}.') + time.sleep(5) + result = work_item * 2 + print(f'Work item {work_item} processed. Result: {result}.') + return result + + +def process_results(ctx, final_result: int): + print(f'Final result: {final_result}.') + + +if __name__ == '__main__': + workflowRuntime = wf.WorkflowRuntime("localhost", "50001") + workflowRuntime.register_workflow(batch_processing_workflow) + workflowRuntime.register_activity(get_work_batch) + workflowRuntime.register_activity(process_work_item) + workflowRuntime.register_activity(process_results) + workflowRuntime.start() + + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow( + workflow=batch_processing_workflow, + input=10) + print(f'Workflow started. Instance ID: {instance_id}') + state = wf_client.wait_for_workflow_completion(instance_id) + + workflowRuntime.shutdown() diff --git a/examples/workflow/human_approval.py b/examples/workflow/human_approval.py new file mode 100644 index 00000000..78662dc5 --- /dev/null +++ b/examples/workflow/human_approval.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +from dataclasses import asdict, dataclass +from datetime import timedelta +import time + +from dapr.clients import DaprClient +import dapr.ext.workflow as wf + + +@dataclass +class Order: + cost: float + product: str + quantity: int + + def __str__(self): + return f'{self.product} ({self.quantity})' + + +@dataclass +class Approval: + approver: str + + @staticmethod + def from_dict(dict): + return Approval(**dict) + + +def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order): + # Orders under $1000 are auto-approved + if order.cost < 1000: + return "Auto-approved" + + # Orders of $1000 or more require manager approval + yield ctx.call_activity(send_approval_request, input=order) + + # Approvals must be received within 24 hours or they will be canceled. + approval_event = ctx.wait_for_external_event("approval_received") + timeout_event = ctx.create_timer(timedelta(hours=24)) + winner = yield wf.when_any([approval_event, timeout_event]) + if winner == timeout_event: + return "Cancelled" + + # The order was approved + yield ctx.call_activity(place_order, input=order) + approval_details = Approval.from_dict(approval_event.get_result()) + return f"Approved by '{approval_details.approver}'" + + +def send_approval_request(_, order: Order) -> None: + print(f'*** Requesting approval from user for order: {order}') + + +def place_order(_, order: Order) -> None: + print(f'*** Placing order: {order}') + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Order purchasing workflow demo.") + parser.add_argument("--cost", type=int, default=2000, help="Cost of the order") + parser.add_argument("--approver", type=str, default="Me", help="Approver name") + parser.add_argument("--timeout", type=int, default=60, help="Timeout in seconds") + args = parser.parse_args() + + # configure and start the workflow runtime + workflowRuntime = wf.WorkflowRuntime("localhost", "50001") + workflowRuntime.register_workflow(purchase_order_workflow) + workflowRuntime.register_activity(send_approval_request) + workflowRuntime.register_activity(place_order) + workflowRuntime.start() + + # Start a purchase order workflow using the user input + order = Order(args.cost, "MyProduct", 1) + + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow( + workflow=purchase_order_workflow, + input=order) + + def prompt_for_approval(): + # Give the workflow time to start up and notify the user + time.sleep(2) + input("Press [ENTER] to approve the order...\n") + with DaprClient() as d: + d.raise_workflow_event( + instance_id=instance_id, + workflow_component="dapr", + event_name="approval_received", + event_data=asdict(Approval(args.approver))) + + # Prompt the user for approval on a background thread + threading.Thread(target=prompt_for_approval, daemon=True).start() + + # Wait for the orchestration to complete + try: + state = wf_client.wait_for_workflow_completion( + instance_id, + timeout_in_seconds=args.timeout + 2) + if not state: + print("Workflow not found!") # not expected + elif state.runtime_status.name == 'COMPLETED': + print(f'Workflow completed! Result: {state.serialized_output}') + else: + print(f'Workflow failed! Status: {state.runtime_status.name}') # not expected + except TimeoutError: + print("*** Workflow timed out!") + + workflowRuntime.shutdown() diff --git a/examples/workflow/monitor.py b/examples/workflow/monitor.py new file mode 100644 index 00000000..ff80d5d4 --- /dev/null +++ b/examples/workflow/monitor.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from datetime import timedelta +import random +import dapr.ext.workflow as wf + + +@dataclass +class JobStatus: + job_id: str + is_healthy: bool + + +def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus): + # poll a status endpoint associated with this job + status = yield ctx.call_activity(check_status, input=job) + if not ctx.is_replaying: + print(f"Job '{job.job_id}' is {status}.") + + if status == "healthy": + job.is_healthy = True + next_sleep_interval = 60 # check less frequently when healthy + else: + if job.is_healthy: + job.is_healthy = False + ctx.call_activity(send_alert, input=f"Job '{job.job_id}' is unhealthy!") + next_sleep_interval = 5 # check more frequently when unhealthy + + yield ctx.create_timer(fire_at=ctx.current_utc_datetime + timedelta(seconds=next_sleep_interval)) + + # restart from the beginning with a new JobStatus input + ctx.continue_as_new(job) + + +def check_status(ctx, _) -> str: + return random.choice(["healthy", "unhealthy"]) + + +def send_alert(ctx, message: str): + print(f'*** Alert: {message}') + + +if __name__ == '__main__': + workflowRuntime = wf.WorkflowRuntime("localhost", "50001") + workflowRuntime.register_workflow(status_monitor_workflow) + workflowRuntime.register_activity(check_status) + workflowRuntime.register_activity(send_alert) + workflowRuntime.start() + + wf_client = wf.DaprWorkflowClient() + job_id = "job1" + status = None + try: + status = wf_client.get_workflow_state(job_id) + except Exception: + pass + if not status or status.runtime_status.name != 'RUNNING': + instance_id = wf_client.schedule_new_workflow( + workflow=status_monitor_workflow, + input=JobStatus(job_id=job_id, is_healthy=True), + instance_id=job_id) + print(f'Workflow started. Instance ID: {instance_id}') + else: + print(f'Workflow already running. Instance ID: {job_id}') + + input("Press Enter to stop...\n") + workflowRuntime.shutdown() diff --git a/examples/workflow/requirements.txt b/examples/workflow/requirements.txt new file mode 100644 index 00000000..b7f8da74 --- /dev/null +++ b/examples/workflow/requirements.txt @@ -0,0 +1 @@ +dapr-ext-workflow>=0.1.0 diff --git a/examples/workflow/task_chaining.py b/examples/workflow/task_chaining.py new file mode 100644 index 00000000..767a1d9d --- /dev/null +++ b/examples/workflow/task_chaining.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dapr.ext.workflow as wf + + +def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): + try: + result1 = yield ctx.call_activity(step1, input=wf_input) + result2 = yield ctx.call_activity(step2, input=result1) + result3 = yield ctx.call_activity(step3, input=result2) + except Exception as e: + yield ctx.call_activity(error_handler, input=str(e)) + raise + return [result1, result2, result3] + + +def step1(ctx, activity_input): + print(f'Step 1: Received input: {activity_input}.') + # Do some work + return activity_input + 1 + + +def step2(ctx, activity_input): + print(f'Step 2: Received input: {activity_input}.') + # Do some work + return activity_input * 2 + + +def step3(ctx, activity_input): + print(f'Step 3: Received input: {activity_input}.') + # Do some work + return activity_input ^ 2 + + +def error_handler(ctx, error): + print(f'Executing error handler: {error}.') + # Do some compensating work + + +if __name__ == '__main__': + workflowRuntime = wf.WorkflowRuntime("localhost", "50001") + workflowRuntime.register_workflow(task_chain_workflow) + workflowRuntime.register_activity(step1) + workflowRuntime.register_activity(step2) + workflowRuntime.register_activity(step3) + workflowRuntime.register_activity(error_handler) + workflowRuntime.start() + + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow( + workflow=task_chain_workflow, + input=42) + print(f'Workflow started. Instance ID: {instance_id}') + state = wf_client.wait_for_workflow_completion(instance_id) + print(f'Workflow completed! Status: {state.runtime_status}') + + workflowRuntime.shutdown()