Skip to content

Commit 1d842ea

Browse files
committed
Add TaskInstance sanitizer and fixes
1 parent 5319a3e commit 1d842ea

File tree

9 files changed

+40
-18
lines changed

9 files changed

+40
-18
lines changed

arrlio/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import importlib.metadata
12
import logging
23
import sys
34

5+
__version__ = importlib.metadata.version("arrlio")
6+
47
logger = logging.getLogger("arrlio")
58

69
log_frmt = logging.Formatter("%(asctime)s %(levelname)-8s %(name)-27s lineno:%(lineno)4d -- %(message)s")

arrlio/backends/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class SerializerConfig(BaseSettings, ModuleConfigValidatorMixIn):
3333
class Config(BaseSettings):
3434
"""Config for backend."""
3535

36+
model_config = SettingsConfigDict()
37+
3638
id: str = Field(default_factory=lambda: f"{uuid4()}")
3739
serializer: SerializerConfig = Field(default_factory=SerializerConfig)
3840

arrlio/backends/rabbitmq.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ class ResultQueueMode(StrEnum):
6464
EVENTS_EXCHANGE_DURABLE = False
6565
EVENTS_QUEUE_TYPE = QueueType.CLASSIC
6666
EVENTS_QUEUE_DURABLE = False
67-
EVENTS_QUEUE_AUTO_DELETE = False
68-
EVENTS_QUEUE_PREFIX = "arrlio."
67+
EVENTS_QUEUE_AUTO_DELETE = True
68+
EVENTS_QUEUE = "arrlio.events"
6969
EVENTS_TTL = 600
7070
EVENTS_PREFETCH_COUNT = 10
7171

