Skip to content

Commit

Permalink
[Workflow] More workflow SDK examples (#575)
Browse files Browse the repository at this point in the history
* [Workflow] More workflow SDK examples

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

* Move examples to live under workflow SDK directory

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

* PR feedback: Move samples back to where they were before

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

---------

Signed-off-by: Chris Gillum <cgillum@microsoft.com>
Co-authored-by: Bernd Verst <github@bernd.dev>
  • Loading branch information
cgillum and berndverst authored Oct 2, 2023
1 parent 306f639 commit 8174667
Show file tree
Hide file tree
Showing 6 changed files with 475 additions and 0 deletions.
145 changes: 145 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Workflow Examples

This directory contains examples of using the [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/) extension. You can find additional information about these examples in the [Dapr Workflow Application Patterns docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-patterns#tabs-0-python).

## Prerequisites

- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
- [Install Python 3.8+](https://www.python.org/downloads/)

### Install requirements

You can install dapr SDK package using pip command:

```sh
pip3 install -r requirements.txt
```

## Running the samples

Each of the examples in this directory can be run directly from the command line.

### Task Chaining

This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command:

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py
```

The output of this example should look like this:

```
== APP == Workflow started. Instance ID: b716208586c24829806b44b62816b598
== APP == Step 1: Received input: 42.
== APP == Step 2: Received input: 43.
== APP == Step 3: Received input: 86.
== APP == Workflow completed! Status: WorkflowStatus.COMPLETED
```

### Fan-out/Fan-in

This example demonstrates how to fan-out a workflow into multiple parallel tasks, and then fan-in the results of those tasks. You can run this sample using the following command:

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py
```

The output of this sample should look like this:

```
== APP == Workflow started. Instance ID: 2e656befbb304e758776e30642b75944
== APP == Processing work item: 1.
== APP == Processing work item: 2.
== APP == Processing work item: 3.
== APP == Processing work item: 4.
== APP == Processing work item: 5.
== APP == Processing work item: 6.
== APP == Processing work item: 7.
== APP == Processing work item: 8.
== APP == Processing work item: 9.
== APP == Processing work item: 10.
== APP == Work item 1 processed. Result: 2.
== APP == Work item 2 processed. Result: 4.
== APP == Work item 3 processed. Result: 6.
== APP == Work item 4 processed. Result: 8.
== APP == Work item 5 processed. Result: 10.
== APP == Work item 6 processed. Result: 12.
== APP == Work item 7 processed. Result: 14.
== APP == Work item 8 processed. Result: 16.
== APP == Work item 9 processed. Result: 18.
== APP == Work item 10 processed. Result: 20.
== APP == Final result: 110.
```

Note that the ordering of the work-items is non-deterministic since they are all running in parallel.

### Human Interaction

This example demonstrates how to use a workflow to interact with a human user. This example requires input from the user, so you'll need to have a separate command for the Dapr CLI and the Python app.

The Dapr CLI can be started using the following command:

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001
```

In a separate terminal window, run the following command to start the Python workflow app:

```sh
python3 human_approval.py
```

When you run the example, you will see output as well as a prompt like this:

```
*** Requesting approval from user for order: namespace(cost=2000, product='MyProduct', quantity=1)
Press [ENTER] to approve the order...
```

Press the `ENTER` key to continue the workflow. If `ENTER` is pressed before the hardcoded timeout expires, then the following output will be displayed:

```
*** Placing order: namespace(cost=2000, product='MyProduct', quantity=1)
Workflow completed! Result: "Approved by 'Me'"
```

However, if the timeout expires before `ENTER` is pressed, then the following output will be displayed:

```
*** Workflow timed out!
```

### Monitor

This example demonstrates how to eternally running workflow that polls an endpoint to detect service health events. This example requires input from the user, so you'll need to have a separate command for the Dapr CLI and the Python app.

The Dapr CLI can be started using the following command:

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001
```

In a separate terminal window, run the following command to start the Python workflow app:

```sh
python3 monitor.py
```

The workflow runs forever, or until the app is stopped. While it is running, it will periodically report information about whether a "job" is healthy or unhealthy. After several minutes, the output of this workflow will look something like this (note that the healthy and unhealthy message ordering is completely random):

```
Press Enter to stop...
Job 'job1' is healthy.
Job 'job1' is healthy.
Job 'job1' is unhealthy.
*** Alert: Job 'job1' is unhealthy!
Job 'job1' is healthy.
Job 'job1' is healthy.
Job 'job1' is healthy.
Job 'job1' is unhealthy.
*** Alert: Job 'job1' is unhealthy!
Job 'job1' is unhealthy.
```

This workflow runs forever or until you press `ENTER` to stop it. Starting the app again after stopping it will cause the same workflow instance to resume where it left off.
62 changes: 62 additions & 0 deletions examples/workflow/fan_out_fan_in.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
# Copyright 2023 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from typing import List
import dapr.ext.workflow as wf


def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
# get a batch of N work items to process in parallel
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)

# schedule N parallel tasks to process the work items and wait for all to complete
parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch]
outputs = yield wf.when_all(parallel_tasks)

# aggregate the results and send them to another activity
total = sum(outputs)
yield ctx.call_activity(process_results, input=total)


def get_work_batch(ctx, batch_size: int) -> List[int]:
return [i + 1 for i in range(batch_size)]


def process_work_item(ctx, work_item: int) -> int:
print(f'Processing work item: {work_item}.')
time.sleep(5)
result = work_item * 2
print(f'Work item {work_item} processed. Result: {result}.')
return result


def process_results(ctx, final_result: int):
print(f'Final result: {final_result}.')


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(batch_processing_workflow)
workflowRuntime.register_activity(get_work_batch)
workflowRuntime.register_activity(process_work_item)
workflowRuntime.register_activity(process_results)
workflowRuntime.start()

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
workflow=batch_processing_workflow,
input=10)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)

