From 6925517c5944ec221908ef2d232054203d2d9716 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20W=C3=B6rpel?= Date: Sat, 7 Sep 2024 19:26:24 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=8A=20Fix=20logging=20configuration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- investigraph/cli.py | 4 +- investigraph/inspect.py | 24 +++++----- investigraph/logging.py | 101 ++++++++++++++++++++++++++++++++++++++- investigraph/settings.py | 3 ++ tests/conftest.py | 2 + tests/test_cli.py | 15 +++--- 6 files changed, 127 insertions(+), 22 deletions(-) diff --git a/investigraph/cli.py b/investigraph/cli.py index a3019f7..5de1fd9 100644 --- a/investigraph/cli.py +++ b/investigraph/cli.py @@ -6,7 +6,6 @@ import typer from anystore import smart_stream from anystore.io import smart_write -from anystore.logging import configure_logging from ftmq.model import Catalog from rich import print from rich.console import Console @@ -18,6 +17,7 @@ inspect_seed, inspect_transform, ) +from investigraph.logging import configure_logging from investigraph.logic.extract import extract_records_from_config from investigraph.logic.transform import transform_record from investigraph.model.config import get_config @@ -34,10 +34,10 @@ def cli_version( version: Annotated[Optional[bool], typer.Option(..., help="Show version")] = False ): - configure_logging() if version: print(VERSION) raise typer.Exit() + configure_logging() @cli.command("run") diff --git a/investigraph/inspect.py b/investigraph/inspect.py index ed25a06..4c33115 100644 --- a/investigraph/inspect.py +++ b/investigraph/inspect.py @@ -6,22 +6,24 @@ import pandas as pd from nomenklatura.entity import CE -from rich import print +from investigraph.logging import get_logger from investigraph.logic.extract import extract_records_from_source from investigraph.logic.transform import transform_record from investigraph.model.config import Config, get_config from investigraph.model.context import BaseContext, Context from investigraph.util import PathLike +log = get_logger(__name__) -def print_error(msg: str): - print(f"[bold red]ERROR[/bold red] {msg}") + +def log_error(msg: str): + log.error(f"[bold red]ERROR[/bold red] {msg}") def get_records(ctx: Context, limit: int | None = 5) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] - print("Extracting `%s` ..." % ctx.source.uri) + log.info("Extracting `%s` ..." % ctx.source.uri) for record in extract_records_from_source(ctx): records.append(record) if len(records) == limit: @@ -33,21 +35,19 @@ def inspect_config(p: PathLike) -> Config: config = get_config(p) try: if not callable(config.extract.get_handler()): - print_error(f"module not found or not callable: `{config.extract.handler}`") + log_error(f"module not found or not callable: `{config.extract.handler}`") except ModuleNotFoundError: - print_error(f"no custom extract module: `{config.extract.handler}`") + log_error(f"no custom extract module: `{config.extract.handler}`") try: if not callable(config.transform.get_handler()): - print_error( - f"module not found or not callable: `{config.transform.handler}`" - ) + log_error(f"module not found or not callable: `{config.transform.handler}`") except ModuleNotFoundError: - print_error(f"no custom transform module: `{config.transform.handler}`") + log_error(f"no custom transform module: `{config.transform.handler}`") try: if not callable(config.load.get_handler()): - print_error(f"module not found or not callable: `{config.load.handler}`") + log_error(f"module not found or not callable: `{config.load.handler}`") except ModuleNotFoundError: - print_error(f"no custom load module: `{config.load.handler}`") + log_error(f"no custom load module: `{config.load.handler}`") return config diff --git a/investigraph/logging.py b/investigraph/logging.py index 1e22132..d14f686 100644 --- a/investigraph/logging.py +++ b/investigraph/logging.py @@ -1,11 +1,108 @@ import logging +import sys +from logging import Filter, LogRecord +from typing import Any, Dict, List +import structlog from prefect import get_run_logger from prefect.exceptions import MissingContextError +from structlog.contextvars import merge_contextvars +from structlog.dev import ConsoleRenderer, set_exc_info +from structlog.processors import ( + JSONRenderer, + TimeStamper, + UnicodeDecoder, + add_log_level, + format_exc_info, +) +from structlog.stdlib import ( + BoundLogger, + LoggerFactory, + ProcessorFormatter, + add_logger_name, +) +from structlog.stdlib import get_logger as get_raw_logger +from investigraph.settings import SETTINGS -def get_logger(name: str): + +def get_logger(name: str, *args, **kwargs): try: return get_run_logger() except MissingContextError: - return logging.getLogger(name) + return get_raw_logger(name, *args, **kwargs) + + +def configure_logging(level: int = logging.INFO) -> None: + """Configure log levels and structured logging""" + shared_processors: List[Any] = [ + add_log_level, + add_logger_name, + # structlog.stdlib.PositionalArgumentsFormatter(), + # structlog.processors.StackInfoRenderer(), + merge_contextvars, + set_exc_info, + TimeStamper(fmt="iso"), + # format_exc_info, + UnicodeDecoder(), + ] + + if SETTINGS.log_json: + shared_processors.append(format_exc_info) + shared_processors.append(format_json) + formatter = ProcessorFormatter( + foreign_pre_chain=shared_processors, + processor=JSONRenderer(), + ) + else: + formatter = ProcessorFormatter( + foreign_pre_chain=shared_processors, + processor=ConsoleRenderer( + exception_formatter=structlog.dev.plain_traceback + ), + ) + + processors = shared_processors + [ + ProcessorFormatter.wrap_for_formatter, + ] + + # configuration for structlog based loggers + structlog.configure( + cache_logger_on_first_use=True, + # wrapper_class=AsyncBoundLogger, + wrapper_class=BoundLogger, + processors=processors, + context_class=dict, + logger_factory=LoggerFactory(), + ) + + # handler for low level logs that should be sent to STDERR + out_handler = logging.StreamHandler(sys.stderr) + out_handler.setLevel(level) + out_handler.addFilter(_MaxLevelFilter(logging.WARNING)) + out_handler.setFormatter(formatter) + # handler for high level logs that should be sent to STDERR + error_handler = logging.StreamHandler(sys.stderr) + error_handler.setLevel(logging.ERROR) + error_handler.setFormatter(formatter) + + root_logger = logging.getLogger() + root_logger.handlers.clear() + root_logger.setLevel(SETTINGS.log_level.upper()) + root_logger.addHandler(out_handler) + root_logger.addHandler(error_handler) + + +def format_json(_: Any, __: Any, ed: Dict[str, str]) -> Dict[str, str]: + """Stackdriver uses `message` and `severity` keys to display logs""" + ed["message"] = ed.pop("event") + ed["severity"] = ed.pop("level", "info").upper() + return ed + + +class _MaxLevelFilter(Filter): + def __init__(self, highest_log_level: int) -> None: + self._highest_log_level = highest_log_level + + def filter(self, log_record: LogRecord) -> bool: + return log_record.levelno <= self._highest_log_level diff --git a/investigraph/settings.py b/investigraph/settings.py index b35f2cb..92b8278 100644 --- a/investigraph/settings.py +++ b/investigraph/settings.py @@ -49,6 +49,9 @@ class Settings(BaseSettings): archive_uri: str = Field(str((Path.cwd() / "data" / "archive").absolute().as_uri())) + log_json: bool = Field(alias="log_json", default=False) + log_level: str = Field(alias="log_level", default="info") + SETTINGS = Settings() DEBUG = SETTINGS.debug diff --git a/tests/conftest.py b/tests/conftest.py index cb2e5ed..c0749ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,7 @@ import requests from prefect.testing.utilities import prefect_test_harness +from investigraph.logging import configure_logging from investigraph.model import Config FIXTURES_PATH = (Path(__file__).parent / "fixtures").absolute() @@ -14,6 +15,7 @@ @pytest.fixture(autouse=True, scope="session") def prefect_test_fixture(): + configure_logging() with prefect_test_harness(): yield diff --git a/tests/test_cli.py b/tests/test_cli.py index 64a4635..4df1724 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -25,6 +25,7 @@ def test_cli_run(fixtures_path: Path): def test_cli_inspect(fixtures_path: Path): + # FIXME stdout/stderr logging in result? config = str(fixtures_path / "gdho" / "config.local.yml") result = runner.invoke(cli, ["inspect", config]) assert result.exit_code == 0 @@ -41,15 +42,17 @@ def test_cli_inspect(fixtures_path: Path): assert result.exit_code == 0 tested = False for line in result.stdout.split("\n"): - data = orjson.loads(line) - proxy = make_proxy(data) - assert "gdho" in proxy.datasets - tested = True - break + if line.startswith("{"): + data = orjson.loads(line) + proxy = make_proxy(data) + assert "gdho" in proxy.datasets + tested = True + break assert tested result = runner.invoke( cli, ["inspect", config, "--transform", "--to-json", "-l", "1"] ) assert result.exit_code == 0 - assert len(result.stdout.strip().split("\n")) == 1 + proxies = [ln for ln in result.stdout.strip().split("\n") if ln.startswith("{")] + assert len(proxies) == 1