Skip to content

Commit

Permalink
revert changes to example
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com>
  • Loading branch information
famarting committed Nov 6, 2024
1 parent 6b093c2 commit 32e4735
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 36 deletions.
80 changes: 45 additions & 35 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from datetime import timedelta
from time import sleep
from dapr.ext.workflow import (
DaprWorkflowClient,
WorkflowRuntime,
DaprWorkflowContext,
WorkflowActivityContext,
Expand Down Expand Up @@ -107,7 +106,10 @@ def act_for_child_wf(ctx: WorkflowActivityContext, inp):


def main():
with DaprWorkflowClient() as wfc:
# DEPRECATION NOTICE!
# The workflow methods in the DaprClient are deprecated, instead use the client provided in dapr-ext-workflow
# You can use the examples in https://github.com/dapr/python-sdk/tree/main/examples/workflow
with DaprClient() as d:
workflow_runtime = WorkflowRuntime()
workflow_runtime.register_workflow(hello_world_wf)
workflow_runtime.register_workflow(child_retryable_wf)
Expand All @@ -120,11 +122,14 @@ def main():
sleep(2)

print('==========Start Counter Increase as per Input:==========')
instance_id = wfc.schedule_new_workflow(
start_resp = d.start_workflow(
instance_id=instance_id,
workflow=hello_world_wf,
input=input_data)
print(f'start_resp {instance_id}')
workflow_component=workflow_component,
workflow_name=workflow_name,
input=input_data,
workflow_options=workflow_options,
)
print(f'start_resp {start_resp.instance_id}')

# Sleep for a while to let the workflow run
sleep(12)
Expand All @@ -133,73 +138,78 @@ def main():
assert child_orchestrator_string == '1aa2bb3cc'

# Pause Test
wfc.pause_workflow(instance_id=instance_id)
get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component)
get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
print(f'Get response from {workflow_name} after pause call: {get_response.runtime_status}')

# Resume Test
wfc.resume_workflow(instance_id=instance_id)
get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component)
get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
print(f'Get response from {workflow_name} after resume call: {get_response.runtime_status}')

sleep(1)
# Raise event
wfc.raise_workflow_event(
instance_id=instance_id,
d.raise_workflow_event(
instance_id=child_instance_id,
workflow_component=workflow_component,
event_name=event_name,
data=event_data,
event_data=event_data,
)

sleep(5)
# Purge Test
# // TODO IMPLEMENT PURGE
# d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
try:
wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
except Exception as err:
# TODO temporary print
print(f'got error {err}')
d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
except DaprInternalError as err:
if non_existent_id_error in err._message:
print('Instance Successfully Purged')

# Kick off another workflow for termination purposes
# This will also test using the same instance ID on a new workflow after
# the old instance was purged
instance_id = wfc.schedule_new_workflow(
start_resp = d.start_workflow(
instance_id=instance_id,
workflow=hello_world_wf,
input=input_data
workflow_component=workflow_component,
workflow_name=workflow_name,
input=input_data,
workflow_options=workflow_options,
)
print(f'start_resp {instance_id}')
print(f'start_resp {start_resp.instance_id}')

sleep(5)
# Terminate Test
wfc.terminate_workflow(instance_id=instance_id)
d.terminate_workflow(instance_id=instance_id, workflow_component=workflow_component)
sleep(1)
get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
print(
f'Get response from {workflow_name} '
f'after terminate call: {get_response.runtime_status}'
)
child_get_response = wfc.get_workflow_state(
instance_id=child_instance_id, fetch_payloads=True
child_get_response = d.get_workflow(
instance_id=child_instance_id, workflow_component=workflow_component
)
print(
f'Get response from {child_workflow_name} '
f'after terminate call: {child_get_response.runtime_status}'
)

# Purge Test
# TODO IMPLEMENT PURGE
# d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
# try:
# d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
# except DaprInternalError as err:
# if non_existent_id_error in err._message:
# print('Instance Successfully Purged')
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
try:
d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
except DaprInternalError as err:
if non_existent_id_error in err._message:
print('Instance Successfully Purged')

workflow_runtime.shutdown()


if __name__ == '__main__':
main()
main()
2 changes: 1 addition & 1 deletion examples/demo_workflow/demo_workflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
dapr-ext-workflow-dev>=0.4.1rc1.dev
dapr-ext-workflow-dev>=0.0.1rc1.dev

0 comments on commit 32e4735

Please sign in to comment.