Skip to content

Commit

Permalink
Wip
Browse files Browse the repository at this point in the history
  • Loading branch information
levsh committed Aug 14, 2023
1 parent 2c527f6 commit 957286c
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 63 deletions.
2 changes: 1 addition & 1 deletion arrlio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
logger = logging.getLogger("arrlio")
logger.setLevel(logging.ERROR)

log_frmt = logging.Formatter("%(asctime)s %(levelname)8s %(name)27s lineno:%(lineno)4d -- %(message)s")
log_frmt = logging.Formatter("%(asctime)s %(levelname)-8s %(name)-27s lineno:%(lineno)4d -- %(message)s")
log_hndl = logging.StreamHandler(stream=sys.stderr)
log_hndl.setFormatter(log_frmt)
logger.addHandler(log_hndl)
Expand Down
21 changes: 11 additions & 10 deletions arrlio/backends/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from uuid import UUID

from pydantic import Field, PositiveInt
from rich.pretty import pretty_repr

from arrlio import settings
from arrlio.backends import base
from arrlio.exc import TaskClosedError, TaskResultError
from arrlio.models import Event, TaskInstance, TaskResult
Expand Down Expand Up @@ -98,9 +98,6 @@ async def send_task(self, task_instance: TaskInstance, **kwds):
if task_instance.result_return and task_instance.task_id not in self._results:
self._results[task_instance.task_id] = [asyncio_Event(), [], None]

if is_debug_level():
logger.debug("%s: send\n%s", self, pretty_repr(task_instance.dict()))

self._task_queues[task_instance.queue].put_nowait(
(
(Priority.le - task_instance.priority) if task_instance.priority else Priority.ge,
Expand Down Expand Up @@ -134,7 +131,11 @@ async def fn(queue: str):
task_instance: TaskInstance = self._serializer.loads_task_instance(data)

if is_debug_level():
logger.debug("%s: got\n%s", self, pretty_repr(task_instance.dict()))
logger.debug(
"%s: got\n%s",
self,
task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE),
)

aio_task: asyncio.Task = create_task(callback(task_instance))

Expand Down Expand Up @@ -174,7 +175,7 @@ async def push_task_result(self, task_instance: TaskInstance, task_result: TaskR
self,
task_id,
task_instance.name,
pretty_repr(task_result.dict()),
task_result.pretty_repr(sanitize=settings.LOG_SANITIZE),
)

results = self._results
Expand Down Expand Up @@ -227,7 +228,7 @@ async def fn():
self,
task_id,
task_instance.name,
pretty_repr(task_result.dict()),
task_result.pretty_repr(sanitize=settings.LOG_SANITIZE),
)

if isinstance(task_result.exc, TaskClosedError):
Expand All @@ -250,7 +251,7 @@ async def fn():
self,
task_id,
task_instance.name,
pretty_repr(task_result.dict()),
task_result.pretty_repr(sanitize=settings.LOG_SANITIZE),
)

