From 957286c51caa933677c72b7d44fa97938dbcbbb1 Mon Sep 17 00:00:00 2001 From: Roman Koshel Date: Tue, 8 Aug 2023 21:41:53 +0300 Subject: [PATCH] Wip --- arrlio/__init__.py | 2 +- arrlio/backends/local.py | 21 ++++++------ arrlio/backends/rabbitmq.py | 15 ++++++--- arrlio/core.py | 5 +-- arrlio/models.py | 45 +++++++++++++++++++------ arrlio/plugins/graphs.py | 11 +++--- arrlio/serializers/json.py | 2 +- arrlio/settings.py | 9 +++++ arrlio/types.py | 2 +- tests/conftest.py | 3 +- tests/small/backends/test_rabbitmq.py | 48 +++++++++++++-------------- 11 files changed, 100 insertions(+), 63 deletions(-) diff --git a/arrlio/__init__.py b/arrlio/__init__.py index 5c53b4a..2dc1a9a 100644 --- a/arrlio/__init__.py +++ b/arrlio/__init__.py @@ -4,7 +4,7 @@ logger = logging.getLogger("arrlio") logger.setLevel(logging.ERROR) -log_frmt = logging.Formatter("%(asctime)s %(levelname)8s %(name)27s lineno:%(lineno)4d -- %(message)s") +log_frmt = logging.Formatter("%(asctime)s %(levelname)-8s %(name)-27s lineno:%(lineno)4d -- %(message)s") log_hndl = logging.StreamHandler(stream=sys.stderr) log_hndl.setFormatter(log_frmt) logger.addHandler(log_hndl) diff --git a/arrlio/backends/local.py b/arrlio/backends/local.py index 4e03773..e751649 100644 --- a/arrlio/backends/local.py +++ b/arrlio/backends/local.py @@ -11,8 +11,8 @@ from uuid import UUID from pydantic import Field, PositiveInt -from rich.pretty import pretty_repr +from arrlio import settings from arrlio.backends import base from arrlio.exc import TaskClosedError, TaskResultError from arrlio.models import Event, TaskInstance, TaskResult @@ -98,9 +98,6 @@ async def send_task(self, task_instance: TaskInstance, **kwds): if task_instance.result_return and task_instance.task_id not in self._results: self._results[task_instance.task_id] = [asyncio_Event(), [], None] - if is_debug_level(): - logger.debug("%s: send\n%s", self, pretty_repr(task_instance.dict())) - self._task_queues[task_instance.queue].put_nowait( ( (Priority.le - task_instance.priority) if task_instance.priority else Priority.ge, @@ -134,7 +131,11 @@ async def fn(queue: str): task_instance: TaskInstance = self._serializer.loads_task_instance(data) if is_debug_level(): - logger.debug("%s: got\n%s", self, pretty_repr(task_instance.dict())) + logger.debug( + "%s: got\n%s", + self, + task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE), + ) aio_task: asyncio.Task = create_task(callback(task_instance)) @@ -174,7 +175,7 @@ async def push_task_result(self, task_instance: TaskInstance, task_result: TaskR self, task_id, task_instance.name, - pretty_repr(task_result.dict()), + task_result.pretty_repr(sanitize=settings.LOG_SANITIZE), ) results = self._results @@ -227,7 +228,7 @@ async def fn(): self, task_id, task_instance.name, - pretty_repr(task_result.dict()), + task_result.pretty_repr(sanitize=settings.LOG_SANITIZE), ) if isinstance(task_result.exc, TaskClosedError): @@ -250,7 +251,7 @@ async def fn(): self, task_id, task_instance.name, - pretty_repr(task_result.dict()), + task_result.pretty_repr(sanitize=settings.LOG_SANITIZE), ) yield task_result @@ -275,7 +276,7 @@ async def close_task(self, task_instance: TaskInstance, idx: Tuple[str, int] = N async def send_event(self, event: Event): if is_debug_level(): - logger.debug("%s: put\n%s", self, pretty_repr(event.dict())) + logger.debug("%s: put\n%s", self, event.pretty_repr(sanitize=settings.LOG_SANITIZE)) self._events[event.event_id] = self._serializer.dumps_event(event) @@ -323,7 +324,7 @@ async def fn(): event: Event = self._serializer.loads_event(events_pop(next(iter(events_keys())))) if is_debug_level(): - logger.debug("%s: got\n%s", self, pretty_repr(event.dict())) + logger.debug("%s: got\n%s", self, event.pretty_repr(sanitize=settings.LOG_SANITIZE)) for callback, event_types in event_callbacks.values(): if event_types is not None and event.type not in event_types: diff --git a/arrlio/backends/rabbitmq.py b/arrlio/backends/rabbitmq.py index ae1b650..884bfe3 100644 --- a/arrlio/backends/rabbitmq.py +++ b/arrlio/backends/rabbitmq.py @@ -14,8 +14,8 @@ import aiormq.exceptions import yarl from pydantic import Field -from rich.pretty import pretty_repr +from arrlio import settings from arrlio.backends import base from arrlio.exc import GraphError, TaskClosedError, TaskResultError from arrlio.models import Event, TaskInstance, TaskResult @@ -808,7 +808,7 @@ async def _on_result_message( self, channel, task_id, - pretty_repr(task_result.dict()), + task_result.pretty_repr(sanitize=settings.LOG_SANITIZE), ) storage = self._allocate_results_storage(task_id) @@ -851,7 +851,11 @@ async def _on_task_message(self, callback, channel: aiormq.Channel, message: aio task_instance.extra["rabbitmq:reply_to"] = message.header.properties.reply_to if is_debug_level(): - logger.debug("%s: got task\n%s", self, pretty_repr(task_instance.dict())) + logger.debug( + "%s: got task\n%s", + self, + task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE), + ) if not task_instance.ack_late: await channel.basic_ack(message.delivery.delivery_tag) @@ -869,6 +873,7 @@ async def close(self): await self._tasks_exchange.close() for queue in self._task_queues.values(): await queue.close() + await self._results_queue.close(delete=True) await self._conn.close() def _reply_to(self, task_instance: TaskInstance) -> str: @@ -963,7 +968,7 @@ async def _push_task_result( task_instance.name, exchange.name, routing_key, - pretty_repr(task_result.dict()), + task_result.pretty_repr(sanitize=settings.LOG_SANITIZE), ) await exchange.publish( @@ -1100,7 +1105,7 @@ async def on_message(channel: aiormq.Channel, message: aiormq.abc.DeliveredMessa event: Event = self._serializer.loads_event(message.body) if is_debug_level(): - logger.debug("%s: got event\n%s", self, pretty_repr(event.dict())) + logger.debug("%s: got event\n%s", self, event.pretty_repr(sanitize=settings.LOG_SANITIZE)) await channel.basic_ack(message.delivery.delivery_tag) diff --git a/arrlio/core.py b/arrlio/core.py index b8db499..9d3149e 100644 --- a/arrlio/core.py +++ b/arrlio/core.py @@ -11,6 +11,7 @@ from rich.pretty import pretty_repr from roview import rodict +from arrlio import settings from arrlio.backends.base import Backend from arrlio.exc import GraphError, TaskClosedError, TaskError from arrlio.executor import Executor @@ -247,7 +248,7 @@ async def send_task( logger.info( "%s: send task instance\n%s", self, - pretty_repr(task_instance.dict(exclude=["args", "kwds"])), + task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE), ) await self._execute_hooks("on_task_send", task_instance) @@ -258,7 +259,7 @@ async def send_task( async def send_event(self, event: Event): if is_info_level(): - logger.info("%s: send event\n%s", self, pretty_repr(event.dict())) + logger.info("%s: send event\n%s", self, event.pretty_repr(sanitize=settings.LOG_SANITIZE)) await self._backend.send_event(event) diff --git a/arrlio/models.py b/arrlio/models.py index c8c30b1..78a6bb3 100644 --- a/arrlio/models.py +++ b/arrlio/models.py @@ -6,6 +6,7 @@ from uuid import UUID, uuid4 from pydantic import Field, create_model +from rich.pretty import pretty_repr from roview import rodict, roset from arrlio.exc import GraphError @@ -72,7 +73,7 @@ def __call__(self, *args, **kwds) -> Any: return self.func(*args, **kwds) - def dict(self, exclude: List[str] = None): + def dict(self, exclude: List[str] = None, sanitize: bool = None): """Convert to dict. Args: @@ -85,6 +86,9 @@ def dict(self, exclude: List[str] = None): exclude = exclude or [] return {k: v for k, v in asdict(self).items() if k not in exclude} + def pretty_repr(self, exclude: List[str] = None, sanitize: bool = None): + return pretty_repr(self.dict(exclude=exclude, sanitize=sanitize)) + def instantiate( self, task_id: TaskId = None, @@ -126,8 +130,8 @@ class TaskInstance(Task): """ task_id: UUID = field(default_factory=uuid4) - args: tuple = field(default_factory=tuple) - kwds: dict = field(default_factory=dict) + args: Args = field(default_factory=tuple) + kwds: Kwds = field(default_factory=dict) meta: dict = field(default_factory=dict) def __post_init__(self): @@ -135,9 +139,18 @@ def __post_init__(self): object.__setattr__(self, "task_id", uuid4()) elif isinstance(self.task_id, str): object.__setattr__(self, "task_id", UUID(self.task_id)) - if isinstance(self.args, list): + if not isinstance(self.args, tuple): object.__setattr__(self, "args", tuple(self.args)) + def dict(self, exclude: List[str] = None, sanitize: bool = None): + data = super().dict(exclude=exclude, sanitize=sanitize) + if sanitize: + if data["args"]: + data["args"] = "..." + if data["kwds"]: + data["kwds"] = "..." + return data + def __call__(self, meta: bool = None): # pylint: disable=arguments-differ """Call `arrlio.models.TaskInstance`. @@ -193,7 +206,7 @@ def validate(self): kwds[k] = getattr(model, k) object.__setattr__(self, "args", tuple(args)) - object.__setattr__(self, "kwds", kwds) + object.__setattr__(self, "kwds", dict(kwds)) @dataclass(frozen=True) @@ -209,7 +222,7 @@ class TaskResult: def set_idx(self, idx: Tuple[str, int]): object.__setattr__(self, "idx", idx) - def dict(self): + def dict(self, sanitize: bool = None): """Convert to dict. Returns: @@ -217,13 +230,16 @@ def dict(self): """ return { - "res": self.res, + "res": self.res if self.res is None or not sanitize else "...", "exc": self.exc, "trb": self.trb, "idx": self.idx, "routes": self.routes, } + def pretty_repr(self, sanitize: bool = None): + return pretty_repr(self.dict(sanitize=sanitize)) + @dataclass(frozen=True) class Event: @@ -250,14 +266,20 @@ def __post_init__(self): elif isinstance(self.dt, str): object.__setattr__(self, "dt", datetime.datetime.fromisoformat(self.dt)) - def dict(self): + def dict(self, sanitize: bool = None): """Convert to dict. Returns: `arrlio.models.Event` as `dict`. """ - return asdict(self) + data = asdict(self) + if hasattr(data["data"], "sanitize"): + data["data"] = data["data"].sanitize() + return data + + def pretty_repr(self, sanitize: bool = None): + return pretty_repr(self.dict(sanitize=sanitize)) class Graph: @@ -332,7 +354,7 @@ def add_edge(self, node_id_from: str, node_id_to: str, routes: Union[str, List[s routes = [routes] self.edges.__original__.setdefault(node_id_from, []).append([node_id_to, routes]) - def dict(self): + def dict(self, sanitize: bool = None): """Convert to the dict. Returns: @@ -362,3 +384,6 @@ def from_dict(cls, data: Dict) -> "Graph": edges=data["edges"], roots=data["roots"], ) + + def pretty_repr(self, sanitize: bool = None): + return pretty_repr(self.dict(sanitize=sanitize)) diff --git a/arrlio/plugins/graphs.py b/arrlio/plugins/graphs.py index 3479142..2b6c98b 100644 --- a/arrlio/plugins/graphs.py +++ b/arrlio/plugins/graphs.py @@ -4,16 +4,13 @@ from typing import Dict, Tuple from uuid import uuid4 -from rich.pretty import pretty_repr - -from arrlio import AsyncResult, registered_tasks +from arrlio import AsyncResult, registered_tasks, settings from arrlio.exc import ArrlioError, GraphError from arrlio.models import Event, Graph, Task, TaskInstance, TaskResult from arrlio.plugins import base +from arrlio.utils import is_info_level logger = logging.getLogger("arrlio.plugins.graphs") -is_info = logger.isEnabledFor(logging.INFO) -is_debug = logger.isEnabledFor(logging.DEBUG) class Config(base.Config): @@ -199,12 +196,12 @@ async def _send_graph( extra = task_instance.extra extra["graph:call_id"] = f"{uuid4()}" - if is_info: + if is_info_level(): logger.info( "%s: send graph '%s' task\n%s", self, graph.name, - pretty_repr(task_instance.dict(exclude=["data.args", "data.kwds"])), + task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE), ) await self.app.backend.send_task(task_instance) diff --git a/arrlio/serializers/json.py b/arrlio/serializers/json.py index fa22853..3b7be7b 100644 --- a/arrlio/serializers/json.py +++ b/arrlio/serializers/json.py @@ -50,7 +50,7 @@ def dumps_task_instance(self, task_instance: TaskInstance, **kwds) -> bytes: extra = data["extra"] if graph := extra.get("graph:graph"): extra["graph:graph"] = graph.dict() - return self.dumps({k: v for k, v in data.items() if v is not None}, cls=self.config.encoder) + return self.dumps({k: v for k, v in data.items() if v is not None}) def loads_task_instance(self, data: bytes) -> TaskInstance: """Loads `arrlio.models.TaskInstance` object from json encoded string.""" diff --git a/arrlio/settings.py b/arrlio/settings.py index b5c2250..138aad5 100644 --- a/arrlio/settings.py +++ b/arrlio/settings.py @@ -26,6 +26,9 @@ TASK_QUEUES = [TASK_QUEUE] +LOG_LEVEL = "ERROR" +LOG_SANITIZE = True + class BaseConfig(BaseSettings): class Config: @@ -108,6 +111,11 @@ class Config: env_prefix = f"{ENV_PREFIX}EXECUTOR_" +# class LoggingConfig(BaseConfig): +# level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field(default_factory=lambda: LOG_LEVEL) +# hide_args_kwds: bool = Field(default_factory=lambda: LOG_HIDE_ARGS_KWDS) + + class Config(BaseConfig): app_id: constr(min_length=1) = Field(default_factory=lambda: f"{uuid4()}") backend: BackendConfig = Field(default_factory=BackendConfig) @@ -116,6 +124,7 @@ class Config(BaseConfig): task_queues: Set[str] = Field(default_factory=lambda: TASK_QUEUES) plugins: List[PluginConfig] = Field(default_factory=list) executor: ExecutorConfig = Field(default_factory=ExecutorConfig) + # logging: LoggingConfig = Field(default_factory=LoggingConfig) class Config: env_prefix = ENV_PREFIX diff --git a/arrlio/types.py b/arrlio/types.py index b5eee15..bc2559d 100644 --- a/arrlio/types.py +++ b/arrlio/types.py @@ -23,7 +23,7 @@ class Priority(ConstrainedInt): TaskId = Union[str, UUID] Args = Union[List, Tuple] -Kwds = Dict +Kwds = Union[Dict] class SecretAnyUrl(AnyUrl): diff --git a/tests/conftest.py b/tests/conftest.py index 21f441a..d3c73de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import gc +import logging # import os import time @@ -9,7 +10,7 @@ from arrlio import App, Config, logger from tests import utils -logger.setLevel("DEBUG") +logger.setLevel(logging.DEBUG) @pytest.fixture(scope="function") diff --git a/tests/small/backends/test_rabbitmq.py b/tests/small/backends/test_rabbitmq.py index 3fdca00..2bc65d4 100644 --- a/tests/small/backends/test_rabbitmq.py +++ b/tests/small/backends/test_rabbitmq.py @@ -97,6 +97,16 @@ def test__init_custom(self, cleanup): assert config.results_ttl == 10 +@pytest.fixture +def mock_connect(): + with mock.patch("aiormq.connect") as m: + mock_conn = mock.AsyncMock() + mock_conn.is_closed = False + mock_conn.closing = asyncio.Future() + m.return_value = mock_conn + yield m + + class TestConnection: @pytest.mark.asyncio async def test__init(self, cleanup): @@ -170,33 +180,23 @@ def cb_with_exception(): await conn.close() @pytest.mark.asyncio - async def test_connect(self, cleanup): + async def test_connect(self, mock_connect, cleanup): conn = rabbitmq.Connection(["amqp://admin@example.com"]) try: - with mock.patch("aiormq.connect") as mock_connect: - mock_conn = mock.AsyncMock() - mock_conn.is_closed = False - mock_conn.closing = asyncio.Future() - mock_connect.return_value = mock_conn - await conn.open() - mock_connect.assert_awaited_once_with("amqp://admin@example.com") - assert conn.is_open is True - assert conn.is_closed is False - assert conn._conn is not None + await conn.open() + mock_connect.assert_awaited_once_with("amqp://admin@example.com") + assert conn.is_open is True + assert conn.is_closed is False + assert conn._conn is not None finally: await conn.close() @pytest.mark.asyncio - async def test_channel(self, cleanup): + async def test_channel(self, mock_connect, cleanup): conn = rabbitmq.Connection(["amqp://admin@example.com"]) try: - with mock.patch("aiormq.connect") as mock_connect: - mock_conn = mock.AsyncMock() - mock_conn.is_closed = False - mock_conn.closing = asyncio.Future() - mock_connect.return_value = mock_conn - await conn.open() - await conn.channel() + await conn.open() + await conn.channel() finally: await conn.close() @@ -219,7 +219,7 @@ async def test_repr(self, cleanup): class TestBackend: @pytest.mark.asyncio - async def test__init(self, cleanup): + async def test__init(self, mock_connect, cleanup): backend = rabbitmq.Backend(rabbitmq.Config()) try: assert isinstance(backend._serializer, serializers.json.Serializer) @@ -232,7 +232,7 @@ async def test__init(self, cleanup): # assert isinstance(backend.serializer, nop.Serializer) @pytest.mark.asyncio - async def test_str(self, cleanup): + async def test_str(self, mock_connect, cleanup): backend = rabbitmq.Backend(rabbitmq.Config()) try: assert str(backend) == "Backend[Connection[localhost]]" @@ -240,7 +240,7 @@ async def test_str(self, cleanup): await backend.close() @pytest.mark.asyncio - async def test_repr(self, cleanup): + async def test_repr(self, mock_connect, cleanup): backend = rabbitmq.Backend(rabbitmq.Config()) try: assert repr(backend) == "Backend[Connection[localhost]]" @@ -248,11 +248,9 @@ async def test_repr(self, cleanup): await backend.close() @pytest.mark.asyncio - async def test_on_connection_open(self, cleanup): + async def test_on_connection_open(self, mock_connect, cleanup): backend = rabbitmq.Backend(rabbitmq.Config()) try: - mock_channel = mock.AsyncMock() - mock_channel.is_closed = False with mock.patch.object(backend._conn, "channel"): await backend._on_conn_open() assert backend._direct_reply_to_consumer is not None