Skip to content

Commit

Permalink
feat: Adds support for the DAPR_GRPC_ENDPOINT env variable (#610)
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <elena@kolevska.com>
Co-authored-by: Bernd Verst <github@bernd.dev>
  • Loading branch information
elena-kolevska and berndverst authored Oct 31, 2023
1 parent 690ac99 commit 7e3704d
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,5 @@ venv.bak/
# mypy
.mypy_cache/

# OSX specific files
# macOS specific files
.DS_Store
103 changes: 57 additions & 46 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,96 +19,107 @@
settings = Settings()

counter = 0
instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
inputData = "Hi Counter!"
workflowOptions = dict()
workflowOptions["task_queue"] = "testQueue"
eventName = "event1"
eventData = "eventData"
nonExistentIDError = "no such instance exists"

def hello_world_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
instance_id = "exampleInstanceID"
workflow_component = "dapr"
workflow_name = "hello_world_wf"
input_data = "Hi Counter!"
workflow_options = dict()
workflow_options["task_queue"] = "testQueue"
event_name = "event1"
event_data = "eventData"
non_existent_id_error = "no such instance exists"


def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, wf_input=1)
yield ctx.call_activity(hello_act, wf_input=10)
yield ctx.wait_for_external_event("event1")
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
yield ctx.call_activity(hello_act, wf_input=100)
yield ctx.call_activity(hello_act, wf_input=1000)

def hello_act(ctx: WorkflowActivityContext, input):

def hello_act(ctx: WorkflowActivityContext, wf_input):
global counter
counter += input
counter += wf_input
print(f'New counter value is: {counter}!', flush=True)


def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()
workflow_runtime = WorkflowRuntime(host, port)
workflow_runtime.register_workflow(hello_world_wf)
workflow_runtime.register_activity(hello_act)
workflow_runtime.start()

sleep(2)

print("==========Start Counter Increase as per Input:==========")
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
start_resp = d.start_workflow(instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name, input=input_data,
workflow_options=workflow_options)
print(f"start_resp {start_resp.instance_id}")

# Sleep for a while to let the workflow run
sleep(1)
assert counter == 11

# Pause Test
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component)
get_response = d.get_workflow(instance_id=instance_id,
workflow_component=workflow_component)
print(f"Get response from {workflow_name} after pause call: {get_response.runtime_status}")

# Resume Test
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component)
get_response = d.get_workflow(instance_id=instance_id,
workflow_component=workflow_component)
print(f"Get response from {workflow_name} after resume call: {get_response.runtime_status}")

sleep(1)
# Raise event
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)
d.raise_workflow_event(instance_id=instance_id, workflow_component=workflow_component,
event_name=event_name, event_data=event_data)

sleep(5)
# Purge Test
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
except DaprInternalError as err:
if nonExistentIDError in err._message:
if non_existent_id_error in err._message:
print("Instance Successfully Purged")


# Kick off another workflow for termination purposes
# Kick off another workflow for termination purposes
# This will also test using the same instance ID on a new workflow after
# the old instance was purged
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
start_resp = d.start_workflow(instance_id=instance_id,
workflow_component=workflow_component,
workflow_name=workflow_name, input=input_data,
workflow_options=workflow_options)
print(f"start_resp {start_resp.instance_id}")

# Terminate Test
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
d.terminate_workflow(instance_id=instance_id, workflow_component=workflow_component)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
get_response = d.get_workflow(instance_id=instance_id,
workflow_component=workflow_component)
print(
f"Get response from {workflow_name} after terminate call: {get_response.runtime_status}")

# Purge Test
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
d.purge_workflow(instance_id=instance_id, workflow_component=workflow_component)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
d.get_workflow(instance_id=instance_id, workflow_component=workflow_component)
except DaprInternalError as err:
if nonExistentIDError in err._message:
if non_existent_id_error in err._message:
print("Instance Successfully Purged")

workflowRuntime.shutdown()
workflow_runtime.shutdown()


if __name__ == '__main__':
main()
11 changes: 8 additions & 3 deletions ext/dapr-ext-workflow/dapr/ext/workflow/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@


def getAddress(host: Optional[str] = None, port: Optional[str] = None) -> str:
host = host or settings.DAPR_RUNTIME_HOST
port = port or settings.DAPR_GRPC_PORT
address = f"{host}:{port}"
if not host and not port:
address = settings.DAPR_GRPC_ENDPOINT or (f"{settings.DAPR_RUNTIME_HOST}:"
f"{settings.DAPR_GRPC_PORT}")
else:
host = host or settings.DAPR_RUNTIME_HOST
port = port or settings.DAPR_GRPC_PORT
address = f"{host}:{port}"

return address
30 changes: 30 additions & 0 deletions ext/dapr-ext-workflow/tests/test_workflow_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import unittest
from dapr.ext.workflow.util import getAddress
from unittest.mock import patch

from dapr.conf import settings


class DaprWorkflowUtilTest(unittest.TestCase):

def test_get_address_default(self):
expected = f"{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}"
self.assertEqual(expected, getAddress())

def test_get_address_with_constructor_arguments(self):
self.assertEqual("test.com:5000", getAddress("test.com", "5000"))

def test_get_address_with_partial_constructor_arguments(self):
expected = f"{settings.DAPR_RUNTIME_HOST}:5000"
self.assertEqual(expected, getAddress(port="5000"))

expected = f"test.com:{settings.DAPR_GRPC_PORT}"
self.assertEqual(expected, getAddress(host="test.com"))

@patch.object(settings, "DAPR_GRPC_ENDPOINT", "https://domain1.com:5000")
def test_get_address_with_constructor_arguments_and_env_variable(self):
self.assertEqual("test.com:5000", getAddress("test.com", "5000"))

@patch.object(settings, "DAPR_GRPC_ENDPOINT", "https://domain1.com:5000")
def test_get_address_with_env_variable(self):
self.assertEqual("https://domain1.com:5000", getAddress())

0 comments on commit 7e3704d

Please sign in to comment.