Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support both http and grpc for logging #474

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 112 additions & 14 deletions src/otaclient/_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,136 @@
# limitations under the License.
"""Configure the logging for otaclient."""


from __future__ import annotations

import atexit
import contextlib
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from queue import Queue
from threading import Event, Thread
from urllib.parse import urljoin
from urllib.parse import urljoin, urlparse

import grpc
import requests
from pydantic import AnyHttpUrl

from otaclient.configs.cfg import cfg, ecu_info, proxy_info
from otaclient.grpc.log_v1 import (
otaclient_iot_logging_server_v1_pb2 as log_pb2,
)
from otaclient.grpc.log_v1 import (
otaclient_iot_logging_server_v1_pb2_grpc as log_v1_grpc,
)


class LogType(Enum):
LOG = "log"
METRICS = "metrics"


@dataclass
class QueueData:
"""Queue data format for logging."""

log_type: LogType
message: str
Comment on lines +46 to +51
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed queue from str to QueueData.



class Transmitter(ABC):
def __init__(self, logging_upload_endpoint, ecu_id: str):
self.ecu_id = ecu_id

@abstractmethod
def send(self, log_type: LogType, message: str, timeout: int) -> None:
pass

@abstractmethod
def check(self, timeout: int) -> None:
pass
Comment on lines +54 to +64
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Base Class.



class TransmitterGrpc(Transmitter):
def __init__(self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str):
super().__init__(logging_upload_endpoint, ecu_id)

parsed_url = urlparse(str(logging_upload_endpoint))
log_upload_endpoint = parsed_url.netloc
channel = grpc.insecure_channel(log_upload_endpoint)
self._stub = log_v1_grpc.OtaClientIoTLoggingServiceStub(channel)

def send(self, log_type: LogType, message: str, timeout: int) -> None:
pb2_log_type = (
log_pb2.LogType.METRICS
if log_type == LogType.METRICS
else log_pb2.LogType.LOG
)
log_entry = log_pb2.PutLogRequest(
ecu_id=self.ecu_id, log_type=pb2_log_type, message=message
)
self._stub.PutLog(log_entry, timeout=timeout)

def check(self, timeout: int) -> None:
# health check
log_entry = log_pb2.HealthCheckRequest(service="")
self._stub.Check(log_entry)
Comment on lines +67 to +90
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation Class for gRPC.



class TransmitterHttp(Transmitter):
def __init__(self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str):
super().__init__(logging_upload_endpoint, ecu_id)

_endpoint = f"{str(logging_upload_endpoint).strip('/')}/"
self.log_upload_endpoint = urljoin(_endpoint, ecu_id)
self._session = requests.Session()

def send(self, log_type: LogType, message: str, timeout: int) -> None:
# support only LogType.LOG
with contextlib.suppress(Exception):
self._session.post(self.log_upload_endpoint, data=message, timeout=timeout)

def check(self, timeout: int) -> None:
pass
Comment on lines +93 to +107
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation Class for HTTP.



class TransmitterFactory:
@staticmethod
def create(logging_upload_endpoint: AnyHttpUrl, ecu_id: str):
try:
_transmitter_grpc = TransmitterGrpc(logging_upload_endpoint, ecu_id)
_transmitter_grpc.check(timeout=3)
return _transmitter_grpc
except Exception:
return TransmitterHttp(logging_upload_endpoint, ecu_id)
Comment on lines +110 to +118
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Factory Class.



class _LogTeeHandler(logging.Handler):
"""Implementation of teeing local logs to a remote otaclient-iot-logger server."""

def __init__(self, max_backlog: int = 2048) -> None:
super().__init__()
self._queue: Queue[str | None] = Queue(maxsize=max_backlog)
self._queue: Queue[QueueData | None] = Queue(maxsize=max_backlog)

def emit(self, record: logging.LogRecord) -> None:
with contextlib.suppress(Exception):
self._queue.put_nowait(self.format(record))

def start_upload_thread(self, endpoint_url: str):
_log_type = getattr(record, "log_type", LogType.LOG) # default to LOG
# if a message is log message, format the message with the formatter
# otherwise(metric message), use the raw message
_message = (
self.format(record) if _log_type == LogType.LOG else record.getMessage()
)
self._queue.put_nowait(QueueData(_log_type, _message))

def start_upload_thread(
self, logging_upload_endpoint: AnyHttpUrl, ecu_id: str
) -> None:
log_queue = self._queue
stop_logging_upload = Event()

def _thread_main():
_session = requests.Session()
_transmitter = TransmitterFactory.create(logging_upload_endpoint, ecu_id)