workflowRuntime.shutdown()
122 changes: 122 additions & 0 deletions examples/workflow/human_approval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# -*- coding: utf-8 -*-
# Copyright 2023 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
from dataclasses import asdict, dataclass
from datetime import timedelta
import time

from dapr.clients import DaprClient
import dapr.ext.workflow as wf


@dataclass
class Order:
cost: float
product: str
quantity: int

def __str__(self):
return f'{self.product} ({self.quantity})'


@dataclass
class Approval:
approver: str

@staticmethod
def from_dict(dict):
return Approval(**dict)


def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
# Orders under $1000 are auto-approved
if order.cost < 1000:
return "Auto-approved"

# Orders of $1000 or more require manager approval
yield ctx.call_activity(send_approval_request, input=order)

# Approvals must be received within 24 hours or they will be canceled.
approval_event = ctx.wait_for_external_event("approval_received")
timeout_event = ctx.create_timer(timedelta(hours=24))
winner = yield wf.when_any([approval_event, timeout_event])
if winner == timeout_event:
return "Cancelled"

# The order was approved
yield ctx.call_activity(place_order, input=order)
approval_details = Approval.from_dict(approval_event.get_result())
return f"Approved by '{approval_details.approver}'"


def send_approval_request(_, order: Order) -> None:
print(f'*** Requesting approval from user for order: {order}')


def place_order(_, order: Order) -> None:
print(f'*** Placing order: {order}')


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(description="Order purchasing workflow demo.")
parser.add_argument("--cost", type=int, default=2000, help="Cost of the order")
parser.add_argument("--approver", type=str, default="Me", help="Approver name")
parser.add_argument("--timeout", type=int, default=60, help="Timeout in seconds")
args = parser.parse_args()

# configure and start the workflow runtime
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(purchase_order_workflow)
workflowRuntime.register_activity(send_approval_request)
workflowRuntime.register_activity(place_order)
workflowRuntime.start()

# Start a purchase order workflow using the user input
order = Order(args.cost, "MyProduct", 1)

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
workflow=purchase_order_workflow,
input=order)

def prompt_for_approval():
# Give the workflow time to start up and notify the user
time.sleep(2)
input("Press [ENTER] to approve the order...\n")
with DaprClient() as d:
d.raise_workflow_event(
instance_id=instance_id,
workflow_component="dapr",
event_name="approval_received",
event_data=asdict(Approval(args.approver)))

# Prompt the user for approval on a background thread
threading.Thread(target=prompt_for_approval, daemon=True).start()

# Wait for the orchestration to complete
try:
state = wf_client.wait_for_workflow_completion(
instance_id,
timeout_in_seconds=args.timeout + 2)
if not state:
print("Workflow not found!") # not expected
elif state.runtime_status.name == 'COMPLETED':
print(f'Workflow completed! Result: {state.serialized_output}')
else:
print(f'Workflow failed! Status: {state.runtime_status.name}') # not expected
except TimeoutError:
print("*** Workflow timed out!")

workflowRuntime.shutdown()
Loading

0 comments on commit 8174667

Please sign in to comment.