Skip to content

Commit ad8aa1e

Browse files
initial code for decorators and optional naming of workflows and activities
Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
1 parent 8344420 commit ad8aa1e

File tree

10 files changed

+139
-49
lines changed

10 files changed

+139
-49
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ cd python-sdk
8181
pip3 install -e .
8282
pip3 install -e ./ext/dapr-ext-grpc/
8383
pip3 install -e ./ext/dapr-ext-fastapi/
84+
pip3 install -e ./ext/dapr-ext-workflow/
8485
```
8586

8687
3. Install required packages

examples/workflow/fan_out_fan_in.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@
1212

1313
import time
1414
from typing import List
15+
from dapr.ext.workflow import workflow, activity, WorkflowRuntime
1516
import dapr.ext.workflow as wf
1617

18+
# TODO: Modify code such that implicit creation of default WorkflowRuntime is possible
19+
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
1720

21+
@workflow(name="batch_processing")
1822
def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
1923
# get a batch of N work items to process in parallel
2024
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
@@ -27,30 +31,25 @@ def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
2731
total = sum(outputs)
2832
yield ctx.call_activity(process_results, input=total)
2933

30-
34+
@activity(name="get_batch")
3135
def get_work_batch(ctx, batch_size: int) -> List[int]:
3236
return [i + 1 for i in range(batch_size)]
3337

34-
38+
@activity
3539
def process_work_item(ctx, work_item: int) -> int:
3640
print(f'Processing work item: {work_item}.')
3741
time.sleep(5)
3842
result = work_item * 2
3943
print(f'Work item {work_item} processed. Result: {result}.')
4044
return result
4145

42-
46+
@activity(name="final_process")
4347
def process_results(ctx, final_result: int):
4448
print(f'Final result: {final_result}.')
4549

4650

4751
if __name__ == '__main__':
48-
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
49-
workflowRuntime.register_workflow(batch_processing_workflow)
50-
workflowRuntime.register_activity(get_work_batch)
51-
workflowRuntime.register_activity(process_work_item)
52-
workflowRuntime.register_activity(process_results)
53-
workflowRuntime.start()
52+
WorkflowRuntime.start()
5453

5554
wf_client = wf.DaprWorkflowClient()
5655
instance_id = wf_client.schedule_new_workflow(
@@ -59,4 +58,4 @@ def process_results(ctx, final_result: int):
5958
print(f'Workflow started. Instance ID: {instance_id}')
6059
state = wf_client.wait_for_workflow_completion(instance_id)
6160

62-
workflowRuntime.shutdown()
61+
WorkflowRuntime.shutdown()

examples/workflow/human_approval.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
import time
1717

1818
from dapr.clients import DaprClient
19+
from dapr.ext.workflow import workflow, activity, WorkflowRuntime
1920
import dapr.ext.workflow as wf
2021

22+
# TODO: Modify code such that implicit creation of default WorkflowRuntime is possible
23+
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
2124

2225
@dataclass
2326
class Order:
@@ -37,7 +40,7 @@ class Approval:
3740
def from_dict(dict):
3841
return Approval(**dict)
3942

40-
43+
@workflow(name="purchase_order_wf")
4144
def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
4245
# Orders under $1000 are auto-approved
4346
if order.cost < 1000:
@@ -59,10 +62,12 @@ def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
5962
return f"Approved by '{approval_details.approver}'"
6063

6164

65+
@activity(name="send_approval")
6266
def send_approval_request(_, order: Order) -> None:
6367
print(f'*** Requesting approval from user for order: {order}')
6468

6569

70+
@activity
6671
def place_order(_, order: Order) -> None:
6772
print(f'*** Placing order: {order}')
6873

@@ -76,12 +81,8 @@ def place_order(_, order: Order) -> None:
7681
parser.add_argument("--timeout", type=int, default=60, help="Timeout in seconds")
7782
args = parser.parse_args()
7883

79-
# configure and start the workflow runtime
80-
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
81-
workflowRuntime.register_workflow(purchase_order_workflow)
82-
workflowRuntime.register_activity(send_approval_request)
83-
workflowRuntime.register_activity(place_order)
84-
workflowRuntime.start()
84+
# start the workflow runtime
85+
WorkflowRuntime.start()
8586

8687
# Start a purchase order workflow using the user input
8788
order = Order(args.cost, "MyProduct", 1)
@@ -119,4 +120,4 @@ def prompt_for_approval():
119120
except TimeoutError:
120121
print("*** Workflow timed out!")
121122

122-
workflowRuntime.shutdown()
123+
WorkflowRuntime.shutdown()

examples/workflow/monitor.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,19 @@
1313
from dataclasses import dataclass
1414
from datetime import timedelta
1515
import random
16+
from dapr.ext.workflow import workflow, activity, WorkflowRuntime
1617
import dapr.ext.workflow as wf
1718

19+
# TODO: Modify code such that implicit creation of default WorkflowRuntime is possible
20+
workflowRuntime = wf.WorkflowRuntime()
1821

1922
@dataclass
2023
class JobStatus:
2124
job_id: str
2225
is_healthy: bool
2326

2427

28+
@workflow(name="status_monitor")
2529
def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
2630
# poll a status endpoint associated with this job
2731
status = yield ctx.call_activity(check_status, input=job)
@@ -43,20 +47,18 @@ def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
4347
ctx.continue_as_new(job)
4448

4549

50+
@activity
4651
def check_status(ctx, _) -> str:
4752
return random.choice(["healthy", "unhealthy"])
4853

4954

55+
@activity
5056
def send_alert(ctx, message: str):
5157
print(f'*** Alert: {message}')
5258

5359

5460
if __name__ == '__main__':
55-
workflowRuntime = wf.WorkflowRuntime()
56-
workflowRuntime.register_workflow(status_monitor_workflow)
57-
workflowRuntime.register_activity(check_status)
58-
workflowRuntime.register_activity(send_alert)
59-
workflowRuntime.start()
61+
WorkflowRuntime.start()
6062

6163
wf_client = wf.DaprWorkflowClient()
6264
job_id = "job1"
@@ -75,4 +77,4 @@ def send_alert(ctx, message: str):
7577
print(f'Workflow already running. Instance ID: {job_id}')
7678

7779
input("Press Enter to stop...\n")
78-
workflowRuntime.shutdown()
80+
WorkflowRuntime.shutdown()

examples/workflow/task_chaining.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,14 @@
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
1212

13+
from dapr.ext.workflow import workflow, activity, WorkflowRuntime
1314
import dapr.ext.workflow as wf
1415

1516

17+
wfr = WorkflowRuntime("localhost", "50001")
18+
19+
20+
@workflow(name="random_workflow")
1621
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
1722
try:
1823
result1 = yield ctx.call_activity(step1, input=wf_input)
@@ -24,38 +29,35 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
2429
return [result1, result2, result3]
2530

2631

32+
@activity()
2733
def step1(ctx, activity_input):
2834
print(f'Step 1: Received input: {activity_input}.')
2935
# Do some work
3036
return activity_input + 1
3137

3238

39+
@activity()
3340
def step2(ctx, activity_input):
3441
print(f'Step 2: Received input: {activity_input}.')
3542
# Do some work
3643
return activity_input * 2
3744

3845

46+
@activity()
3947
def step3(ctx, activity_input):
4048
print(f'Step 3: Received input: {activity_input}.')
4149
# Do some work
4250
return activity_input ^ 2
4351

4452

53+
@activity()
4554
def error_handler(ctx, error):
4655
print(f'Executing error handler: {error}.')
4756
# Do some compensating work
4857

4958

5059
if __name__ == '__main__':
51-
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
52-
workflowRuntime.register_workflow(task_chain_workflow)
53-
workflowRuntime.register_activity(step1)
54-
workflowRuntime.register_activity(step2)
55-
workflowRuntime.register_activity(step3)
56-
workflowRuntime.register_activity(error_handler)
57-
workflowRuntime.start()
58-
60+
WorkflowRuntime.start()
5961
wf_client = wf.DaprWorkflowClient()
6062
instance_id = wf_client.schedule_new_workflow(
6163
workflow=task_chain_workflow,
@@ -64,4 +66,4 @@ def error_handler(ctx, error):
6466
state = wf_client.wait_for_workflow_completion(instance_id)
6567
print(f'Workflow completed! Status: {state.runtime_status}')
6668

67-
workflowRuntime.shutdown()
69+
WorkflowRuntime.shutdown()

ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"""
1515

1616
# Import your main classes here
17-
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime
17+
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, workflow, activity
1818
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
1919
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any
2020
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
@@ -28,5 +28,7 @@
2828
'WorkflowState',
2929
'WorkflowStatus',
3030
'when_all',
31-
'when_any'
31+
'when_any',
32+
'workflow',
33+
'activity'
3234
]

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,15 @@ def schedule_new_workflow(self, workflow: Workflow, *, input: Optional[TInput] =
7575
Returns:
7676
The ID of the scheduled workflow instance.
7777
"""
78-
return self.__obj.schedule_new_orchestration(workflow.__name__, input=input,
79-
instance_id=instance_id, start_at=start_at)
78+
try:
79+
workflow.__getattribute__('_registered_name')
80+
return self.__obj.schedule_new_orchestration(workflow._registered_name, input=input,
81+
instance_id=instance_id,
82+
start_at=start_at)
83+
except AttributeError:
84+
return self.__obj.schedule_new_orchestration(workflow.__name__, input=input,
85+
instance_id=instance_id,
86+
start_at=start_at)
8087

8188
def get_workflow_state(self, instance_id: str, *,
8289
fetch_payloads: bool = True) -> Optional[WorkflowState]:

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
5353

5454
def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *,
5555
input: TInput = None) -> task.Task[TOutput]:
56-
return self.__obj.call_activity(activity=activity.__name__, input=input)
56+
try:
57+
return self.__obj.call_activity(activity=activity._registered_name, input=input)
58+
except AttributeError:
59+
return self.__obj.call_activity(activity=activity.__name__, input=input)
5760

5861
def call_child_workflow(self, workflow: Workflow, *,
5962
input: Optional[TInput],
@@ -62,7 +65,12 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
6265
daprWfContext = DaprWorkflowContext(ctx)
6366
return workflow(daprWfContext, inp)
6467
# copy workflow name so durabletask.worker can find the orchestrator in its registry
65-
wf.__name__ = workflow.__name__
68+
69+
# Any workflow function using python decorator will have a _registered_name attribute
70+
try:
71+
wf.__name__ = workflow._registered_name
72+
except AttributeError:
73+
wf.__name__ = workflow.__name__
6674
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)
6775

6876
def wait_for_external_event(self, name: str) -> task.Task:

0 commit comments

Comments
 (0)