Skip to content

Commit

Permalink
🔊 Fix logging configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
simonwoerpel committed Sep 7, 2024
1 parent eeeac47 commit 6925517
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 22 deletions.
4 changes: 2 additions & 2 deletions investigraph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import typer
from anystore import smart_stream
from anystore.io import smart_write
from anystore.logging import configure_logging
from ftmq.model import Catalog
from rich import print
from rich.console import Console
Expand All @@ -18,6 +17,7 @@
inspect_seed,
inspect_transform,
)
from investigraph.logging import configure_logging
from investigraph.logic.extract import extract_records_from_config
from investigraph.logic.transform import transform_record
from investigraph.model.config import get_config
Expand All @@ -34,10 +34,10 @@
def cli_version(
version: Annotated[Optional[bool], typer.Option(..., help="Show version")] = False
):
configure_logging()
if version:
print(VERSION)
raise typer.Exit()
configure_logging()


@cli.command("run")
Expand Down
24 changes: 12 additions & 12 deletions investigraph/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@

import pandas as pd
from nomenklatura.entity import CE
from rich import print

from investigraph.logging import get_logger
from investigraph.logic.extract import extract_records_from_source
from investigraph.logic.transform import transform_record
from investigraph.model.config import Config, get_config
from investigraph.model.context import BaseContext, Context
from investigraph.util import PathLike

log = get_logger(__name__)

def print_error(msg: str):
print(f"[bold red]ERROR[/bold red] {msg}")

def log_error(msg: str):
log.error(f"[bold red]ERROR[/bold red] {msg}")


def get_records(ctx: Context, limit: int | None = 5) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
print("Extracting `%s` ..." % ctx.source.uri)
log.info("Extracting `%s` ..." % ctx.source.uri)
for record in extract_records_from_source(ctx):
records.append(record)
if len(records) == limit:
Expand All @@ -33,21 +35,19 @@ def inspect_config(p: PathLike) -> Config:
config = get_config(p)
try:
if not callable(config.extract.get_handler()):
print_error(f"module not found or not callable: `{config.extract.handler}`")
log_error(f"module not found or not callable: `{config.extract.handler}`")
except ModuleNotFoundError:
print_error(f"no custom extract module: `{config.extract.handler}`")
log_error(f"no custom extract module: `{config.extract.handler}`")
try:
if not callable(config.transform.get_handler()):
print_error(
f"module not found or not callable: `{config.transform.handler}`"
)
log_error(f"module not found or not callable: `{config.transform.handler}`")
except ModuleNotFoundError:
print_error(f"no custom transform module: `{config.transform.handler}`")
log_error(f"no custom transform module: `{config.transform.handler}`")
try:
if not callable(config.load.get_handler()):
print_error(f"module not found or not callable: `{config.load.handler}`")
log_error(f"module not found or not callable: `{config.load.handler}`")
except ModuleNotFoundError:
print_error(f"no custom load module: `{config.load.handler}`")
log_error(f"no custom load module: `{config.load.handler}`")
return config


Expand Down
101 changes: 99 additions & 2 deletions investigraph/logging.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,108 @@
import logging
import sys
from logging import Filter, LogRecord
from typing import Any, Dict, List

import structlog
from prefect import get_run_logger
from prefect.exceptions import MissingContextError
from structlog.contextvars import merge_contextvars
from structlog.dev import ConsoleRenderer, set_exc_info
from structlog.processors import (
JSONRenderer,
TimeStamper,
UnicodeDecoder,
add_log_level,
format_exc_info,
)
from structlog.stdlib import (
BoundLogger,
LoggerFactory,
ProcessorFormatter,
add_logger_name,
)
from structlog.stdlib import get_logger as get_raw_logger

from investigraph.settings import SETTINGS

def get_logger(name: str):

def get_logger(name: str, *args, **kwargs):
try:
return get_run_logger()
except MissingContextError:
return logging.getLogger(name)
return get_raw_logger(name, *args, **kwargs)


