Skip to content

Commit c22b2ee

Browse files
Retry all calls
Signed-off-by: Elena Kolevska <elena@kolevska.com>
1 parent b0ed644 commit c22b2ee

File tree

5 files changed

+49
-28
lines changed

5 files changed

+49
-28
lines changed

dapr/actor/client/proxy.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
from dapr.actor.actor_interface import ActorInterface
2020
from dapr.actor.id import ActorId
2121
from dapr.actor.runtime._type_utils import get_dispatchable_attrs_from_interface
22-
from dapr.clients import DaprActorClientBase, DaprActorHttpClient, RetryPolicy
22+
from dapr.clients import DaprActorClientBase, DaprActorHttpClient
23+
from dapr.clients.retry import RetryPolicy
2324
from dapr.serializers import Serializer, DefaultJSONSerializer
2425
from dapr.conf import settings
2526

dapr/clients/grpc/client.py

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,9 @@ def invoke_method(
329329
),
330330
)
331331

332-
response, call = self._stub.InvokeService.with_call(req, metadata=metadata, timeout=timeout)
332+
response, call = self.retry_policy.run_rpc(
333+
self._stub.InvokeService.with_call, req, metadata=metadata, timeout=timeout
334+
)
333335

334336
resp_data = InvokeMethodResponse(response.data, response.content_type)
335337
resp_data.headers = call.initial_metadata() # type: ignore
@@ -390,7 +392,9 @@ def invoke_binding(
390392
operation=operation,
391393
)
392394

393-
response, call = self._stub.InvokeBinding.with_call(req, metadata=metadata)
395+
response, call = self.retry_policy.run_rpc(
396+
self._stub.InvokeBinding.with_call, req, metadata=metadata
397+
)
394398
return BindingResponse(response.data, dict(response.metadata), call.initial_metadata())
395399

