Skip to content

Commit 848fbdc

Browse files
committed
workflow execution with return strategy
1 parent 9ea31b7 commit 848fbdc

File tree

5 files changed

+387
-4
lines changed

5 files changed

+387
-4
lines changed

src/conductor/client/http/api/workflow_resource_api.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3051,4 +3051,131 @@ def update_workflow_and_task_state_with_http_info(self, body, request_id, workfl
30513051
_return_http_data_only=params.get('_return_http_data_only'),
30523052
_preload_content=params.get('_preload_content', True),
30533053
_request_timeout=params.get('_request_timeout'),
3054+
collection_formats=collection_formats)
3055+
3056+
def execute_workflow_with_return_strategy(self, body, name, version, **kwargs): # noqa: E501
3057+
"""Execute a workflow synchronously with reactive response # noqa: E501
3058+
This method makes a synchronous HTTP request by default. To make an
3059+
asynchronous HTTP request, please pass async_req=True
3060+
>>> thread = api.execute_workflow_with_return_strategy(body,name,version)
3061+
>>> result = thread.get()
3062+
:param async_req bool
3063+
:param StartWorkflowRequest body: (required)
3064+
:param str name: (required)
3065+
:param int version: (required)
3066+
:param str request_id:
3067+
:param str wait_until_task_ref:
3068+
:param int wait_for_seconds:
3069+
:param str consistency: DURABLE or EVENTUAL
3070+
:param str return_strategy: TARGET_WORKFLOW or WAIT_WORKFLOW
3071+
:return: WorkflowRun
3072+
If the method is called asynchronously,
3073+
returns the request thread.
3074+
"""
3075+
kwargs['_return_http_data_only'] = True
3076+
if kwargs.get('async_req'):
3077+
return self.execute_workflow_with_return_strategy_with_http_info(body, name, version, **kwargs) # noqa: E501
3078+
else:
3079+
(data) = self.execute_workflow_with_return_strategy_with_http_info(body, name, version, **kwargs) # noqa: E501
3080+
return data
3081+
3082+
def execute_workflow_with_return_strategy_with_http_info(self, body, name, version, **kwargs): # noqa: E501
3083+
"""Execute a workflow synchronously with reactive response # noqa: E501
3084+
This method makes a synchronous HTTP request by default. To make an
3085+
asynchronous HTTP request, please pass async_req=True
3086+
>>> thread = api.execute_workflow_with_return_strategy_with_http_info(body, name, version, async_req=True)
3087+
>>> result = thread.get()
3088+
:param async_req bool
3089+
:param StartWorkflowRequest body: (required)
3090+
:param str name: (required)
3091+
:param int version: (required)
3092+
:param str request_id:
3093+
:param str wait_until_task_ref:
3094+
:param int wait_for_seconds:
3095+
:param str consistency: DURABLE or EVENTUAL
3096+
:param str return_strategy: TARGET_WORKFLOW or WAIT_WORKFLOW
3097+
:return: WorkflowRun
3098+
If the method is called asynchronously,
3099+
returns the request thread.
3100+
"""
3101+
3102+
all_params = ['body', 'name', 'version', 'request_id', 'wait_until_task_ref', 'wait_for_seconds', 'consistency',
3103+
'return_strategy', 'async_req', '_return_http_data_only', '_preload_content',
3104+
'_request_timeout'] # noqa: E501
3105+
3106+
params = locals()
3107+
for key, val in six.iteritems(params['kwargs']):
3108+
if key not in all_params:
3109+
raise TypeError(
3110+
"Got an unexpected keyword argument '%s'"
3111+
" to method execute_workflow" % key
3112+
)
3113+
params[key] = val
3114+
del params['kwargs']
3115+
# verify the required parameter 'body' is set
3116+
if ('body' not in params or
3117+
params['body'] is None):
3118+
raise ValueError("Missing the required parameter `body` when calling `execute_workflow`") # noqa: E501
3119+
# verify the required parameter 'name' is set
3120+
if ('name' not in params or
3121+
params['name'] is None):
3122+
raise ValueError("Missing the required parameter `name` when calling `execute_workflow`") # noqa: E501
3123+
# verify the required parameter 'version' is set
3124+
if ('version' not in params or
3125+
params['version'] is None):
3126+
raise ValueError("Missing the required parameter `version` when calling `execute_workflow`") # noqa: E501
3127+
3128+
collection_formats = {}
3129+
3130+
path_params = {}
3131+
if 'name' in params:
3132+
path_params['name'] = params['name'] # noqa: E501
3133+
if 'version' in params:
3134+
path_params['version'] = params['version'] # noqa: E501
3135+
3136+
query_params = []
3137+
if 'request_id' in params:
3138+
query_params.append(('requestId', params['request_id'])) # noqa: E501
3139+
if 'wait_until_task_ref' in params:
3140+
query_params.append(('waitUntilTaskRef', params['wait_until_task_ref'])) # noqa: E501
3141+
if 'wait_for_seconds' in params:
3142+
query_params.append(('waitForSeconds', params['wait_for_seconds'])) # noqa: E501
3143+
if 'consistency' in params:
3144+
query_params.append(('consistency', params['consistency'])) # noqa: E501
3145+
if 'return_strategy' in params:
3146+
query_params.append(('returnStrategy', params['return_strategy'])) # noqa: E501
3147+
3148+
header_params = {}
3149+
3150+
form_params = []
3151+
local_var_files = {}
3152+
3153+
body_params = None
3154+
if 'body' in params:
3155+
body_params = params['body']
3156+
# HTTP header `Accept`
3157+
header_params['Accept'] = self.api_client.select_header_accept(
3158+
['application/json']) # noqa: E501
3159+
3160+
# HTTP header `Content-Type`
3161+
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
3162+
['application/json']) # noqa: E501
3163+
3164+
# Authentication setting
3165+
auth_settings = ['api_key'] # noqa: E501
3166+
3167+
return self.api_client.call_api(
3168+
'/workflow/execute/{name}/{version}', 'POST',
3169+
path_params,
3170+
query_params,
3171+
header_params,
3172+
body=body_params,
3173+
post_params=form_params,
3174+
files=local_var_files,
3175+
response_type='SignalResponse', # noqa: E501
3176+
auth_settings=auth_settings,
3177+
async_req=params.get('async_req'),
3178+
_return_http_data_only=params.get('_return_http_data_only'),
3179+
_preload_content=params.get('_preload_content', True),
3180+
_request_timeout=params.get('_request_timeout'),
30543181
collection_formats=collection_formats)