yield task_result
Expand All @@ -275,7 +276,7 @@ async def close_task(self, task_instance: TaskInstance, idx: Tuple[str, int] = N

async def send_event(self, event: Event):
if is_debug_level():
logger.debug("%s: put\n%s", self, pretty_repr(event.dict()))
logger.debug("%s: put\n%s", self, event.pretty_repr(sanitize=settings.LOG_SANITIZE))

self._events[event.event_id] = self._serializer.dumps_event(event)

Expand Down Expand Up @@ -323,7 +324,7 @@ async def fn():
event: Event = self._serializer.loads_event(events_pop(next(iter(events_keys()))))

if is_debug_level():
logger.debug("%s: got\n%s", self, pretty_repr(event.dict()))
logger.debug("%s: got\n%s", self, event.pretty_repr(sanitize=settings.LOG_SANITIZE))

for callback, event_types in event_callbacks.values():
if event_types is not None and event.type not in event_types:
Expand Down
15 changes: 10 additions & 5 deletions arrlio/backends/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import aiormq.exceptions
import yarl
from pydantic import Field
from rich.pretty import pretty_repr

from arrlio import settings
from arrlio.backends import base
from arrlio.exc import GraphError, TaskClosedError, TaskResultError
from arrlio.models import Event, TaskInstance, TaskResult
Expand Down Expand Up @@ -808,7 +808,7 @@ async def _on_result_message(
self,
channel,
task_id,
pretty_repr(task_result.dict()),
task_result.pretty_repr(sanitize=settings.LOG_SANITIZE),
)

storage = self._allocate_results_storage(task_id)
Expand Down Expand Up @@ -851,7 +851,11 @@ async def _on_task_message(self, callback, channel: aiormq.Channel, message: aio
task_instance.extra["rabbitmq:reply_to"] = message.header.properties.reply_to

if is_debug_level():
logger.debug("%s: got task\n%s", self, pretty_repr(task_instance.dict()))
logger.debug(
"%s: got task\n%s",
self,
task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE),
)

if not task_instance.ack_late:
await channel.basic_ack(message.delivery.delivery_tag)
Expand All @@ -869,6 +873,7 @@ async def close(self):
await self._tasks_exchange.close()
for queue in self._task_queues.values():
await queue.close()
await self._results_queue.close(delete=True)
await self._conn.close()

def _reply_to(self, task_instance: TaskInstance) -> str:
Expand Down Expand Up @@ -963,7 +968,7 @@ async def _push_task_result(
task_instance.name,
exchange.name,
routing_key,
pretty_repr(task_result.dict()),
task_result.pretty_repr(sanitize=settings.LOG_SANITIZE),
)

await exchange.publish(
Expand Down Expand Up @@ -1100,7 +1105,7 @@ async def on_message(channel: aiormq.Channel, message: aiormq.abc.DeliveredMessa
event: Event = self._serializer.loads_event(message.body)

if is_debug_level():
logger.debug("%s: got event\n%s", self, pretty_repr(event.dict()))
logger.debug("%s: got event\n%s", self, event.pretty_repr(sanitize=settings.LOG_SANITIZE))

await channel.basic_ack(message.delivery.delivery_tag)

Expand Down
5 changes: 3 additions & 2 deletions arrlio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rich.pretty import pretty_repr
from roview import rodict

from arrlio import settings
from arrlio.backends.base import Backend
from arrlio.exc import GraphError, TaskClosedError, TaskError
from arrlio.executor import Executor
Expand Down Expand Up @@ -247,7 +248,7 @@ async def send_task(
logger.info(
"%s: send task instance\n%s",
self,
pretty_repr(task_instance.dict(exclude=["args", "kwds"])),
task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE),
)

await self._execute_hooks("on_task_send", task_instance)
Expand All @@ -258,7 +259,7 @@ async def send_task(

async def send_event(self, event: Event):
if is_info_level():
logger.info("%s: send event\n%s", self, pretty_repr(event.dict()))
logger.info("%s: send event\n%s", self, event.pretty_repr(sanitize=settings.LOG_SANITIZE))

await self._backend.send_event(event)

Expand Down
45 changes: 35 additions & 10 deletions arrlio/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from uuid import UUID, uuid4

from pydantic import Field, create_model
from rich.pretty import pretty_repr
from roview import rodict, roset

from arrlio.exc import GraphError
Expand Down Expand Up @@ -72,7 +73,7 @@ def __call__(self, *args, **kwds) -> Any:

return self.func(*args, **kwds)

def dict(self, exclude: List[str] = None):
def dict(self, exclude: List[str] = None, sanitize: bool = None):
"""Convert to dict.
Args:
Expand All @@ -85,6 +86,9 @@ def dict(self, exclude: List[str] = None):
exclude = exclude or []
return {k: v for k, v in asdict(self).items() if k not in exclude}

def pretty_repr(self, exclude: List[str] = None, sanitize: bool = None):
return pretty_repr(self.dict(exclude=exclude, sanitize=sanitize))

def instantiate(
self,
task_id: TaskId = None,
Expand Down Expand Up @@ -126,18 +130,27 @@ class TaskInstance(Task):
"""

task_id: UUID = field(default_factory=uuid4)
args: tuple = field(default_factory=tuple)
kwds: dict = field(default_factory=dict)
args: Args = field(default_factory=tuple)
kwds: Kwds = field(default_factory=dict)
meta: dict = field(default_factory=dict)

def __post_init__(self):
if self.task_id is None:
object.__setattr__(self, "task_id", uuid4())
elif isinstance(self.task_id, str):
object.__setattr__(self, "task_id", UUID(self.task_id))
if isinstance(self.args, list):
if not isinstance(self.args, tuple):
object.__setattr__(self, "args", tuple(self.args))

def dict(self, exclude: List[str] = None, sanitize: bool = None):
data = super().dict(exclude=exclude, sanitize=sanitize)
if sanitize:
if data["args"]:
data["args"] = "..."
if data["kwds"]:
data["kwds"] = "..."
return data

def __call__(self, meta: bool = None): # pylint: disable=arguments-differ
"""Call `arrlio.models.TaskInstance`.
Expand Down Expand Up @@ -193,7 +206,7 @@ def validate(self):
kwds[k] = getattr(model, k)

object.__setattr__(self, "args", tuple(args))
object.__setattr__(self, "kwds", kwds)
object.__setattr__(self, "kwds", dict(kwds))


@dataclass(frozen=True)
Expand All @@ -209,21 +222,24 @@ class TaskResult:
def set_idx(self, idx: Tuple[str, int]):
object.__setattr__(self, "idx", idx)

def dict(self):
def dict(self, sanitize: bool = None):
"""Convert to dict.
Returns:
`arrlio.models.TaskResult` as `dict`.
"""

return {
"res": self.res,
"res": self.res if self.res is None or not sanitize else "...",
"exc": self.exc,
"trb": self.trb,
"idx": self.idx,
"routes": self.routes,
}

def pretty_repr(self, sanitize: bool = None):
return pretty_repr(self.dict(sanitize=sanitize))


@dataclass(frozen=True)
class Event:
Expand All @@ -250,14 +266,20 @@ def __post_init__(self):
elif isinstance(self.dt, str):
object.__setattr__(self, "dt", datetime.datetime.fromisoformat(self.dt))

def dict(self):
def dict(self, sanitize: bool = None):
"""Convert to dict.
Returns:
`arrlio.models.Event` as `dict`.
"""

return asdict(self)
data = asdict(self)
if hasattr(data["data"], "sanitize"):
data["data"] = data["data"].sanitize()
return data

def pretty_repr(self, sanitize: bool = None):
return pretty_repr(self.dict(sanitize=sanitize))


class Graph:
Expand Down Expand Up @@ -332,7 +354,7 @@ def add_edge(self, node_id_from: str, node_id_to: str, routes: Union[str, List[s
routes = [routes]
self.edges.__original__.setdefault(node_id_from, []).append([node_id_to, routes])

def dict(self):
def dict(self, sanitize: bool = None):
"""Convert to the dict.
Returns:
Expand Down Expand Up @@ -362,3 +384,6 @@ def from_dict(cls, data: Dict) -> "Graph":
edges=data["edges"],
roots=data["roots"],
)

def pretty_repr(self, sanitize: bool = None):
return pretty_repr(self.dict(sanitize=sanitize))
11 changes: 4 additions & 7 deletions arrlio/plugins/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
from typing import Dict, Tuple
from uuid import uuid4

from rich.pretty import pretty_repr

from arrlio import AsyncResult, registered_tasks
from arrlio import AsyncResult, registered_tasks, settings
from arrlio.exc import ArrlioError, GraphError
from arrlio.models import Event, Graph, Task, TaskInstance, TaskResult
from arrlio.plugins import base
from arrlio.utils import is_info_level

logger = logging.getLogger("arrlio.plugins.graphs")
is_info = logger.isEnabledFor(logging.INFO)
is_debug = logger.isEnabledFor(logging.DEBUG)


class Config(base.Config):
Expand Down Expand Up @@ -199,12 +196,12 @@ async def _send_graph(
extra = task_instance.extra
extra["graph:call_id"] = f"{uuid4()}"

if is_info:
if is_info_level():
logger.info(
"%s: send graph '%s' task\n%s",
self,
graph.name,
pretty_repr(task_instance.dict(exclude=["data.args", "data.kwds"])),
task_instance.pretty_repr(sanitize=settings.LOG_SANITIZE),
)

await self.app.backend.send_task(task_instance)
Expand Down
2 changes: 1 addition & 1 deletion arrlio/serializers/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def dumps_task_instance(self, task_instance: TaskInstance, **kwds) -> bytes:
extra = data["extra"]
if graph := extra.get("graph:graph"):
extra["graph:graph"] = graph.dict()
return self.dumps({k: v for k, v in data.items() if v is not None}, cls=self.config.encoder)
return self.dumps({k: v for k, v in data.items() if v is not None})

def loads_task_instance(self, data: bytes) -> TaskInstance:
"""Loads `arrlio.models.TaskInstance` object from json encoded string."""
Expand Down
9 changes: 9 additions & 0 deletions arrlio/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

TASK_QUEUES = [TASK_QUEUE]

LOG_LEVEL = "ERROR"
LOG_SANITIZE = True


class BaseConfig(BaseSettings):
class Config:
Expand Down Expand Up @@ -108,6 +111,11 @@ class Config:
env_prefix = f"{ENV_PREFIX}EXECUTOR_"


# class LoggingConfig(BaseConfig):
# level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = Field(default_factory=lambda: LOG_LEVEL)
# hide_args_kwds: bool = Field(default_factory=lambda: LOG_HIDE_ARGS_KWDS)


class Config(BaseConfig):
app_id: constr(min_length=1) = Field(default_factory=lambda: f"{uuid4()}")
backend: BackendConfig = Field(default_factory=BackendConfig)
Expand All @@ -116,6 +124,7 @@ class Config(BaseConfig):
task_queues: Set[str] = Field(default_factory=lambda: TASK_QUEUES)
plugins: List[PluginConfig] = Field(default_factory=list)
executor: ExecutorConfig = Field(default_factory=ExecutorConfig)
# logging: LoggingConfig = Field(default_factory=LoggingConfig)

class Config:
env_prefix = ENV_PREFIX
2 changes: 1 addition & 1 deletion arrlio/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Priority(ConstrainedInt):

TaskId = Union[str, UUID]
Args = Union[List, Tuple]
Kwds = Dict
Kwds = Union[Dict]


class SecretAnyUrl(AnyUrl):
Expand Down
Loading

0 comments on commit 957286c

Please sign in to comment.