From d221f8c4970a5d4edf7dce3cc25f310c10524518 Mon Sep 17 00:00:00 2001 From: Keisuke Nakata Date: Thu, 9 Jan 2025 10:54:34 +0900 Subject: [PATCH 1/8] feat!: migrate logging from http to grpc --- samples/proxy_info.yaml | 2 +- src/otaclient/_logging.py | 26 +++--- src/otaclient/configs/_proxy_info.py | 6 +- .../v1/otaclient_iot_logging_server_v1_pb2.py | 41 +++++++++ .../otaclient_iot_logging_server_v1_pb2.pyi | 75 +++++++++++++++++ ...taclient_iot_logging_server_v1_pb2_grpc.py | 83 +++++++++++++++++++ .../test_configs/test_proxy_info.py | 4 +- 7 files changed, 219 insertions(+), 18 deletions(-) create mode 100644 src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py create mode 100644 src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi create mode 100644 src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2_grpc.py diff --git a/samples/proxy_info.yaml b/samples/proxy_info.yaml index 5c893ca05..d9063d88b 100644 --- a/samples/proxy_info.yaml +++ b/samples/proxy_info.yaml @@ -6,4 +6,4 @@ enable_local_ota_proxy_cache: true local_ota_proxy_listen_addr: 127.0.0.1 local_ota_proxy_listen_port: 8082 # if otaclient-logger is installed locally -logging_server: "http://127.0.0.1:8083" +logging_server: "127.0.0.1:8083" diff --git a/src/otaclient/_logging.py b/src/otaclient/_logging.py index 687d17c33..7dc78c9bd 100644 --- a/src/otaclient/_logging.py +++ b/src/otaclient/_logging.py @@ -21,11 +21,16 @@ import logging from queue import Queue from threading import Event, Thread -from urllib.parse import urljoin -import requests +import grpc from otaclient.configs.cfg import cfg, ecu_info, proxy_info +from otaclient_common.otaclient_iot_logging_server.v1 import ( + otaclient_iot_logging_server_v1_pb2 as log_pb2, +) +from otaclient_common.otaclient_iot_logging_server.v1 import ( + otaclient_iot_logging_server_v1_pb2_grpc as log_pb2_grpc, +) class _LogTeeHandler(logging.Handler): @@ -39,12 +44,13 @@ def emit(self, record: logging.LogRecord) -> None: with contextlib.suppress(Exception): self._queue.put_nowait(self.format(record)) - def start_upload_thread(self, endpoint_url: str): + def start_upload_thread(self, logging_upload_channel: str, ecu_id: str) -> None: log_queue = self._queue stop_logging_upload = Event() def _thread_main(): - _session = requests.Session() + channel = grpc.insecure_channel(logging_upload_channel) + stub = log_pb2_grpc.OtaClientIoTLoggingServiceStub(channel) while not stop_logging_upload.is_set(): entry = log_queue.get() @@ -54,7 +60,8 @@ def _thread_main(): continue # skip uploading empty log line with contextlib.suppress(Exception): - _session.post(endpoint_url, data=entry, timeout=3) + log_entry = log_pb2.PutLogRequest(message=entry, ecu_id=ecu_id) + stub.PutLog(log_entry) log_upload_thread = Thread(target=_thread_main, daemon=True) log_upload_thread.start() @@ -68,7 +75,7 @@ def _thread_exit(): def configure_logging() -> None: - """Configure logging with http handler.""" + """Configure logging with gRPC handler.""" # ------ suppress logging from non-first-party modules ------ # # NOTE: force to reload the basicConfig, this is for overriding setting # when launching subprocess. @@ -78,16 +85,13 @@ def configure_logging() -> None: # ------ configure each sub loggers and attach ota logging handler ------ # log_upload_handler = None - if logging_upload_endpoint := proxy_info.logging_server: - logging_upload_endpoint = f"{str(logging_upload_endpoint).strip('/')}/" - + if logging_upload_channel := proxy_info.logging_server: log_upload_handler = _LogTeeHandler() fmt = logging.Formatter(fmt=cfg.LOG_FORMAT) log_upload_handler.setFormatter(fmt) # star the logging thread - log_upload_endpoint = urljoin(logging_upload_endpoint, ecu_info.ecu_id) - log_upload_handler.start_upload_thread(log_upload_endpoint) + log_upload_handler.start_upload_thread(logging_upload_channel, ecu_info.ecu_id) for logger_name, loglevel in cfg.LOG_LEVEL_TABLE.items(): _logger = logging.getLogger(logger_name) diff --git a/src/otaclient/configs/_proxy_info.py b/src/otaclient/configs/_proxy_info.py index 9b140c13f..64e47ca4c 100644 --- a/src/otaclient/configs/_proxy_info.py +++ b/src/otaclient/configs/_proxy_info.py @@ -45,7 +45,7 @@ class ProxyInfo(BaseFixedConfig): local_ota_proxy_listen_port: port ota_proxy used. upper_ota_proxy: the URL of upper OTA proxy used by local ota_proxy server or otaclient(proxy chain). - logging_server: the URL of AWS IoT otaclient logs upload server. + logging_server: the channel of AWS IoT otaclient logs upload server. """ format_version: int = 1 @@ -75,9 +75,7 @@ class ProxyInfo(BaseFixedConfig): # NOTE: when logging_server is not configured, it implicitly means the logging server # is located at localhost. # check roles/ota_client/templates/run.sh.j2 in ecu_setup repo. - logging_server: Optional[AnyHttpUrl] = AnyHttpUrl( - f"http://127.0.0.1:{LOGGING_SERVER_PORT}" - ) + logging_server: Optional[str] = f"127.0.0.1:{LOGGING_SERVER_PORT}" def get_proxy_for_local_ota(self) -> str | None: """Tell local otaclient which proxy to use(or not use any).""" diff --git a/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py b/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py new file mode 100644 index 000000000..41ca8174b --- /dev/null +++ b/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: otaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\nIotaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto"y\n\rPutLogRequest\x12\x0e\n\x06\x65\x63u_id\x18\x01 \x01(\t\x12\x1a\n\x08log_type\x18\x02 \x01(\x0e\x32\x08.LogType\x12\x11\n\ttimestamp\x18\x03 \x01(\x04\x12\x18\n\x05level\x18\x04 \x01(\x0e\x32\t.LogLevel\x12\x0f\n\x07message\x18\x05 \x01(\t";\n\x0ePutLogResponse\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t*\x1f\n\x07LogType\x12\x07\n\x03LOG\x10\x00\x12\x0b\n\x07METRICS\x10\x01*Z\n\x08LogLevel\x12\x0e\n\nUNSPECIFIC\x10\x00\x12\t\n\x05TRACE\x10\x01\x12\t\n\x05\x44\x45\x42UG\x10\x02\x12\x08\n\x04INFO\x10\x03\x12\x08\n\x04WARN\x10\x04\x12\t\n\x05\x45RROR\x10\x05\x12\t\n\x05\x46\x41TAL\x10\x06*Z\n\tErrorCode\x12\x0e\n\nNO_FAILURE\x10\x00\x12\x15\n\x11SERVER_QUEUE_FULL\x10\x01\x12\x16\n\x12NOT_ALLOWED_ECU_ID\x10\x02\x12\x0e\n\nNO_MESSAGE\x10\x03\x32I\n\x1aOtaClientIoTLoggingService\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse"\x00\x62\x06proto3' +) + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, + "otaclient_iot_logging_server_pb2.v1.otaclient_iot_logging_server_v1_pb2", + _globals, +) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _globals["_LOGTYPE"]._serialized_start = 261 + _globals["_LOGTYPE"]._serialized_end = 292 + _globals["_LOGLEVEL"]._serialized_start = 294 + _globals["_LOGLEVEL"]._serialized_end = 384 + _globals["_ERRORCODE"]._serialized_start = 386 + _globals["_ERRORCODE"]._serialized_end = 476 + _globals["_PUTLOGREQUEST"]._serialized_start = 77 + _globals["_PUTLOGREQUEST"]._serialized_end = 198 + _globals["_PUTLOGRESPONSE"]._serialized_start = 200 + _globals["_PUTLOGRESPONSE"]._serialized_end = 259 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_start = 478 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_end = 551 +# @@protoc_insertion_point(module_scope) diff --git a/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi b/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi new file mode 100644 index 000000000..bddccd3ed --- /dev/null +++ b/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi @@ -0,0 +1,75 @@ +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class LogType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = [] + LOG: _ClassVar[LogType] + METRICS: _ClassVar[LogType] + +class LogLevel(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = [] + UNSPECIFIC: _ClassVar[LogLevel] + TRACE: _ClassVar[LogLevel] + DEBUG: _ClassVar[LogLevel] + INFO: _ClassVar[LogLevel] + WARN: _ClassVar[LogLevel] + ERROR: _ClassVar[LogLevel] + FATAL: _ClassVar[LogLevel] + +class ErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = [] + NO_FAILURE: _ClassVar[ErrorCode] + SERVER_QUEUE_FULL: _ClassVar[ErrorCode] + NOT_ALLOWED_ECU_ID: _ClassVar[ErrorCode] + NO_MESSAGE: _ClassVar[ErrorCode] + +LOG: LogType +METRICS: LogType +UNSPECIFIC: LogLevel +TRACE: LogLevel +DEBUG: LogLevel +INFO: LogLevel +WARN: LogLevel +ERROR: LogLevel +FATAL: LogLevel +NO_FAILURE: ErrorCode +SERVER_QUEUE_FULL: ErrorCode +NOT_ALLOWED_ECU_ID: ErrorCode +NO_MESSAGE: ErrorCode + +class PutLogRequest(_message.Message): + __slots__ = ["ecu_id", "log_type", "timestamp", "level", "message"] + ECU_ID_FIELD_NUMBER: _ClassVar[int] + LOG_TYPE_FIELD_NUMBER: _ClassVar[int] + TIMESTAMP_FIELD_NUMBER: _ClassVar[int] + LEVEL_FIELD_NUMBER: _ClassVar[int] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + ecu_id: str + log_type: LogType + timestamp: int + level: LogLevel + message: str + def __init__( + self, + ecu_id: _Optional[str] = ..., + log_type: _Optional[_Union[LogType, str]] = ..., + timestamp: _Optional[int] = ..., + level: _Optional[_Union[LogLevel, str]] = ..., + message: _Optional[str] = ..., + ) -> None: ... + +class PutLogResponse(_message.Message): + __slots__ = ["code", "message"] + CODE_FIELD_NUMBER: _ClassVar[int] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + code: ErrorCode + message: str + def __init__( + self, + code: _Optional[_Union[ErrorCode, str]] = ..., + message: _Optional[str] = ..., + ) -> None: ... diff --git a/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2_grpc.py b/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2_grpc.py new file mode 100644 index 000000000..2fdd440f9 --- /dev/null +++ b/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2_grpc.py @@ -0,0 +1,83 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from otaclient_common.otaclient_iot_logging_server.v1 import ( + otaclient_iot_logging_server_v1_pb2 as otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2, +) + + +class OtaClientIoTLoggingServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.PutLog = channel.unary_unary( + "/OtaClientIoTLoggingService/PutLog", + request_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogRequest.SerializeToString, + response_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogResponse.FromString, + ) + + +class OtaClientIoTLoggingServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def PutLog(self, request, context): + """ + `PutLog` service requests OTA Client logging service to put log. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_OtaClientIoTLoggingServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + "PutLog": grpc.unary_unary_rpc_method_handler( + servicer.PutLog, + request_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogRequest.FromString, + response_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "OtaClientIoTLoggingService", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class OtaClientIoTLoggingService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def PutLog( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/OtaClientIoTLoggingService/PutLog", + otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogRequest.SerializeToString, + otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/tests/test_otaclient/test_configs/test_proxy_info.py b/tests/test_otaclient/test_configs/test_proxy_info.py index 38a639485..8438308e8 100644 --- a/tests/test_otaclient/test_configs/test_proxy_info.py +++ b/tests/test_otaclient/test_configs/test_proxy_info.py @@ -47,7 +47,7 @@ "enable_local_ota_proxy: true\n" 'upper_ota_proxy: "http://10.0.0.1:8082"\n' "enable_local_ota_proxy_cache: true\n" - 'logging_server: "http://10.0.0.1:8083"\n' + 'logging_server: "10.0.0.1:8083"\n' ), ( True, @@ -59,7 +59,7 @@ "enable_local_ota_proxy_cache": True, "local_ota_proxy_listen_addr": "0.0.0.0", "local_ota_proxy_listen_port": 8082, - "logging_server": "http://10.0.0.1:8083", + "logging_server": "10.0.0.1:8083", } ), ), From d4a14d5e8a525b666202bf164b83251f5797eeb1 Mon Sep 17 00:00:00 2001 From: Keisuke Nakata Date: Fri, 10 Jan 2025 13:49:35 +0900 Subject: [PATCH 2/8] support metrics logging and add test codes --- src/otaclient/_logging.py | 42 +++++- src/otaclient/_types.py | 1 - .../otaclient_iot_logging_server_v1_pb2.py | 39 ++++++ .../otaclient_iot_logging_server_v1_pb2.pyi | 25 +--- ...taclient_iot_logging_server_v1_pb2_grpc.py | 2 +- .../v1/otaclient_iot_logging_server_v1_pb2.py | 41 ------ tests/test_otaclient/test_log_setting.py | 29 +++- tests/test_otaclient/test_logging.py | 129 ++++++++++++++++++ 8 files changed, 229 insertions(+), 79 deletions(-) create mode 100644 src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py rename src/{otaclient_common/otaclient_iot_logging_server/v1 => otaclient/grpc/log_v1}/otaclient_iot_logging_server_v1_pb2.pyi (68%) rename src/{otaclient_common/otaclient_iot_logging_server/v1 => otaclient/grpc/log_v1}/otaclient_iot_logging_server_v1_pb2_grpc.py (98%) delete mode 100644 src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py create mode 100644 tests/test_otaclient/test_logging.py diff --git a/src/otaclient/_logging.py b/src/otaclient/_logging.py index 7dc78c9bd..064d71720 100644 --- a/src/otaclient/_logging.py +++ b/src/otaclient/_logging.py @@ -19,30 +19,50 @@ import atexit import contextlib import logging +from dataclasses import dataclass +from enum import Enum from queue import Queue from threading import Event, Thread import grpc from otaclient.configs.cfg import cfg, ecu_info, proxy_info -from otaclient_common.otaclient_iot_logging_server.v1 import ( +from otaclient.grpc.log_v1 import ( otaclient_iot_logging_server_v1_pb2 as log_pb2, ) -from otaclient_common.otaclient_iot_logging_server.v1 import ( - otaclient_iot_logging_server_v1_pb2_grpc as log_pb2_grpc, +from otaclient.grpc.log_v1 import ( + otaclient_iot_logging_server_v1_pb2_grpc as log_v1_grpc, ) +class LogType(Enum): + LOG = 0 + METRICS = 1 + + class _LogTeeHandler(logging.Handler): """Implementation of teeing local logs to a remote otaclient-iot-logger server.""" + @dataclass + class QueueData: + """Queue data format for logging.""" + + log_type: LogType + message: str + def __init__(self, max_backlog: int = 2048) -> None: super().__init__() - self._queue: Queue[str | None] = Queue(maxsize=max_backlog) + self._queue: Queue[_LogTeeHandler.QueueData | None] = Queue(maxsize=max_backlog) def emit(self, record: logging.LogRecord) -> None: with contextlib.suppress(Exception): - self._queue.put_nowait(self.format(record)) + _log_type = getattr(record, "log_type", LogType.LOG) # default to LOG + # if a message is log message, format the message with the formatter + # otherwise(metric message), use the raw message + _message = ( + self.format(record) if _log_type == LogType.LOG else record.getMessage() + ) + self._queue.put_nowait(_LogTeeHandler.QueueData(_log_type, _message)) def start_upload_thread(self, logging_upload_channel: str, ecu_id: str) -> None: log_queue = self._queue @@ -50,7 +70,7 @@ def start_upload_thread(self, logging_upload_channel: str, ecu_id: str) -> None: def _thread_main(): channel = grpc.insecure_channel(logging_upload_channel) - stub = log_pb2_grpc.OtaClientIoTLoggingServiceStub(channel) + stub = log_v1_grpc.OtaClientIoTLoggingServiceStub(channel) while not stop_logging_upload.is_set(): entry = log_queue.get() @@ -60,7 +80,15 @@ def _thread_main(): continue # skip uploading empty log line with contextlib.suppress(Exception): - log_entry = log_pb2.PutLogRequest(message=entry, ecu_id=ecu_id) + # convert to the protobuf log type + pb2_log_type = ( + log_pb2.LogType.METRICS + if entry.log_type == LogType.METRICS + else log_pb2.LogType.LOG + ) + log_entry = log_pb2.PutLogRequest( + ecu_id=ecu_id, log_type=pb2_log_type, message=entry.message + ) stub.PutLog(log_entry) log_upload_thread = Thread(target=_thread_main, daemon=True) diff --git a/src/otaclient/_types.py b/src/otaclient/_types.py index 603aaea5c..223e94774 100644 --- a/src/otaclient/_types.py +++ b/src/otaclient/_types.py @@ -21,7 +21,6 @@ from typing import ClassVar, Optional from _otaclient_version import __version__ - from otaclient.configs.cfg import ecu_info from otaclient_common.typing import StrEnum diff --git a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py new file mode 100644 index 000000000..e1ff355ef --- /dev/null +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: otaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\nIotaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto"L\n\rPutLogRequest\x12\x0e\n\x06\x65\x63u_id\x18\x01 \x01(\t\x12\x1a\n\x08log_type\x18\x02 \x01(\x0e\x32\x08.LogType\x12\x0f\n\x07message\x18\x03 \x01(\t";\n\x0ePutLogResponse\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t*\x1f\n\x07LogType\x12\x07\n\x03LOG\x10\x00\x12\x0b\n\x07METRICS\x10\x01*Z\n\tErrorCode\x12\x0e\n\nNO_FAILURE\x10\x00\x12\x15\n\x11SERVER_QUEUE_FULL\x10\x01\x12\x16\n\x12NOT_ALLOWED_ECU_ID\x10\x02\x12\x0e\n\nNO_MESSAGE\x10\x03\x32I\n\x1aOtaClientIoTLoggingService\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse"\x00\x62\x06proto3' +) + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, + "otaclient_iot_logging_server_pb2.v1.otaclient_iot_logging_server_v1_pb2", + _globals, +) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _globals["_LOGTYPE"]._serialized_start = 216 + _globals["_LOGTYPE"]._serialized_end = 247 + _globals["_ERRORCODE"]._serialized_start = 249 + _globals["_ERRORCODE"]._serialized_end = 339 + _globals["_PUTLOGREQUEST"]._serialized_start = 77 + _globals["_PUTLOGREQUEST"]._serialized_end = 153 + _globals["_PUTLOGRESPONSE"]._serialized_start = 155 + _globals["_PUTLOGRESPONSE"]._serialized_end = 214 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_start = 341 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_end = 414 +# @@protoc_insertion_point(module_scope) diff --git a/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi similarity index 68% rename from src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi rename to src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi index bddccd3ed..6c5124be8 100644 --- a/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.pyi +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi @@ -10,16 +10,6 @@ class LogType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): LOG: _ClassVar[LogType] METRICS: _ClassVar[LogType] -class LogLevel(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): - __slots__ = [] - UNSPECIFIC: _ClassVar[LogLevel] - TRACE: _ClassVar[LogLevel] - DEBUG: _ClassVar[LogLevel] - INFO: _ClassVar[LogLevel] - WARN: _ClassVar[LogLevel] - ERROR: _ClassVar[LogLevel] - FATAL: _ClassVar[LogLevel] - class ErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = [] NO_FAILURE: _ClassVar[ErrorCode] @@ -29,36 +19,23 @@ class ErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): LOG: LogType METRICS: LogType -UNSPECIFIC: LogLevel -TRACE: LogLevel -DEBUG: LogLevel -INFO: LogLevel -WARN: LogLevel -ERROR: LogLevel -FATAL: LogLevel NO_FAILURE: ErrorCode SERVER_QUEUE_FULL: ErrorCode NOT_ALLOWED_ECU_ID: ErrorCode NO_MESSAGE: ErrorCode class PutLogRequest(_message.Message): - __slots__ = ["ecu_id", "log_type", "timestamp", "level", "message"] + __slots__ = ["ecu_id", "log_type", "message"] ECU_ID_FIELD_NUMBER: _ClassVar[int] LOG_TYPE_FIELD_NUMBER: _ClassVar[int] - TIMESTAMP_FIELD_NUMBER: _ClassVar[int] - LEVEL_FIELD_NUMBER: _ClassVar[int] MESSAGE_FIELD_NUMBER: _ClassVar[int] ecu_id: str log_type: LogType - timestamp: int - level: LogLevel message: str def __init__( self, ecu_id: _Optional[str] = ..., log_type: _Optional[_Union[LogType, str]] = ..., - timestamp: _Optional[int] = ..., - level: _Optional[_Union[LogLevel, str]] = ..., message: _Optional[str] = ..., ) -> None: ... diff --git a/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2_grpc.py b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py similarity index 98% rename from src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2_grpc.py rename to src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py index 2fdd440f9..d24ceaefd 100644 --- a/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2_grpc.py +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from otaclient_common.otaclient_iot_logging_server.v1 import ( +from otaclient.grpc.log_v1 import ( otaclient_iot_logging_server_v1_pb2 as otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2, ) diff --git a/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py b/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py deleted file mode 100644 index 41ca8174b..000000000 --- a/src/otaclient_common/otaclient_iot_logging_server/v1/otaclient_iot_logging_server_v1_pb2.py +++ /dev/null @@ -1,41 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: otaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto -"""Generated protocol buffer code.""" -from google.protobuf import descriptor as _descriptor -from google.protobuf import descriptor_pool as _descriptor_pool -from google.protobuf import symbol_database as _symbol_database -from google.protobuf.internal import builder as _builder - -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\nIotaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto"y\n\rPutLogRequest\x12\x0e\n\x06\x65\x63u_id\x18\x01 \x01(\t\x12\x1a\n\x08log_type\x18\x02 \x01(\x0e\x32\x08.LogType\x12\x11\n\ttimestamp\x18\x03 \x01(\x04\x12\x18\n\x05level\x18\x04 \x01(\x0e\x32\t.LogLevel\x12\x0f\n\x07message\x18\x05 \x01(\t";\n\x0ePutLogResponse\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t*\x1f\n\x07LogType\x12\x07\n\x03LOG\x10\x00\x12\x0b\n\x07METRICS\x10\x01*Z\n\x08LogLevel\x12\x0e\n\nUNSPECIFIC\x10\x00\x12\t\n\x05TRACE\x10\x01\x12\t\n\x05\x44\x45\x42UG\x10\x02\x12\x08\n\x04INFO\x10\x03\x12\x08\n\x04WARN\x10\x04\x12\t\n\x05\x45RROR\x10\x05\x12\t\n\x05\x46\x41TAL\x10\x06*Z\n\tErrorCode\x12\x0e\n\nNO_FAILURE\x10\x00\x12\x15\n\x11SERVER_QUEUE_FULL\x10\x01\x12\x16\n\x12NOT_ALLOWED_ECU_ID\x10\x02\x12\x0e\n\nNO_MESSAGE\x10\x03\x32I\n\x1aOtaClientIoTLoggingService\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse"\x00\x62\x06proto3' -) - -_globals = globals() -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages( - DESCRIPTOR, - "otaclient_iot_logging_server_pb2.v1.otaclient_iot_logging_server_v1_pb2", - _globals, -) -if _descriptor._USE_C_DESCRIPTORS == False: - - DESCRIPTOR._options = None - _globals["_LOGTYPE"]._serialized_start = 261 - _globals["_LOGTYPE"]._serialized_end = 292 - _globals["_LOGLEVEL"]._serialized_start = 294 - _globals["_LOGLEVEL"]._serialized_end = 384 - _globals["_ERRORCODE"]._serialized_start = 386 - _globals["_ERRORCODE"]._serialized_end = 476 - _globals["_PUTLOGREQUEST"]._serialized_start = 77 - _globals["_PUTLOGREQUEST"]._serialized_end = 198 - _globals["_PUTLOGRESPONSE"]._serialized_start = 200 - _globals["_PUTLOGRESPONSE"]._serialized_end = 259 - _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_start = 478 - _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_end = 551 -# @@protoc_insertion_point(module_scope) diff --git a/tests/test_otaclient/test_log_setting.py b/tests/test_otaclient/test_log_setting.py index 60ce8b5a7..9b14585af 100644 --- a/tests/test_otaclient/test_log_setting.py +++ b/tests/test_otaclient/test_log_setting.py @@ -17,21 +17,38 @@ import logging +import pytest + from otaclient import _logging MODULE = _logging.__name__ logger = logging.getLogger(__name__) -def test_server_logger(): - test_log_msg = "emit one logging entry" - +@pytest.mark.parametrize( + "test_log_msg, test_extra, expected_log_type", + [ + (None, None, _logging.LogType.LOG), + ("emit one logging entry", None, _logging.LogType.LOG), + ( + "emit one logging entry", + {"log_type": _logging.LogType.LOG}, + _logging.LogType.LOG, + ), + ( + "emit one metrics entry", + {"log_type": _logging.LogType.METRICS}, + _logging.LogType.METRICS, + ), + ], +) +def test_server_logger(test_log_msg, test_extra, expected_log_type): # ------ setup test ------ # _handler = _logging._LogTeeHandler() logger.addHandler(_handler) # ------ execution ------ # - logger.info(test_log_msg) + logger.info(test_log_msg, extra=test_extra) # ------ clenaup ------ # logger.removeHandler(_handler) @@ -39,4 +56,6 @@ def test_server_logger(): # ------ check result ------ # _queue = _handler._queue _log = _queue.get_nowait() - assert _log == test_log_msg + assert _log is not None + assert _log.log_type == expected_log_type + assert _log.message == test_log_msg diff --git a/tests/test_otaclient/test_logging.py b/tests/test_otaclient/test_logging.py new file mode 100644 index 000000000..8fab9eb8c --- /dev/null +++ b/tests/test_otaclient/test_logging.py @@ -0,0 +1,129 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from queue import Queue + +import grpc +import pytest +from pytest_mock import MockerFixture + +import otaclient._logging as _logging +from otaclient._logging import LogType, configure_logging +from otaclient.configs._ecu_info import ECUInfo +from otaclient.configs._proxy_info import ProxyInfo +from otaclient.grpc.log_v1 import ( + otaclient_iot_logging_server_v1_pb2 as log_pb2, +) +from otaclient.grpc.log_v1 import ( + otaclient_iot_logging_server_v1_pb2_grpc as log_v1_grpc, +) + +logger = logging.getLogger(__name__) + +MODULE = _logging.__name__ + + +class TestLogClient: + OTA_CLIENT_LOGGING_SERVER = "127.0.0.1:8083" + + @dataclass + class DummyQueueData: + ecu_id: str + log_type: log_pb2.LogType + message: str + + class DummyLogServerService(log_v1_grpc.OtaClientIoTLoggingServiceServicer): + def __init__(self, client): + self.client = client + + async def PutLog(self, request: log_pb2.PutLogRequest, context): + """ + Dummy gRPC method to put a log message to the queue. + """ + self.client.queue.put( + TestLogClient.DummyQueueData( + ecu_id=request.ecu_id, + log_type=request.log_type, + message=request.message, + ) + ) + return log_pb2.PutLogResponse(code=log_pb2.NO_FAILURE) + + @pytest.fixture(autouse=True) + async def launch_server(self): + self.queue: Queue[TestLogClient.DummyQueueData] = Queue() + + client = TestLogClient() + server = grpc.aio.server() + log_v1_grpc.add_OtaClientIoTLoggingServiceServicer_to_server( + servicer=TestLogClient.DummyLogServerService(client), server=server + ) + server.add_insecure_port(TestLogClient.OTA_CLIENT_LOGGING_SERVER) + try: + await server.start() + yield + finally: + await server.stop(None) + + @pytest.fixture(autouse=True) + def mock_ecu_info(self, mocker: MockerFixture): + self._ecu_info = ECUInfo(ecu_id="otaclient") + mocker.patch(f"{MODULE}.ecu_info", self._ecu_info) + + @pytest.fixture(autouse=True) + def mock_proxy_info(self, mocker: MockerFixture): + self._proxy_info = ProxyInfo( + logging_server=TestLogClient.OTA_CLIENT_LOGGING_SERVER + ) + mocker.patch(f"{MODULE}.proxy_info", self._proxy_info) + + @pytest.mark.parametrize( + "log_message, extra, expected_log_type, expected_message", + [ + (None, {}, log_pb2.LogType.LOG, "None"), + ( + "some log message without extra", + {}, + log_pb2.LogType.LOG, + "some log message without extra", + ), + ( + "some log message", + {"log_type": LogType.LOG}, + log_pb2.LogType.LOG, + "some log message", + ), + ( + "some metrics message", + {"log_type": LogType.METRICS}, + log_pb2.LogType.METRICS, + "some metrics message", + ), + ], + ) + async def test_logging( + self, log_message, extra, expected_log_type, expected_message + ): + configure_logging() + # send a test log message + logger.info(log_message, extra=extra) + + _response = self.queue.get() + assert _response.ecu_id == "otaclient" + assert _response.log_type == expected_log_type + assert _response.message == expected_message From dbe1991f091b63108eea4b5fbe5ec7617ce3a4fc Mon Sep 17 00:00:00 2001 From: Keisuke Nakata Date: Fri, 10 Jan 2025 21:13:25 +0900 Subject: [PATCH 3/8] add test code for logging --- tests/test_otaclient/test_log_setting.py | 1 - tests/test_otaclient/test_logging.py | 129 +++++++++++++++-------- 2 files changed, 83 insertions(+), 47 deletions(-) diff --git a/tests/test_otaclient/test_log_setting.py b/tests/test_otaclient/test_log_setting.py index 9b14585af..c8f5c6b69 100644 --- a/tests/test_otaclient/test_log_setting.py +++ b/tests/test_otaclient/test_log_setting.py @@ -28,7 +28,6 @@ @pytest.mark.parametrize( "test_log_msg, test_extra, expected_log_type", [ - (None, None, _logging.LogType.LOG), ("emit one logging entry", None, _logging.LogType.LOG), ( "emit one logging entry", diff --git a/tests/test_otaclient/test_logging.py b/tests/test_otaclient/test_logging.py index 8fab9eb8c..f76800009 100644 --- a/tests/test_otaclient/test_logging.py +++ b/tests/test_otaclient/test_logging.py @@ -14,16 +14,20 @@ from __future__ import annotations +import asyncio import logging +import re from dataclasses import dataclass from queue import Queue import grpc import pytest +import pytest_asyncio from pytest_mock import MockerFixture import otaclient._logging as _logging from otaclient._logging import LogType, configure_logging +from otaclient.configs._cfg_configurable import _OTAClientSettings from otaclient.configs._ecu_info import ECUInfo from otaclient.configs._proxy_info import ProxyInfo from otaclient.grpc.log_v1 import ( @@ -38,51 +42,53 @@ MODULE = _logging.__name__ -class TestLogClient: - OTA_CLIENT_LOGGING_SERVER = "127.0.0.1:8083" +@dataclass +class DummyQueueData: + ecu_id: str + log_type: log_pb2.LogType + message: str + - @dataclass - class DummyQueueData: - ecu_id: str - log_type: log_pb2.LogType - message: str - - class DummyLogServerService(log_v1_grpc.OtaClientIoTLoggingServiceServicer): - def __init__(self, client): - self.client = client - - async def PutLog(self, request: log_pb2.PutLogRequest, context): - """ - Dummy gRPC method to put a log message to the queue. - """ - self.client.queue.put( - TestLogClient.DummyQueueData( - ecu_id=request.ecu_id, - log_type=request.log_type, - message=request.message, - ) +class DummyLogServerService(log_v1_grpc.OtaClientIoTLoggingServiceServicer): + def __init__(self, test_queue, data_ready): + self._test_queue = test_queue + self._data_ready = data_ready + + async def PutLog(self, request: log_pb2.PutLogRequest, context): + """ + Dummy gRPC method to put a log message to the queue. + """ + self._test_queue.put_nowait( + DummyQueueData( + ecu_id=request.ecu_id, + log_type=request.log_type, + message=request.message, ) - return log_pb2.PutLogResponse(code=log_pb2.NO_FAILURE) + ) + self._data_ready.set() + return log_pb2.PutLogResponse(code=log_pb2.NO_FAILURE) + + +class TestLogClient: + OTA_CLIENT_LOGGING_SERVER = "127.0.0.1:8083" + ECU_ID = "testclient" @pytest.fixture(autouse=True) - async def launch_server(self): - self.queue: Queue[TestLogClient.DummyQueueData] = Queue() + async def initialize_queue(self): + self.test_queue: Queue[DummyQueueData] = Queue() + self.data_ready = asyncio.Event() - client = TestLogClient() - server = grpc.aio.server() - log_v1_grpc.add_OtaClientIoTLoggingServiceServicer_to_server( - servicer=TestLogClient.DummyLogServerService(client), server=server + @pytest.fixture(autouse=True) + def mock_cfg(self, mocker: MockerFixture): + self._cfg = _OTAClientSettings( + LOG_LEVEL_TABLE={__name__: "INFO"}, + LOG_FORMAT="[%(asctime)s][%(levelname)s]-%(name)s:%(funcName)s:%(lineno)d,%(message)s", ) - server.add_insecure_port(TestLogClient.OTA_CLIENT_LOGGING_SERVER) - try: - await server.start() - yield - finally: - await server.stop(None) + mocker.patch(f"{MODULE}.cfg", self._cfg) @pytest.fixture(autouse=True) def mock_ecu_info(self, mocker: MockerFixture): - self._ecu_info = ECUInfo(ecu_id="otaclient") + self._ecu_info = ECUInfo(ecu_id=TestLogClient.ECU_ID) mocker.patch(f"{MODULE}.ecu_info", self._ecu_info) @pytest.fixture(autouse=True) @@ -92,38 +98,69 @@ def mock_proxy_info(self, mocker: MockerFixture): ) mocker.patch(f"{MODULE}.proxy_info", self._proxy_info) + @pytest_asyncio.fixture + async def launch_grpc_server(self): + server = grpc.aio.server() + log_v1_grpc.add_OtaClientIoTLoggingServiceServicer_to_server( + servicer=DummyLogServerService(self.test_queue, self.data_ready), + server=server, + ) + server.add_insecure_port(TestLogClient.OTA_CLIENT_LOGGING_SERVER) + try: + await server.start() + yield + finally: + await server.stop(None) + @pytest.mark.parametrize( - "log_message, extra, expected_log_type, expected_message", + "log_message, extra, expected_log_type", [ - (None, {}, log_pb2.LogType.LOG, "None"), ( "some log message without extra", {}, log_pb2.LogType.LOG, - "some log message without extra", ), ( "some log message", {"log_type": LogType.LOG}, log_pb2.LogType.LOG, - "some log message", ), ( "some metrics message", {"log_type": LogType.METRICS}, log_pb2.LogType.METRICS, - "some metrics message", ), ], ) async def test_logging( - self, log_message, extra, expected_log_type, expected_message + self, + launch_grpc_server, + log_message, + extra, + expected_log_type, ): configure_logging() # send a test log message - logger.info(log_message, extra=extra) + logger.error(log_message, extra=extra) + # wait for the log message to be received + await self.data_ready.wait() + try: + _response = self.test_queue.get_nowait() + except Exception as e: + pytest.fail(f"Failed to get a log message from the queue: {e}") - _response = self.queue.get() - assert _response.ecu_id == "otaclient" + assert _response.ecu_id == TestLogClient.ECU_ID assert _response.log_type == expected_log_type - assert _response.message == expected_message + + if _response.log_type == log_pb2.LogType.LOG: + # Extract the message part from the log format + log_format_regex = r"\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\]\[ERROR\]-.*?:test_logging:\d+,(.*)" + match = re.match(log_format_regex, _response.message) + assert match is not None, "Log message format does not match" + extracted_message = match.group(1) + assert extracted_message == log_message + elif _response.log_type == log_pb2.LogType.METRICS: + # Expect the message to be the same as the input + assert _response.message == log_message + else: + pytest.fail(f"Unexpected log type: {_response.log_type}") From d60be40bef491f9d37d5c0f9d805f1b34ad3f300 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 10 Jan 2025 12:17:24 +0000 Subject: [PATCH 4/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/otaclient/_types.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/otaclient/_types.py b/src/otaclient/_types.py index 223e94774..603aaea5c 100644 --- a/src/otaclient/_types.py +++ b/src/otaclient/_types.py @@ -21,6 +21,7 @@ from typing import ClassVar, Optional from _otaclient_version import __version__ + from otaclient.configs.cfg import ecu_info from otaclient_common.typing import StrEnum From 341cdda67bb03511886601b09ca4f35e98b8cfab Mon Sep 17 00:00:00 2001 From: Keisuke Nakata Date: Tue, 14 Jan 2025 09:38:04 +0900 Subject: [PATCH 5/8] fix: add restore control for logging test --- tests/test_otaclient/test_logging.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/tests/test_otaclient/test_logging.py b/tests/test_otaclient/test_logging.py index f76800009..aff3259a1 100644 --- a/tests/test_otaclient/test_logging.py +++ b/tests/test_otaclient/test_logging.py @@ -111,6 +111,23 @@ async def launch_grpc_server(self): yield finally: await server.stop(None) + await server.wait_for_termination() + + @pytest.fixture + def restore_logging(self): + # confiure_logging() is called in the test, so we need to restore the original logging configuration + # Save the current logging configuration + original_handlers = logging.root.handlers[:] + original_level = logging.root.level + original_formatters = [handler.formatter for handler in logging.root.handlers] + + yield + + # Restore the original logging configuration + logging.root.handlers = original_handlers + logging.root.level = original_level + for handler, formatter in zip(logging.root.handlers, original_formatters): + handler.setFormatter(formatter) @pytest.mark.parametrize( "log_message, extra, expected_log_type", @@ -135,6 +152,7 @@ async def launch_grpc_server(self): async def test_logging( self, launch_grpc_server, + restore_logging, log_message, extra, expected_log_type, @@ -144,6 +162,8 @@ async def test_logging( logger.error(log_message, extra=extra) # wait for the log message to be received await self.data_ready.wait() + self.data_ready.clear() + try: _response = self.test_queue.get_nowait() except Exception as e: @@ -154,7 +174,8 @@ async def test_logging( if _response.log_type == log_pb2.LogType.LOG: # Extract the message part from the log format - log_format_regex = r"\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\]\[ERROR\]-.*?:test_logging:\d+,(.*)" + # e.g. [2022-01-01 00:00:00,000][ERROR]-test_log_client:test_log_client:123,some log message + log_format_regex = r"\[.*?\]\[.*?\]-.*?:.*?:\d+,(.*)" match = re.match(log_format_regex, _response.message) assert match is not None, "Log message format does not match" extracted_message = match.group(1) From 7acbe6dc6e7dea98d3068914710f74834ce3459d Mon Sep 17 00:00:00 2001 From: Keisuke Nakata Date: Tue, 14 Jan 2025 10:44:22 +0900 Subject: [PATCH 6/8] add README for logging --- src/otaclient/logging_README.md | 72 +++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 src/otaclient/logging_README.md diff --git a/src/otaclient/logging_README.md b/src/otaclient/logging_README.md new file mode 100644 index 000000000..c41d37914 --- /dev/null +++ b/src/otaclient/logging_README.md @@ -0,0 +1,72 @@ +# Protocol Documentation + +## Table of Contents + +- [Protocol Documentation](#protocol-documentation) + - [Table of Contents](#table-of-contents) + - [proto/otaclient\_iot\_logging\_server\_v1.proto](#protootaclient_iot_logging_server_v1proto) + - [PutLogRequest](#putlogrequest) + - [PutLogResponse](#putlogresponse) + - [ErrorCode](#errorcode) + - [LogType](#logtype) + - [OtaClientIoTLoggingService](#otaclientiotloggingservice) + - [Scalar Value Types](#scalar-value-types) + +## proto/otaclient_iot_logging_server_v1.proto + +### PutLogRequest + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| ecu_id | string | | target ECU ID | +| log_type | [LogType](#logtype) | | log type | +| message | string | | log message | + +### PutLogResponse + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| code | [ErrorCode](#errorcode) | | error code | +| message | string | | error message | + +### ErrorCode + +| Name | Number | Description | +| ---- | ------ | ----------- | +| NO_FAILURE | 0 | Success | +| SERVER_QUEUE_FULL | 1 | Error: Server queue is full | +| NOT_ALLOWED_ECU_ID | 2 | Error: Specified ECU ID is not allowed | +| NO_MESSAGE | 3 | Error: No message in the request | + +### LogType + +| Name | Number | Description | +| ---- | ------ | ----------- | +| LOG | 0 | | +| METRICS | 1 | | + +### OtaClientIoTLoggingService + +| Method Name | Request Type | Response Type | Description | +| ----------- | ------------ | ------------- | ------------| +| PutLog | [.PutLogRequest](#putlogrequest) | [.PutLogResponse](#putlogresponse) | `PutLog` service requests OTA Client logging service to put log. | + +## Scalar Value Types + +| .proto Type | Notes | C++ | Java | Python | Go | C# | PHP | Ruby | +| ----------- | ----- | --- | ---- | ------ | -- | -- | --- | ---- | +| double | | double | double | float | float64 | double | float | Float | +| float | | float | float | float | float32 | float | float | Float | +| int32 | Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint32 instead. | int32 | int | int | int32 | int | integer | Bignum or Fixnum (as required) | +| int64 | Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint64 instead. | int64 | long | int/long | int64 | long | integer/string | Bignum | +| uint32 | Uses variable-length encoding. | uint32 | int | int/long | uint32 | uint | integer | Bignum or Fixnum (as required) | +| uint64 | Uses variable-length encoding. | uint64 | long | int/long | uint64 | ulong | integer/string | Bignum or Fixnum (as required) | +| sint32 | Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int32s. | int32 | int | int | int32 | int | integer | Bignum or Fixnum (as required) | +| sint64 | Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int64s. | int64 | long | int/long | int64 | long | integer/string | Bignum | +| fixed32 | Always four bytes. More efficient than uint32 if values are often greater than 2^28. | uint32 | int | int | uint32 | uint | integer | Bignum or Fixnum (as required) | +| fixed64 | Always eight bytes. More efficient than uint64 if values are often greater than 2^56. | uint64 | long | int/long | uint64 | ulong | integer/string | Bignum | +| sfixed32 | Always four bytes. | int32 | int | int | int32 | int | integer | Bignum or Fixnum (as required) | +| sfixed64 | Always eight bytes. | int64 | long | int/long | int64 | long | integer/string | Bignum | +| bool | | bool | boolean | boolean | bool | bool | boolean | TrueClass/FalseClass | +| string | A string must always contain UTF-8 encoded or 7-bit ASCII text. | string | String | str/unicode | string | string | string | String (UTF-8) | +| bytes | May contain any arbitrary sequence of bytes. | string | ByteString | str | []byte | ByteString | string | String (ASCII-8BIT) | From 2db43035f6d94a2f3eee9d9e4785002250f2e26b Mon Sep 17 00:00:00 2001 From: Keisuke Nakata Date: Mon, 3 Feb 2025 17:20:45 +0900 Subject: [PATCH 7/8] support both HTTP/gRPC --- samples/proxy_info.yaml | 2 +- src/otaclient/_logging.py | 112 +++++++++++++----- src/otaclient/configs/_proxy_info.py | 7 +- .../test_configs/test_proxy_info.py | 4 +- tests/test_otaclient/test_logging.py | 11 +- 5 files changed, 98 insertions(+), 38 deletions(-) diff --git a/samples/proxy_info.yaml b/samples/proxy_info.yaml index d9063d88b..5c893ca05 100644 --- a/samples/proxy_info.yaml +++ b/samples/proxy_info.yaml @@ -6,4 +6,4 @@ enable_local_ota_proxy_cache: true local_ota_proxy_listen_addr: 127.0.0.1 local_ota_proxy_listen_port: 8082 # if otaclient-logger is installed locally -logging_server: "127.0.0.1:8083" +logging_server: "http://127.0.0.1:8083" diff --git a/src/otaclient/_logging.py b/src/otaclient/_logging.py index 064d71720..bf6ff7bd6 100644 --- a/src/otaclient/_logging.py +++ b/src/otaclient/_logging.py @@ -19,12 +19,16 @@ import atexit import contextlib import logging +from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum from queue import Queue from threading import Event, Thread +from urllib.parse import urljoin, urlparse import grpc +import requests +from pydantic import AnyHttpUrl from otaclient.configs.cfg import cfg, ecu_info, proxy_info from otaclient.grpc.log_v1 import ( @@ -36,23 +40,84 @@ class LogType(Enum): - LOG = 0 - METRICS = 1 + LOG = "log" + METRICS = "metrics" -class _LogTeeHandler(logging.Handler): - """Implementation of teeing local logs to a remote otaclient-iot-logger server.""" +@dataclass +class QueueData: + """Queue data format for logging.""" + + log_type: LogType + message: str + + +class Transmitter(ABC): + def __init__(self, logging_upload_endpoint, ecu_id: str): + self.ecu_id = ecu_id + + @abstractmethod + def send(self, log_type: LogType, message: str, timeout: int) -> None: + pass + + +class TransmitterGrpc(Transmitter): + def __init__(self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str): + super().__init__(logging_upload_endpoint, ecu_id) + + parsed_url = urlparse(str(logging_upload_endpoint)) + log_upload_endpoint = parsed_url.netloc + channel = grpc.insecure_channel(log_upload_endpoint) + self._stub = log_v1_grpc.OtaClientIoTLoggingServiceStub(channel) + + def send(self, log_type: LogType, message: str, timeout: int) -> None: + pb2_log_type = ( + log_pb2.LogType.METRICS + if log_type == LogType.METRICS + else log_pb2.LogType.LOG + ) + log_entry = log_pb2.PutLogRequest( + ecu_id=self.ecu_id, log_type=pb2_log_type, message=message + ) + self._stub.PutLog(log_entry, timeout=timeout) - @dataclass - class QueueData: - """Queue data format for logging.""" + def check(self, timeout: int) -> None: + # health check + log_entry = log_pb2.HealthCheckRequest(service="") + self._stub.Check(log_entry) - log_type: LogType - message: str + +class TransmitterHttp(Transmitter): + def __init__(self, logging_upload_endpoint, ecu_id: str): + super().__init__(logging_upload_endpoint, ecu_id) + + logging_upload_endpoint = f"{str(logging_upload_endpoint).strip('/')}/" + self.log_upload_endpoint = urljoin(logging_upload_endpoint, ecu_id) + self._session = requests.Session() + + def send(self, log_type: LogType, message: str, timeout: int) -> None: + # support only LogType.LOG + with contextlib.suppress(Exception): + self._session.post(self.log_upload_endpoint, data=message, timeout=timeout) + + +class TransmitterFactory: + @staticmethod + def create(logging_upload_endpoint: AnyHttpUrl, ecu_id: str): + try: + _transmitter_grpc = TransmitterGrpc(logging_upload_endpoint, ecu_id) + _transmitter_grpc.check(timeout=3) + return _transmitter_grpc + except Exception: + return TransmitterHttp(logging_upload_endpoint, ecu_id) + + +class _LogTeeHandler(logging.Handler): + """Implementation of teeing local logs to a remote otaclient-iot-logger server.""" def __init__(self, max_backlog: int = 2048) -> None: super().__init__() - self._queue: Queue[_LogTeeHandler.QueueData | None] = Queue(maxsize=max_backlog) + self._queue: Queue[QueueData | None] = Queue(maxsize=max_backlog) def emit(self, record: logging.LogRecord) -> None: with contextlib.suppress(Exception): @@ -62,15 +127,16 @@ def emit(self, record: logging.LogRecord) -> None: _message = ( self.format(record) if _log_type == LogType.LOG else record.getMessage() ) - self._queue.put_nowait(_LogTeeHandler.QueueData(_log_type, _message)) + self._queue.put_nowait(QueueData(_log_type, _message)) - def start_upload_thread(self, logging_upload_channel: str, ecu_id: str) -> None: + def start_upload_thread( + self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str + ) -> None: log_queue = self._queue stop_logging_upload = Event() def _thread_main(): - channel = grpc.insecure_channel(logging_upload_channel) - stub = log_v1_grpc.OtaClientIoTLoggingServiceStub(channel) + _transmitter = TransmitterFactory.create(logging_upload_endpoint, ecu_id) while not stop_logging_upload.is_set(): entry = log_queue.get() @@ -79,17 +145,7 @@ def _thread_main(): if not entry: continue # skip uploading empty log line - with contextlib.suppress(Exception): - # convert to the protobuf log type - pb2_log_type = ( - log_pb2.LogType.METRICS - if entry.log_type == LogType.METRICS - else log_pb2.LogType.LOG - ) - log_entry = log_pb2.PutLogRequest( - ecu_id=ecu_id, log_type=pb2_log_type, message=entry.message - ) - stub.PutLog(log_entry) + _transmitter.send(entry.log_type, entry.message, timeout=3) log_upload_thread = Thread(target=_thread_main, daemon=True) log_upload_thread.start() @@ -103,7 +159,7 @@ def _thread_exit(): def configure_logging() -> None: - """Configure logging with gRPC handler.""" + """Configure logging with http/gRPC handler.""" # ------ suppress logging from non-first-party modules ------ # # NOTE: force to reload the basicConfig, this is for overriding setting # when launching subprocess. @@ -113,13 +169,13 @@ def configure_logging() -> None: # ------ configure each sub loggers and attach ota logging handler ------ # log_upload_handler = None - if logging_upload_channel := proxy_info.logging_server: + if logging_upload_endpoint := proxy_info.logging_server: log_upload_handler = _LogTeeHandler() fmt = logging.Formatter(fmt=cfg.LOG_FORMAT) log_upload_handler.setFormatter(fmt) # star the logging thread - log_upload_handler.start_upload_thread(logging_upload_channel, ecu_info.ecu_id) + log_upload_handler.start_upload_thread(logging_upload_endpoint, ecu_info.ecu_id) for logger_name, loglevel in cfg.LOG_LEVEL_TABLE.items(): _logger = logging.getLogger(logger_name) diff --git a/src/otaclient/configs/_proxy_info.py b/src/otaclient/configs/_proxy_info.py index 64e47ca4c..eaa97141b 100644 --- a/src/otaclient/configs/_proxy_info.py +++ b/src/otaclient/configs/_proxy_info.py @@ -13,7 +13,6 @@ # limitations under the License. """proxy_info.yaml definition and parsing logic.""" - from __future__ import annotations import logging @@ -45,7 +44,7 @@ class ProxyInfo(BaseFixedConfig): local_ota_proxy_listen_port: port ota_proxy used. upper_ota_proxy: the URL of upper OTA proxy used by local ota_proxy server or otaclient(proxy chain). - logging_server: the channel of AWS IoT otaclient logs upload server. + logging_server: the URL of AWS IoT otaclient logs upload server. """ format_version: int = 1 @@ -75,7 +74,9 @@ class ProxyInfo(BaseFixedConfig): # NOTE: when logging_server is not configured, it implicitly means the logging server # is located at localhost. # check roles/ota_client/templates/run.sh.j2 in ecu_setup repo. - logging_server: Optional[str] = f"127.0.0.1:{LOGGING_SERVER_PORT}" + logging_server: Optional[AnyHttpUrl] = AnyHttpUrl( + f"http://127.0.0.1:{LOGGING_SERVER_PORT}" + ) def get_proxy_for_local_ota(self) -> str | None: """Tell local otaclient which proxy to use(or not use any).""" diff --git a/tests/test_otaclient/test_configs/test_proxy_info.py b/tests/test_otaclient/test_configs/test_proxy_info.py index 8438308e8..38a639485 100644 --- a/tests/test_otaclient/test_configs/test_proxy_info.py +++ b/tests/test_otaclient/test_configs/test_proxy_info.py @@ -47,7 +47,7 @@ "enable_local_ota_proxy: true\n" 'upper_ota_proxy: "http://10.0.0.1:8082"\n' "enable_local_ota_proxy_cache: true\n" - 'logging_server: "10.0.0.1:8083"\n' + 'logging_server: "http://10.0.0.1:8083"\n' ), ( True, @@ -59,7 +59,7 @@ "enable_local_ota_proxy_cache": True, "local_ota_proxy_listen_addr": "0.0.0.0", "local_ota_proxy_listen_port": 8082, - "logging_server": "10.0.0.1:8083", + "logging_server": "http://10.0.0.1:8083", } ), ), diff --git a/tests/test_otaclient/test_logging.py b/tests/test_otaclient/test_logging.py index aff3259a1..9b8612c1e 100644 --- a/tests/test_otaclient/test_logging.py +++ b/tests/test_otaclient/test_logging.py @@ -19,10 +19,12 @@ import re from dataclasses import dataclass from queue import Queue +from urllib.parse import urlparse import grpc import pytest import pytest_asyncio +from pydantic import AnyHttpUrl from pytest_mock import MockerFixture import otaclient._logging as _logging @@ -70,7 +72,7 @@ async def PutLog(self, request: log_pb2.PutLogRequest, context): class TestLogClient: - OTA_CLIENT_LOGGING_SERVER = "127.0.0.1:8083" + OTA_CLIENT_LOGGING_SERVER = "http://127.0.0.1:8083" ECU_ID = "testclient" @pytest.fixture(autouse=True) @@ -94,7 +96,7 @@ def mock_ecu_info(self, mocker: MockerFixture): @pytest.fixture(autouse=True) def mock_proxy_info(self, mocker: MockerFixture): self._proxy_info = ProxyInfo( - logging_server=TestLogClient.OTA_CLIENT_LOGGING_SERVER + logging_server=AnyHttpUrl(TestLogClient.OTA_CLIENT_LOGGING_SERVER) ) mocker.patch(f"{MODULE}.proxy_info", self._proxy_info) @@ -105,7 +107,8 @@ async def launch_grpc_server(self): servicer=DummyLogServerService(self.test_queue, self.data_ready), server=server, ) - server.add_insecure_port(TestLogClient.OTA_CLIENT_LOGGING_SERVER) + parsed_url = urlparse(TestLogClient.OTA_CLIENT_LOGGING_SERVER) + server.add_insecure_port(parsed_url.netloc) try: await server.start() yield @@ -149,7 +152,7 @@ def restore_logging(self): ), ], ) - async def test_logging( + async def test_grpc_logging( self, launch_grpc_server, restore_logging, From 58a0bad3747eb4fdf19376115fe5dc7d3887fd7e Mon Sep 17 00:00:00 2001 From: Keisuke Nakata Date: Tue, 4 Feb 2025 08:52:00 +0900 Subject: [PATCH 8/8] implement healthcheck --- src/otaclient/_logging.py | 20 ++++++-- .../otaclient_iot_logging_server_v1_pb2.py | 20 +++++--- .../otaclient_iot_logging_server_v1_pb2.pyi | 26 ++++++++++ ...taclient_iot_logging_server_v1_pb2_grpc.py | 48 +++++++++++++++++++ tests/test_otaclient/test_logging.py | 10 +++- 5 files changed, 111 insertions(+), 13 deletions(-) diff --git a/src/otaclient/_logging.py b/src/otaclient/_logging.py index bf6ff7bd6..82918682d 100644 --- a/src/otaclient/_logging.py +++ b/src/otaclient/_logging.py @@ -13,7 +13,6 @@ # limitations under the License. """Configure the logging for otaclient.""" - from __future__ import annotations import atexit @@ -60,6 +59,10 @@ def __init__(self, logging_upload_endpoint, ecu_id: str): def send(self, log_type: LogType, message: str, timeout: int) -> None: pass + @abstractmethod + def check(self, timeout: int) -> None: + pass + class TransmitterGrpc(Transmitter): def __init__(self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str): @@ -88,11 +91,11 @@ def check(self, timeout: int) -> None: class TransmitterHttp(Transmitter): - def __init__(self, logging_upload_endpoint, ecu_id: str): + def __init__(self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str): super().__init__(logging_upload_endpoint, ecu_id) - logging_upload_endpoint = f"{str(logging_upload_endpoint).strip('/')}/" - self.log_upload_endpoint = urljoin(logging_upload_endpoint, ecu_id) + _endpoint = f"{str(logging_upload_endpoint).strip('/')}/" + self.log_upload_endpoint = urljoin(_endpoint, ecu_id) self._session = requests.Session() def send(self, log_type: LogType, message: str, timeout: int) -> None: @@ -100,6 +103,9 @@ def send(self, log_type: LogType, message: str, timeout: int) -> None: with contextlib.suppress(Exception): self._session.post(self.log_upload_endpoint, data=message, timeout=timeout) + def check(self, timeout: int) -> None: + pass + class TransmitterFactory: @staticmethod @@ -145,7 +151,11 @@ def _thread_main(): if not entry: continue # skip uploading empty log line - _transmitter.send(entry.log_type, entry.message, timeout=3) + try: + _transmitter.send(entry.log_type, entry.message, timeout=3) + except Exception: + # ignore the exception and continue + pass log_upload_thread = Thread(target=_thread_main, daemon=True) log_upload_thread.start() diff --git a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py index e1ff355ef..e6a1b00bc 100644 --- a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py @@ -13,7 +13,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\nIotaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto"L\n\rPutLogRequest\x12\x0e\n\x06\x65\x63u_id\x18\x01 \x01(\t\x12\x1a\n\x08log_type\x18\x02 \x01(\x0e\x32\x08.LogType\x12\x0f\n\x07message\x18\x03 \x01(\t";\n\x0ePutLogResponse\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t*\x1f\n\x07LogType\x12\x07\n\x03LOG\x10\x00\x12\x0b\n\x07METRICS\x10\x01*Z\n\tErrorCode\x12\x0e\n\nNO_FAILURE\x10\x00\x12\x15\n\x11SERVER_QUEUE_FULL\x10\x01\x12\x16\n\x12NOT_ALLOWED_ECU_ID\x10\x02\x12\x0e\n\nNO_MESSAGE\x10\x03\x32I\n\x1aOtaClientIoTLoggingService\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse"\x00\x62\x06proto3' + b'\nIotaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto"L\n\rPutLogRequest\x12\x0e\n\x06\x65\x63u_id\x18\x01 \x01(\t\x12\x1a\n\x08log_type\x18\x02 \x01(\x0e\x32\x08.LogType\x12\x0f\n\x07message\x18\x03 \x01(\t";\n\x0ePutLogResponse\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t"%\n\x12HealthCheckRequest\x12\x0f\n\x07service\x18\x01 \x01(\t"\x9a\x01\n\x13HealthCheckResponse\x12\x32\n\x06status\x18\x01 \x01(\x0e\x32".HealthCheckResponse.ServingStatus"O\n\rServingStatus\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SERVING\x10\x01\x12\x0f\n\x0bNOT_SERVING\x10\x02\x12\x13\n\x0fSERVICE_UNKNOWN\x10\x03*\x1f\n\x07LogType\x12\x07\n\x03LOG\x10\x00\x12\x0b\n\x07METRICS\x10\x01*Z\n\tErrorCode\x12\x0e\n\nNO_FAILURE\x10\x00\x12\x15\n\x11SERVER_QUEUE_FULL\x10\x01\x12\x16\n\x12NOT_ALLOWED_ECU_ID\x10\x02\x12\x0e\n\nNO_MESSAGE\x10\x03\x32}\n\x1aOtaClientIoTLoggingService\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse"\x00\x12\x32\n\x05\x43heck\x12\x13.HealthCheckRequest\x1a\x14.HealthCheckResponseb\x06proto3' ) _globals = globals() @@ -26,14 +26,20 @@ if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals["_LOGTYPE"]._serialized_start = 216 - _globals["_LOGTYPE"]._serialized_end = 247 - _globals["_ERRORCODE"]._serialized_start = 249 - _globals["_ERRORCODE"]._serialized_end = 339 + _globals["_LOGTYPE"]._serialized_start = 412 + _globals["_LOGTYPE"]._serialized_end = 443 + _globals["_ERRORCODE"]._serialized_start = 445 + _globals["_ERRORCODE"]._serialized_end = 535 _globals["_PUTLOGREQUEST"]._serialized_start = 77 _globals["_PUTLOGREQUEST"]._serialized_end = 153 _globals["_PUTLOGRESPONSE"]._serialized_start = 155 _globals["_PUTLOGRESPONSE"]._serialized_end = 214 - _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_start = 341 - _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_end = 414 + _globals["_HEALTHCHECKREQUEST"]._serialized_start = 216 + _globals["_HEALTHCHECKREQUEST"]._serialized_end = 253 + _globals["_HEALTHCHECKRESPONSE"]._serialized_start = 256 + _globals["_HEALTHCHECKRESPONSE"]._serialized_end = 410 + _globals["_HEALTHCHECKRESPONSE_SERVINGSTATUS"]._serialized_start = 331 + _globals["_HEALTHCHECKRESPONSE_SERVINGSTATUS"]._serialized_end = 410 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_start = 537 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_end = 662 # @@protoc_insertion_point(module_scope) diff --git a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi index 6c5124be8..52246c1b8 100644 --- a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi @@ -50,3 +50,29 @@ class PutLogResponse(_message.Message): code: _Optional[_Union[ErrorCode, str]] = ..., message: _Optional[str] = ..., ) -> None: ... + +class HealthCheckRequest(_message.Message): + __slots__ = ["service"] + SERVICE_FIELD_NUMBER: _ClassVar[int] + service: str + def __init__(self, service: _Optional[str] = ...) -> None: ... + +class HealthCheckResponse(_message.Message): + __slots__ = ["status"] + + class ServingStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = [] + UNKNOWN: _ClassVar[HealthCheckResponse.ServingStatus] + SERVING: _ClassVar[HealthCheckResponse.ServingStatus] + NOT_SERVING: _ClassVar[HealthCheckResponse.ServingStatus] + SERVICE_UNKNOWN: _ClassVar[HealthCheckResponse.ServingStatus] + + UNKNOWN: HealthCheckResponse.ServingStatus + SERVING: HealthCheckResponse.ServingStatus + NOT_SERVING: HealthCheckResponse.ServingStatus + SERVICE_UNKNOWN: HealthCheckResponse.ServingStatus + STATUS_FIELD_NUMBER: _ClassVar[int] + status: HealthCheckResponse.ServingStatus + def __init__( + self, status: _Optional[_Union[HealthCheckResponse.ServingStatus, str]] = ... + ) -> None: ... diff --git a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py index d24ceaefd..fab8a3d5d 100644 --- a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py @@ -21,6 +21,11 @@ def __init__(self, channel): request_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogRequest.SerializeToString, response_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogResponse.FromString, ) + self.Check = channel.unary_unary( + "/OtaClientIoTLoggingService/Check", + request_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckRequest.SerializeToString, + response_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckResponse.FromString, + ) class OtaClientIoTLoggingServiceServicer(object): @@ -34,6 +39,15 @@ def PutLog(self, request, context): context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") + def Check(self, request, context): + """ + `Check` requests OTA Client logging service to check the health of the + service. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + def add_OtaClientIoTLoggingServiceServicer_to_server(servicer, server): rpc_method_handlers = { @@ -42,6 +56,11 @@ def add_OtaClientIoTLoggingServiceServicer_to_server(servicer, server): request_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogRequest.FromString, response_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogResponse.SerializeToString, ), + "Check": grpc.unary_unary_rpc_method_handler( + servicer.Check, + request_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckRequest.FromString, + response_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( "OtaClientIoTLoggingService", rpc_method_handlers @@ -81,3 +100,32 @@ def PutLog( timeout, metadata, ) + + @staticmethod + def Check( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/OtaClientIoTLoggingService/Check", + otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckRequest.SerializeToString, + otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/tests/test_otaclient/test_logging.py b/tests/test_otaclient/test_logging.py index 9b8612c1e..6d3a26077 100644 --- a/tests/test_otaclient/test_logging.py +++ b/tests/test_otaclient/test_logging.py @@ -56,6 +56,14 @@ def __init__(self, test_queue, data_ready): self._test_queue = test_queue self._data_ready = data_ready + async def Check(self, request: log_pb2.HealthCheckRequest, context): + """ + Dummy gRPC method for health check. + """ + return log_pb2.HealthCheckResponse( + status=log_pb2.HealthCheckResponse.ServingStatus.SERVING + ) + async def PutLog(self, request: log_pb2.PutLogRequest, context): """ Dummy gRPC method to put a log message to the queue. @@ -72,7 +80,7 @@ async def PutLog(self, request: log_pb2.PutLogRequest, context): class TestLogClient: - OTA_CLIENT_LOGGING_SERVER = "http://127.0.0.1:8083" + OTA_CLIENT_LOGGING_SERVER = "http://127.0.0.1:8084" ECU_ID = "testclient" @pytest.fixture(autouse=True)