-
Notifications
You must be signed in to change notification settings - Fork 138
Added async workflow client implementation, leveraging new durabletask.aio.client implementation #861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…k.aio.client implementation Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
d32c9aa to
d17b262
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #861 +/- ##
==========================================
+ Coverage 86.63% 87.54% +0.90%
==========================================
Files 84 99 +15
Lines 4473 6461 +1988
==========================================
+ Hits 3875 5656 +1781
- Misses 598 805 +207 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@passuied I think this other PR covers this same thing, and is a bit more complete because it also have async context, but it's WIP as it has lots of lint changes that are now applied to |
|
@acroca actually this is a bit different, as this tackles the client initiating actions against a workflow(schedule workflow, signal events), instead of the internals of the workflow that my PR tackles. He did the PR in durabletask that complements this dapr/durabletask-python#17. The PRs his/mine are complementary and not overlapping |
acroca
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but I'd follow the same pattern as the DaprClient and not have a suffix Async for the async version.
ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
|
@passuied Can you add an example to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but tests are failing, and copyright headers need to be fixed :)
ext/dapr-ext-workflow/dapr/ext/workflow/aio/workflow_runtime.py
Outdated
Show resolved
Hide resolved
ext/dapr-ext-workflow/dapr/ext/workflow/aio/workflow_runtime.py
Outdated
Show resolved
Hide resolved
ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py
Outdated
Show resolved
Hide resolved
ext/dapr-ext-workflow/dapr/ext/workflow/aio/workflow_runtime.py
Outdated
Show resolved
Hide resolved
75f3e7c to
6b8d7c8
Compare
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
|
Can't we have an example using this client? I know you removed it because of the |
@acroca It's just hard to provide a meaningful example without other parts of the workflow to be async... It's kinda moot... |
|
@passuied it does make sense to break it, think of the runtime as the backend and the client and the front-end/api-gateway. you can have an async client that just return workflow instance ID (to check status) and a backend that process the work items (workflow runtime). You should be able to convert any example to asyncio client with: For example the fan out/fan in: async def main():
# async client
import dapr.ext.workflow.aio as aio_client
wf_client = aio_client.DaprWorkflowClient()
instance_id = await wf_client.schedule_new_workflow(workflow=batch_processing_workflow, input=10)
# ....
# other asyncio functions calls...
print(f'Workflow started. Instance ID: {instance_id}')
return await wf_client.wait_for_workflow_completion(instance_id)
if __name__ == '__main__':
wfr.start() # synchronous backend engine
time.sleep(10) # wait for workflow backend runtime to start
try:
wf_state = asyncio.run(main())
print(f'Workflow completed! Status: {wf_state}')
except Exception as e:
import traceback
traceback.print_exc()
exit(1)
finally:
wfr.shutdown()Inside the activity if you need to run asyncio calls just do something similar for now (until we get the other work in durabletask in) |
…mple example. Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
| if non_existent_id_error in err._message: | ||
| print('Instance Successfully Purged') | ||
|
|
||
| wfr.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a try/finally to always execute wfr.shudown() on any issues, or the process might hang if any issue happens
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
|
LGTM! The linter in the CI doesn't say what's wrong (we should change it to Also, the examples don't seem to install the workflow extension from the local directory, can you fix that in this PR please? I think we should install all the extensions locally before running validations. Thanks! |
Description
Added async workflow client implementation, leveraging new durabletask.aio.client implementation
Issue reference
#834
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list: