From e3efd51162960bbb10b3656e03d700fdc8adfdcd Mon Sep 17 00:00:00 2001 From: Roman Koshel Date: Mon, 15 Apr 2024 14:01:42 +0300 Subject: [PATCH] Add TaskInstance sanitizer and fixes --- arrlio/__init__.py | 3 +++ arrlio/backends/base.py | 2 ++ arrlio/backends/rabbitmq.py | 13 ++++++++----- arrlio/core.py | 9 +++++++++ arrlio/models.py | 15 ++++++++++----- arrlio/plugins/base.py | 4 ++-- arrlio/serializers/base.py | 4 ++-- pyproject.toml | 2 +- tests/small/backends/test_rabbitmq.py | 6 +++--- 9 files changed, 40 insertions(+), 18 deletions(-) diff --git a/arrlio/__init__.py b/arrlio/__init__.py index a91a81a..2077239 100644 --- a/arrlio/__init__.py +++ b/arrlio/__init__.py @@ -1,6 +1,9 @@ +import importlib.metadata import logging import sys +__version__ = importlib.metadata.version("arrlio") + logger = logging.getLogger("arrlio") log_frmt = logging.Formatter("%(asctime)s %(levelname)-8s %(name)-27s lineno:%(lineno)4d -- %(message)s") diff --git a/arrlio/backends/base.py b/arrlio/backends/base.py index 7ae8821..171e28b 100644 --- a/arrlio/backends/base.py +++ b/arrlio/backends/base.py @@ -33,6 +33,8 @@ class SerializerConfig(BaseSettings, ModuleConfigValidatorMixIn): class Config(BaseSettings): """Config for backend.""" + model_config = SettingsConfigDict() + id: str = Field(default_factory=lambda: f"{uuid4()}") serializer: SerializerConfig = Field(default_factory=SerializerConfig) diff --git a/arrlio/backends/rabbitmq.py b/arrlio/backends/rabbitmq.py index 529689e..175f4a3 100644 --- a/arrlio/backends/rabbitmq.py +++ b/arrlio/backends/rabbitmq.py @@ -64,13 +64,14 @@ class ResultQueueMode(StrEnum): EVENTS_EXCHANGE_DURABLE = False EVENTS_QUEUE_TYPE = QueueType.CLASSIC EVENTS_QUEUE_DURABLE = False -EVENTS_QUEUE_AUTO_DELETE = False -EVENTS_QUEUE_PREFIX = "arrlio." +EVENTS_QUEUE_AUTO_DELETE = True +EVENTS_QUEUE = "arrlio.events" EVENTS_TTL = 600 EVENTS_PREFETCH_COUNT = 10 RESULTS_QUEUE_MODE = ResultQueueMode.COMMON RESULTS_QUEUE_PREFIX = "arrlio." +RESULTS_QUEUE_AUTO_DELETE = True RESULTS_QUEUE_DURABLE = False RESULTS_QUEUE_TYPE = QueueType.CLASSIC RESULTS_TTL = 600 @@ -667,7 +668,7 @@ class Config(base.Config): events_queue_type: QueueType = Field(default_factory=lambda: EVENTS_QUEUE_TYPE) events_queue_durable: bool = Field(default_factory=lambda: EVENTS_QUEUE_DURABLE) events_queue_auto_delete: bool = Field(default_factory=lambda: EVENTS_QUEUE_AUTO_DELETE) - events_queue_prefix: str = Field(default_factory=lambda: EVENTS_QUEUE_PREFIX) + events_queue: str = Field(default_factory=lambda: EVENTS_QUEUE) events_ttl: Optional[Timeout] = Field(default_factory=lambda: EVENTS_TTL) events_prefetch_count: Optional[PositiveInt] = Field(default_factory=lambda: EVENTS_PREFETCH_COUNT) results_queue_mode: ResultQueueMode = Field(default_factory=lambda: RESULTS_QUEUE_MODE) @@ -675,6 +676,8 @@ class Config(base.Config): """.. note:: Only valid for `ResultQueueMode.COMMON`.""" results_queue_durable: bool = Field(default_factory=lambda: RESULTS_QUEUE_DURABLE) """.. note:: Only valid for `ResultQueueMode.COMMON`.""" + results_queue_auto_delete: bool = Field(default_factory=lambda: RESULTS_QUEUE_AUTO_DELETE) + """.. note:: Only valid for `ResultQueueMode.COMMON`.""" results_queue_type: QueueType = Field(default_factory=lambda: RESULTS_QUEUE_TYPE) """.. note:: Only valid for `ResultQueueMode.COMMON`.""" results_ttl: Optional[Timeout] = Field(default_factory=lambda: RESULTS_TTL) @@ -755,7 +758,7 @@ def __init__(self, config: Config): conn=self._conn, type=config.results_queue_type, durable=config.results_queue_durable, - auto_delete=False, + auto_delete=config.results_queue_auto_delete, prefetch_count=config.results_prefetch_count, # expires=config.results_ttl, # msg_ttl=config.results_ttl, @@ -774,7 +777,7 @@ def __init__(self, config: Config): timeout=config.timeout, ) self._events_queue: Queue = Queue( - f"{config.events_queue_prefix}events.{config.id}", + config.events_queue, conn=self._conn, type=config.events_queue_type, durable=config.events_queue_durable, diff --git a/arrlio/core.py b/arrlio/core.py index 5c5ffc0..1246b6e 100644 --- a/arrlio/core.py +++ b/arrlio/core.py @@ -269,6 +269,15 @@ async def send_event(self, event: Event): async def pop_result(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]: async for task_result in self._backend.pop_task_result(task_instance): + if is_info_level(): + logger.info( + "%s got result[idx=%s, exc=%s] for task %s[%s]", + self, + task_result.idx, + task_result.exc is not None, + task_instance.name, + task_instance.task_id, + ) if task_result.exc: if isinstance(task_result.exc, TaskError): if task_result.exc.task_id is None: diff --git a/arrlio/models.py b/arrlio/models.py index 6733473..23cee6c 100644 --- a/arrlio/models.py +++ b/arrlio/models.py @@ -1,7 +1,7 @@ import datetime from dataclasses import asdict, dataclass, field from types import TracebackType -from typing import Any, Callable, Dict +from typing import Any, Callable, ClassVar, Dict from uuid import UUID, uuid4 from rich.pretty import pretty_repr @@ -128,6 +128,8 @@ class TaskInstance(Task): kwds: Kwds = field(default_factory=dict) # pylint: disable=used-before-assignment meta: Dict = field(default_factory=dict) # pylint: disable=used-before-assignment + sanitizer: ClassVar[Callable | None] = None + def __post_init__(self): if self.task_id is None: object.__setattr__(self, "task_id", uuid4()) @@ -139,10 +141,13 @@ def __post_init__(self): def dict(self, exclude: list[str] | None = None, sanitize: bool | None = None): data = super().dict(exclude=exclude, sanitize=sanitize) if sanitize: - if data["args"]: - data["args"] = "" - if data["kwds"]: - data["kwds"] = "" + if self.sanitizer: + data = self.sanitizer(data) # pylint: disable=not-callable + else: + if data["args"]: + data["args"] = "" + if data["kwds"]: + data["kwds"] = "" return data def __call__(self, meta: bool = None): # pylint: disable=arguments-differ diff --git a/arrlio/plugins/base.py b/arrlio/plugins/base.py index 001824a..5d73762 100644 --- a/arrlio/plugins/base.py +++ b/arrlio/plugins/base.py @@ -2,7 +2,7 @@ import contextlib import logging -from pydantic_settings import BaseSettings +from pydantic_settings import BaseSettings, SettingsConfigDict from arrlio.models import TaskInstance, TaskResult @@ -10,7 +10,7 @@ class Config(BaseSettings): - pass + model_config = SettingsConfigDict() class Plugin(abc.ABC): diff --git a/arrlio/serializers/base.py b/arrlio/serializers/base.py index 0da2ea9..df87ff5 100644 --- a/arrlio/serializers/base.py +++ b/arrlio/serializers/base.py @@ -1,13 +1,13 @@ import abc from typing import Any -from pydantic_settings import BaseSettings +from pydantic_settings import BaseSettings, SettingsConfigDict from arrlio.models import Event, TaskInstance, TaskResult class Config(BaseSettings): - pass + model_config = SettingsConfigDict() class Serializer(abc.ABC): diff --git a/pyproject.toml b/pyproject.toml index 6682585..11e13d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "arrlio" -version = "0.21.0" +version = "0.22.0a0" description = "" authors = ["Roman Koshel "] license = "MIT" diff --git a/tests/small/backends/test_rabbitmq.py b/tests/small/backends/test_rabbitmq.py index 3a8c955..f853af9 100644 --- a/tests/small/backends/test_rabbitmq.py +++ b/tests/small/backends/test_rabbitmq.py @@ -29,7 +29,7 @@ def test__init(self, cleanup): assert config.events_queue_type == rabbitmq.EVENTS_QUEUE_TYPE assert config.events_queue_durable == rabbitmq.EVENTS_QUEUE_DURABLE assert config.events_queue_auto_delete == rabbitmq.EVENTS_QUEUE_AUTO_DELETE - assert config.events_queue_prefix == rabbitmq.EVENTS_QUEUE_PREFIX + assert config.events_queue == rabbitmq.EVENTS_QUEUE assert config.events_ttl == rabbitmq.EVENTS_TTL assert config.events_prefetch_count == rabbitmq.EVENTS_PREFETCH_COUNT assert config.results_queue_mode == rabbitmq.RESULTS_QUEUE_MODE @@ -59,7 +59,7 @@ def test__init_custom(self, cleanup): events_queue_type="quorum", events_queue_durable=False, events_queue_auto_delete=False, - events_queue_prefix="events_queue_prefix", + events_queue="events_queue", events_ttl=789, events_prefetch_count=20, results_queue_mode="common", @@ -87,7 +87,7 @@ def test__init_custom(self, cleanup): assert config.events_queue_type == "quorum" assert config.events_queue_durable is False assert config.events_queue_auto_delete is False - assert config.events_queue_prefix == "events_queue_prefix" + assert config.events_queue == "events_queue" assert config.events_ttl == 789 assert config.events_prefetch_count == 20 assert config.results_queue_mode == "common"