def configure_logging(level: int = logging.INFO) -> None:
"""Configure log levels and structured logging"""
shared_processors: List[Any] = [
add_log_level,
add_logger_name,
# structlog.stdlib.PositionalArgumentsFormatter(),
# structlog.processors.StackInfoRenderer(),
merge_contextvars,
set_exc_info,
TimeStamper(fmt="iso"),
# format_exc_info,
UnicodeDecoder(),
]

if SETTINGS.log_json:
shared_processors.append(format_exc_info)
shared_processors.append(format_json)
formatter = ProcessorFormatter(
foreign_pre_chain=shared_processors,
processor=JSONRenderer(),
)
else:
formatter = ProcessorFormatter(
foreign_pre_chain=shared_processors,
processor=ConsoleRenderer(
exception_formatter=structlog.dev.plain_traceback
),
)

processors = shared_processors + [
ProcessorFormatter.wrap_for_formatter,
]

# configuration for structlog based loggers
structlog.configure(
cache_logger_on_first_use=True,
# wrapper_class=AsyncBoundLogger,
wrapper_class=BoundLogger,
processors=processors,
context_class=dict,
logger_factory=LoggerFactory(),
)

# handler for low level logs that should be sent to STDERR
out_handler = logging.StreamHandler(sys.stderr)
out_handler.setLevel(level)
out_handler.addFilter(_MaxLevelFilter(logging.WARNING))
out_handler.setFormatter(formatter)
# handler for high level logs that should be sent to STDERR
error_handler = logging.StreamHandler(sys.stderr)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)

root_logger = logging.getLogger()
root_logger.handlers.clear()
root_logger.setLevel(SETTINGS.log_level.upper())
root_logger.addHandler(out_handler)
root_logger.addHandler(error_handler)


def format_json(_: Any, __: Any, ed: Dict[str, str]) -> Dict[str, str]:
"""Stackdriver uses `message` and `severity` keys to display logs"""
ed["message"] = ed.pop("event")
ed["severity"] = ed.pop("level", "info").upper()
return ed


class _MaxLevelFilter(Filter):
def __init__(self, highest_log_level: int) -> None:
self._highest_log_level = highest_log_level

def filter(self, log_record: LogRecord) -> bool:
return log_record.levelno <= self._highest_log_level
3 changes: 3 additions & 0 deletions investigraph/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class Settings(BaseSettings):

archive_uri: str = Field(str((Path.cwd() / "data" / "archive").absolute().as_uri()))

log_json: bool = Field(alias="log_json", default=False)
log_level: str = Field(alias="log_level", default="info")


SETTINGS = Settings()
DEBUG = SETTINGS.debug
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import requests
from prefect.testing.utilities import prefect_test_harness

from investigraph.logging import configure_logging
from investigraph.model import Config

FIXTURES_PATH = (Path(__file__).parent / "fixtures").absolute()


@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
configure_logging()
with prefect_test_harness():
yield

Expand Down
15 changes: 9 additions & 6 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_cli_run(fixtures_path: Path):


def test_cli_inspect(fixtures_path: Path):
# FIXME stdout/stderr logging in result?
config = str(fixtures_path / "gdho" / "config.local.yml")
result = runner.invoke(cli, ["inspect", config])
assert result.exit_code == 0
Expand All @@ -41,15 +42,17 @@ def test_cli_inspect(fixtures_path: Path):
assert result.exit_code == 0
tested = False
for line in result.stdout.split("\n"):
data = orjson.loads(line)
proxy = make_proxy(data)
assert "gdho" in proxy.datasets
tested = True
break
if line.startswith("{"):
data = orjson.loads(line)
proxy = make_proxy(data)
assert "gdho" in proxy.datasets
tested = True
break
assert tested

result = runner.invoke(
cli, ["inspect", config, "--transform", "--to-json", "-l", "1"]
)
assert result.exit_code == 0
assert len(result.stdout.strip().split("\n")) == 1
proxies = [ln for ln in result.stdout.strip().split("\n") if ln.startswith("{")]
assert len(proxies) == 1

0 comments on commit 6925517

Please sign in to comment.