Skip to content

Commit

Permalink
Add TaskInstance sanitizer and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
levsh committed Apr 15, 2024
1 parent 5319a3e commit e3efd51
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 18 deletions.
3 changes: 3 additions & 0 deletions arrlio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import importlib.metadata
import logging
import sys

__version__ = importlib.metadata.version("arrlio")

logger = logging.getLogger("arrlio")

log_frmt = logging.Formatter("%(asctime)s %(levelname)-8s %(name)-27s lineno:%(lineno)4d -- %(message)s")
Expand Down
2 changes: 2 additions & 0 deletions arrlio/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class SerializerConfig(BaseSettings, ModuleConfigValidatorMixIn):
class Config(BaseSettings):
"""Config for backend."""

model_config = SettingsConfigDict()

id: str = Field(default_factory=lambda: f"{uuid4()}")
serializer: SerializerConfig = Field(default_factory=SerializerConfig)

Expand Down
13 changes: 8 additions & 5 deletions arrlio/backends/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ class ResultQueueMode(StrEnum):
EVENTS_EXCHANGE_DURABLE = False
EVENTS_QUEUE_TYPE = QueueType.CLASSIC
EVENTS_QUEUE_DURABLE = False
EVENTS_QUEUE_AUTO_DELETE = False
EVENTS_QUEUE_PREFIX = "arrlio."
EVENTS_QUEUE_AUTO_DELETE = True
EVENTS_QUEUE = "arrlio.events"
EVENTS_TTL = 600
EVENTS_PREFETCH_COUNT = 10

