-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommand_bus.py
35 lines (28 loc) · 1.09 KB
/
command_bus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import logging
import typing as t
from pymessagebus._commandbus import CommandBus # type: ignore
from pymessagebus.api import CommandHandlerNotFound # type: ignore
from pymessagebus.middleware.logger import ( # type: ignore
LoggingMiddlewareConfig,
get_logger_middleware,
)
logger = logging.getLogger("message_bus")
logging_middleware_config = LoggingMiddlewareConfig(
mgs_received_level=logging.INFO,
mgs_succeeded_level=logging.INFO,
mgs_failed_level=logging.CRITICAL,
)
logging_middleware = get_logger_middleware(logger, logging_middleware_config)
_DEFAULT_COMMAND_BUS = CommandBus(middlewares=[logging_middleware])
# Public API:
# This is our handy decorator:
def register_handler(message_class: type):
def decorator(handler: t.Callable):
_DEFAULT_COMMAND_BUS.add_handler(message_class, handler)
return handler
return decorator
# And those are aliases to our "default" singleton instance:
# pylint: disable=invalid-name
add_handler = _DEFAULT_COMMAND_BUS.add_handler
handle = _DEFAULT_COMMAND_BUS.handle
has_handler_for = _DEFAULT_COMMAND_BUS.has_handler_for