Skip to content

Commit c32f421

Browse files
authored
refactor: update python workflow example with daprWorkflowClient (#1101)
Signed-off-by: Eileen Yu <eileenylj@gmail.com>
1 parent 9e9cb5e commit c32f421

File tree

3 files changed

+48
-47
lines changed

3 files changed

+48
-47
lines changed

workflows/python/sdk/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ cd ..
3434
name: Running this example
3535
expected_stdout_lines:
3636
- "There are now 90 cars left in stock"
37-
- "Workflow completed! Result: Completed"
37+
- "Workflow completed!"
3838
output_match_mode: substring
3939
background: true
4040
timeout_seconds: 120

workflows/python/sdk/order-processor/app.py

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,54 +4,48 @@
44

55
from dapr.clients import DaprClient
66
from dapr.conf import settings
7-
from dapr.ext.workflow import WorkflowRuntime
7+
from dapr.ext.workflow import DaprWorkflowClient, WorkflowStatus
88

9-
from workflow import order_processing_workflow, notify_activity, process_payment_activity, \
10-
verify_inventory_activity, update_inventory_activity, requst_approval_activity
9+
from workflow import wfr, order_processing_workflow
1110
from model import InventoryItem, OrderPayload
1211

1312
store_name = "statestore"
14-
workflow_component = "dapr"
1513
workflow_name = "order_processing_workflow"
1614
default_item_name = "cars"
1715

1816
class WorkflowConsoleApp:
1917
def main(self):
2018
print("*** Welcome to the Dapr Workflow console app sample!", flush=True)
2119
print("*** Using this app, you can place orders that start workflows.", flush=True)
20+
21+
wfr.start()
2222
# Wait for the sidecar to become available
2323
sleep(5)
2424

25-
workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT)
26-
workflowRuntime.register_workflow(order_processing_workflow)
27-
workflowRuntime.register_activity(notify_activity)
28-
workflowRuntime.register_activity(requst_approval_activity)
29-
workflowRuntime.register_activity(verify_inventory_activity)
30-
workflowRuntime.register_activity(process_payment_activity)
31-
workflowRuntime.register_activity(update_inventory_activity)
32-
workflowRuntime.start()
25+
wfClient = DaprWorkflowClient()
26+
27+
baseInventory = {
28+
"paperclip": InventoryItem("Paperclip", 5, 100),
29+
"cars": InventoryItem("Cars", 15000, 100),
30+
"computers": InventoryItem("Computers", 500, 100),
31+
}
3332

34-
daprClient = DaprClient(address=f'{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}')
35-
baseInventory = {}
36-
baseInventory["paperclip"] = InventoryItem("Paperclip", 5, 100)
37-
baseInventory["cars"] = InventoryItem("Cars", 15000, 100)
38-
baseInventory["computers"] = InventoryItem("Computers", 500, 100)
3933

34+
daprClient = DaprClient(address=f'{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}')
4035
self.restock_inventory(daprClient, baseInventory)
4136

4237
print("==========Begin the purchase of item:==========", flush=True)
4338
item_name = default_item_name
4439
order_quantity = 10
45-
4640
total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost
4741
order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost)
42+
4843
print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True)
49-
start_resp = daprClient.start_workflow(workflow_component=workflow_component,
50-
workflow_name=workflow_name,
51-
input=order)
52-
_id = start_resp.instance_id
44+
instance_id = wfClient.schedule_new_workflow(
45+
workflow=order_processing_workflow, input=order.to_json())
46+
_id = instance_id
5347

54-
def prompt_for_approval(daprClient: DaprClient):
48+
def prompt_for_approval(wfClient: DaprWorkflowClient):
5549
"""This is a helper function to prompt for approval.
5650
Not using the prompt here ACTUALLY, as quickstart validation is required to be automated.
5751
@@ -65,9 +59,9 @@ def prompt_for_approval(daprClient: DaprClient):
6559
if state.runtime_status.name == "COMPLETED":
6660
return
6761
if approved.lower() == "y":
68-
client.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': True})
62+
wfClient.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': True})
6963
else:
70-
client.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': False})
64+
wfClient.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': False})
7165
7266
## Additionally, you would need to import signal and define timeout_error:
7367
# import signal
@@ -76,32 +70,32 @@ def prompt_for_approval(daprClient: DaprClient):
7670
7771
# signal.signal(signal.SIGALRM, timeout_error)
7872
"""
79-
daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component,
80-
event_name="manager_approval", event_data={'approval': True})
73+
wfClient.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': True})
8174

