Skip to content

Commit

Permalink
Initial code for decorators and optional naming of workflows and acti…
Browse files Browse the repository at this point in the history
…vities (#651)

* initial code for decorators and optional naming of workflows and activities

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* add alternater_name decorator

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix linter

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* address review comments.

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* address review comments.

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

---------

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
  • Loading branch information
mukundansundar authored Jan 9, 2024
1 parent c08e714 commit 656207a
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 65 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ cd python-sdk
pip3 install -e .
pip3 install -e ./ext/dapr-ext-grpc/
pip3 install -e ./ext/dapr-ext-fastapi/
pip3 install -e ./ext/dapr-ext-workflow/
```

3. Install required packages
Expand Down
39 changes: 39 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,20 @@ Each of the examples in this directory can be run directly from the command line
### Task Chaining

This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command:
<!--STEP
name: Run the task chaining example
expected_stdout_lines:
- "== APP == Step 1: Received input: 42."
- "== APP == Step 2: Received input: 43."
- "== APP == Step 3: Received input: 86."
- "== APP == Workflow completed! Status: WorkflowStatus.COMPLETED"
timeout_seconds: 30
-->

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py
```
<!--END_STEP-->

The output of this example should look like this:

Expand All @@ -41,9 +51,38 @@ The output of this example should look like this:

This example demonstrates how to fan-out a workflow into multiple parallel tasks, and then fan-in the results of those tasks. You can run this sample using the following command:

<!--STEP
name: Run the fan-out/fan-in example
match_order: none
expected_stdout_lines:
- "== APP == Processing work item: 1."
- "== APP == Processing work item: 2."
- "== APP == Processing work item: 3."
- "== APP == Processing work item: 4."
- "== APP == Processing work item: 5."
- "== APP == Processing work item: 6."
- "== APP == Processing work item: 7."
- "== APP == Processing work item: 8."
- "== APP == Processing work item: 9."
- "== APP == Processing work item: 10."
- "== APP == Work item 1 processed. Result: 2."
- "== APP == Work item 2 processed. Result: 4."
- "== APP == Work item 3 processed. Result: 6."
- "== APP == Work item 4 processed. Result: 8."
- "== APP == Work item 5 processed. Result: 10."
- "== APP == Work item 6 processed. Result: 12."
- "== APP == Work item 7 processed. Result: 14."
- "== APP == Work item 8 processed. Result: 16."
- "== APP == Work item 9 processed. Result: 18."
- "== APP == Work item 10 processed. Result: 20."
- "== APP == Final result: 110."
timeout_seconds: 30
-->

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py
```
<!--END_STEP-->

The output of this sample should look like this:

Expand Down
23 changes: 12 additions & 11 deletions examples/workflow/child_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,33 @@
import dapr.ext.workflow as wf
import time

wfr = wf.WorkflowRuntime()

@wfr.workflow
def main_workflow(ctx: wf.DaprWorkflowContext):
try:
instance_id = ctx.instance_id
child_instance_id = instance_id + '-child'
print(f'*** Calling child workflow {child_instance_id}')
print(f'*** Calling child workflow {child_instance_id}', flush=True)
yield ctx.call_child_workflow(workflow=child_workflow,input=None,instance_id=child_instance_id)
except Exception as e:
print(f'*** Exception: {e}')

return

@wfr.workflow
def child_workflow(ctx: wf.DaprWorkflowContext):
instance_id = ctx.instance_id
print(f'*** Child workflow {instance_id} called')
print(f'*** Child workflow {instance_id} called', flush=True)

if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(main_workflow)
workflowRuntime.register_workflow(child_workflow)
workflowRuntime.start()
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
workflow=main_workflow)
instance_id = wf_client.schedule_new_workflow(workflow=main_workflow)

# Wait for the workflow to complete
time.sleep(5)
workflowRuntime.shutdown()

wfr.shutdown()
18 changes: 8 additions & 10 deletions examples/workflow/fan_out_fan_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from typing import List
import dapr.ext.workflow as wf

wfr = wf.WorkflowRuntime()

@wfr.workflow(name="batch_processing")
def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
# get a batch of N work items to process in parallel
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
Expand All @@ -27,30 +29,26 @@ def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
total = sum(outputs)
yield ctx.call_activity(process_results, input=total)


@wfr.activity(name="get_batch")
def get_work_batch(ctx, batch_size: int) -> List[int]:
return [i + 1 for i in range(batch_size)]


@wfr.activity
def process_work_item(ctx, work_item: int) -> int:
print(f'Processing work item: {work_item}.')
time.sleep(5)
result = work_item * 2
print(f'Work item {work_item} processed. Result: {result}.')
return result


@wfr.activity(name="final_process")
def process_results(ctx, final_result: int):
print(f'Final result: {final_result}.')


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(batch_processing_workflow)
workflowRuntime.register_activity(get_work_batch)
workflowRuntime.register_activity(process_work_item)
workflowRuntime.register_activity(process_results)
workflowRuntime.start()
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
Expand All @@ -59,4 +57,4 @@ def process_results(ctx, final_result: int):
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)

workflowRuntime.shutdown()
wfr.shutdown()
15 changes: 7 additions & 8 deletions examples/workflow/human_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dapr.clients import DaprClient
import dapr.ext.workflow as wf

wfr = wf.WorkflowRuntime()

@dataclass
class Order:
Expand All @@ -37,7 +38,7 @@ class Approval:
def from_dict(dict):
return Approval(**dict)


@wfr.workflow(name="purchase_order_wf")
def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
# Orders under $1000 are auto-approved
if order.cost < 1000:
Expand All @@ -59,10 +60,12 @@ def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
return f"Approved by '{approval_details.approver}'"


@wfr.activity(name="send_approval")
def send_approval_request(_, order: Order) -> None:
print(f'*** Requesting approval from user for order: {order}')


@wfr.activity
def place_order(_, order: Order) -> None:
print(f'*** Placing order: {order}')

Expand All @@ -76,12 +79,8 @@ def place_order(_, order: Order) -> None:
parser.add_argument("--timeout", type=int, default=60, help="Timeout in seconds")
args = parser.parse_args()

# configure and start the workflow runtime
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(purchase_order_workflow)
workflowRuntime.register_activity(send_approval_request)
workflowRuntime.register_activity(place_order)
workflowRuntime.start()
# start the workflow runtime
wfr.start()

# Start a purchase order workflow using the user input
order = Order(args.cost, "MyProduct", 1)
Expand Down Expand Up @@ -119,4 +118,4 @@ def prompt_for_approval():
except TimeoutError:
print("*** Workflow timed out!")

workflowRuntime.shutdown()
wfr.shutdown()
15 changes: 8 additions & 7 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
from dataclasses import dataclass
from datetime import timedelta
import random
from time import sleep
import dapr.ext.workflow as wf


wfr = wf.WorkflowRuntime()
@dataclass
class JobStatus:
job_id: str
is_healthy: bool


@wfr.workflow(name="status_monitor")
def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
# poll a status endpoint associated with this job
status = yield ctx.call_activity(check_status, input=job)
Expand All @@ -43,20 +45,19 @@ def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
ctx.continue_as_new(job)


@wfr.activity
def check_status(ctx, _) -> str:
return random.choice(["healthy", "unhealthy"])


@wfr.activity
def send_alert(ctx, message: str):
print(f'*** Alert: {message}')


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime()
workflowRuntime.register_workflow(status_monitor_workflow)
workflowRuntime.register_activity(check_status)
workflowRuntime.register_activity(send_alert)
workflowRuntime.start()
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
job_id = "job1"
Expand All @@ -75,4 +76,4 @@ def send_alert(ctx, message: str):
print(f'Workflow already running. Instance ID: {job_id}')

input("Press Enter to stop...\n")
workflowRuntime.shutdown()
wfr.shutdown()
20 changes: 12 additions & 8 deletions examples/workflow/task_chaining.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from time import sleep

import dapr.ext.workflow as wf


wfr = wf.WorkflowRuntime()

@wfr.workflow(name="random_workflow")
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
Expand All @@ -24,37 +29,36 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
return [result1, result2, result3]


@wfr.activity(name="step10")
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1


@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2


@wfr.activity
def step3(ctx, activity_input):
print(f'Step 3: Received input: {activity_input}.')
# Do some work
return activity_input ^ 2


@wfr.activity
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work


if __name__ == '__main__':
workflowRuntime = wf.WorkflowRuntime("localhost", "50001")
workflowRuntime.register_workflow(task_chain_workflow)
workflowRuntime.register_activity(step1)
workflowRuntime.register_activity(step2)
workflowRuntime.register_activity(step3)
workflowRuntime.register_activity(error_handler)
workflowRuntime.start()
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
Expand All @@ -64,4 +68,4 @@ def error_handler(ctx, error):
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')

workflowRuntime.shutdown()
wfr.shutdown()
5 changes: 3 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""

# Import your main classes here
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
Expand All @@ -28,5 +28,6 @@
'WorkflowState',
'WorkflowStatus',
'when_all',
'when_any'
'when_any',
'alternate_name'
]
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ def schedule_new_workflow(self, workflow: Workflow, *, input: Optional[TInput] =
Returns:
The ID of the scheduled workflow instance.
"""
if hasattr(workflow, '_dapr_alternate_name'):
return self.__obj.schedule_new_orchestration(workflow.__dict__['_dapr_alternate_name'],
input=input, instance_id=instance_id,
start_at=start_at)
return self.__obj.schedule_new_orchestration(workflow.__name__, input=input,
instance_id=instance_id, start_at=start_at)
instance_id=instance_id,
start_at=start_at)

def get_workflow_state(self, instance_id: str, *,
fetch_payloads: bool = True) -> Optional[WorkflowState]:
Expand Down
11 changes: 10 additions & 1 deletion ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:

def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *,
input: TInput = None) -> task.Task[TOutput]:
if hasattr(activity, '_dapr_alternate_name'):
return self.__obj.call_activity(activity=activity.__dict__['_dapr_alternate_name'],
input=input)
# this return should ideally never execute
return self.__obj.call_activity(activity=activity.__name__, input=input)

def call_child_workflow(self, workflow: Workflow, *,
Expand All @@ -62,7 +66,12 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
daprWfContext = DaprWorkflowContext(ctx)
return workflow(daprWfContext, inp)
# copy workflow name so durabletask.worker can find the orchestrator in its registry
wf.__name__ = workflow.__name__

if hasattr(workflow, '_dapr_alternate_name'):
wf.__name__ = workflow.__dict__['_dapr_alternate_name']
else:
# this case should ideally never happen
wf.__name__ = workflow.__name__
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)

def wait_for_external_event(self, name: str) -> task.Task:
Expand Down
Loading

0 comments on commit 656207a

Please sign in to comment.