Skip to content

Commit

Permalink
Retry and Timeout policies for grpc and http (#679)
Browse files Browse the repository at this point in the history
* Brings back the link we removed because it wasn’t online yet

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Retry decorator

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Use a retry function instead of a decorator

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* adds retries for async

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Fixes types

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Timeout policy

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Cleanup

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* retry for http wip.

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* tests for http retries

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* New ruff version has some formatting changes

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Revert "New ruff version has some formatting changes"

This reverts commit 8609b7f.

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Fixes setting values with a default of None

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Moves timeout

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds a deprecation warning for the wait() method (#682)

* Adds a deprecation warning for the wait() method

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Ruff

Signed-off-by: Elena Kolevska <elena@kolevska.com>

---------

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Pin ruff version

Signed-off-by: Bernd Verst <github@bernd.dev>
Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Pin ruff version in tox.ini

Signed-off-by: Bernd Verst <github@bernd.dev>
Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Lint fix

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Reorganises tests. Adds retry for actors.

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Linters

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Retry all calls

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Small fix and docs update

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds a note to docs, for clarity

Signed-off-by: Elena Kolevska <elena@kolevska.com>

---------

Signed-off-by: Elena Kolevska <elena@kolevska.com>
Signed-off-by: Bernd Verst <github@bernd.dev>
Co-authored-by: Bernd Verst <github@bernd.dev>
  • Loading branch information
elena-kolevska and berndverst authored Jun 27, 2024
1 parent e8256b5 commit f43c0aa
Show file tree
Hide file tree
Showing 21 changed files with 1,036 additions and 157 deletions.
6 changes: 5 additions & 1 deletion dapr/actor/client/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dapr.actor.id import ActorId
from dapr.actor.runtime._type_utils import get_dispatchable_attrs_from_interface
from dapr.clients import DaprActorClientBase, DaprActorHttpClient
from dapr.clients.retry import RetryPolicy
from dapr.serializers import Serializer, DefaultJSONSerializer
from dapr.conf import settings

Expand Down Expand Up @@ -50,9 +51,12 @@ def __init__(
self,
message_serializer=DefaultJSONSerializer(),
http_timeout_seconds: int = settings.DAPR_HTTP_TIMEOUT_SECONDS,
retry_policy: Optional[RetryPolicy] = None,
):
# TODO: support serializer for state store later
self._dapr_client = DaprActorHttpClient(message_serializer, timeout=http_timeout_seconds)
self._dapr_client = DaprActorHttpClient(
message_serializer, timeout=http_timeout_seconds, retry_policy=retry_policy
)
self._message_serializer = message_serializer

def create(
Expand Down
43 changes: 28 additions & 15 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
from dapr.conf.helpers import GrpcEndpoint
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
from dapr.version import __version__

from dapr.aio.clients.grpc._asynchelpers import DaprClientInterceptorAsync
from dapr.aio.clients.grpc.interceptors import (
DaprClientInterceptorAsync,
DaprClientTimeoutInterceptorAsync,
)
from dapr.clients.grpc._helpers import (
MetadataTuple,
to_bytes,
Expand Down Expand Up @@ -118,6 +122,7 @@ def __init__(
]
] = None,
max_grpc_message_length: Optional[int] = None,
retry_policy: Optional[RetryPolicy] = None,
):
"""Connects to Dapr Runtime and initialize gRPC client stub.
Expand All @@ -131,6 +136,7 @@ def __init__(
message length in bytes.
"""
DaprHealth.wait_until_ready()
self.retry_policy = retry_policy or RetryPolicy()

useragent = f'dapr-sdk-python/{__version__}'
if not max_grpc_message_length:
Expand All @@ -154,26 +160,32 @@ def __init__(
except ValueError as error:
raise DaprInternalError(f'{error}') from error

if self._uri.tls:
self._channel = grpc.aio.secure_channel(
self._uri.endpoint, credentials=self.get_credentials(), options=options
) # type: ignore
# Prepare interceptors
if interceptors is None:
interceptors = [DaprClientTimeoutInterceptorAsync()]
else:
self._channel = grpc.aio.insecure_channel(self._uri.endpoint, options) # type: ignore
interceptors.append(DaprClientTimeoutInterceptorAsync())

if settings.DAPR_API_TOKEN:
api_token_interceptor = DaprClientInterceptorAsync(
[
('dapr-api-token', settings.DAPR_API_TOKEN),
]
)
self._channel = grpc.aio.insecure_channel( # type: ignore
address, options=options, interceptors=(api_token_interceptor,)
)
if interceptors:
self._channel = grpc.aio.insecure_channel( # type: ignore
address, options=options, *interceptors
)
interceptors.append(api_token_interceptor)

# Create gRPC channel
if self._uri.tls:
self._channel = grpc.aio.secure_channel(
self._uri.endpoint,
credentials=self.get_credentials(),
options=options,
interceptors=interceptors,
) # type: ignore
else:
self._channel = grpc.aio.insecure_channel(
self._uri.endpoint, options, interceptors=interceptors
) # type: ignore

self._stub = api_service_v1.DaprStub(self._channel)

Expand Down Expand Up @@ -713,8 +725,9 @@ async def save_state(

req = api_v1.SaveStateRequest(store_name=store_name, states=[state])
try:
call = self._stub.SaveState(req, metadata=metadata)
await call
result, call = await self.retry_policy.run_rpc_async(
self._stub.SaveState, req, metadata=metadata
)
return DaprResponse(headers=await call.initial_metadata())
except AioRpcError as e:
raise DaprInternalError(e.details()) from e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from grpc.aio import UnaryUnaryClientInterceptor, ClientCallDetails # type: ignore

from dapr.conf import settings


class _ClientCallDetailsAsync(
namedtuple(
Expand All @@ -33,6 +35,22 @@ class _ClientCallDetailsAsync(
pass


class DaprClientTimeoutInterceptorAsync(UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, continuation, client_call_details, request):
# If a specific timeout is not set, create a new ClientCallDetails with the default timeout
if client_call_details.timeout is None:
new_client_call_details = _ClientCallDetailsAsync(
client_call_details.method,
settings.DAPR_API_TIMEOUT_SECONDS,
client_call_details.metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
)
return continuation(new_client_call_details, request)

return continuation(client_call_details, request)


class DaprClientInterceptorAsync(UnaryUnaryClientInterceptor):
"""The class implements a UnaryUnaryClientInterceptor from grpc to add an interceptor to add
additional headers to all calls as needed.
Expand Down
4 changes: 3 additions & 1 deletion dapr/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dapr.clients.grpc.client import DaprGrpcClient, MetadataTuple, InvokeMethodResponse
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
from dapr.clients.retry import RetryPolicy
from dapr.conf import settings
from google.protobuf.message import Message as GrpcMessage

Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(
] = None,
http_timeout_seconds: Optional[int] = None,
max_grpc_message_length: Optional[int] = None,
retry_policy: Optional[RetryPolicy] = None,
):
"""Connects to Dapr Runtime via gRPC and HTTP.
Expand All @@ -78,7 +80,7 @@ def __init__(
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes.
"""
super().__init__(address, interceptors, max_grpc_message_length)
super().__init__(address, interceptors, max_grpc_message_length, retry_policy)
self.invocation_client = None

invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper()
Expand Down
91 changes: 0 additions & 91 deletions dapr/clients/grpc/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from collections import namedtuple
from typing import Dict, List, Union, Tuple, Optional
from enum import Enum
from google.protobuf.any_pb2 import Any as GrpcAny
from google.protobuf.message import Message as GrpcMessage
from grpc import UnaryUnaryClientInterceptor, ClientCallDetails # type: ignore

MetadataDict = Dict[str, List[Union[bytes, str]]]
MetadataTuple = Tuple[Tuple[str, Union[bytes, str]], ...]
Expand Down Expand Up @@ -78,96 +76,7 @@ def to_str(data: Union[str, bytes]) -> str:
raise f'invalid data type {type(data)}'


class _ClientCallDetails(
namedtuple(
'_ClientCallDetails',
['method', 'timeout', 'metadata', 'credentials', 'wait_for_ready', 'compression'],
),
ClientCallDetails,
):
"""This is an implementation of the ClientCallDetails interface needed for interceptors.
This class takes six named values and inherits the ClientCallDetails from grpc package.
This class encloses the values that describe a RPC to be invoked.
"""

pass


class DaprClientInterceptor(UnaryUnaryClientInterceptor):
"""The class implements a UnaryUnaryClientInterceptor from grpc to add an interceptor to add
additional headers to all calls as needed.
Examples:
interceptor = HeaderInterceptor([('header', 'value', )])
intercepted_channel = grpc.intercept_channel(grpc_channel, interceptor)
With multiple header values:
interceptor = HeaderInterceptor([('header1', 'value1', ), ('header2', 'value2', )])
intercepted_channel = grpc.intercept_channel(grpc_channel, interceptor)
"""

def __init__(self, metadata: List[Tuple[str, str]]):
"""Initializes the metadata field for the class.
Args:
metadata list[tuple[str, str]]: list of tuple of (key, value) strings
representing header values
"""

self._metadata = metadata

def _intercept_call(self, client_call_details: ClientCallDetails) -> ClientCallDetails:
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
call details.
Args:
client_call_details :class: `ClientCallDetails`: object that describes a RPC
to be invoked
Returns:
:class: `ClientCallDetails` modified call details
"""

metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.extend(self._metadata)

new_call_details = _ClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
client_call_details.compression,
)
return new_call_details

def intercept_unary_unary(self, continuation, client_call_details, request):
"""This method intercepts a unary-unary gRPC call. This is the implementation of the
abstract method defined in UnaryUnaryClientInterceptor defined in grpc. This is invoked
automatically by grpc based on the order in which interceptors are added to the channel.
Args:
continuation: a callable to be invoked to continue with the RPC or next interceptor
client_call_details: a ClientCallDetails object describing the outgoing RPC
request: the request value for the RPC
Returns:
A response object after invoking the continuation callable
"""
# Pre-process or intercept call
new_call_details = self._intercept_call(client_call_details)
# Call continuation
response = continuation(new_call_details, request)
return response


# Data validation helpers


def validateNotNone(**kwargs: Optional[str]):
for field_name, value in kwargs.items():
if value is None:
Expand Down
Loading

0 comments on commit f43c0aa

Please sign in to comment.