8275
approval_seeked = False
8376
start_time = datetime.now()
8477
while True:
8578
time_delta = datetime.now() - start_time
86-
state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
79+
state = wfClient.get_workflow_state(instance_id=_id)
80+
8781
if not state:
8882
print("Workflow not found!") # not expected
89-
elif state.runtime_status == "Completed" or\
90-
state.runtime_status == "Failed" or\
91-
state.runtime_status == "Terminated":
83+
break
84+
85+
if state.runtime_status in {WorkflowStatus.COMPLETED, WorkflowStatus.FAILED, WorkflowStatus.TERMINATED}:
9286
print(f'Workflow completed! Result: {state.runtime_status}', flush=True)
9387
break
88+
89+
9490
if time_delta.total_seconds() >= 10:
95-
state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
96-
if total_cost > 50000 and (
97-
state.runtime_status != "Completed" or
98-
state.runtime_status != "Failed" or
99-
state.runtime_status != "Terminated"
100-
) and not approval_seeked:
91+
state = wfClient.get_workflow_state(instance_id=_id)
92+
if total_cost > 50000 and state not in {WorkflowStatus.COMPLETED, WorkflowStatus.FAILED, WorkflowStatus.TERMINATED} and not approval_seeked:
10193
approval_seeked = True
102-
threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start()
94+
threading.Thread(target=prompt_for_approval(wfClient), daemon=True).start()
95+
96+
wfr.shutdown()
97+
10398

104-
print("Purchase of item is ", state.runtime_status, flush=True)
10599

106100
def restock_inventory(self, daprClient: DaprClient, baseInventory):
107101
for key, item in baseInventory.items():

workflows/python/sdk/order-processor/workflow.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
21
from datetime import timedelta
32
import logging
43
import json
54

6-
from dapr.ext.workflow import DaprWorkflowContext, WorkflowActivityContext, when_any
5+
from dapr.ext.workflow import DaprWorkflowContext, WorkflowActivityContext, WorkflowRuntime, when_any
76
from dapr.clients import DaprClient
87
from dapr.conf import settings
98

@@ -12,10 +11,13 @@
1211

1312
store_name = "statestore"
1413

14+
wfr = WorkflowRuntime()
15+
1516
logging.basicConfig(level=logging.INFO)
1617

1718

18-
def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload):
19+
@wfr.workflow(name="order_processing_workflow")
20+
def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str):
1921
"""Defines the order processing workflow.
2022
When the order is received, the inventory is checked to see if there is enough inventory to
2123
fulfill the order. If there is enough inventory, the payment is processed and the inventory is
@@ -39,7 +41,7 @@ def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: Order
3941
return OrderResult(processed=False)
4042

4143
if order_payload["total_cost"] > 50000:
42-
yield ctx.call_activity(requst_approval_activity, input=order_payload)
44+
yield ctx.call_activity(request_approval_activity, input=order_payload)
4345
approval_task = ctx.wait_for_external_event("manager_approval")
4446
timeout_event = ctx.create_timer(timedelta(seconds=200))
4547
winner = yield when_any([approval_task, timeout_event])
@@ -76,15 +78,15 @@ def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: Order
7678
message=f'Order {order_id} has completed!'))
7779
return OrderResult(processed=True)
7880

79-
81+
@wfr.activity(name="notify_activity")
8082
def notify_activity(ctx: WorkflowActivityContext, input: Notification):
8183
"""Defines Notify Activity. This is used by the workflow to send out a notification"""
8284
# Create a logger
8385
logger = logging.getLogger('NotifyActivity')
8486
logger.info(input.message)
8587

8688

87-
89+
@wfr.activity(name="process_payment_activity")
8890
def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest):
8991
"""Defines Process Payment Activity.This is used by the workflow to process a payment"""
9092
logger = logging.getLogger('ProcessPaymentActivity')
@@ -94,6 +96,7 @@ def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest
9496
logger.info(f'Payment for request ID {input.request_id} processed successfully')
9597

9698

99+
@wfr.activity(name="verify_inventory_activity")
97100
def verify_inventory_activity(ctx: WorkflowActivityContext,
98101
input: InventoryRequest) -> InventoryResult:
99102
"""Defines Verify Inventory Activity. This is used by the workflow to verify if inventory
@@ -117,6 +120,8 @@ def verify_inventory_activity(ctx: WorkflowActivityContext,
117120
return InventoryResult(False, None)
118121

119122

123+
124+
@wfr.activity(name="update_inventory_activity")
120125
def update_inventory_activity(ctx: WorkflowActivityContext,
121126
input: PaymentRequest) -> InventoryResult:
122127
"""Defines Update Inventory Activity. This is used by the workflow to check if inventory
@@ -139,7 +144,9 @@ def update_inventory_activity(ctx: WorkflowActivityContext,
139144
logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock')
140145

141146

142-
def requst_approval_activity(ctx: WorkflowActivityContext,
147+
148+
@wfr.activity(name="request_approval_activity")
149+
def request_approval_activity(ctx: WorkflowActivityContext,
143150
input: OrderPayload):
144151
"""Defines Request Approval Activity. This is used by the workflow to request approval
145152
for payment of an order. This activity is used only if the order total cost is greater than

0 commit comments

Comments
 (0)