From 477acd3e1aa5557ae7da9b28e29bb5f1dfbaaa8f Mon Sep 17 00:00:00 2001 From: Yevgen Polyak Date: Wed, 26 Jun 2024 19:04:58 +0200 Subject: [PATCH] Add health check support to gRPC app (#723) Signed-off-by: Yevgen Polyak Co-authored-by: Marc Duiker Co-authored-by: Bernd Verst --- .github/scripts/automerge.py | 5 +- examples/pubsub-simple/README.md | 2 +- examples/pubsub-simple/subscriber.py | 7 +++ .../dapr/ext/grpc/_health_servicer.py | 32 +++++++++++++ .../ext/grpc/{_servicier.py => _servicer.py} | 0 ext/dapr-ext-grpc/dapr/ext/grpc/app.py | 16 ++++++- ext/dapr-ext-grpc/tests/test_app.py | 14 ++++++ .../tests/test_health_servicer.py | 20 ++++++++ ext/dapr-ext-grpc/tests/test_servicier.py | 48 +++++++++---------- 9 files changed, 115 insertions(+), 29 deletions(-) create mode 100644 ext/dapr-ext-grpc/dapr/ext/grpc/_health_servicer.py rename ext/dapr-ext-grpc/dapr/ext/grpc/{_servicier.py => _servicer.py} (100%) create mode 100644 ext/dapr-ext-grpc/tests/test_health_servicer.py diff --git a/.github/scripts/automerge.py b/.github/scripts/automerge.py index f54537054..176962f93 100644 --- a/.github/scripts/automerge.py +++ b/.github/scripts/automerge.py @@ -25,9 +25,8 @@ def fetch_pulls(mergeable_state, labels={'automerge'}): return [pr for pr in repo.get_pulls(state='open', sort='created') - # noqa: E502 - if (not pr.draft) and (pr.mergeable_state == mergeable_state) and \ - (not labels or len(labels.intersection({label.name for label in pr.labels})) > 0)] + if (not pr.draft and pr.mergeable_state == mergeable_state + and (not labels or len(labels.intersection({label.name for label in pr.labels})) > 0))] def is_approved(pr): diff --git a/examples/pubsub-simple/README.md b/examples/pubsub-simple/README.md index 0ebb358f3..97af9a4a9 100644 --- a/examples/pubsub-simple/README.md +++ b/examples/pubsub-simple/README.md @@ -66,7 +66,7 @@ sleep: 15 ```bash # 2. Start Publisher -dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 python3 publisher.py +dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py ``` diff --git a/examples/pubsub-simple/subscriber.py b/examples/pubsub-simple/subscriber.py index e74192f3b..b905aaa6f 100644 --- a/examples/pubsub-simple/subscriber.py +++ b/examples/pubsub-simple/subscriber.py @@ -83,4 +83,11 @@ def mytopic_wildcard(event: v1.Event) -> TopicEventResponse: return TopicEventResponse('success') +# Example of an unhealthy status +# def unhealthy(): +# raise ValueError("Not healthy") +# app.register_health_check(unhealthy) + +app.register_health_check(lambda: print('Healthy')) + app.run(50051) diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/_health_servicer.py b/ext/dapr-ext-grpc/dapr/ext/grpc/_health_servicer.py new file mode 100644 index 000000000..029dff745 --- /dev/null +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/_health_servicer.py @@ -0,0 +1,32 @@ +import grpc +from typing import Callable, Optional + +from dapr.proto import appcallback_service_v1 +from dapr.proto.runtime.v1.appcallback_pb2 import HealthCheckResponse + +HealthCheckCallable = Optional[Callable[[], None]] + + +class _HealthCheckServicer(appcallback_service_v1.AppCallbackHealthCheckServicer): + """The implementation of HealthCheck Server. + + :class:`App` provides useful decorators to register method, topic, input bindings. + """ + + def __init__(self): + self._health_check_cb: Optional[HealthCheckCallable] = None + + def register_health_check(self, cb: HealthCheckCallable) -> None: + if not cb: + raise ValueError('health check callback must be defined') + self._health_check_cb = cb + + def HealthCheck(self, request, context): + """Health check.""" + + if not self._health_check_cb: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + self._health_check_cb() + return HealthCheckResponse() diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicier.py b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py similarity index 100% rename from ext/dapr-ext-grpc/dapr/ext/grpc/_servicier.py rename to ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/app.py b/ext/dapr-ext-grpc/dapr/ext/grpc/app.py index b08421d7d..13a0bec69 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/app.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/app.py @@ -19,7 +19,8 @@ from typing import Dict, Optional from dapr.conf import settings -from dapr.ext.grpc._servicier import _CallbackServicer, Rule # type: ignore +from dapr.ext.grpc._servicer import _CallbackServicer, Rule # type: ignore +from dapr.ext.grpc._health_servicer import _HealthCheckServicer # type: ignore from dapr.proto import appcallback_service_v1 @@ -43,6 +44,7 @@ def __init__(self, max_grpc_message_length: Optional[int] = None, **kwargs): kwargs: arguments to grpc.server() """ self._servicer = _CallbackServicer() + self._health_check_servicer = _HealthCheckServicer() if not kwargs: options = [] if max_grpc_message_length is not None: @@ -56,6 +58,9 @@ def __init__(self, max_grpc_message_length: Optional[int] = None, **kwargs): else: self._server = grpc.server(**kwargs) # type: ignore appcallback_service_v1.add_AppCallbackServicer_to_server(self._servicer, self._server) + appcallback_service_v1.add_AppCallbackHealthCheckServicer_to_server( + self._health_check_servicer, self._server + ) def __del__(self): self.stop() @@ -64,6 +69,15 @@ def add_external_service(self, servicer_callback, external_servicer): """Adds an external gRPC service to the same server""" servicer_callback(external_servicer, self._server) + def register_health_check(self, health_check_callback): + """Adds a health check callback + + The below example adds a basic health check to check Dapr gRPC is running + + @app.register_health_check(lambda: None) + """ + self._health_check_servicer.register_health_check(health_check_callback) + def run(self, app_port: Optional[int] = None, listen_address: Optional[str] = None) -> None: """Starts app gRPC server and waits until :class:`App`.stop() is called. diff --git a/ext/dapr-ext-grpc/tests/test_app.py b/ext/dapr-ext-grpc/tests/test_app.py index a8b63b359..2a33dd668 100644 --- a/ext/dapr-ext-grpc/tests/test_app.py +++ b/ext/dapr-ext-grpc/tests/test_app.py @@ -74,3 +74,17 @@ def handle_dead_letter(event: v1.Event) -> None: 'AppTests.test_subscribe_decorator..handle_dead_letter', str(subscription_map['pubsub:topic2:']), ) + + def test_register_health_check(self): + def health_check_cb(): + pass + + self._app.register_health_check(health_check_cb) + registered_cb = self._app._health_check_servicer._health_check_cb + self.assertIn( + 'AppTests.test_register_health_check..health_check_cb', str(registered_cb) + ) + + def test_no_health_check(self): + registered_cb = self._app._health_check_servicer._health_check_cb + self.assertIsNone(registered_cb) diff --git a/ext/dapr-ext-grpc/tests/test_health_servicer.py b/ext/dapr-ext-grpc/tests/test_health_servicer.py new file mode 100644 index 000000000..4e19af46a --- /dev/null +++ b/ext/dapr-ext-grpc/tests/test_health_servicer.py @@ -0,0 +1,20 @@ +import unittest +from unittest.mock import MagicMock + +from dapr.ext.grpc._health_servicer import _HealthCheckServicer + + +class OnInvokeTests(unittest.TestCase): + def setUp(self): + self._health_servicer = _HealthCheckServicer() + + def test_healthcheck_cb_called(self): + health_cb = MagicMock() + self._health_servicer.register_health_check(health_cb) + self._health_servicer.HealthCheck(None, MagicMock()) + health_cb.assert_called_once() + + def test_no_healthcheck_cb(self): + with self.assertRaises(NotImplementedError) as exception_context: + self._health_servicer.HealthCheck(None, MagicMock()) + self.assertIn('Method not implemented!', exception_context.exception.args[0]) diff --git a/ext/dapr-ext-grpc/tests/test_servicier.py b/ext/dapr-ext-grpc/tests/test_servicier.py index 4be179c70..2447eea3c 100644 --- a/ext/dapr-ext-grpc/tests/test_servicier.py +++ b/ext/dapr-ext-grpc/tests/test_servicier.py @@ -19,7 +19,7 @@ from dapr.clients.grpc._request import InvokeMethodRequest from dapr.clients.grpc._response import InvokeMethodResponse, TopicEventResponse -from dapr.ext.grpc._servicier import _CallbackServicer +from dapr.ext.grpc._servicer import _CallbackServicer from dapr.proto import common_v1, appcallback_v1 from google.protobuf.any_pb2 import Any as GrpcAny @@ -27,10 +27,10 @@ class OnInvokeTests(unittest.TestCase): def setUp(self): - self._servicier = _CallbackServicer() + self._servicer = _CallbackServicer() def _on_invoke(self, method_name, method_cb): - self._servicier.register_method(method_name, method_cb) + self._servicer.register_method(method_name, method_cb) # fake context fake_context = MagicMock() @@ -39,7 +39,7 @@ def _on_invoke(self, method_name, method_cb): ('key2', 'value1'), ) - return self._servicier.OnInvoke( + return self._servicer.OnInvoke( common_v1.InvokeRequest(method=method_name, data=GrpcAny()), fake_context, ) @@ -93,18 +93,18 @@ def method_cb(request: InvokeMethodRequest): class TopicSubscriptionTests(unittest.TestCase): def setUp(self): - self._servicier = _CallbackServicer() + self._servicer = _CallbackServicer() self._topic1_method = Mock() self._topic2_method = Mock() self._topic3_method = Mock() self._topic3_method.return_value = TopicEventResponse('success') self._topic4_method = Mock() - self._servicier.register_topic('pubsub1', 'topic1', self._topic1_method, {'session': 'key'}) - self._servicier.register_topic('pubsub1', 'topic3', self._topic3_method, {'session': 'key'}) - self._servicier.register_topic('pubsub2', 'topic2', self._topic2_method, {'session': 'key'}) - self._servicier.register_topic('pubsub2', 'topic3', self._topic3_method, {'session': 'key'}) - self._servicier.register_topic( + self._servicer.register_topic('pubsub1', 'topic1', self._topic1_method, {'session': 'key'}) + self._servicer.register_topic('pubsub1', 'topic3', self._topic3_method, {'session': 'key'}) + self._servicer.register_topic('pubsub2', 'topic2', self._topic2_method, {'session': 'key'}) + self._servicer.register_topic('pubsub2', 'topic3', self._topic3_method, {'session': 'key'}) + self._servicer.register_topic( 'pubsub3', 'topic4', self._topic4_method, @@ -121,12 +121,12 @@ def setUp(self): def test_duplicated_topic(self): with self.assertRaises(ValueError): - self._servicier.register_topic( + self._servicer.register_topic( 'pubsub1', 'topic1', self._topic1_method, {'session': 'key'} ) def test_list_topic_subscription(self): - resp = self._servicier.ListTopicSubscriptions(None, None) + resp = self._servicer.ListTopicSubscriptions(None, None) self.assertEqual('pubsub1', resp.subscriptions[0].pubsub_name) self.assertEqual('topic1', resp.subscriptions[0].topic) self.assertEqual({'session': 'key'}, resp.subscriptions[0].metadata) @@ -143,7 +143,7 @@ def test_list_topic_subscription(self): self.assertEqual({'session': 'key'}, resp.subscriptions[4].metadata) def test_topic_event(self): - self._servicier.OnTopicEvent( + self._servicer.OnTopicEvent( appcallback_v1.TopicEventRequest(pubsub_name='pubsub1', topic='topic1'), self.fake_context, ) @@ -151,7 +151,7 @@ def test_topic_event(self): self._topic1_method.assert_called_once() def test_topic3_event_called_once(self): - self._servicier.OnTopicEvent( + self._servicer.OnTopicEvent( appcallback_v1.TopicEventRequest(pubsub_name='pubsub1', topic='topic3'), self.fake_context, ) @@ -159,7 +159,7 @@ def test_topic3_event_called_once(self): self._topic3_method.assert_called_once() def test_topic3_event_response(self): - response = self._servicier.OnTopicEvent( + response = self._servicer.OnTopicEvent( appcallback_v1.TopicEventRequest(pubsub_name='pubsub1', topic='topic3'), self.fake_context, ) @@ -169,7 +169,7 @@ def test_topic3_event_response(self): ) def test_disable_topic_validation(self): - self._servicier.OnTopicEvent( + self._servicer.OnTopicEvent( appcallback_v1.TopicEventRequest(pubsub_name='pubsub3', topic='should_be_ignored'), self.fake_context, ) @@ -178,7 +178,7 @@ def test_disable_topic_validation(self): def test_non_registered_topic(self): with self.assertRaises(NotImplementedError): - self._servicier.OnTopicEvent( + self._servicer.OnTopicEvent( appcallback_v1.TopicEventRequest(pubsub_name='pubsub1', topic='topic_non_existed'), self.fake_context, ) @@ -186,12 +186,12 @@ def test_non_registered_topic(self): class BindingTests(unittest.TestCase): def setUp(self): - self._servicier = _CallbackServicer() + self._servicer = _CallbackServicer() self._binding1_method = Mock() self._binding2_method = Mock() - self._servicier.register_binding('binding1', self._binding1_method) - self._servicier.register_binding('binding2', self._binding2_method) + self._servicer.register_binding('binding1', self._binding1_method) + self._servicer.register_binding('binding2', self._binding2_method) # fake context self.fake_context = MagicMock() @@ -202,15 +202,15 @@ def setUp(self): def test_duplicated_binding(self): with self.assertRaises(ValueError): - self._servicier.register_binding('binding1', self._binding1_method) + self._servicer.register_binding('binding1', self._binding1_method) def test_list_bindings(self): - resp = self._servicier.ListInputBindings(None, None) + resp = self._servicer.ListInputBindings(None, None) self.assertEqual('binding1', resp.bindings[0]) self.assertEqual('binding2', resp.bindings[1]) def test_binding_event(self): - self._servicier.OnBindingEvent( + self._servicer.OnBindingEvent( appcallback_v1.BindingEventRequest(name='binding1'), self.fake_context, ) @@ -219,7 +219,7 @@ def test_binding_event(self): def test_non_registered_binding(self): with self.assertRaises(NotImplementedError): - self._servicier.OnBindingEvent( + self._servicer.OnBindingEvent( appcallback_v1.BindingEventRequest(name='binding3'), self.fake_context, )