RESULTS_QUEUE_MODE = ResultQueueMode.COMMON
RESULTS_QUEUE_PREFIX = "arrlio."
RESULTS_QUEUE_AUTO_DELETE = True
RESULTS_QUEUE_DURABLE = False
RESULTS_QUEUE_TYPE = QueueType.CLASSIC
RESULTS_TTL = 600
Expand Down Expand Up @@ -667,14 +668,16 @@ class Config(base.Config):
events_queue_type: QueueType = Field(default_factory=lambda: EVENTS_QUEUE_TYPE)
events_queue_durable: bool = Field(default_factory=lambda: EVENTS_QUEUE_DURABLE)
events_queue_auto_delete: bool = Field(default_factory=lambda: EVENTS_QUEUE_AUTO_DELETE)
events_queue_prefix: str = Field(default_factory=lambda: EVENTS_QUEUE_PREFIX)
events_queue: str = Field(default_factory=lambda: EVENTS_QUEUE)
events_ttl: Optional[Timeout] = Field(default_factory=lambda: EVENTS_TTL)
events_prefetch_count: Optional[PositiveInt] = Field(default_factory=lambda: EVENTS_PREFETCH_COUNT)
results_queue_mode: ResultQueueMode = Field(default_factory=lambda: RESULTS_QUEUE_MODE)
results_queue_prefix: str = Field(default_factory=lambda: RESULTS_QUEUE_PREFIX)
""".. note:: Only valid for `ResultQueueMode.COMMON`."""
results_queue_durable: bool = Field(default_factory=lambda: RESULTS_QUEUE_DURABLE)
""".. note:: Only valid for `ResultQueueMode.COMMON`."""
results_queue_auto_delete: bool = Field(default_factory=lambda: RESULTS_QUEUE_AUTO_DELETE)
""".. note:: Only valid for `ResultQueueMode.COMMON`."""
results_queue_type: QueueType = Field(default_factory=lambda: RESULTS_QUEUE_TYPE)
""".. note:: Only valid for `ResultQueueMode.COMMON`."""
results_ttl: Optional[Timeout] = Field(default_factory=lambda: RESULTS_TTL)
Expand Down Expand Up @@ -755,7 +758,7 @@ def __init__(self, config: Config):
conn=self._conn,
type=config.results_queue_type,
durable=config.results_queue_durable,
auto_delete=False,
auto_delete=config.results_queue_auto_delete,
prefetch_count=config.results_prefetch_count,
# expires=config.results_ttl,
# msg_ttl=config.results_ttl,
Expand All @@ -774,7 +777,7 @@ def __init__(self, config: Config):
timeout=config.timeout,
)
self._events_queue: Queue = Queue(
f"{config.events_queue_prefix}events.{config.id}",
config.events_queue,
conn=self._conn,
type=config.events_queue_type,
durable=config.events_queue_durable,
Expand Down
9 changes: 9 additions & 0 deletions arrlio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@ async def send_event(self, event: Event):

async def pop_result(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
async for task_result in self._backend.pop_task_result(task_instance):
if is_info_level():
logger.info(
"%s got result[idx=%s, exc=%s] for task %s[%s]",
self,
task_result.idx,
task_result.exc is not None,
task_instance.name,
task_instance.task_id,
)
if task_result.exc:
if isinstance(task_result.exc, TaskError):
if task_result.exc.task_id is None:
Expand Down
15 changes: 10 additions & 5 deletions arrlio/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
from dataclasses import asdict, dataclass, field
from types import TracebackType
from typing import Any, Callable, Dict
from typing import Any, Callable, ClassVar, Dict
from uuid import UUID, uuid4

from rich.pretty import pretty_repr
Expand Down Expand Up @@ -128,6 +128,8 @@ class TaskInstance(Task):
kwds: Kwds = field(default_factory=dict) # pylint: disable=used-before-assignment
meta: Dict = field(default_factory=dict) # pylint: disable=used-before-assignment

sanitizer: ClassVar[Callable | None] = None

def __post_init__(self):
if self.task_id is None:
object.__setattr__(self, "task_id", uuid4())
Expand All @@ -139,10 +141,13 @@ def __post_init__(self):
def dict(self, exclude: list[str] | None = None, sanitize: bool | None = None):
data = super().dict(exclude=exclude, sanitize=sanitize)
if sanitize:
if data["args"]:
data["args"] = "<hiden>"
if data["kwds"]:
data["kwds"] = "<hiden>"
if self.sanitizer:
data = self.sanitizer(data) # pylint: disable=not-callable
else:
if data["args"]:
data["args"] = "<hiden>"
if data["kwds"]:
data["kwds"] = "<hiden>"
return data

def __call__(self, meta: bool = None): # pylint: disable=arguments-differ
Expand Down
4 changes: 2 additions & 2 deletions arrlio/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import contextlib
import logging

from pydantic_settings import BaseSettings
from pydantic_settings import BaseSettings, SettingsConfigDict

from arrlio.models import TaskInstance, TaskResult

logger = logging.getLogger("arrlio.plugins.base")


class Config(BaseSettings):
pass
model_config = SettingsConfigDict()


class Plugin(abc.ABC):
Expand Down
4 changes: 2 additions & 2 deletions arrlio/serializers/base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import abc
from typing import Any

from pydantic_settings import BaseSettings
from pydantic_settings import BaseSettings, SettingsConfigDict

from arrlio.models import Event, TaskInstance, TaskResult


class Config(BaseSettings):
pass
model_config = SettingsConfigDict()


class Serializer(abc.ABC):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "arrlio"
version = "0.21.0"
version = "0.22.0a0"
description = ""
authors = ["Roman Koshel <roma.koshel@gmail.com>"]
license = "MIT"
Expand Down
6 changes: 3 additions & 3 deletions tests/small/backends/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test__init(self, cleanup):
assert config.events_queue_type == rabbitmq.EVENTS_QUEUE_TYPE
assert config.events_queue_durable == rabbitmq.EVENTS_QUEUE_DURABLE
assert config.events_queue_auto_delete == rabbitmq.EVENTS_QUEUE_AUTO_DELETE
assert config.events_queue_prefix == rabbitmq.EVENTS_QUEUE_PREFIX
assert config.events_queue == rabbitmq.EVENTS_QUEUE
assert config.events_ttl == rabbitmq.EVENTS_TTL
assert config.events_prefetch_count == rabbitmq.EVENTS_PREFETCH_COUNT
assert config.results_queue_mode == rabbitmq.RESULTS_QUEUE_MODE
Expand Down Expand Up @@ -59,7 +59,7 @@ def test__init_custom(self, cleanup):
events_queue_type="quorum",
events_queue_durable=False,
events_queue_auto_delete=False,
events_queue_prefix="events_queue_prefix",
events_queue="events_queue",
events_ttl=789,
events_prefetch_count=20,
results_queue_mode="common",
Expand Down Expand Up @@ -87,7 +87,7 @@ def test__init_custom(self, cleanup):
assert config.events_queue_type == "quorum"
assert config.events_queue_durable is False
assert config.events_queue_auto_delete is False
assert config.events_queue_prefix == "events_queue_prefix"
assert config.events_queue == "events_queue"
assert config.events_ttl == 789
assert config.events_prefetch_count == 20
assert config.results_queue_mode == "common"
Expand Down

0 comments on commit e3efd51

Please sign in to comment.