From 3aad5baaad1df9ee1e8d3abe0bdd213dcdf8d983 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Tue, 9 Nov 2021 12:14:02 +0100 Subject: [PATCH 1/3] ISSUE #429 * Add support for `post_fn`. --- minos/networks/decorators/builders.py | 38 +++++++++---------- minos/networks/decorators/definitions/abc.py | 8 ++++ .../networks/decorators/definitions/kinds.py | 13 +++++++ .../test_decorators/test_builders.py | 4 +- .../test_definitions/test_abc.py | 6 +++ tests/utils.py | 8 ++++ 6 files changed, 54 insertions(+), 23 deletions(-) diff --git a/minos/networks/decorators/builders.py b/minos/networks/decorators/builders.py index 1e0e5379..afcb88c7 100644 --- a/minos/networks/decorators/builders.py +++ b/minos/networks/decorators/builders.py @@ -5,7 +5,7 @@ defaultdict, ) from inspect import ( - iscoroutinefunction, + isawaitable, ) from typing import ( Awaitable, @@ -112,34 +112,30 @@ def _build_one_class( for name, decorators in mapping.items(): for decorator in decorators: - ans[decorator].add(self._build_one_method(class_, name, decorator.pre_fn_name)) + ans[decorator].add(self._build_one_method(class_, name, decorator.pre_fn_name, decorator.post_fn_name)) @staticmethod - def _build_one_method(class_: type, name: str, pref_fn_name: str, **kwargs) -> Handler: + def _build_one_method(class_: type, name: str, pref_fn_name: str, post_fn_name: str, **kwargs) -> Handler: instance = class_(**kwargs) fn = getattr(instance, name) pre_fn = getattr(instance, pref_fn_name, None) + post_fn = getattr(instance, post_fn_name, None) - if iscoroutinefunction(fn): - _awaitable_fn = fn - else: + async def _wrapped_fn(request: Request) -> Optional[Response]: + if pre_fn is not None: + request = pre_fn(request) + if isawaitable(request): + request = await request - async def _awaitable_fn(request: Request) -> Optional[Response]: - return fn(request) + response = fn(request) + if isawaitable(response): + response = await response - if pre_fn is None: - _wrapped_fn = _awaitable_fn - else: - if iscoroutinefunction(pre_fn): + if post_fn is not None: + response = post_fn(response) + if isawaitable(response): + response = await response - async def _wrapped_fn(request: Request) -> Optional[Response]: - request = await pre_fn(request) - return await _awaitable_fn(request) - - else: - - async def _wrapped_fn(request: Request) -> Optional[Response]: - request = pre_fn(request) - return await _awaitable_fn(request) + return response return _wrapped_fn diff --git a/minos/networks/decorators/definitions/abc.py b/minos/networks/decorators/definitions/abc.py index 42382d41..907622a9 100644 --- a/minos/networks/decorators/definitions/abc.py +++ b/minos/networks/decorators/definitions/abc.py @@ -81,3 +81,11 @@ def pre_fn_name(self) -> str: :return: A string value containing the function name. """ return self.KIND.pre_fn_name + + @property + def post_fn_name(self) -> str: + """Get the post execution function name. + + :return: A string value containing the function name. + """ + return self.KIND.post_fn_name diff --git a/minos/networks/decorators/definitions/kinds.py b/minos/networks/decorators/definitions/kinds.py index 79815597..513947c0 100644 --- a/minos/networks/decorators/definitions/kinds.py +++ b/minos/networks/decorators/definitions/kinds.py @@ -23,3 +23,16 @@ def pre_fn_name(self) -> str: self.Event: "_pre_event_handle", } return mapping[self] + + @property + def post_fn_name(self) -> str: + """Get the post execution function name. + + :return: A string value containing the function name. + """ + mapping = { + self.Command: "_post_command_handle", + self.Query: "_post_query_handle", + self.Event: "_post_event_handle", + } + return mapping[self] diff --git a/tests/test_networks/test_decorators/test_builders.py b/tests/test_networks/test_decorators/test_builders.py index a36dbbbd..afb62eba 100644 --- a/tests/test_networks/test_decorators/test_builders.py +++ b/tests/test_networks/test_decorators/test_builders.py @@ -37,7 +37,7 @@ async def test_get_rest_command_query(self): handlers = self.builder.get_rest_command_query() self.assertEqual(3, len(handlers)) - expected = Response("Get Tickets: test") + expected = Response("(Get Tickets: test)") observed = await handlers[RestQueryEnrouteDecorator("tickets/", "GET")](self.request) self.assertEqual(expected, observed) @@ -70,7 +70,7 @@ async def test_get_broker_command_query(self): handlers = self.builder.get_broker_command_query() self.assertEqual(4, len(handlers)) - expected = Response("Get Tickets: test") + expected = Response("(Get Tickets: test)") observed = await handlers[BrokerQueryEnrouteDecorator("GetTickets")](self.request) self.assertEqual(expected, observed) diff --git a/tests/test_networks/test_decorators/test_definitions/test_abc.py b/tests/test_networks/test_decorators/test_definitions/test_abc.py index cb8f5059..4b4b96a7 100644 --- a/tests/test_networks/test_decorators/test_definitions/test_abc.py +++ b/tests/test_networks/test_decorators/test_definitions/test_abc.py @@ -66,6 +66,12 @@ def test_multiple_decorator_kind_raises(self): with self.assertRaises(MinosMultipleEnrouteDecoratorKindsException): another(self.decorator(_fn)) + def test_pre_fn_name(self): + self.assertEqual("_pre_command_handle", self.decorator.pre_fn_name) + + def test_post_fn_name(self): + self.assertEqual("_post_command_handle", self.decorator.post_fn_name) + if __name__ == "__main__": unittest.main() diff --git a/tests/utils.py b/tests/utils.py index 4cf07586..3b19c1b0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -232,6 +232,14 @@ def _pre_query_handle(request: Request) -> Request: async def _pre_event_handle(request: Request) -> Request: return WrappedRequest(request, lambda content: f"[{content}]") + @staticmethod + def _post_command_handle(response: Response) -> Response: + return response + + @staticmethod + async def _post_query_handle(response: Response) -> Response: + return Response(f"({await response.content()})") + # noinspection PyUnusedLocal @enroute.rest.command(url="orders/", method="GET") @enroute.broker.command(topic="CreateTicket") From 151f12d144adb5ef96b153f43f276ec08cfc8b39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Tue, 9 Nov 2021 17:04:18 +0100 Subject: [PATCH 2/3] ISSUE #431 * Add `REPLY_TOPIC_CONTEXT_VAR`. --- minos/networks/__init__.py | 1 + minos/networks/brokers/__init__.py | 1 + minos/networks/brokers/commands.py | 9 +++++ minos/networks/handlers/dynamic/pools.py | 34 +++++++++++++++++++ .../test_brokers/test_commands.py | 18 ++++++++++ .../test_handlers/test_dynamic/test_pools.py | 10 ++++++ 6 files changed, 73 insertions(+) diff --git a/minos/networks/__init__.py b/minos/networks/__init__.py index 987e034b..9e9420d8 100644 --- a/minos/networks/__init__.py +++ b/minos/networks/__init__.py @@ -3,6 +3,7 @@ __version__ = "0.1.0" from .brokers import ( + REPLY_TOPIC_CONTEXT_VAR, Broker, BrokerSetup, CommandBroker, diff --git a/minos/networks/brokers/__init__.py b/minos/networks/brokers/__init__.py index af2120b4..8980aa9e 100644 --- a/minos/networks/brokers/__init__.py +++ b/minos/networks/brokers/__init__.py @@ -6,6 +6,7 @@ CommandReplyBroker, ) from .commands import ( + REPLY_TOPIC_CONTEXT_VAR, CommandBroker, ) from .events import ( diff --git a/minos/networks/brokers/commands.py b/minos/networks/brokers/commands.py index 7d97571d..be9fce93 100644 --- a/minos/networks/brokers/commands.py +++ b/minos/networks/brokers/commands.py @@ -3,8 +3,12 @@ ) import logging +from contextvars import ( + ContextVar, +) from typing import ( Any, + Final, Optional, ) from uuid import ( @@ -22,6 +26,8 @@ logger = logging.getLogger(__name__) +REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None) + class CommandBroker(Broker): """Minos Command Broker Class.""" @@ -57,8 +63,11 @@ async def send( the command is not authenticated. :return: This method does not return anything. """ + if reply_topic is None: + reply_topic = REPLY_TOPIC_CONTEXT_VAR.get() if reply_topic is None: reply_topic = self.default_reply_topic + command = Command(topic, data, saga, reply_topic, user) logger.info(f"Sending '{command!s}'...") return await self.enqueue(command.topic, command.avro_bytes) diff --git a/minos/networks/handlers/dynamic/pools.py b/minos/networks/handlers/dynamic/pools.py index 3dd6f0f5..6cbd6b6c 100644 --- a/minos/networks/handlers/dynamic/pools.py +++ b/minos/networks/handlers/dynamic/pools.py @@ -3,6 +3,13 @@ ) import logging +from contextvars import ( + Token, +) +from typing import ( + AsyncContextManager, + Optional, +) from uuid import ( uuid4, ) @@ -23,6 +30,9 @@ MinosPool, ) +from ...brokers import ( + REPLY_TOPIC_CONTEXT_VAR, +) from ..consumers import ( Consumer, ) @@ -82,3 +92,27 @@ async def _subscribe_reply_topic(self, topic: str) -> None: async def _unsubscribe_reply_topic(self, topic: str) -> None: await self.consumer.remove_topic(topic) + + def acquire(self, *args, **kwargs) -> AsyncContextManager: + """Acquire a new instance wrapped on an asynchronous context manager. + + :return: An asynchronous context manager. + """ + return _ReplyTopicContextManager(super().acquire()) + + +class _ReplyTopicContextManager: + _token: Optional[Token] + + def __init__(self, wrapper: AsyncContextManager[DynamicHandler]): + self.wrapper = wrapper + self._token = None + + async def __aenter__(self) -> DynamicHandler: + handler = await self.wrapper.__aenter__() + self._token = REPLY_TOPIC_CONTEXT_VAR.set(handler.topic) + return handler + + async def __aexit__(self, exc_type, exc_val, exc_tb): + REPLY_TOPIC_CONTEXT_VAR.reset(self._token) + await self.wrapper.__aexit__(exc_type, exc_val, exc_tb) diff --git a/tests/test_networks/test_brokers/test_commands.py b/tests/test_networks/test_brokers/test_commands.py index 5a1ec6a5..8b0cbed8 100644 --- a/tests/test_networks/test_brokers/test_commands.py +++ b/tests/test_networks/test_brokers/test_commands.py @@ -13,6 +13,7 @@ PostgresAsyncTestCase, ) from minos.networks import ( + REPLY_TOPIC_CONTEXT_VAR, CommandBroker, ) from tests.utils import ( @@ -65,6 +66,23 @@ async def test_send_with_default_reply_topic(self): self.assertEqual("fake", args[0]) self.assertEqual(Command("fake", FakeModel("foo"), saga, "OrderReply"), Command.from_avro_bytes(args[1])) + async def test_send_with_reply_topic_context_var(self): + mock = AsyncMock(return_value=56) + saga = uuid4() + + REPLY_TOPIC_CONTEXT_VAR.set("onetwothree") + + async with CommandBroker.from_config(config=self.config) as broker: + broker.enqueue = mock + identifier = await broker.send(FakeModel("foo"), "fake", saga) + + self.assertEqual(56, identifier) + self.assertEqual(1, mock.call_count) + + args = mock.call_args.args + self.assertEqual("fake", args[0]) + self.assertEqual(Command("fake", FakeModel("foo"), saga, "onetwothree"), Command.from_avro_bytes(args[1])) + async def test_send_with_user(self): mock = AsyncMock(return_value=56) saga = uuid4() diff --git a/tests/test_networks/test_handlers/test_dynamic/test_pools.py b/tests/test_networks/test_handlers/test_dynamic/test_pools.py index 9fc62233..f075ec6c 100644 --- a/tests/test_networks/test_handlers/test_dynamic/test_pools.py +++ b/tests/test_networks/test_handlers/test_dynamic/test_pools.py @@ -8,6 +8,7 @@ PostgresAsyncTestCase, ) from minos.networks import ( + REPLY_TOPIC_CONTEXT_VAR, Consumer, DynamicHandler, DynamicHandlerPool, @@ -46,6 +47,15 @@ async def test_acquire(self): self.assertIsInstance(handler, DynamicHandler) self.assertIn(handler.topic, self.pool.client.list_topics()) + async def test_acquire_reply_topic_context_var(self): + self.assertEqual(None, REPLY_TOPIC_CONTEXT_VAR.get()) + + async with self.consumer, self.pool: + async with self.pool.acquire() as handler: + self.assertEqual(handler.topic, REPLY_TOPIC_CONTEXT_VAR.get()) + + self.assertEqual(None, REPLY_TOPIC_CONTEXT_VAR.get()) + if __name__ == "__main__": unittest.main() From 5900854c10ef677e073a8f54ba49fed5ffac25d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Tue, 9 Nov 2021 17:29:36 +0100 Subject: [PATCH 3/3] v0.1.1 --- HISTORY.md | 6 ++++++ minos/networks/__init__.py | 2 +- pyproject.toml | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 15165897..bee7c24b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -120,3 +120,9 @@ History ------------------ * Add `minos-microservice-common>=0.2.0` compatibility. + +0.1.1 (2021-11-09) +------------------ + +* Add `REPLY_TOPIC_CONTEXT_VAR` and integrate with `DynamicHandlerPool`. +* Add support for `post_fn` callbacks following the same strategy as in `pre_fn` callbacks. diff --git a/minos/networks/__init__.py b/minos/networks/__init__.py index 9e9420d8..d82ad291 100644 --- a/minos/networks/__init__.py +++ b/minos/networks/__init__.py @@ -1,6 +1,6 @@ __author__ = """Clariteia Devs""" __email__ = "devs@clariteia.com" -__version__ = "0.1.0" +__version__ = "0.1.1" from .brokers import ( REPLY_TOPIC_CONTEXT_VAR, diff --git a/pyproject.toml b/pyproject.toml index 5e05c0c1..59516d17 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "minos_microservice_networks" -version = "0.1.0" +version = "0.1.1" description = "Python Package with the common network classes and utilities used in Minos Microservice." readme = "README.md" repository = "https://github.com/clariteia/minos_microservice_network"