diff --git a/src/otaclient/_logging.py b/src/otaclient/_logging.py index 687d17c33..82918682d 100644 --- a/src/otaclient/_logging.py +++ b/src/otaclient/_logging.py @@ -13,19 +13,109 @@ # limitations under the License. """Configure the logging for otaclient.""" - from __future__ import annotations import atexit import contextlib import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum from queue import Queue from threading import Event, Thread -from urllib.parse import urljoin +from urllib.parse import urljoin, urlparse +import grpc import requests +from pydantic import AnyHttpUrl from otaclient.configs.cfg import cfg, ecu_info, proxy_info +from otaclient.grpc.log_v1 import ( + otaclient_iot_logging_server_v1_pb2 as log_pb2, +) +from otaclient.grpc.log_v1 import ( + otaclient_iot_logging_server_v1_pb2_grpc as log_v1_grpc, +) + + +class LogType(Enum): + LOG = "log" + METRICS = "metrics" + + +@dataclass +class QueueData: + """Queue data format for logging.""" + + log_type: LogType + message: str + + +class Transmitter(ABC): + def __init__(self, logging_upload_endpoint, ecu_id: str): + self.ecu_id = ecu_id + + @abstractmethod + def send(self, log_type: LogType, message: str, timeout: int) -> None: + pass + + @abstractmethod + def check(self, timeout: int) -> None: + pass + + +class TransmitterGrpc(Transmitter): + def __init__(self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str): + super().__init__(logging_upload_endpoint, ecu_id) + + parsed_url = urlparse(str(logging_upload_endpoint)) + log_upload_endpoint = parsed_url.netloc + channel = grpc.insecure_channel(log_upload_endpoint) + self._stub = log_v1_grpc.OtaClientIoTLoggingServiceStub(channel) + + def send(self, log_type: LogType, message: str, timeout: int) -> None: + pb2_log_type = ( + log_pb2.LogType.METRICS + if log_type == LogType.METRICS + else log_pb2.LogType.LOG + ) + log_entry = log_pb2.PutLogRequest( + ecu_id=self.ecu_id, log_type=pb2_log_type, message=message + ) + self._stub.PutLog(log_entry, timeout=timeout) + + def check(self, timeout: int) -> None: + # health check + log_entry = log_pb2.HealthCheckRequest(service="") + self._stub.Check(log_entry) + + +class TransmitterHttp(Transmitter): + def __init__(self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str): + super().__init__(logging_upload_endpoint, ecu_id) + + _endpoint = f"{str(logging_upload_endpoint).strip('/')}/" + self.log_upload_endpoint = urljoin(_endpoint, ecu_id) + self._session = requests.Session() + + def send(self, log_type: LogType, message: str, timeout: int) -> None: + # support only LogType.LOG + with contextlib.suppress(Exception): + self._session.post(self.log_upload_endpoint, data=message, timeout=timeout) + + def check(self, timeout: int) -> None: + pass + + +class TransmitterFactory: + @staticmethod + def create(logging_upload_endpoint: AnyHttpUrl, ecu_id: str): + try: + _transmitter_grpc = TransmitterGrpc(logging_upload_endpoint, ecu_id) + _transmitter_grpc.check(timeout=3) + return _transmitter_grpc + except Exception: + return TransmitterHttp(logging_upload_endpoint, ecu_id) class _LogTeeHandler(logging.Handler): @@ -33,18 +123,26 @@ class _LogTeeHandler(logging.Handler): def __init__(self, max_backlog: int = 2048) -> None: super().__init__() - self._queue: Queue[str | None] = Queue(maxsize=max_backlog) + self._queue: Queue[QueueData | None] = Queue(maxsize=max_backlog) def emit(self, record: logging.LogRecord) -> None: with contextlib.suppress(Exception): - self._queue.put_nowait(self.format(record)) - - def start_upload_thread(self, endpoint_url: str): + _log_type = getattr(record, "log_type", LogType.LOG) # default to LOG + # if a message is log message, format the message with the formatter + # otherwise(metric message), use the raw message + _message = ( + self.format(record) if _log_type == LogType.LOG else record.getMessage() + ) + self._queue.put_nowait(QueueData(_log_type, _message)) + + def start_upload_thread( + self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str + ) -> None: log_queue = self._queue stop_logging_upload = Event() def _thread_main(): - _session = requests.Session() + _transmitter = TransmitterFactory.create(logging_upload_endpoint, ecu_id) while not stop_logging_upload.is_set(): entry = log_queue.get() @@ -53,8 +151,11 @@ def _thread_main(): if not entry: continue # skip uploading empty log line - with contextlib.suppress(Exception): - _session.post(endpoint_url, data=entry, timeout=3) + try: + _transmitter.send(entry.log_type, entry.message, timeout=3) + except Exception: + # ignore the exception and continue + pass log_upload_thread = Thread(target=_thread_main, daemon=True) log_upload_thread.start() @@ -68,7 +169,7 @@ def _thread_exit(): def configure_logging() -> None: - """Configure logging with http handler.""" + """Configure logging with http/gRPC handler.""" # ------ suppress logging from non-first-party modules ------ # # NOTE: force to reload the basicConfig, this is for overriding setting # when launching subprocess. @@ -79,15 +180,12 @@ def configure_logging() -> None: # ------ configure each sub loggers and attach ota logging handler ------ # log_upload_handler = None if logging_upload_endpoint := proxy_info.logging_server: - logging_upload_endpoint = f"{str(logging_upload_endpoint).strip('/')}/" - log_upload_handler = _LogTeeHandler() fmt = logging.Formatter(fmt=cfg.LOG_FORMAT) log_upload_handler.setFormatter(fmt) # star the logging thread - log_upload_endpoint = urljoin(logging_upload_endpoint, ecu_info.ecu_id) - log_upload_handler.start_upload_thread(log_upload_endpoint) + log_upload_handler.start_upload_thread(logging_upload_endpoint, ecu_info.ecu_id) for logger_name, loglevel in cfg.LOG_LEVEL_TABLE.items(): _logger = logging.getLogger(logger_name) diff --git a/src/otaclient/configs/_proxy_info.py b/src/otaclient/configs/_proxy_info.py index 9b140c13f..eaa97141b 100644 --- a/src/otaclient/configs/_proxy_info.py +++ b/src/otaclient/configs/_proxy_info.py @@ -13,7 +13,6 @@ # limitations under the License. """proxy_info.yaml definition and parsing logic.""" - from __future__ import annotations import logging diff --git a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py new file mode 100644 index 000000000..e6a1b00bc --- /dev/null +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: otaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\nIotaclient_iot_logging_server_pb2/v1/otaclient_iot_logging_server_v1.proto"L\n\rPutLogRequest\x12\x0e\n\x06\x65\x63u_id\x18\x01 \x01(\t\x12\x1a\n\x08log_type\x18\x02 \x01(\x0e\x32\x08.LogType\x12\x0f\n\x07message\x18\x03 \x01(\t";\n\x0ePutLogResponse\x12\x18\n\x04\x63ode\x18\x01 \x01(\x0e\x32\n.ErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t"%\n\x12HealthCheckRequest\x12\x0f\n\x07service\x18\x01 \x01(\t"\x9a\x01\n\x13HealthCheckResponse\x12\x32\n\x06status\x18\x01 \x01(\x0e\x32".HealthCheckResponse.ServingStatus"O\n\rServingStatus\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SERVING\x10\x01\x12\x0f\n\x0bNOT_SERVING\x10\x02\x12\x13\n\x0fSERVICE_UNKNOWN\x10\x03*\x1f\n\x07LogType\x12\x07\n\x03LOG\x10\x00\x12\x0b\n\x07METRICS\x10\x01*Z\n\tErrorCode\x12\x0e\n\nNO_FAILURE\x10\x00\x12\x15\n\x11SERVER_QUEUE_FULL\x10\x01\x12\x16\n\x12NOT_ALLOWED_ECU_ID\x10\x02\x12\x0e\n\nNO_MESSAGE\x10\x03\x32}\n\x1aOtaClientIoTLoggingService\x12+\n\x06PutLog\x12\x0e.PutLogRequest\x1a\x0f.PutLogResponse"\x00\x12\x32\n\x05\x43heck\x12\x13.HealthCheckRequest\x1a\x14.HealthCheckResponseb\x06proto3' +) + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, + "otaclient_iot_logging_server_pb2.v1.otaclient_iot_logging_server_v1_pb2", + _globals, +) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _globals["_LOGTYPE"]._serialized_start = 412 + _globals["_LOGTYPE"]._serialized_end = 443 + _globals["_ERRORCODE"]._serialized_start = 445 + _globals["_ERRORCODE"]._serialized_end = 535 + _globals["_PUTLOGREQUEST"]._serialized_start = 77 + _globals["_PUTLOGREQUEST"]._serialized_end = 153 + _globals["_PUTLOGRESPONSE"]._serialized_start = 155 + _globals["_PUTLOGRESPONSE"]._serialized_end = 214 + _globals["_HEALTHCHECKREQUEST"]._serialized_start = 216 + _globals["_HEALTHCHECKREQUEST"]._serialized_end = 253 + _globals["_HEALTHCHECKRESPONSE"]._serialized_start = 256 + _globals["_HEALTHCHECKRESPONSE"]._serialized_end = 410 + _globals["_HEALTHCHECKRESPONSE_SERVINGSTATUS"]._serialized_start = 331 + _globals["_HEALTHCHECKRESPONSE_SERVINGSTATUS"]._serialized_end = 410 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_start = 537 + _globals["_OTACLIENTIOTLOGGINGSERVICE"]._serialized_end = 662 +# @@protoc_insertion_point(module_scope) diff --git a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi new file mode 100644 index 000000000..52246c1b8 --- /dev/null +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2.pyi @@ -0,0 +1,78 @@ +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class LogType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = [] + LOG: _ClassVar[LogType] + METRICS: _ClassVar[LogType] + +class ErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = [] + NO_FAILURE: _ClassVar[ErrorCode] + SERVER_QUEUE_FULL: _ClassVar[ErrorCode] + NOT_ALLOWED_ECU_ID: _ClassVar[ErrorCode] + NO_MESSAGE: _ClassVar[ErrorCode] + +LOG: LogType +METRICS: LogType +NO_FAILURE: ErrorCode +SERVER_QUEUE_FULL: ErrorCode +NOT_ALLOWED_ECU_ID: ErrorCode +NO_MESSAGE: ErrorCode + +class PutLogRequest(_message.Message): + __slots__ = ["ecu_id", "log_type", "message"] + ECU_ID_FIELD_NUMBER: _ClassVar[int] + LOG_TYPE_FIELD_NUMBER: _ClassVar[int] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + ecu_id: str + log_type: LogType + message: str + def __init__( + self, + ecu_id: _Optional[str] = ..., + log_type: _Optional[_Union[LogType, str]] = ..., + message: _Optional[str] = ..., + ) -> None: ... + +class PutLogResponse(_message.Message): + __slots__ = ["code", "message"] + CODE_FIELD_NUMBER: _ClassVar[int] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + code: ErrorCode + message: str + def __init__( + self, + code: _Optional[_Union[ErrorCode, str]] = ..., + message: _Optional[str] = ..., + ) -> None: ... + +class HealthCheckRequest(_message.Message): + __slots__ = ["service"] + SERVICE_FIELD_NUMBER: _ClassVar[int] + service: str + def __init__(self, service: _Optional[str] = ...) -> None: ... + +class HealthCheckResponse(_message.Message): + __slots__ = ["status"] + + class ServingStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = [] + UNKNOWN: _ClassVar[HealthCheckResponse.ServingStatus] + SERVING: _ClassVar[HealthCheckResponse.ServingStatus] + NOT_SERVING: _ClassVar[HealthCheckResponse.ServingStatus] + SERVICE_UNKNOWN: _ClassVar[HealthCheckResponse.ServingStatus] + + UNKNOWN: HealthCheckResponse.ServingStatus + SERVING: HealthCheckResponse.ServingStatus + NOT_SERVING: HealthCheckResponse.ServingStatus + SERVICE_UNKNOWN: HealthCheckResponse.ServingStatus + STATUS_FIELD_NUMBER: _ClassVar[int] + status: HealthCheckResponse.ServingStatus + def __init__( + self, status: _Optional[_Union[HealthCheckResponse.ServingStatus, str]] = ... + ) -> None: ... diff --git a/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py new file mode 100644 index 000000000..fab8a3d5d --- /dev/null +++ b/src/otaclient/grpc/log_v1/otaclient_iot_logging_server_v1_pb2_grpc.py @@ -0,0 +1,131 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from otaclient.grpc.log_v1 import ( + otaclient_iot_logging_server_v1_pb2 as otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2, +) + + +class OtaClientIoTLoggingServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.PutLog = channel.unary_unary( + "/OtaClientIoTLoggingService/PutLog", + request_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogRequest.SerializeToString, + response_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogResponse.FromString, + ) + self.Check = channel.unary_unary( + "/OtaClientIoTLoggingService/Check", + request_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckRequest.SerializeToString, + response_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckResponse.FromString, + ) + + +class OtaClientIoTLoggingServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def PutLog(self, request, context): + """ + `PutLog` service requests OTA Client logging service to put log. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def Check(self, request, context): + """ + `Check` requests OTA Client logging service to check the health of the + service. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_OtaClientIoTLoggingServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + "PutLog": grpc.unary_unary_rpc_method_handler( + servicer.PutLog, + request_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogRequest.FromString, + response_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogResponse.SerializeToString, + ), + "Check": grpc.unary_unary_rpc_method_handler( + servicer.Check, + request_deserializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckRequest.FromString, + response_serializer=otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "OtaClientIoTLoggingService", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class OtaClientIoTLoggingService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def PutLog( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/OtaClientIoTLoggingService/PutLog", + otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogRequest.SerializeToString, + otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.PutLogResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def Check( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/OtaClientIoTLoggingService/Check", + otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckRequest.SerializeToString, + otaclient__iot__logging__server__pb2_dot_v1_dot_otaclient__iot__logging__server__v1__pb2.HealthCheckResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/src/otaclient/logging_README.md b/src/otaclient/logging_README.md new file mode 100644 index 000000000..c41d37914 --- /dev/null +++ b/src/otaclient/logging_README.md @@ -0,0 +1,72 @@ +# Protocol Documentation + +## Table of Contents + +- [Protocol Documentation](#protocol-documentation) + - [Table of Contents](#table-of-contents) + - [proto/otaclient\_iot\_logging\_server\_v1.proto](#protootaclient_iot_logging_server_v1proto) + - [PutLogRequest](#putlogrequest) + - [PutLogResponse](#putlogresponse) + - [ErrorCode](#errorcode) + - [LogType](#logtype) + - [OtaClientIoTLoggingService](#otaclientiotloggingservice) + - [Scalar Value Types](#scalar-value-types) + +## proto/otaclient_iot_logging_server_v1.proto + +### PutLogRequest + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| ecu_id | string | | target ECU ID | +| log_type | [LogType](#logtype) | | log type | +| message | string | | log message | + +### PutLogResponse + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| code | [ErrorCode](#errorcode) | | error code | +| message | string | | error message | + +### ErrorCode + +| Name | Number | Description | +| ---- | ------ | ----------- | +| NO_FAILURE | 0 | Success | +| SERVER_QUEUE_FULL | 1 | Error: Server queue is full | +| NOT_ALLOWED_ECU_ID | 2 | Error: Specified ECU ID is not allowed | +| NO_MESSAGE | 3 | Error: No message in the request | + +### LogType + +| Name | Number | Description | +| ---- | ------ | ----------- | +| LOG | 0 | | +| METRICS | 1 | | + +### OtaClientIoTLoggingService + +| Method Name | Request Type | Response Type | Description | +| ----------- | ------------ | ------------- | ------------| +| PutLog | [.PutLogRequest](#putlogrequest) | [.PutLogResponse](#putlogresponse) | `PutLog` service requests OTA Client logging service to put log. | + +## Scalar Value Types + +| .proto Type | Notes | C++ | Java | Python | Go | C# | PHP | Ruby | +| ----------- | ----- | --- | ---- | ------ | -- | -- | --- | ---- | +| double | | double | double | float | float64 | double | float | Float | +| float | | float | float | float | float32 | float | float | Float | +| int32 | Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint32 instead. | int32 | int | int | int32 | int | integer | Bignum or Fixnum (as required) | +| int64 | Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint64 instead. | int64 | long | int/long | int64 | long | integer/string | Bignum | +| uint32 | Uses variable-length encoding. | uint32 | int | int/long | uint32 | uint | integer | Bignum or Fixnum (as required) | +| uint64 | Uses variable-length encoding. | uint64 | long | int/long | uint64 | ulong | integer/string | Bignum or Fixnum (as required) | +| sint32 | Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int32s. | int32 | int | int | int32 | int | integer | Bignum or Fixnum (as required) | +| sint64 | Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int64s. | int64 | long | int/long | int64 | long | integer/string | Bignum | +| fixed32 | Always four bytes. More efficient than uint32 if values are often greater than 2^28. | uint32 | int | int | uint32 | uint | integer | Bignum or Fixnum (as required) | +| fixed64 | Always eight bytes. More efficient than uint64 if values are often greater than 2^56. | uint64 | long | int/long | uint64 | ulong | integer/string | Bignum | +| sfixed32 | Always four bytes. | int32 | int | int | int32 | int | integer | Bignum or Fixnum (as required) | +| sfixed64 | Always eight bytes. | int64 | long | int/long | int64 | long | integer/string | Bignum | +| bool | | bool | boolean | boolean | bool | bool | boolean | TrueClass/FalseClass | +| string | A string must always contain UTF-8 encoded or 7-bit ASCII text. | string | String | str/unicode | string | string | string | String (UTF-8) | +| bytes | May contain any arbitrary sequence of bytes. | string | ByteString | str | []byte | ByteString | string | String (ASCII-8BIT) | diff --git a/tests/test_otaclient/test_log_setting.py b/tests/test_otaclient/test_log_setting.py index 60ce8b5a7..c8f5c6b69 100644 --- a/tests/test_otaclient/test_log_setting.py +++ b/tests/test_otaclient/test_log_setting.py @@ -17,21 +17,37 @@ import logging +import pytest + from otaclient import _logging MODULE = _logging.__name__ logger = logging.getLogger(__name__) -def test_server_logger(): - test_log_msg = "emit one logging entry" - +@pytest.mark.parametrize( + "test_log_msg, test_extra, expected_log_type", + [ + ("emit one logging entry", None, _logging.LogType.LOG), + ( + "emit one logging entry", + {"log_type": _logging.LogType.LOG}, + _logging.LogType.LOG, + ), + ( + "emit one metrics entry", + {"log_type": _logging.LogType.METRICS}, + _logging.LogType.METRICS, + ), + ], +) +def test_server_logger(test_log_msg, test_extra, expected_log_type): # ------ setup test ------ # _handler = _logging._LogTeeHandler() logger.addHandler(_handler) # ------ execution ------ # - logger.info(test_log_msg) + logger.info(test_log_msg, extra=test_extra) # ------ clenaup ------ # logger.removeHandler(_handler) @@ -39,4 +55,6 @@ def test_server_logger(): # ------ check result ------ # _queue = _handler._queue _log = _queue.get_nowait() - assert _log == test_log_msg + assert _log is not None + assert _log.log_type == expected_log_type + assert _log.message == test_log_msg diff --git a/tests/test_otaclient/test_logging.py b/tests/test_otaclient/test_logging.py new file mode 100644 index 000000000..6d3a26077 --- /dev/null +++ b/tests/test_otaclient/test_logging.py @@ -0,0 +1,198 @@ +# Copyright 2022 TIER IV, INC. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +import logging +import re +from dataclasses import dataclass +from queue import Queue +from urllib.parse import urlparse + +import grpc +import pytest +import pytest_asyncio +from pydantic import AnyHttpUrl +from pytest_mock import MockerFixture + +import otaclient._logging as _logging +from otaclient._logging import LogType, configure_logging +from otaclient.configs._cfg_configurable import _OTAClientSettings +from otaclient.configs._ecu_info import ECUInfo +from otaclient.configs._proxy_info import ProxyInfo +from otaclient.grpc.log_v1 import ( + otaclient_iot_logging_server_v1_pb2 as log_pb2, +) +from otaclient.grpc.log_v1 import ( + otaclient_iot_logging_server_v1_pb2_grpc as log_v1_grpc, +) + +logger = logging.getLogger(__name__) + +MODULE = _logging.__name__ + + +@dataclass +class DummyQueueData: + ecu_id: str + log_type: log_pb2.LogType + message: str + + +class DummyLogServerService(log_v1_grpc.OtaClientIoTLoggingServiceServicer): + def __init__(self, test_queue, data_ready): + self._test_queue = test_queue + self._data_ready = data_ready + + async def Check(self, request: log_pb2.HealthCheckRequest, context): + """ + Dummy gRPC method for health check. + """ + return log_pb2.HealthCheckResponse( + status=log_pb2.HealthCheckResponse.ServingStatus.SERVING + ) + + async def PutLog(self, request: log_pb2.PutLogRequest, context): + """ + Dummy gRPC method to put a log message to the queue. + """ + self._test_queue.put_nowait( + DummyQueueData( + ecu_id=request.ecu_id, + log_type=request.log_type, + message=request.message, + ) + ) + self._data_ready.set() + return log_pb2.PutLogResponse(code=log_pb2.NO_FAILURE) + + +class TestLogClient: + OTA_CLIENT_LOGGING_SERVER = "http://127.0.0.1:8084" + ECU_ID = "testclient" + + @pytest.fixture(autouse=True) + async def initialize_queue(self): + self.test_queue: Queue[DummyQueueData] = Queue() + self.data_ready = asyncio.Event() + + @pytest.fixture(autouse=True) + def mock_cfg(self, mocker: MockerFixture): + self._cfg = _OTAClientSettings( + LOG_LEVEL_TABLE={__name__: "INFO"}, + LOG_FORMAT="[%(asctime)s][%(levelname)s]-%(name)s:%(funcName)s:%(lineno)d,%(message)s", + ) + mocker.patch(f"{MODULE}.cfg", self._cfg) + + @pytest.fixture(autouse=True) + def mock_ecu_info(self, mocker: MockerFixture): + self._ecu_info = ECUInfo(ecu_id=TestLogClient.ECU_ID) + mocker.patch(f"{MODULE}.ecu_info", self._ecu_info) + + @pytest.fixture(autouse=True) + def mock_proxy_info(self, mocker: MockerFixture): + self._proxy_info = ProxyInfo( + logging_server=AnyHttpUrl(TestLogClient.OTA_CLIENT_LOGGING_SERVER) + ) + mocker.patch(f"{MODULE}.proxy_info", self._proxy_info) + + @pytest_asyncio.fixture + async def launch_grpc_server(self): + server = grpc.aio.server() + log_v1_grpc.add_OtaClientIoTLoggingServiceServicer_to_server( + servicer=DummyLogServerService(self.test_queue, self.data_ready), + server=server, + ) + parsed_url = urlparse(TestLogClient.OTA_CLIENT_LOGGING_SERVER) + server.add_insecure_port(parsed_url.netloc) + try: + await server.start() + yield + finally: + await server.stop(None) + await server.wait_for_termination() + + @pytest.fixture + def restore_logging(self): + # confiure_logging() is called in the test, so we need to restore the original logging configuration + # Save the current logging configuration + original_handlers = logging.root.handlers[:] + original_level = logging.root.level + original_formatters = [handler.formatter for handler in logging.root.handlers] + + yield + + # Restore the original logging configuration + logging.root.handlers = original_handlers + logging.root.level = original_level + for handler, formatter in zip(logging.root.handlers, original_formatters): + handler.setFormatter(formatter) + + @pytest.mark.parametrize( + "log_message, extra, expected_log_type", + [ + ( + "some log message without extra", + {}, + log_pb2.LogType.LOG, + ), + ( + "some log message", + {"log_type": LogType.LOG}, + log_pb2.LogType.LOG, + ), + ( + "some metrics message", + {"log_type": LogType.METRICS}, + log_pb2.LogType.METRICS, + ), + ], + ) + async def test_grpc_logging( + self, + launch_grpc_server, + restore_logging, + log_message, + extra, + expected_log_type, + ): + configure_logging() + # send a test log message + logger.error(log_message, extra=extra) + # wait for the log message to be received + await self.data_ready.wait() + self.data_ready.clear() + + try: + _response = self.test_queue.get_nowait() + except Exception as e: + pytest.fail(f"Failed to get a log message from the queue: {e}") + + assert _response.ecu_id == TestLogClient.ECU_ID + assert _response.log_type == expected_log_type + + if _response.log_type == log_pb2.LogType.LOG: + # Extract the message part from the log format + # e.g. [2022-01-01 00:00:00,000][ERROR]-test_log_client:test_log_client:123,some log message + log_format_regex = r"\[.*?\]\[.*?\]-.*?:.*?:\d+,(.*)" + match = re.match(log_format_regex, _response.message) + assert match is not None, "Log message format does not match" + extracted_message = match.group(1) + assert extracted_message == log_message + elif _response.log_type == log_pb2.LogType.METRICS: + # Expect the message to be the same as the input + assert _response.message == log_message + else: + pytest.fail(f"Unexpected log type: {_response.log_type}")