Skip to content

Commit

Permalink
Health check (#670)
Browse files Browse the repository at this point in the history
* Fixes http endpoint being overwritten by gRPC address argument in constructor (#621)

* Fixes bug of constructor argument being used as http endpoint

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Updates tests

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Updates the docs explaining how http service invocation should be configured

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Update daprdocs/content/en/python-sdk-docs/python-client.md

Signed-off-by: Bernd Verst <github@bernd.dev>

---------

Signed-off-by: Elena Kolevska <elena@kolevska.com>
Signed-off-by: Bernd Verst <github@bernd.dev>
Co-authored-by: Bernd Verst <github@bernd.dev>
Signed-off-by: Elena Kolevska <elena@kolevska.com>
# Conflicts:
#	dapr/clients/__init__.py
#	dapr/clients/http/client.py
#	dapr/clients/http/dapr_invocation_http_client.py
#	tests/clients/test_http_service_invocation_client.py
#	tests/clients/test_secure_http_service_invocation_client.py

* health decorator - first commit

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Fixes tests

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Removes unused imports

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Ruff format

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds unit test

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Repalces wait() with @healthcheck decorator in examples

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Ruff

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Linter

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* updates heathcheck decorator to use a global var

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Fixes tests

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Linter fixes

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Removes healthcheck from examples

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Ruff

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* wip

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* wip

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* wip

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* wip

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Set health timeout to 60 seconds

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* wip

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Unit tests passing

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Linter

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Refactor and cleanup

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Refactors client tests for speed and readability

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Small fix

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Add tests performance improvement for actor tests too

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Cosmetic touch up

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Documents the `DAPR_HEALTH_TIMEOUT` environment variable

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* make healthcheck a static method

Signed-off-by: Elena Kolevska <elena@kolevska.com>

---------

Signed-off-by: Elena Kolevska <elena@kolevska.com>
  • Loading branch information
elena-kolevska authored Feb 13, 2024
1 parent 23fc4b1 commit f4dc8ce
Show file tree
Hide file tree
Showing 34 changed files with 574 additions and 340 deletions.
14 changes: 12 additions & 2 deletions dapr/actor/client/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ActorProxy:
communication.
"""

_default_proxy_factory = ActorProxyFactory()
_default_proxy_factory = None

def __init__(
self,
Expand Down Expand Up @@ -127,6 +127,13 @@ def actor_type(self) -> str:
"""Returns actor type."""
return self._actor_type

@classmethod
def _get_default_factory_instance(cls):
"""Lazily initializes and returns the default ActorProxyFactory instance."""
if cls._default_proxy_factory is None:
cls._default_proxy_factory = ActorProxyFactory()
return cls._default_proxy_factory

@classmethod
def create(
cls,
Expand All @@ -146,8 +153,11 @@ def create(
Returns:
:class:`ActorProxy': new Actor Proxy client.
@param actor_proxy_factory:
"""
factory = cls._default_proxy_factory if not actor_proxy_factory else actor_proxy_factory
factory = (
actor_proxy_factory if actor_proxy_factory else cls._get_default_factory_instance()
)
return factory.create(actor_type, actor_id, actor_interface)

async def invoke_method(self, method: str, raw_body: Optional[bytes] = None) -> bytes:
Expand Down
3 changes: 3 additions & 0 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
from dapr.conf.helpers import GrpcEndpoint
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
Expand Down Expand Up @@ -129,6 +130,8 @@ def __init__(
max_grpc_messsage_length (int, optional): The maximum grpc send and receive
message length in bytes.
"""
DaprHealth.wait_until_ready()

useragent = f'dapr-sdk-python/{__version__}'
if not max_grpc_message_length:
options = [
Expand Down
2 changes: 2 additions & 0 deletions dapr/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dapr.conf import settings
from google.protobuf.message import Message as GrpcMessage


__all__ = [
'DaprClient',
'DaprActorClientBase',
Expand All @@ -32,6 +33,7 @@
'ERROR_CODE_UNKNOWN',
]


from grpc import ( # type: ignore
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
Expand Down
3 changes: 3 additions & 0 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
Expand Down Expand Up @@ -127,6 +128,8 @@ def __init__(
max_grpc_messsage_length (int, optional): The maximum grpc send and receive
message length in bytes.
"""
DaprHealth.wait_until_ready()

useragent = f'dapr-sdk-python/{__version__}'
if not max_grpc_message_length:
options = [
Expand Down
54 changes: 54 additions & 0 deletions dapr/clients/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-

"""
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import urllib.request
import urllib.error
import time

from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT
from dapr.clients.http.helpers import get_api_url
from dapr.conf import settings


class DaprHealth:
@staticmethod
def wait_until_ready():
health_url = f'{get_api_url()}/healthz/outbound'
headers = {USER_AGENT_HEADER: DAPR_USER_AGENT}
if settings.DAPR_API_TOKEN is not None:
headers[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN
timeout = settings.DAPR_HEALTH_TIMEOUT

start = time.time()
while True:
try:
req = urllib.request.Request(health_url, headers=headers)
with urllib.request.urlopen(req, context=DaprHealth.get_ssl_context()) as response:
if 200 <= response.status < 300:
break
except urllib.error.URLError as e:
print(f'Health check on {health_url} failed: {e.reason}')
except Exception as e:
print(f'Unexpected error during health check: {e}')

remaining = (start + timeout) - time.time()
if remaining <= 0:
raise TimeoutError(f'Dapr health check timed out, after {timeout}.')
time.sleep(min(1, remaining))

@staticmethod
def get_ssl_context():
# This method is used (overwritten) from tests
# to return context for self-signed certificates
return None
24 changes: 10 additions & 14 deletions dapr/clients/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@

from typing import Callable, Mapping, Dict, Optional, Union, Tuple, TYPE_CHECKING

from dapr.clients.http.conf import (
DAPR_API_TOKEN_HEADER,
USER_AGENT_HEADER,
DAPR_USER_AGENT,
CONTENT_TYPE_HEADER,
)
from dapr.clients.health import DaprHealth

if TYPE_CHECKING:
from dapr.serializers import Serializer

from dapr.conf import settings
from dapr.clients.base import DEFAULT_JSON_CONTENT_TYPE
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_DOES_NOT_EXIST, ERROR_CODE_UNKNOWN
from dapr.version import __version__

CONTENT_TYPE_HEADER = 'content-type'
DAPR_API_TOKEN_HEADER = 'dapr-api-token'
USER_AGENT_HEADER = 'User-Agent'
DAPR_USER_AGENT = f'dapr-sdk-python/{__version__}'


class DaprHttpClient:
Expand All @@ -47,18 +49,12 @@ def __init__(
timeout (int, optional): Timeout in seconds, defaults to 60.
headers_callback (lambda: Dict[str, str]], optional): Generates header for each request.
"""
DaprHealth.wait_until_ready()

self._timeout = aiohttp.ClientTimeout(total=timeout)
self._serializer = message_serializer
self._headers_callback = headers_callback

def get_api_url(self) -> str:
if settings.DAPR_HTTP_ENDPOINT:
return '{}/{}'.format(settings.DAPR_HTTP_ENDPOINT, settings.DAPR_API_VERSION)

return 'http://{}:{}/{}'.format(
settings.DAPR_RUNTIME_HOST, settings.DAPR_HTTP_PORT, settings.DAPR_API_VERSION
)

async def send_bytes(
self,
method: str,
Expand Down
21 changes: 21 additions & 0 deletions dapr/clients/http/conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from dapr.version import __version__

CONTENT_TYPE_HEADER = 'content-type'
DAPR_API_TOKEN_HEADER = 'dapr-api-token'
USER_AGENT_HEADER = 'User-Agent'
DAPR_USER_AGENT = f'dapr-sdk-python/{__version__}'
4 changes: 3 additions & 1 deletion dapr/clients/http/dapr_actor_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from typing import Callable, Dict, Optional, Union, TYPE_CHECKING

from dapr.clients.http.helpers import get_api_url

if TYPE_CHECKING:
from dapr.serializers import Serializer

Expand Down Expand Up @@ -145,4 +147,4 @@ async def unregister_timer(self, actor_type: str, actor_id: str, name: str) -> N
await self._client.send_bytes(method='DELETE', url=url, data=None)

def _get_base_url(self, actor_type: str, actor_id: str) -> str:
return '{}/actors/{}/{}'.format(self._client.get_api_url(), actor_type, actor_id)
return '{}/actors/{}/{}'.format(get_api_url(), actor_type, actor_id)
6 changes: 4 additions & 2 deletions dapr/clients/http/dapr_invocation_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
from typing import Callable, Dict, Optional, Union
from multidict import MultiDict

from dapr.clients.http.client import DaprHttpClient, CONTENT_TYPE_HEADER
from dapr.clients.http.client import DaprHttpClient
from dapr.clients.grpc._helpers import MetadataTuple, GrpcMessage
from dapr.clients.grpc._response import InvokeMethodResponse
from dapr.clients.http.conf import CONTENT_TYPE_HEADER
from dapr.clients.http.helpers import get_api_url
from dapr.serializers import DefaultJSONSerializer
from dapr.version import __version__

Expand Down Expand Up @@ -88,7 +90,7 @@ async def invoke_method_async(

headers[USER_AGENT_HEADER] = DAPR_USER_AGENT

url = f'{self._client.get_api_url()}/invoke/{app_id}/method/{method_name}'
url = f'{get_api_url()}/invoke/{app_id}/method/{method_name}'

if isinstance(data, GrpcMessage):
body = data.SerializeToString()
Expand Down
25 changes: 25 additions & 0 deletions dapr/clients/http/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from dapr.conf import settings


def get_api_url() -> str:
if settings.DAPR_HTTP_ENDPOINT:
return '{}/{}'.format(settings.DAPR_HTTP_ENDPOINT, settings.DAPR_API_VERSION)

return 'http://{}:{}/{}'.format(
settings.DAPR_RUNTIME_HOST, settings.DAPR_HTTP_PORT, settings.DAPR_API_VERSION
)
1 change: 1 addition & 0 deletions dapr/conf/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
DAPR_HTTP_PORT = 3500
DAPR_GRPC_PORT = 50001
DAPR_API_VERSION = 'v1.0'
DAPR_HEALTH_TIMEOUT = 60 # seconds

DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http'

Expand Down
8 changes: 8 additions & 0 deletions daprdocs/content/en/python-sdk-docs/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ If your Dapr instance is configured to require the `DAPR_API_TOKEN` environment
set it in the environment and the client will use it automatically.
You can read more about Dapr API token authentication [here](https://docs.dapr.io/operations/security/api-token/).

##### Health timeout
On client initialisation, a health check is performed against the Dapr sidecar (`/healthz/outboud`).
The client will wait for the sidecar to be up and running before proceeding.

The default timeout is 60 seconds, but it can be overridden by setting the `DAPR_HEALTH_TIMEOUT`
environment variable.


## Error handling
Initially, errors in Dapr followed the [Standard gRPC error model](https://grpc.io/docs/guides/error/#standard-error-model). However, to provide more detailed and informative error messages, in version 1.13 an enhanced error model has been introduced which aligns with the gRPC [Richer error model](https://grpc.io/docs/guides/error/#richer-error-model). In response, the Python SDK implemented `DaprGrpcError`, a custom exception class designed to improve the developer experience.
It's important to note that the transition to using `DaprGrpcError` for all gRPC status exceptions is a work in progress. As of now, not every API call in the SDK has been updated to leverage this custom exception. We are actively working on this enhancement and welcome contributions from the community.
Expand Down
23 changes: 12 additions & 11 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

These examples demonstrate how to use the Dapr Python SDK:

| Example | Description |
|---------|-------------|
| [Service invocation](./invoke-simple) | Invoke service by passing bytes data
| Example | Description |
|-------------------------------------------------------|-------------|
| [Service invocation](./invoke-simple) | Invoke service by passing bytes data
| [Service invocation (advanced)](./invoke-custom-data) | Invoke service by using custom protobuf message
| [State management](./state_store) | Save and get state to/from the state store
| [Publish & subscribe](./pubsub-simple) | Publish and subscribe to events
| [Bindings](./invoke-binding) | Invoke an output binding to interact with external resources
| [Virtual actors](./demo_actor) | Try Dapr virtual actor features
| [Secrets](./secret_store) | Get secrets from a defined secret store
| [Distributed tracing](./w3c-tracing) | Leverage Dapr's built-in tracing support
| [Distributed lock](./distributed_lock) | Keep your application safe from race conditions by using distributed locks
| [Workflow](./demo_workflow) | Run a workflow to simulate an order processor
| [State management](./state_store) | Save and get state to/from the state store
| [Publish & subscribe](./pubsub-simple) | Publish and subscribe to events
| [Error handling](./error_handling) | Error handling
| [Bindings](./invoke-binding) | Invoke an output binding to interact with external resources
| [Virtual actors](./demo_actor) | Try Dapr virtual actor features
| [Secrets](./secret_store) | Get secrets from a defined secret store
| [Distributed tracing](./w3c-tracing) | Leverage Dapr's built-in tracing support
| [Distributed lock](./distributed_lock) | Keep your application safe from race conditions by using distributed locks
| [Workflow](./demo_workflow) | Run a workflow to simulate an order processor

## More information

Expand Down
3 changes: 0 additions & 3 deletions examples/configuration/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ async def executeConfiguration():

keys = ['orderId1', 'orderId2']

# Wait for sidecar to be up within 20 seconds.
d.wait(20)

global configuration

# Get one configuration by key.
Expand Down
4 changes: 1 addition & 3 deletions examples/error_handling/error_handling.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from dapr.clients import DaprClient
from dapr.clients.exceptions import DaprGrpcError


with DaprClient() as d:
storeName = 'statestore'

key = 'key||'
value = 'value_1'

# Wait for sidecar to be up within 5 seconds.
d.wait(5)

# Save single state.
try:
d.save_state(store_name=storeName, key=key, value=value)
Expand Down
7 changes: 2 additions & 5 deletions examples/state_store/state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
from dapr.clients.grpc._state import StateItem


with DaprClient() as d:
storeName = 'statestore'

Expand All @@ -22,9 +23,6 @@
yet_another_key = 'key_3'
yet_another_value = 'value_3'

# Wait for sidecar to be up within 5 seconds.
d.wait(5)

# Save single state.
d.save_state(store_name=storeName, key=key, value=value)
print(f'State store has successfully saved {value} with {key} as key')
Expand Down Expand Up @@ -63,8 +61,7 @@
# StatusCode should be StatusCode.ABORTED.
print(f'Cannot save bulk due to bad etags. ErrorCode={err.code()}')

# For detailed error messages from the dapr runtime:
# print(f"Details={err.details()})
# For detailed error messages from the dapr runtime: # print(f"Details={err.details()})

# Get one state by key.
state = d.get_state(store_name=storeName, key=key, state_metadata={'metakey': 'metavalue'})
Expand Down
Loading

0 comments on commit f4dc8ce

Please sign in to comment.