src/conductor/client/orkes/orkes_workflow_client.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from conductor.client.configuration.configuration import Configuration
44
from conductor.client.http.models import SkipTaskRequest, WorkflowStatus, \
5-
ScrollableSearchResultWorkflowSummary
5+
ScrollableSearchResultWorkflowSummary, SignalResponse
66
from conductor.client.http.models.correlation_ids_search_request import CorrelationIdsSearchRequest
77
from conductor.client.http.models.rerun_workflow_request import RerunWorkflowRequest
88
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
@@ -59,6 +59,40 @@ def execute_workflow(
5959
wait_for_seconds=wait_for_seconds,
6060
)
6161

62+
def execute_workflow_with_return_strategy(
63+
self,
64+
start_workflow_request: StartWorkflowRequest,
65+
request_id: str = None,
66+
wait_until_task_ref: Optional[str] = None,
67+
wait_for_seconds: int = 30,
68+
consistency: Optional[str] = None,
69+
return_strategy: Optional[str] = None
70+
) -> SignalResponse:
71+
"""Execute a workflow synchronously with optional reactive features
72+
Args:
73+
start_workflow_request: StartWorkflowRequest containing workflow details
74+
request_id: Optional request ID for tracking
75+
wait_until_task_ref: Wait until this task reference is reached
76+
wait_for_seconds: How long to wait for completion (default 30)
77+
consistency: Workflow consistency level - 'DURABLE' or 'SYNCHRONOUS' or 'REGION_DURABLE'
78+
return_strategy: Return strategy - 'TARGET_WORKFLOW' or 'BLOCKING_WORKFLOW' or 'BLOCKING_TASK' or 'BLOCKING_TASK_INPUT'
79+
Returns:
80+
WorkflowRun: The workflow execution result
81+
"""
82+
if consistency is None:
83+
consistency = 'DURABLE'
84+
if return_strategy is None:
85+
return_strategy = 'TARGET_WORKFLOW'
86+
87+
return self.workflowResourceApi.execute_workflow_with_return_strategy(body=start_workflow_request,
88+
name=start_workflow_request.name,
89+
version=start_workflow_request.version,
90+
request_id=request_id,
91+
wait_until_task_ref=wait_until_task_ref,
92+
wait_for_seconds=wait_for_seconds,
93+
consistency=consistency,
94+
return_strategy=return_strategy)
95+
6296
def pause_workflow(self, workflow_id: str):
6397
self.workflowResourceApi.pause_workflow(workflow_id)
6498

src/conductor/client/workflow/executor/workflow_executor.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,21 @@ def execute_workflow(self, request: StartWorkflowRequest, wait_until_task_ref: s
5959
wait_for_seconds=wait_for_seconds,
6060
)
6161

62+
def execute_workflow_with_return_strategy(self, request: StartWorkflowRequest, wait_until_task_ref: str = None,
63+
wait_for_seconds: int = 10, request_id: str = None,
64+
consistency: str = None,
65+
return_strategy: str = None) -> SignalResponse:
66+
"""Execute a workflow synchronously with optional reactive features"""
67+
if request_id is None:
68+
request_id = str(uuid.uuid4())
69+
70+
return self.workflow_client.execute_workflow_with_return_strategy(start_workflow_request=request,
71+
request_id=request_id,
72+
wait_until_task_ref=wait_until_task_ref,
73+
wait_for_seconds=wait_for_seconds,
74+
consistency=consistency,
75+
return_strategy=return_strategy)
76+
6277
def execute(self, name: str, version: Optional[int] = None, workflow_input: Any = {},
6378
wait_until_task_ref: str = None, wait_for_seconds: int = 10,
6479
request_id: str = None, correlation_id: str = None, domain: str = None) -> WorkflowRun:

src/conductor/client/workflow_client.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Optional, List, Dict
33

44
from conductor.client.http.models import WorkflowRun, SkipTaskRequest, WorkflowStatus, \
5-
ScrollableSearchResultWorkflowSummary
5+
ScrollableSearchResultWorkflowSummary, SignalResponse
66
from conductor.client.http.models.correlation_ids_search_request import CorrelationIdsSearchRequest
77
from conductor.client.http.models.rerun_workflow_request import RerunWorkflowRequest
88
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
@@ -44,6 +44,18 @@ def execute_workflow(
4444
) -> WorkflowRun:
4545
pass
4646

47+
@abstractmethod
48+
def execute_workflow_with_return_strategy(
49+
self,
50+
start_workflow_request: StartWorkflowRequest,
51+
request_id: str = None,
52+
wait_until_task_ref: Optional[str] = None,
53+
wait_for_seconds: int = 30,
54+
consistency: Optional[str] = None,
55+
return_strategy: Optional[str] = None
56+
) -> SignalResponse:
57+
pass
58+
4759
@abstractmethod
4860
def pause_workflow(self, workflow_id: str):
4961
pass

0 commit comments

Comments
 (0)