Skip to content

Commit

Permalink
workflows, remove deprecated functions
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com>
  • Loading branch information
famarting committed Oct 29, 2024
1 parent 6e90e84 commit 6b093c2
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 46 deletions.
7 changes: 7 additions & 0 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,7 @@ async def start_workflow(
send_raw_bytes: bool = False,
) -> StartWorkflowResponse:
"""Starts a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
workflow_component (str): the name of the workflow component
Expand Down Expand Up @@ -1469,6 +1470,7 @@ async def start_workflow(

async def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflowResponse:
"""Gets information on a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1510,6 +1512,7 @@ async def get_workflow(self, instance_id: str, workflow_component: str) -> GetWo

async def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Terminates a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance, e.g.
Expand Down Expand Up @@ -1547,6 +1550,7 @@ async def raise_workflow_event(
send_raw_bytes: bool = False,
) -> DaprResponse:
"""Raises an event on a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1610,6 +1614,7 @@ async def raise_workflow_event(

async def pause_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Pause a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1642,6 +1647,7 @@ async def pause_workflow(self, instance_id: str, workflow_component: str) -> Dap

async def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Resumes a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1673,6 +1679,7 @@ async def resume_workflow(self, instance_id: str, workflow_component: str) -> Da

async def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Purges a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance,
Expand Down
7 changes: 7 additions & 0 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,7 @@ def start_workflow(
send_raw_bytes: bool = False,
) -> StartWorkflowResponse:
"""Starts a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
workflow_component (str): the name of the workflow component
Expand Down Expand Up @@ -1466,6 +1467,7 @@ def start_workflow(

def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflowResponse:
"""Gets information on a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1507,6 +1509,7 @@ def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflow

def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Terminates a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance, e.g.
Expand Down Expand Up @@ -1545,6 +1548,7 @@ def raise_workflow_event(
send_raw_bytes: bool = False,
) -> DaprResponse:
"""Raises an event on a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1609,6 +1613,7 @@ def raise_workflow_event(

def pause_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Pause a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1641,6 +1646,7 @@ def pause_workflow(self, instance_id: str, workflow_component: str) -> DaprRespo

def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Resumes a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance,
Expand Down Expand Up @@ -1672,6 +1678,7 @@ def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResp

def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
"""Purges a workflow.
Deprecated: use dapr-ext-workflow instead
Args:
instance_id (str): the ID of the workflow instance,
Expand Down
6 changes: 3 additions & 3 deletions examples/demo_workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

This document describes how to register a workflow and activities inside it and start running it.
It demonstrates the following APIs:
- **start_workflow**: Start an instance of a workflow
- **get_workflow**: Get information on a single workflow
- **schedule_new_workflow**: Start an instance of a workflow
- **get_workflow_state**: Get information on a single workflow
- **terminate_workflow**: Terminate or stop a particular instance of a workflow
- **raise_event**: Raise an event on a workflow
- **raise_workflow_event**: Raise an event on a workflow
- **pause_workflow**: Pauses or suspends a workflow instance that can later be resumed
- **resume_workflow**: Resumes a paused workflow instance
- **purge_workflow**: Removes all metadata related to a specific workflow instance from the state store
Expand Down
75 changes: 34 additions & 41 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datetime import timedelta
from time import sleep
from dapr.ext.workflow import (
DaprWorkflowClient,
WorkflowRuntime,
DaprWorkflowContext,
WorkflowActivityContext,
Expand Down Expand Up @@ -106,7 +107,7 @@ def act_for_child_wf(ctx: WorkflowActivityContext, inp):


def main():
with DaprClient() as d:
with DaprWorkflowClient() as wfc:
workflow_runtime = WorkflowRuntime()
workflow_runtime.register_workflow(hello_world_wf)
workflow_runtime.register_workflow(child_retryable_wf)
Expand All @@ -119,14 +120,11 @@ def main():
sleep(2)

print('==========Start Counter Increase as per Input:==========')
start_resp = d.start_workflow(
instance_id = wfc.schedule_new_workflow(
instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name,
input=input_data,
workflow_options=workflow_options,
)
print(f'start_resp {start_resp.instance_id}')
workflow=hello_world_wf,
input=input_data)
print(f'start_resp {instance_id}')

# Sleep for a while to let the workflow run
sleep(12)
Expand All @@ -135,75 +133,70 @@ def main():
assert child_orchestrator_string == '1aa2bb3cc'

# Pause Test
d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component)
get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
wfc.pause_workflow(instance_id=instance_id)
get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
print(f'Get response from {workflow_name} after pause call: {get_response.runtime_status}')

# Resume Test
d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component)
get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
wfc.resume_workflow(instance_id=instance_id)
get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
print(f'Get response from {workflow_name} after resume call: {get_response.runtime_status}')

sleep(1)
# Raise event
d.raise_workflow_event(
instance_id=child_instance_id,
workflow_component=workflow_component,
wfc.raise_workflow_event(
instance_id=instance_id,
event_name=event_name,
event_data=event_data,
data=event_data,
)

sleep(5)
# Purge Test
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
# // TODO IMPLEMENT PURGE
# d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
try:
d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
except DaprInternalError as err:
wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
except Exception as err:
# TODO temporary print
print(f'got error {err}')
if non_existent_id_error in err._message:
print('Instance Successfully Purged')

# Kick off another workflow for termination purposes
# This will also test using the same instance ID on a new workflow after
# the old instance was purged
start_resp = d.start_workflow(
instance_id = wfc.schedule_new_workflow(
instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name,
input=input_data,
workflow_options=workflow_options,
workflow=hello_world_wf,
input=input_data
)
print(f'start_resp {start_resp.instance_id}')
print(f'start_resp {instance_id}')

sleep(5)
# Terminate Test
d.terminate_workflow(instance_id=instance_id, workflow_component=workflow_component)
wfc.terminate_workflow(instance_id=instance_id)
sleep(1)
get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
get_response = wfc.get_workflow_state(instance_id=instance_id, fetch_payloads=True)
print(
f'Get response from {workflow_name} '
f'after terminate call: {get_response.runtime_status}'
)
child_get_response = d.get_workflow(
instance_id=child_instance_id, workflow_component=workflow_component
child_get_response = wfc.get_workflow_state(
instance_id=child_instance_id, fetch_payloads=True
)
print(
f'Get response from {child_workflow_name} '
f'after terminate call: {child_get_response.runtime_status}'
)

# Purge Test
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
try:
d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
except DaprInternalError as err:
if non_existent_id_error in err._message:
print('Instance Successfully Purged')
# TODO IMPLEMENT PURGE
# d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
# try:
# d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
# except DaprInternalError as err:
# if non_existent_id_error in err._message:
# print('Instance Successfully Purged')

workflow_runtime.shutdown()

Expand Down
2 changes: 1 addition & 1 deletion examples/demo_workflow/demo_workflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
dapr-ext-workflow-dev>=0.0.1rc1.dev
dapr-ext-workflow-dev>=0.4.1rc1.dev
2 changes: 2 additions & 0 deletions ext/dapr-ext-workflow/tests/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,5 @@ def test_client_functions(self):

actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId)
assert actual_resume_result == mock_resume_result

# TODO add purge support
2 changes: 1 addition & 1 deletion tests/clients/test_dapr_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ def test_unlock_input_validation(self):
# Tests for workflow
#

def test_workflow(self):
def test_workflow_deprecated(self):
dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}')
# Sane parameters
workflow_name = 'test_workflow'
Expand Down

0 comments on commit 6b093c2

Please sign in to comment.