Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds support for the DAPR_GRPC_ENDPOINT env variable for workflows #610

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,5 @@ venv.bak/
# mypy
.mypy_cache/

# OSX specific files
# macOS specific files
.DS_Store
103 changes: 57 additions & 46 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,96 +19,107 @@
settings = Settings()

counter = 0
instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
inputData = "Hi Counter!"
workflowOptions = dict()
workflowOptions["task_queue"] = "testQueue"
eventName = "event1"
eventData = "eventData"
nonExistentIDError = "no such instance exists"

def hello_world_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
instance_id = "exampleInstanceID"
workflow_component = "dapr"
workflow_name = "hello_world_wf"
input_data = "Hi Counter!"
workflow_options = dict()
workflow_options["task_queue"] = "testQueue"
event_name = "event1"
event_data = "eventData"
non_existent_id_error = "no such instance exists"


def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, wf_input=1)
yield ctx.call_activity(hello_act, wf_input=10)
yield ctx.wait_for_external_event("event1")
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
yield ctx.call_activity(hello_act, wf_input=100)
yield ctx.call_activity(hello_act, wf_input=1000)

def hello_act(ctx: WorkflowActivityContext, input):

def hello_act(ctx: WorkflowActivityContext, wf_input):
global counter
counter += input
counter += wf_input
print(f'New counter value is: {counter}!', flush=True)


def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()
workflow_runtime = WorkflowRuntime(host, port)
workflow_runtime.register_workflow(hello_world_wf)
workflow_runtime.register_activity(hello_act)
workflow_runtime.start()

sleep(2)

print("==========Start Counter Increase as per Input:==========")
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
start_resp = d.start_workflow(instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name, input=input_data,
workflow_options=workflow_options)
print(f"start_resp {start_resp.instance_id}")

# Sleep for a while to let the workflow run
sleep(1)
assert counter == 11

# Pause Test
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component)
get_response = d.get_workflow(instance_id=instance_id,
workflow_component=workflow_component)
print(f"Get response from {workflow_name} after pause call: {get_response.runtime_status}")

# Resume Test
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component)
get_response = d.get_workflow(instance_id=instance_id,
workflow_component=workflow_component)
print(f"Get response from {workflow_name} after resume call: {get_response.runtime_status}")

sleep(1)
# Raise event
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)
d.raise_workflow_event(instance_id=instance_id, workflow_component=workflow_component,
event_name=event_name, event_data=event_data)

sleep(5)
# Purge Test
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
except DaprInternalError as err:
if nonExistentIDError in err._message:
if non_existent_id_error in err._message:
print("Instance Successfully Purged")


# Kick off another workflow for termination purposes
# Kick off another workflow for termination purposes
# This will also test using the same instance ID on a new workflow after
# the old instance was purged
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
start_resp = d.start_workflow(instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name, input=input_data,
workflow_options=workflow_options)
print(f"start_resp {start_resp.instance_id}")

# Terminate Test
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
d.terminate_workflow(instance_id=instance_id, workflow_component=workflow_component)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
get_response = d.get_workflow(instance_id=instance_id,
workflow_component=workflow_component)
print(
f"Get response from {workflow_name} after terminate call: {get_response.runtime_status}")

# Purge Test
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
except DaprInternalError as err:
if nonExistentIDError in err._message:
if non_existent_id_error in err._message:
print("Instance Successfully Purged")

workflowRuntime.shutdown()
workflow_runtime.shutdown()


if __name__ == '__main__':
main()
11 changes: 8 additions & 3 deletions ext/dapr-ext-workflow/dapr/ext/workflow/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@


def getAddress(host: Optional[str] = None, port: Optional[str] = None) -> str:
host = host or settings.DAPR_RUNTIME_HOST
port = port or settings.DAPR_GRPC_PORT
address = f"{host}:{port}"
if not host and not port:
address = settings.DAPR_GRPC_ENDPOINT or (f"{settings.DAPR_RUNTIME_HOST}:"
f"{settings.DAPR_GRPC_PORT}")
else:
host = host or settings.DAPR_RUNTIME_HOST
port = port or settings.DAPR_GRPC_PORT
address = f"{host}:{port}"

return address
30 changes: 30 additions & 0 deletions ext/dapr-ext-workflow/tests/test_workflow_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import unittest
from dapr.ext.workflow.util import getAddress
from unittest.mock import patch

from dapr.conf import settings


class DaprWorkflowUtilTest(unittest.TestCase):

def test_get_address_default(self):
expected = f"{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}"
self.assertEqual(expected, getAddress())

def test_get_address_with_constructor_arguments(self):
self.assertEqual("test.com:5000", getAddress("test.com", "5000"))

def test_get_address_with_partial_constructor_arguments(self):
expected = f"{settings.DAPR_RUNTIME_HOST}:5000"
self.assertEqual(expected, getAddress(port="5000"))

expected = f"test.com:{settings.DAPR_GRPC_PORT}"
self.assertEqual(expected, getAddress(host="test.com"))

@patch.object(settings, "DAPR_GRPC_ENDPOINT", "https://domain1.com:5000")
def test_get_address_with_constructor_arguments_and_env_variable(self):
self.assertEqual("test.com:5000", getAddress("test.com", "5000"))

@patch.object(settings, "DAPR_GRPC_ENDPOINT", "https://domain1.com:5000")
def test_get_address_with_env_variable(self):
self.assertEqual("https://domain1.com:5000", getAddress())
Loading