396400
def publish_event(
@@ -462,7 +466,9 @@ def publish_event(
462466

463467
try:
464468
# response is google.protobuf.Empty
465-
_, call = self._stub.PublishEvent.with_call(req, metadata=metadata)
469+
_, call = self.retry_policy.run_rpc(
470+
self._stub.PublishEvent.with_call, req, metadata=metadata
471+
)
466472
except RpcError as err:
467473
raise DaprGrpcError(err) from err
468474

@@ -509,7 +515,9 @@ def get_state(
509515
raise ValueError('State store name cannot be empty')
510516
req = api_v1.GetStateRequest(store_name=store_name, key=key, metadata=state_metadata)
511517
try:
512-
response, call = self._stub.GetState.with_call(req, metadata=metadata)
518+
response, call = self.retry_policy.run_rpc(
519+
self._stub.GetState.with_call, req, metadata=metadata
520+
)
513521
return StateResponse(
514522
data=response.data, etag=response.etag, headers=call.initial_metadata()
515523
)
@@ -562,7 +570,9 @@ def get_bulk_state(
562570
)
563571

564572
try:
565-
response, call = self._stub.GetBulkState.with_call(req, metadata=metadata)
573+
response, call = self.retry_policy.run_rpc(
574+
self._stub.GetBulkState.with_call, req, metadata=metadata
575+
)
566576
except RpcError as err:
567577
raise DaprGrpcError(err) from err
568578

@@ -624,7 +634,7 @@ def query_state(
624634
req = api_v1.QueryStateRequest(store_name=store_name, query=query, metadata=states_metadata)
625635

626636
try:
627-
response, call = self._stub.QueryStateAlpha1.with_call(req)
637+
response, call = self.retry_policy.run_rpc(self._stub.QueryStateAlpha1.with_call, req)
628638
except RpcError as err:
629639
raise DaprGrpcError(err) from err
630640

@@ -779,7 +789,9 @@ def save_bulk_state(
779789
req = api_v1.SaveStateRequest(store_name=store_name, states=req_states)
780790

781791
try:
782-
_, call = self._stub.SaveState.with_call(req, metadata=metadata)
792+
_, call = self.retry_policy.run_rpc(
793+
self._stub.SaveState.with_call, req, metadata=metadata
794+
)
783795
return DaprResponse(headers=call.initial_metadata())
784796
except RpcError as err:
785797
raise DaprGrpcError(err) from err
@@ -848,7 +860,9 @@ def execute_state_transaction(
848860
)
849861

850862
try:
851-
_, call = self._stub.ExecuteStateTransaction.with_call(req, metadata=metadata)
863+
_, call = self.retry_policy.run_rpc(
864+
self._stub.ExecuteStateTransaction.with_call, req, metadata=metadata
865+
)
852866
return DaprResponse(headers=call.initial_metadata())
853867
except RpcError as err:
854868
raise DaprGrpcError(err) from err
@@ -916,7 +930,9 @@ def delete_state(
916930
)
917931

918932
try:
919-
_, call = self._stub.DeleteState.with_call(req, metadata=metadata)
933+
_, call = self.retry_policy.run_rpc(
934+
self._stub.DeleteState.with_call, req, metadata=metadata
935+
)
920936
return DaprResponse(headers=call.initial_metadata())
921937
except RpcError as err:
922938
raise DaprGrpcError(err) from err
@@ -968,7 +984,9 @@ def get_secret(
968984

969985
req = api_v1.GetSecretRequest(store_name=store_name, key=key, metadata=secret_metadata)
970986

971-
response, call = self._stub.GetSecret.with_call(req, metadata=metadata)
987+
response, call = self.retry_policy.run_rpc(
988+
self._stub.GetSecret.with_call, req, metadata=metadata
989+
)
972990

973991
return GetSecretResponse(secret=response.data, headers=call.initial_metadata())
974992

@@ -1015,7 +1033,9 @@ def get_bulk_secret(
10151033

10161034
req = api_v1.GetBulkSecretRequest(store_name=store_name, metadata=secret_metadata)
10171035

1018-
response, call = self._stub.GetBulkSecret.with_call(req, metadata=metadata)
1036+
response, call = self.retry_policy.run_rpc(
1037+
self._stub.GetBulkSecret.with_call, req, metadata=metadata
1038+
)
10191039

10201040
secrets_map = {}
10211041
for key in response.data.keys():
@@ -1055,7 +1075,7 @@ def get_configuration(
10551075
req = api_v1.GetConfigurationRequest(
10561076
store_name=store_name, keys=keys, metadata=config_metadata
10571077
)
1058-
response, call = self._stub.GetConfiguration.with_call(req)
1078+
response, call = self.retry_policy.run_rpc(self._stub.GetConfiguration.with_call, req)
10591079
return ConfigurationResponse(items=response.items, headers=call.initial_metadata())
10601080

10611081
def subscribe_configuration(
@@ -1163,7 +1183,7 @@ def try_lock(
11631183
lock_owner=lock_owner,
11641184
expiry_in_seconds=expiry_in_seconds,
11651185
)
1166-
response, call = self._stub.TryLockAlpha1.with_call(req)
1186+
response, call = self.retry_policy.run_rpc(self._stub.TryLockAlpha1.with_call, req)
11671187
return TryLockResponse(
11681188
success=response.success,
11691189
client=self,
@@ -1201,7 +1221,7 @@ def unlock(self, store_name: str, resource_id: str, lock_owner: str) -> UnlockRe
12011221
req = api_v1.UnlockRequest(
12021222
store_name=store_name, resource_id=resource_id, lock_owner=lock_owner
12031223
)
1204-
response, call = self._stub.UnlockAlpha1.with_call(req)
1224+
response, call = self.retry_policy.run_rpc(self._stub.UnlockAlpha1.with_call, req)
12051225

12061226
return UnlockResponse(
12071227
status=UnlockResponseStatus(response.status), headers=call.initial_metadata()
@@ -1267,7 +1287,7 @@ def start_workflow(
12671287
)
12681288

12691289
try:
1270-
response = self._stub.StartWorkflowBeta1(req)
1290+
response, call = self.retry_policy.run_rpc(self._stub.StartWorkflowBeta1.with_call, req)
12711291
return StartWorkflowResponse(instance_id=response.instance_id)
12721292
except RpcError as err:
12731293
raise DaprInternalError(err.details())
@@ -1297,7 +1317,7 @@ def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflow
12971317
)
12981318

12991319
try:
1300-
resp = self._stub.GetWorkflowBeta1(req)
1320+
resp = self.retry_policy.run_rpc(self._stub.GetWorkflowBeta1, req)
13011321
if resp.created_at is None:
13021322
resp.created_at = datetime.now()
13031323
if resp.last_updated_at is None:
@@ -1339,7 +1359,7 @@ def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprR
13391359
)
13401360

13411361
try:
1342-
_, call = self._stub.TerminateWorkflowBeta1.with_call(req)
1362+
_, call = self.retry_policy.run_rpc(self._stub.TerminateWorkflowBeta1.with_call, req)
13431363
return DaprResponse(headers=call.initial_metadata())
13441364
except RpcError as err:
13451365
raise DaprInternalError(err.details())
@@ -1410,7 +1430,7 @@ def raise_workflow_event(
14101430
)
14111431

14121432
try:
1413-
_, call = self._stub.RaiseEventWorkflowBeta1.with_call(req)
1433+
_, call = self.retry_policy.run_rpc(self._stub.RaiseEventWorkflowBeta1.with_call, req)
14141434
return DaprResponse(headers=call.initial_metadata())
14151435
except RpcError as err:
14161436
raise DaprInternalError(err.details())
@@ -1441,7 +1461,7 @@ def pause_workflow(self, instance_id: str, workflow_component: str) -> DaprRespo
14411461
)
14421462

14431463
try:
1444-
_, call = self._stub.PauseWorkflowBeta1.with_call(req)
1464+
_, call = self.retry_policy.run_rpc(self._stub.PauseWorkflowBeta1.with_call, req)
14451465

14461466
return DaprResponse(headers=call.initial_metadata())
14471467
except RpcError as err:
@@ -1472,7 +1492,7 @@ def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResp
14721492
)
14731493

14741494
try:
1475-
_, call = self._stub.ResumeWorkflowBeta1.with_call(req)
1495+
_, call = self.retry_policy.run_rpc(self._stub.ResumeWorkflowBeta1.with_call, req)
14761496

14771497
return DaprResponse(headers=call.initial_metadata())
14781498
except RpcError as err:
@@ -1503,7 +1523,7 @@ def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprRespo
15031523
)
15041524

15051525
try:
1506-
_, call = self._stub.PurgeWorkflowBeta1.with_call(req)
1526+
response, call = self.retry_policy.run_rpc(self._stub.PurgeWorkflowBeta1.with_call, req)
15071527

15081528
return DaprResponse(headers=call.initial_metadata())
15091529

@@ -1559,7 +1579,7 @@ def get_metadata(self) -> GetMetadataResponse:
15591579
capabilities.
15601580
"""
15611581
try:
1562-
_resp, call = self._stub.GetMetadata.with_call(GrpcEmpty())
1582+
_resp, call = self.retry_policy.run_rpc(self._stub.GetMetadata.with_call, GrpcEmpty())
15631583
except RpcError as err:
15641584
raise DaprGrpcError(err) from err
15651585

@@ -1605,7 +1625,7 @@ def set_metadata(self, attributeName: str, attributeValue: str) -> DaprResponse:
16051625
validateNotNone(attributeValue=attributeValue)
16061626
# Actual invocation
16071627
req = api_v1.SetMetadataRequest(key=attributeName, value=attributeValue)
1608-
_, call = self._stub.SetMetadata.with_call(req)
1628+
_, call = self.retry_policy.run_rpc(self._stub.SetMetadata.with_call, req)
16091629

16101630
return DaprResponse(call.initial_metadata())
16111631

@@ -1625,6 +1645,6 @@ def shutdown(self) -> DaprResponse:
16251645
:class:`DaprResponse` gRPC metadata returned from callee
16261646
"""
16271647

1628-
_, call = self._stub.Shutdown.with_call(GrpcEmpty())
1648+
_, call = self.retry_policy.run_rpc(self._stub.Shutdown.with_call, GrpcEmpty())
16291649

16301650
return DaprResponse(call.initial_metadata())

examples/demo_actor/demo_actor/demo_actor_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import asyncio
1414

1515
from dapr.actor import ActorProxy, ActorId, ActorProxyFactory
16-
from dapr.clients import RetryPolicy
16+
from dapr.clients.retry import RetryPolicy
1717
from demo_actor_interface import DemoActorInterface
1818

1919

ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any
2020
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
2121
from dapr.ext.workflow.workflow_state import WorkflowState, WorkflowStatus
22-
from dapr.ext.workflow.retry_policy import RetryPolicy
22+
from dapr.clients.retry import RetryPolicy
2323

2424
__all__ = [
2525
'WorkflowRuntime',

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from dapr.ext.workflow.workflow_context import WorkflowContext, Workflow
2222
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
2323
from dapr.ext.workflow.logger import LoggerOptions, Logger
24-
from dapr.ext.workflow.retry_policy import RetryPolicy
24+
from dapr.clients.retry import RetryPolicy
2525

2626
T = TypeVar('T')
2727
TInput = TypeVar('TInput')

0 commit comments

Comments
 (0)