7272
RESULTS_QUEUE_MODE = ResultQueueMode.COMMON
7373
RESULTS_QUEUE_PREFIX = "arrlio."
74+
RESULTS_QUEUE_AUTO_DELETE = True
7475
RESULTS_QUEUE_DURABLE = False
7576
RESULTS_QUEUE_TYPE = QueueType.CLASSIC
7677
RESULTS_TTL = 600
@@ -667,14 +668,16 @@ class Config(base.Config):
667668
events_queue_type: QueueType = Field(default_factory=lambda: EVENTS_QUEUE_TYPE)
668669
events_queue_durable: bool = Field(default_factory=lambda: EVENTS_QUEUE_DURABLE)
669670
events_queue_auto_delete: bool = Field(default_factory=lambda: EVENTS_QUEUE_AUTO_DELETE)
670-
events_queue_prefix: str = Field(default_factory=lambda: EVENTS_QUEUE_PREFIX)
671+
events_queue: str = Field(default_factory=lambda: EVENTS_QUEUE)
671672
events_ttl: Optional[Timeout] = Field(default_factory=lambda: EVENTS_TTL)
672673
events_prefetch_count: Optional[PositiveInt] = Field(default_factory=lambda: EVENTS_PREFETCH_COUNT)
673674
results_queue_mode: ResultQueueMode = Field(default_factory=lambda: RESULTS_QUEUE_MODE)
674675
results_queue_prefix: str = Field(default_factory=lambda: RESULTS_QUEUE_PREFIX)
675676
""".. note:: Only valid for `ResultQueueMode.COMMON`."""
676677
results_queue_durable: bool = Field(default_factory=lambda: RESULTS_QUEUE_DURABLE)
677678
""".. note:: Only valid for `ResultQueueMode.COMMON`."""
679+
results_queue_auto_delete: bool = Field(default_factory=lambda: RESULTS_QUEUE_AUTO_DELETE)
680+
""".. note:: Only valid for `ResultQueueMode.COMMON`."""
678681
results_queue_type: QueueType = Field(default_factory=lambda: RESULTS_QUEUE_TYPE)
679682
""".. note:: Only valid for `ResultQueueMode.COMMON`."""
680683
results_ttl: Optional[Timeout] = Field(default_factory=lambda: RESULTS_TTL)
@@ -755,7 +758,7 @@ def __init__(self, config: Config):
755758
conn=self._conn,
756759
type=config.results_queue_type,
757760
durable=config.results_queue_durable,
758-
auto_delete=False,
761+
auto_delete=config.results_queue_auto_delete,
759762
prefetch_count=config.results_prefetch_count,
760763
# expires=config.results_ttl,
761764
# msg_ttl=config.results_ttl,
@@ -774,7 +777,7 @@ def __init__(self, config: Config):
774777
timeout=config.timeout,
775778
)
776779
self._events_queue: Queue = Queue(
777-
f"{config.events_queue_prefix}events.{config.id}",
780+
config.events_queue,
778781
conn=self._conn,
779782
type=config.events_queue_type,
780783
durable=config.events_queue_durable,

arrlio/core.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,15 @@ async def send_event(self, event: Event):
269269

270270
async def pop_result(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
271271
async for task_result in self._backend.pop_task_result(task_instance):
272+
if is_info_level():
273+
logger.info(
274+
"%s got result[idx=%s, exc=%s] for task %s[%s]",
275+
self,
276+
task_result.idx,
277+
task_result.exc is not None,
278+
task_instance.name,
279+
task_instance.task_id,
280+
)
272281
if task_result.exc:
273282
if isinstance(task_result.exc, TaskError):
274283
if task_result.exc.task_id is None:

arrlio/models.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import datetime
22
from dataclasses import asdict, dataclass, field
33
from types import TracebackType
4-
from typing import Any, Callable, Dict
4+
from typing import Any, Callable, ClassVar, Dict
55
from uuid import UUID, uuid4
66

77
from rich.pretty import pretty_repr
@@ -128,6 +128,8 @@ class TaskInstance(Task):
128128
kwds: Kwds = field(default_factory=dict) # pylint: disable=used-before-assignment
129129
meta: Dict = field(default_factory=dict) # pylint: disable=used-before-assignment
130130

131+
sanitizer: ClassVar[Callable | None] = None
132+
131133
def __post_init__(self):
132134
if self.task_id is None:
133135
object.__setattr__(self, "task_id", uuid4())
@@ -139,10 +141,13 @@ def __post_init__(self):
139141
def dict(self, exclude: list[str] | None = None, sanitize: bool | None = None):
140142
data = super().dict(exclude=exclude, sanitize=sanitize)
141143
if sanitize:
142-
if data["args"]:
143-
data["args"] = "<hiden>"
144-
if data["kwds"]:
145-
data["kwds"] = "<hiden>"
144+
if self.sanitizer:
145+
data = self.sanitizer(data) # pylint: disable=not-callable
146+
else:
147+
if data["args"]:
148+
data["args"] = "<hiden>"
149+
if data["kwds"]:
150+
data["kwds"] = "<hiden>"
146151
return data
147152

148153
def __call__(self, meta: bool = None): # pylint: disable=arguments-differ

arrlio/plugins/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
import contextlib
33
import logging
44

5-
from pydantic_settings import BaseSettings
5+
from pydantic_settings import BaseSettings, SettingsConfigDict
66

77
from arrlio.models import TaskInstance, TaskResult
88

99
logger = logging.getLogger("arrlio.plugins.base")
1010

1111

1212
class Config(BaseSettings):
13-
pass
13+
model_config = SettingsConfigDict()
1414

1515

1616
class Plugin(abc.ABC):

arrlio/serializers/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import abc
22
from typing import Any
33

4-
from pydantic_settings import BaseSettings
4+
from pydantic_settings import BaseSettings, SettingsConfigDict
55

66
from arrlio.models import Event, TaskInstance, TaskResult
77

88

99
class Config(BaseSettings):
10-
pass
10+
model_config = SettingsConfigDict()
1111

1212

1313
class Serializer(abc.ABC):

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "arrlio"
3-
version = "0.21.0"
3+
version = "0.22.0"
44
description = ""
55
authors = ["Roman Koshel <roma.koshel@gmail.com>"]
66
license = "MIT"

tests/small/backends/test_rabbitmq.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def test__init(self, cleanup):
2929
assert config.events_queue_type == rabbitmq.EVENTS_QUEUE_TYPE
3030
assert config.events_queue_durable == rabbitmq.EVENTS_QUEUE_DURABLE
3131
assert config.events_queue_auto_delete == rabbitmq.EVENTS_QUEUE_AUTO_DELETE
32-
assert config.events_queue_prefix == rabbitmq.EVENTS_QUEUE_PREFIX
32+
assert config.events_queue == rabbitmq.EVENTS_QUEUE
3333
assert config.events_ttl == rabbitmq.EVENTS_TTL
3434
assert config.events_prefetch_count == rabbitmq.EVENTS_PREFETCH_COUNT
3535
assert config.results_queue_mode == rabbitmq.RESULTS_QUEUE_MODE
@@ -59,7 +59,7 @@ def test__init_custom(self, cleanup):
5959
events_queue_type="quorum",
6060
events_queue_durable=False,
6161
events_queue_auto_delete=False,
62-
events_queue_prefix="events_queue_prefix",
62+
events_queue="events_queue",
6363
events_ttl=789,
6464
events_prefetch_count=20,
6565
results_queue_mode="common",
@@ -87,7 +87,7 @@ def test__init_custom(self, cleanup):
8787
assert config.events_queue_type == "quorum"
8888
assert config.events_queue_durable is False
8989
assert config.events_queue_auto_delete is False
90-
assert config.events_queue_prefix == "events_queue_prefix"
90+
assert config.events_queue == "events_queue"
9191
assert config.events_ttl == 789
9292
assert config.events_prefetch_count == 20
9393
assert config.results_queue_mode == "common"

0 commit comments

Comments
 (0)