From 0a523e2ff396918b1e245c525dc494e2da10c629 Mon Sep 17 00:00:00 2001 From: Keisuke Nakata Date: Tue, 4 Feb 2025 08:49:03 +0900 Subject: [PATCH] implement healthcheck --- src/otaclient/_logging.py | 14 +++++- .../otaclient_iot_logging_server_v1_pb2.py | 20 +++++--- .../otaclient_iot_logging_server_v1_pb2.pyi | 26 ++++++++++ ...taclient_iot_logging_server_v1_pb2_grpc.py | 48 +++++++++++++++++++ tests/test_otaclient/test_logging.py | 10 +++- 5 files changed, 108 insertions(+), 10 deletions(-) diff --git a/src/otaclient/_logging.py b/src/otaclient/_logging.py index bf6ff7bd6..b17858008 100644 --- a/src/otaclient/_logging.py +++ b/src/otaclient/_logging.py @@ -13,7 +13,6 @@ # limitations under the License. """Configure the logging for otaclient.""" - from __future__ import annotations import atexit @@ -60,6 +59,10 @@ def __init__(self, logging_upload_endpoint, ecu_id: str): def send(self, log_type: LogType, message: str, timeout: int) -> None: pass + @abstractmethod + def check(self, timeout: int) -> None: + pass + class TransmitterGrpc(Transmitter): def __init__(self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str): @@ -100,6 +103,9 @@ def send(self, log_type: LogType, message: str, timeout: int) -> None: with contextlib.suppress(Exception): self._session.post(self.log_upload_endpoint, data=message, timeout=timeout) + def check(self, timeout: int) -> None: + pass + class TransmitterFactory: @staticmethod @@ -145,7 +151,11 @@ def _thread_main(): if not entry: continue # skip uploading empty log line - _transmitter.send(entry.log_type, entry.message, timeout=3) + try: + _transmitter.send(entry.log_type, entry.message, timeout=3) + except Exception: + # ignore the exception and continue + pass log_upload_thread = Thread(target=_thread_main, daemon=True) log_upload_thread.start() diff --git a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py index e1ff355ef..e6a1b00bc 100644 --- a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py @@ -13,7 +13,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\nIotaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto"L\n\rPutLogRequest\x12\x0e\n\x06\x65\x63u_id\x18\x01 \x01(\t\x12\x1a\n\x08log_type\x18\x02 \x01(\x0e\x32\x08.LogType\x12\x0f\n\x07message\x18\x03 \x01(\t";\n\x0ePutLogResponse\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t*\x1f\n\x07LogType\x12\x07\n\x03LOG\x10\x00\x12\x0b\n\x07METRICS\x10\x01*Z\n\tErrorCode\x12\x0e\n\nNO_FAILURE\x10\x00\x12\x15\n\x11SERVER_QUEUE_FULL\x10\x01\x12\x16\n\x12NOT_ALLOWED_ECU_ID\x10\x02\x12\x0e\n\nNO_MESSAGE\x10\x03\x32I\n\x1aOtaClientIoTLoggingService\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse"\x00\x62\x06proto3' + b'\nIotaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto"L\n\rPutLogRequest\x12\x0e\n\x06\x65\x63u_id\x18\x01 \x01(\t\x12\x1a\n\x08log_type\x18\x02 \x01(\x0e\x32\x08.LogType\x12\x0f\n\x07message\x18\x03 \x01(\t";\n\x0ePutLogResponse\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t"%\n\x12HealthCheckRequest\x12\x0f\n\x07service\x18\x01 \x01(\t"\x9a\x01\n\x13HealthCheckResponse\x12\x32\n\x06status\x18\x01 \x01(\x0e\x32".HealthCheckResponse.ServingStatus"O\n\rServingStatus\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SERVING\x10\x01\x12\x0f\n\x0bNOT_SERVING\x10\x02\x12\x13\n\x0fSERVICE_UNKNOWN\x10\x03*\x1f\n\x07LogType\x12\x07\n\x03LOG\x10\x00\x12\x0b\n\x07METRICS\x10\x01*Z\n\tErrorCode\x12\x0e\n\nNO_FAILURE\x10\x00\x12\x15\n\x11SERVER_QUEUE_FULL\x10\x01\x12\x16\n\x12NOT_ALLOWED_ECU_ID\x10\x02\x12\x0e\n\nNO_MESSAGE\x10\x03\x32}\n\x1aOtaClientIoTLoggingService\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse"\x00\x12\x32\n\x05\x43heck\x12\x13.HealthCheckRequest\x1a\x14.HealthCheckResponseb\x06proto3' ) _globals = globals() @@ -26,14 +26,20 @@ if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals["_LOGTYPE"]._serialized_start = 216 - _globals["_LOGTYPE"]._serialized_end = 247 - _globals["_ERRORCODE"]._serialized_start = 249 - _globals["_ERRORCODE"]._serialized_end = 339 + _globals["_LOGTYPE"]._serialized_start = 412 + _globals["_LOGTYPE"]._serialized_end = 443 + _globals["_ERRORCODE"]._serialized_start = 445 + _globals["_ERRORCODE"]._serialized_end = 535 _globals["_PUTLOGREQUEST"]._serialized_start = 77 _globals["_PUTLOGREQUEST"]._serialized_end = 153 _globals["_PUTLOGRESPONSE"]._serialized_start = 155 _globals["_PUTLOGRESPONSE"]._serialized_end = 214 - _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_start = 341 - _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_end = 414 + _globals["_HEALTHCHECKREQUEST"]._serialized_start = 216 + _globals["_HEALTHCHECKREQUEST"]._serialized_end = 253 + _globals["_HEALTHCHECKRESPONSE"]._serialized_start = 256 + _globals["_HEALTHCHECKRESPONSE"]._serialized_end = 410 + _globals["_HEALTHCHECKRESPONSE_SERVINGSTATUS"]._serialized_start = 331 + _globals["_HEALTHCHECKRESPONSE_SERVINGSTATUS"]._serialized_end = 410 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_start = 537 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_end = 662 # @@protoc_insertion_point(module_scope) diff --git a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi index 6c5124be8..52246c1b8 100644 --- a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi @@ -50,3 +50,29 @@ class PutLogResponse(_message.Message): code: _Optional[_Union[ErrorCode, str]] = ..., message: _Optional[str] = ..., ) -> None: ... + +class HealthCheckRequest(_message.Message): + __slots__ = ["service"] + SERVICE_FIELD_NUMBER: _ClassVar[int] + service: str + def __init__(self, service: _Optional[str] = ...) -> None: ... + +class HealthCheckResponse(_message.Message): + __slots__ = ["status"] + + class ServingStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = [] + UNKNOWN: _ClassVar[HealthCheckResponse.ServingStatus] + SERVING: _ClassVar[HealthCheckResponse.ServingStatus] + NOT_SERVING: _ClassVar[HealthCheckResponse.ServingStatus] + SERVICE_UNKNOWN: _ClassVar[HealthCheckResponse.ServingStatus] + + UNKNOWN: HealthCheckResponse.ServingStatus + SERVING: HealthCheckResponse.ServingStatus + NOT_SERVING: HealthCheckResponse.ServingStatus + SERVICE_UNKNOWN: HealthCheckResponse.ServingStatus + STATUS_FIELD_NUMBER: _ClassVar[int] + status: HealthCheckResponse.ServingStatus + def __init__( + self, status: _Optional[_Union[HealthCheckResponse.ServingStatus, str]] = ... + ) -> None: ... diff --git a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py index d24ceaefd..fab8a3d5d 100644 --- a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py @@ -21,6 +21,11 @@ def __init__(self, channel): request_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogRequest.SerializeToString, response_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogResponse.FromString, ) + self.Check = channel.unary_unary( + "/OtaClientIoTLoggingService/Check", + request_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckRequest.SerializeToString, + response_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckResponse.FromString, + ) class OtaClientIoTLoggingServiceServicer(object): @@ -34,6 +39,15 @@ def PutLog(self, request, context): context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") + def Check(self, request, context): + """ + `Check` requests OTA Client logging service to check the health of the + service. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + def add_OtaClientIoTLoggingServiceServicer_to_server(servicer, server): rpc_method_handlers = { @@ -42,6 +56,11 @@ def add_OtaClientIoTLoggingServiceServicer_to_server(servicer, server): request_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogRequest.FromString, response_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogResponse.SerializeToString, ), + "Check": grpc.unary_unary_rpc_method_handler( + servicer.Check, + request_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckRequest.FromString, + response_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( "OtaClientIoTLoggingService", rpc_method_handlers @@ -81,3 +100,32 @@ def PutLog( timeout, metadata, ) + + @staticmethod + def Check( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/OtaClientIoTLoggingService/Check", + otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckRequest.SerializeToString, + otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/tests/test_otaclient/test_logging.py b/tests/test_otaclient/test_logging.py index 9b8612c1e..6d3a26077 100644 --- a/tests/test_otaclient/test_logging.py +++ b/tests/test_otaclient/test_logging.py @@ -56,6 +56,14 @@ def __init__(self, test_queue, data_ready): self._test_queue = test_queue self._data_ready = data_ready + async def Check(self, request: log_pb2.HealthCheckRequest, context): + """ + Dummy gRPC method for health check. + """ + return log_pb2.HealthCheckResponse( + status=log_pb2.HealthCheckResponse.ServingStatus.SERVING + ) + async def PutLog(self, request: log_pb2.PutLogRequest, context): """ Dummy gRPC method to put a log message to the queue. @@ -72,7 +80,7 @@ async def PutLog(self, request: log_pb2.PutLogRequest, context): class TestLogClient: - OTA_CLIENT_LOGGING_SERVER = "http://127.0.0.1:8083" + OTA_CLIENT_LOGGING_SERVER = "http://127.0.0.1:8084" ECU_ID = "testclient" @pytest.fixture(autouse=True)