while not stop_logging_upload.is_set():
entry = log_queue.get()
Expand All @@ -53,8 +151,11 @@ def _thread_main():
if not entry:
continue # skip uploading empty log line

with contextlib.suppress(Exception):
_session.post(endpoint_url, data=entry, timeout=3)
try:
_transmitter.send(entry.log_type, entry.message, timeout=3)
except Exception:
# ignore the exception and continue
pass

log_upload_thread = Thread(target=_thread_main, daemon=True)
log_upload_thread.start()
Expand All @@ -68,7 +169,7 @@ def _thread_exit():


def configure_logging() -> None:
"""Configure logging with http handler."""
"""Configure logging with http/gRPC handler."""
# ------ suppress logging from non-first-party modules ------ #
# NOTE: force to reload the basicConfig, this is for overriding setting
# when launching subprocess.
Expand All @@ -79,15 +180,12 @@ def configure_logging() -> None:
# ------ configure each sub loggers and attach ota logging handler ------ #
log_upload_handler = None
if logging_upload_endpoint := proxy_info.logging_server:
logging_upload_endpoint = f"{str(logging_upload_endpoint).strip('/')}/"

log_upload_handler = _LogTeeHandler()
fmt = logging.Formatter(fmt=cfg.LOG_FORMAT)
log_upload_handler.setFormatter(fmt)

# star the logging thread
log_upload_endpoint = urljoin(logging_upload_endpoint, ecu_info.ecu_id)
log_upload_handler.start_upload_thread(log_upload_endpoint)
log_upload_handler.start_upload_thread(logging_upload_endpoint, ecu_info.ecu_id)

for logger_name, loglevel in cfg.LOG_LEVEL_TABLE.items():
_logger = logging.getLogger(logger_name)
Expand Down
1 change: 0 additions & 1 deletion src/otaclient/configs/_proxy_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
"""proxy_info.yaml definition and parsing logic."""


from __future__ import annotations

import logging
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union

DESCRIPTOR: _descriptor.FileDescriptor

class LogType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
LOG: _ClassVar[LogType]
METRICS: _ClassVar[LogType]

class ErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
NO_FAILURE: _ClassVar[ErrorCode]
SERVER_QUEUE_FULL: _ClassVar[ErrorCode]
NOT_ALLOWED_ECU_ID: _ClassVar[ErrorCode]
NO_MESSAGE: _ClassVar[ErrorCode]

LOG: LogType
METRICS: LogType
NO_FAILURE: ErrorCode
SERVER_QUEUE_FULL: ErrorCode
NOT_ALLOWED_ECU_ID: ErrorCode
NO_MESSAGE: ErrorCode

class PutLogRequest(_message.Message):
__slots__ = ["ecu_id", "log_type", "message"]
ECU_ID_FIELD_NUMBER: _ClassVar[int]
LOG_TYPE_FIELD_NUMBER: _ClassVar[int]
MESSAGE_FIELD_NUMBER: _ClassVar[int]
ecu_id: str
log_type: LogType
message: str
def __init__(
self,
ecu_id: _Optional[str] = ...,
log_type: _Optional[_Union[LogType, str]] = ...,
message: _Optional[str] = ...,
) -> None: ...

class PutLogResponse(_message.Message):
__slots__ = ["code", "message"]
CODE_FIELD_NUMBER: _ClassVar[int]
MESSAGE_FIELD_NUMBER: _ClassVar[int]
code: ErrorCode
message: str
def __init__(
self,
code: _Optional[_Union[ErrorCode, str]] = ...,
message: _Optional[str] = ...,
) -> None: ...

class HealthCheckRequest(_message.Message):
__slots__ = ["service"]
SERVICE_FIELD_NUMBER: _ClassVar[int]
service: str
def __init__(self, service: _Optional[str] = ...) -> None: ...

class HealthCheckResponse(_message.Message):
__slots__ = ["status"]

class ServingStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
UNKNOWN: _ClassVar[HealthCheckResponse.ServingStatus]
SERVING: _ClassVar[HealthCheckResponse.ServingStatus]
NOT_SERVING: _ClassVar[HealthCheckResponse.ServingStatus]
SERVICE_UNKNOWN: _ClassVar[HealthCheckResponse.ServingStatus]

UNKNOWN: HealthCheckResponse.ServingStatus
SERVING: HealthCheckResponse.ServingStatus
NOT_SERVING: HealthCheckResponse.ServingStatus
SERVICE_UNKNOWN: HealthCheckResponse.ServingStatus
STATUS_FIELD_NUMBER: _ClassVar[int]
status: HealthCheckResponse.ServingStatus
def __init__(
self, status: _Optional[_Union[HealthCheckResponse.ServingStatus, str]] = ...
) -> None: ...
Loading
Loading