diff --git a/minos/networks/__init__.py b/minos/networks/__init__.py index f5011386..660b1040 100644 --- a/minos/networks/__init__.py +++ b/minos/networks/__init__.py @@ -3,7 +3,8 @@ __version__ = "0.3.0" from .brokers import ( - REPLY_TOPIC_CONTEXT_VAR, + REQUEST_HEADERS_CONTEXT_VAR, + REQUEST_REPLY_TOPIC_CONTEXT_VAR, BrokerConsumer, BrokerConsumerService, BrokerHandler, @@ -56,7 +57,7 @@ MinosRedefinedEnrouteDecoratorException, ) from .requests import ( - USER_CONTEXT_VAR, + REQUEST_USER_CONTEXT_VAR, Request, Response, ResponseException, diff --git a/minos/networks/brokers/__init__.py b/minos/networks/brokers/__init__.py index c0d438b0..c23cb10d 100644 --- a/minos/networks/brokers/__init__.py +++ b/minos/networks/brokers/__init__.py @@ -14,7 +14,8 @@ BrokerResponseException, ) from .messages import ( - REPLY_TOPIC_CONTEXT_VAR, + REQUEST_HEADERS_CONTEXT_VAR, + REQUEST_REPLY_TOPIC_CONTEXT_VAR, BrokerMessage, BrokerMessageStatus, BrokerMessageStrategy, diff --git a/minos/networks/brokers/dynamic/brokers.py b/minos/networks/brokers/dynamic/brokers.py index 81f92967..881e6517 100644 --- a/minos/networks/brokers/dynamic/brokers.py +++ b/minos/networks/brokers/dynamic/brokers.py @@ -10,6 +10,9 @@ from typing import ( Optional, ) +from uuid import ( + UUID, +) from aiopg import ( Cursor, @@ -84,15 +87,15 @@ async def _destroy(self) -> None: await super()._destroy() # noinspection PyUnusedLocal - async def send(self, *args, reply_topic: None = None, **kwargs) -> None: + async def send(self, *args, reply_topic: None = None, **kwargs) -> UUID: """Send a ``BrokerMessage``. :param args: Additional positional arguments. :param reply_topic: This argument is ignored if ignored in favor of ``self.topic``. :param kwargs: Additional named arguments. - :return: This method does not return anything. + :return: The ``UUID`` identifier of the message. """ - await self.publisher.send(*args, reply_topic=self.topic, **kwargs) + return await self.publisher.send(*args, reply_topic=self.topic, **kwargs) async def get_one(self, *args, **kwargs) -> BrokerHandlerEntry: """Get one handler entry from the given topics. diff --git a/minos/networks/brokers/dynamic/pools.py b/minos/networks/brokers/dynamic/pools.py index 8e8cd7e9..3581c380 100644 --- a/minos/networks/brokers/dynamic/pools.py +++ b/minos/networks/brokers/dynamic/pools.py @@ -35,7 +35,7 @@ BrokerConsumer, ) from ..messages import ( - REPLY_TOPIC_CONTEXT_VAR, + REQUEST_REPLY_TOPIC_CONTEXT_VAR, ) from ..publishers import ( BrokerPublisher, @@ -150,9 +150,9 @@ def __init__(self, wrapper: AsyncContextManager[DynamicBroker]): async def __aenter__(self) -> DynamicBroker: handler = await self.wrapper.__aenter__() - self._token = REPLY_TOPIC_CONTEXT_VAR.set(handler.topic) + self._token = REQUEST_REPLY_TOPIC_CONTEXT_VAR.set(handler.topic) return handler async def __aexit__(self, exc_type, exc_val, exc_tb): - REPLY_TOPIC_CONTEXT_VAR.reset(self._token) + REQUEST_REPLY_TOPIC_CONTEXT_VAR.reset(self._token) await self.wrapper.__aexit__(exc_type, exc_val, exc_tb) diff --git a/minos/networks/brokers/handlers/handlers.py b/minos/networks/brokers/handlers/handlers.py index 5d99cc38..6989e350 100644 --- a/minos/networks/brokers/handlers/handlers.py +++ b/minos/networks/brokers/handlers/handlers.py @@ -52,7 +52,7 @@ MinosActionNotFoundException, ) from ...requests import ( - USER_CONTEXT_VAR, + REQUEST_USER_CONTEXT_VAR, Response, ResponseException, ) @@ -60,6 +60,7 @@ consume_queue, ) from ..messages import ( + REQUEST_HEADERS_CONTEXT_VAR, BrokerMessage, BrokerMessageStatus, ) @@ -304,26 +305,32 @@ async def dispatch_one(self, entry: BrokerHandlerEntry) -> None: fn = self.get_callback(entry.callback) message = entry.data - data, status = await fn(message) + data, status, headers = await fn(message) if message.reply_topic is not None: await self.publisher.send( - data, topic=message.reply_topic, saga=message.saga, status=status, user=message.user + data, + topic=message.reply_topic, + identifier=message.identifier, + status=status, + user=message.user, + headers=headers, ) @staticmethod def get_callback( fn: Callable[[BrokerRequest], Union[Optional[BrokerRequest], Awaitable[Optional[BrokerRequest]]]] - ) -> Callable[[BrokerMessage], Awaitable[tuple[Any, BrokerMessageStatus]]]: + ) -> Callable[[BrokerMessage], Awaitable[tuple[Any, BrokerMessageStatus, dict[str, str]]]]: """Get the handler function to be used by the Broker Handler. :param fn: The action function. :return: A wrapper function around the given one that is compatible with the Broker Handler API. """ - async def _fn(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus]: + async def _fn(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus, dict[str, str]]: request = BrokerRequest(raw) - token = USER_CONTEXT_VAR.set(request.user) + user_token = REQUEST_USER_CONTEXT_VAR.set(request.user) + headers_token = REQUEST_HEADERS_CONTEXT_VAR.set(raw.headers) try: response = fn(request) @@ -331,15 +338,16 @@ async def _fn(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus]: response = await response if isinstance(response, Response): response = await response.content() - return response, BrokerMessageStatus.SUCCESS + return response, BrokerMessageStatus.SUCCESS, REQUEST_HEADERS_CONTEXT_VAR.get() except ResponseException as exc: logger.warning(f"Raised an application exception: {exc!s}") - return repr(exc), BrokerMessageStatus.ERROR + return repr(exc), BrokerMessageStatus.ERROR, REQUEST_HEADERS_CONTEXT_VAR.get() except Exception as exc: logger.exception(f"Raised a system exception: {exc!r}") - return repr(exc), BrokerMessageStatus.SYSTEM_ERROR + return repr(exc), BrokerMessageStatus.SYSTEM_ERROR, REQUEST_HEADERS_CONTEXT_VAR.get() finally: - USER_CONTEXT_VAR.reset(token) + REQUEST_USER_CONTEXT_VAR.reset(user_token) + REQUEST_HEADERS_CONTEXT_VAR.reset(headers_token) return _fn diff --git a/minos/networks/brokers/messages.py b/minos/networks/brokers/messages.py index e9487d0f..0799b9d6 100644 --- a/minos/networks/brokers/messages.py +++ b/minos/networks/brokers/messages.py @@ -16,13 +16,15 @@ ) from uuid import ( UUID, + uuid4, ) from minos.common import ( DeclarativeModel, ) -REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None) +REQUEST_REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None) +REQUEST_HEADERS_CONTEXT_VAR: Final[ContextVar[Optional[dict[str, str]]]] = ContextVar("headers", default=None) class BrokerMessage(DeclarativeModel): @@ -30,28 +32,35 @@ class BrokerMessage(DeclarativeModel): topic: str data: Any - service_name: str - saga: Optional[UUID] + identifier: UUID reply_topic: Optional[str] user: Optional[UUID] status: BrokerMessageStatus strategy: BrokerMessageStrategy + headers: dict[str, str] def __init__( self, topic: str, data: Any, - service_name: str, *, + identifier: Optional[UUID] = None, status: Optional[BrokerMessageStatus] = None, strategy: Optional[BrokerMessageStrategy] = None, + headers: Optional[dict[str, str]] = None, **kwargs ): + if identifier is None: + identifier = uuid4() if status is None: status = BrokerMessageStatus.SUCCESS if strategy is None: strategy = BrokerMessageStrategy.UNICAST - super().__init__(topic, data, service_name, status=status, strategy=strategy, **kwargs) + if headers is None: + headers = dict() + super().__init__( + topic=topic, data=data, identifier=identifier, status=status, strategy=strategy, headers=headers, **kwargs + ) @property def ok(self) -> bool: diff --git a/minos/networks/brokers/publishers/publishers.py b/minos/networks/brokers/publishers/publishers.py index e7c25970..52554e5a 100644 --- a/minos/networks/brokers/publishers/publishers.py +++ b/minos/networks/brokers/publishers/publishers.py @@ -3,9 +3,6 @@ ) import logging -from abc import ( - ABC, -) from typing import ( Any, Optional, @@ -34,17 +31,13 @@ logger = logging.getLogger(__name__) -class BrokerPublisher(BrokerPublisherSetup, ABC): +class BrokerPublisher(BrokerPublisherSetup): """Broker Publisher class.""" - def __init__(self, *args, service_name: str, **kwargs): - super().__init__(*args, **kwargs) - self.service_name = service_name - @classmethod def _from_config(cls, *args, config: MinosConfig, **kwargs) -> BrokerPublisher: # noinspection PyProtectedMember - return cls(*args, service_name=config.service.name, **config.broker.queue._asdict(), **kwargs) + return cls(*args, **config.broker.queue._asdict(), **kwargs) # noinspection PyMethodOverriding async def send( @@ -52,38 +45,41 @@ async def send( data: Any, topic: str, *, - saga: Optional[UUID] = None, + identifier: Optional[UUID] = None, reply_topic: Optional[str] = None, user: Optional[UUID] = None, status: BrokerMessageStatus = BrokerMessageStatus.SUCCESS, strategy: BrokerMessageStrategy = BrokerMessageStrategy.UNICAST, + headers: Optional[dict[str, str]] = None, **kwargs, - ) -> int: + ) -> UUID: """Send a ``BrokerMessage``. :param data: The data to be send. :param topic: Topic in which the message will be published. - :param saga: Saga identifier. + :param identifier: The identifier of the message. :param reply_topic: An optional topic name to wait for a response. :param user: The user identifier that send the message. :param status: The status code of the message. :param strategy: The publishing strategy. + :param headers: A mapping of string values identified by a string key. :param kwargs: Additional named arguments. - :return: This method does not return anything. + :return: The ``UUID`` identifier of the message. """ message = BrokerMessage( topic=topic, data=data, - saga=saga, + identifier=identifier, status=status, reply_topic=reply_topic, user=user, - service_name=self.service_name, strategy=strategy, + headers=headers, ) logger.info(f"Publishing '{message!s}'...") - return await self.enqueue(message.topic, message.strategy, message.avro_bytes) + await self.enqueue(message.topic, message.strategy, message.avro_bytes) + return message.identifier async def enqueue(self, topic: str, strategy: BrokerMessageStrategy, raw: bytes) -> int: """Send a sequence of bytes to the given topic. diff --git a/minos/networks/requests.py b/minos/networks/requests.py index f52b8046..c74c92bf 100644 --- a/minos/networks/requests.py +++ b/minos/networks/requests.py @@ -30,8 +30,7 @@ MinosException, ) -USER_CONTEXT_VAR: Final[ContextVar[Optional[UUID]]] = ContextVar("user", default=None) -USER_CONTEXT_VAR.set(None) # needed to "register" the context variable. +REQUEST_USER_CONTEXT_VAR: Final[ContextVar[Optional[UUID]]] = ContextVar("user", default=None) class Request(ABC): diff --git a/minos/networks/rest/handlers.py b/minos/networks/rest/handlers.py index de9f61b5..5aa26179 100644 --- a/minos/networks/rest/handlers.py +++ b/minos/networks/rest/handlers.py @@ -29,7 +29,7 @@ EnrouteBuilder, ) from ..requests import ( - USER_CONTEXT_VAR, + REQUEST_USER_CONTEXT_VAR, Response, ResponseException, ) @@ -128,7 +128,7 @@ async def _fn(request: web.Request) -> web.Response: logger.info(f"Dispatching '{request!s}' from '{request.remote!s}'...") request = RestRequest(request) - token = USER_CONTEXT_VAR.set(request.user) + token = REQUEST_USER_CONTEXT_VAR.set(request.user) try: response = fn(request) @@ -146,7 +146,7 @@ async def _fn(request: web.Request) -> web.Response: logger.exception(f"Raised a system exception: {exc!r}") raise web.HTTPInternalServerError() finally: - USER_CONTEXT_VAR.reset(token) + REQUEST_USER_CONTEXT_VAR.reset(token) return _fn diff --git a/tests/test_networks/test_brokers/test_dynamic/test_pools.py b/tests/test_networks/test_brokers/test_dynamic/test_pools.py index 1e460894..19210450 100644 --- a/tests/test_networks/test_brokers/test_dynamic/test_pools.py +++ b/tests/test_networks/test_brokers/test_dynamic/test_pools.py @@ -11,7 +11,7 @@ PostgresAsyncTestCase, ) from minos.networks import ( - REPLY_TOPIC_CONTEXT_VAR, + REQUEST_REPLY_TOPIC_CONTEXT_VAR, BrokerConsumer, BrokerPublisher, DynamicBroker, @@ -70,12 +70,12 @@ async def test_acquire(self): self.assertIn(broker.topic, self.pool.client.list_topics()) async def test_acquire_reply_topic_context_var(self): - self.assertEqual(None, REPLY_TOPIC_CONTEXT_VAR.get()) + self.assertEqual(None, REQUEST_REPLY_TOPIC_CONTEXT_VAR.get()) async with self.pool.acquire() as broker: - self.assertEqual(broker.topic, REPLY_TOPIC_CONTEXT_VAR.get()) + self.assertEqual(broker.topic, REQUEST_REPLY_TOPIC_CONTEXT_VAR.get()) - self.assertEqual(None, REPLY_TOPIC_CONTEXT_VAR.get()) + self.assertEqual(None, REQUEST_REPLY_TOPIC_CONTEXT_VAR.get()) if __name__ == "__main__": diff --git a/tests/test_networks/test_brokers/test_handlers/test_entries.py b/tests/test_networks/test_brokers/test_handlers/test_entries.py index dae7a44e..2d030c23 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_entries.py +++ b/tests/test_networks/test_brokers/test_handlers/test_entries.py @@ -17,12 +17,11 @@ class TestHandlerEntry(unittest.IsolatedAsyncioTestCase): def setUp(self) -> None: - self.saga = uuid4() + self.identifier = uuid4() self.user = uuid4() - self.service_name = "foo" self.message = BrokerMessage( - "AddOrder", FakeModel("foo"), self.service_name, saga=self.saga, user=self.user, reply_topic="UpdateTicket", + "AddOrder", FakeModel("foo"), identifier=self.identifier, user=self.user, reply_topic="UpdateTicket", ) def test_constructor(self): @@ -49,12 +48,12 @@ def test_constructor_extended(self): def test_sort(self): unsorted = [ - BrokerHandlerEntry(1, "", 0, BrokerMessage("", "foo", "").avro_bytes, 1), - BrokerHandlerEntry(1, "", 0, BrokerMessage("", 4, "").avro_bytes, 1), - BrokerHandlerEntry(1, "", 0, BrokerMessage("", 2, "").avro_bytes, 1), - BrokerHandlerEntry(1, "", 0, BrokerMessage("", 3, "").avro_bytes, 1), - BrokerHandlerEntry(1, "", 0, BrokerMessage("", 1, "").avro_bytes, 1), - BrokerHandlerEntry(1, "", 0, BrokerMessage("", "bar", "").avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", "foo").avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", 4).avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", 2).avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", 3).avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", 1).avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", "bar").avro_bytes, 1), ] expected = [unsorted[0], unsorted[4], unsorted[2], unsorted[3], unsorted[1], unsorted[5]] diff --git a/tests/test_networks/test_brokers/test_handlers/test_handlers.py b/tests/test_networks/test_brokers/test_handlers/test_handlers.py index eba4f4dd..af50d911 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_handlers.py +++ b/tests/test_networks/test_brokers/test_handlers/test_handlers.py @@ -33,7 +33,8 @@ PostgresAsyncTestCase, ) from minos.networks import ( - USER_CONTEXT_VAR, + REQUEST_HEADERS_CONTEXT_VAR, + REQUEST_USER_CONTEXT_VAR, BrokerHandler, BrokerHandlerEntry, BrokerMessage, @@ -80,12 +81,16 @@ def setUp(self) -> None: self.publisher = BrokerPublisher.from_config(self.config) self.handler = BrokerHandler.from_config(self.config, publisher=self.publisher) - self.saga = uuid4() + self.identifier = uuid4() self.user = uuid4() - self.service_name = self.config.service.name self.message = BrokerMessage( - "AddOrder", FakeModel("foo"), self.service_name, saga=self.saga, user=self.user, reply_topic="UpdateTicket", + "AddOrder", + FakeModel("foo"), + identifier=self.identifier, + user=self.user, + reply_topic="UpdateTicket", + headers={"foo": "bar"}, ) async def asyncSetUp(self): @@ -219,9 +224,10 @@ async def test_dispatch(self): call( "add_order", topic="UpdateTicket", - saga=self.message.saga, + identifier=self.message.identifier, status=BrokerMessageStatus.SUCCESS, user=self.user, + headers={"foo": "bar"}, ) ], send_mock.call_args_list, @@ -234,7 +240,7 @@ async def test_dispatch(self): async def test_dispatch_wrong(self): instance_1 = namedtuple("FakeCommand", ("topic", "avro_bytes"))("AddOrder", bytes(b"Test")) - instance_2 = BrokerMessage("NoActionTopic", FakeModel("Foo"), self.service_name) + instance_2 = BrokerMessage("NoActionTopic", FakeModel("Foo")) queue_id_1 = await self._insert_one(instance_1) queue_id_2 = await self._insert_one(instance_2) @@ -247,11 +253,9 @@ async def test_dispatch_concurrent(self): FakeModel, ) - saga = uuid4() + identifier = uuid4() - instance = BrokerMessage( - "AddOrder", [FakeModel("foo")], self.service_name, saga=saga, reply_topic="UpdateTicket" - ) + instance = BrokerMessage("AddOrder", [FakeModel("foo")], identifier=identifier, reply_topic="UpdateTicket") instance_wrong = namedtuple("FakeCommand", ("topic", "avro_bytes"))("AddOrder", bytes(b"Test")) for _ in range(10): @@ -275,7 +279,7 @@ async def test_dispatch_one(self): lookup_mock = MagicMock(return_value=callback_mock) topic = "TicketAdded" - event = BrokerMessage(topic, FakeModel("Foo"), self.service_name) + event = BrokerMessage(topic, FakeModel("Foo")) entry = BrokerHandlerEntry(1, topic, 0, event.avro_bytes, 1, callback_lookup=lookup_mock) await self.handler.dispatch_one(entry) @@ -288,26 +292,26 @@ async def test_dispatch_one(self): async def test_get_callback(self): fn = self.handler.get_callback(_Cls._fn) - self.assertEqual((FakeModel("foo"), BrokerMessageStatus.SUCCESS), await fn(self.message)) + self.assertEqual((FakeModel("foo"), BrokerMessageStatus.SUCCESS, {"foo": "bar"}), await fn(self.message)) async def test_get_callback_none(self): fn = self.handler.get_callback(_Cls._fn_none) - self.assertEqual((None, BrokerMessageStatus.SUCCESS), await fn(self.message)) + self.assertEqual((None, BrokerMessageStatus.SUCCESS, {"foo": "bar"}), await fn(self.message)) async def test_get_callback_raises_response(self): fn = self.handler.get_callback(_Cls._fn_raises_response) - expected = (repr(BrokerResponseException("foo")), BrokerMessageStatus.ERROR) + expected = (repr(BrokerResponseException("foo")), BrokerMessageStatus.ERROR, {"foo": "bar"}) self.assertEqual(expected, await fn(self.message)) async def test_get_callback_raises_exception(self): fn = self.handler.get_callback(_Cls._fn_raises_exception) - expected = (repr(ValueError()), BrokerMessageStatus.SYSTEM_ERROR) + expected = (repr(ValueError()), BrokerMessageStatus.SYSTEM_ERROR, {"foo": "bar"}) self.assertEqual(expected, await fn(self.message)) async def test_get_callback_with_user(self): async def _fn(request) -> None: self.assertEqual(self.user, request.user) - self.assertEqual(self.user, USER_CONTEXT_VAR.get()) + self.assertEqual(self.user, REQUEST_USER_CONTEXT_VAR.get()) mock = AsyncMock(side_effect=_fn) @@ -316,6 +320,18 @@ async def _fn(request) -> None: self.assertEqual(1, mock.call_count) + async def test_get_callback_with_headers(self): + async def _fn(request) -> None: + self.assertEqual({"foo": "bar"}, request.raw.headers) + REQUEST_HEADERS_CONTEXT_VAR.get()["bar"] = "foo" + + mock = AsyncMock(side_effect=_fn) + + handler = self.handler.get_callback(mock) + _, _, observed = await handler(self.message) + + self.assertEqual({"foo": "bar", "bar": "foo"}, observed) + async def test_dispatch_without_sorting(self): observed = list() @@ -326,8 +342,8 @@ async def _fn2(request): self.handler.get_action = MagicMock(return_value=_fn2) events = [ - BrokerMessage("TicketAdded", FakeModel("uuid1"), self.service_name), - BrokerMessage("TicketAdded", FakeModel("uuid2"), self.service_name), + BrokerMessage("TicketAdded", FakeModel("uuid1")), + BrokerMessage("TicketAdded", FakeModel("uuid2")), ] for event in events: @@ -349,12 +365,7 @@ async def _fn2(request): events = list() for i in range(1, 6): - events.extend( - [ - BrokerMessage("TicketAdded", ["uuid1", i], self.service_name), - BrokerMessage("TicketAdded", ["uuid2", i], self.service_name), - ] - ) + events.extend([BrokerMessage("TicketAdded", ["uuid1", i]), BrokerMessage("TicketAdded", ["uuid2", i])]) shuffle(events) for event in events: diff --git a/tests/test_networks/test_brokers/test_handlers/test_requests.py b/tests/test_networks/test_brokers/test_handlers/test_requests.py index 7f73781a..9142df7b 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_requests.py +++ b/tests/test_networks/test_brokers/test_handlers/test_requests.py @@ -16,11 +16,8 @@ class TestHandlerRequest(unittest.IsolatedAsyncioTestCase): def setUp(self) -> None: self.data = [FakeModel("foo"), FakeModel("bar")] - self.saga = uuid4() - self.service_name = "foo" - self.raw = BrokerMessage( - "FooCreated", self.data, self.service_name, saga=self.saga, reply_topic="AddOrderReply" - ) + self.identifier = uuid4() + self.raw = BrokerMessage("FooCreated", self.data, identifier=self.identifier, reply_topic="AddOrderReply") def test_repr(self): request = BrokerRequest(self.raw) @@ -32,7 +29,7 @@ def test_eq_true(self): def test_eq_false(self): another = BrokerRequest( - BrokerMessage("FooUpdated", self.data, self.service_name, saga=self.saga, reply_topic="AddOrderReply") + BrokerMessage("FooUpdated", self.data, identifier=self.identifier, reply_topic="AddOrderReply") ) self.assertNotEqual(BrokerRequest(self.raw), another) @@ -50,13 +47,13 @@ async def test_content(self): async def test_content_single(self): request = BrokerRequest( - BrokerMessage("FooCreated", self.data[0], self.service_name, saga=self.saga, reply_topic="AddOrderReply") + BrokerMessage("FooCreated", self.data[0], identifier=self.identifier, reply_topic="AddOrderReply") ) self.assertEqual(self.data[0], await request.content()) async def test_content_simple(self): request = BrokerRequest( - BrokerMessage("FooCreated", 1234, self.service_name, saga=self.saga, reply_topic="AddOrderReply") + BrokerMessage("FooCreated", 1234, identifier=self.identifier, reply_topic="AddOrderReply") ) self.assertEqual(1234, await request.content()) diff --git a/tests/test_networks/test_brokers/test_messages.py b/tests/test_networks/test_brokers/test_messages.py index 3b2f96ca..06d3c7f6 100644 --- a/tests/test_networks/test_brokers/test_messages.py +++ b/tests/test_networks/test_brokers/test_messages.py @@ -1,5 +1,6 @@ import unittest from uuid import ( + UUID, uuid4, ) @@ -17,19 +18,17 @@ class TestBrokerMessage(unittest.TestCase): def setUp(self) -> None: self.topic = "FooCreated" self.data = [FakeModel("blue"), FakeModel("red")] - self.saga = uuid4() + self.identifier = uuid4() self.reply_topic = "AddOrderReply" self.status = BrokerMessageStatus.SUCCESS self.user = uuid4() - self.service_name = "foo" self.strategy = BrokerMessageStrategy.MULTICAST def test_constructor_simple(self): - message = BrokerMessage(self.topic, self.data, self.service_name) + message = BrokerMessage(self.topic, self.data) self.assertEqual(self.topic, message.topic) self.assertEqual(self.data, message.data) - self.assertEqual(self.service_name, message.service_name) - self.assertEqual(None, message.saga) + self.assertIsInstance(message.identifier, UUID) self.assertEqual(None, message.reply_topic) self.assertEqual(None, message.user) self.assertEqual(BrokerMessageStatus.SUCCESS, message.status) @@ -39,8 +38,7 @@ def test_constructor(self): message = BrokerMessage( self.topic, self.data, - self.service_name, - saga=self.saga, + identifier=self.identifier, reply_topic=self.reply_topic, user=self.user, status=self.status, @@ -48,29 +46,25 @@ def test_constructor(self): ) self.assertEqual(self.topic, message.topic) self.assertEqual(self.data, message.data) - self.assertEqual(self.saga, message.saga) + self.assertEqual(self.identifier, message.identifier) self.assertEqual(self.reply_topic, message.reply_topic) self.assertEqual(self.user, message.user) self.assertEqual(self.status, message.status) - self.assertEqual(self.service_name, message.service_name) self.assertEqual(self.strategy, message.strategy) def test_ok(self): - self.assertTrue(BrokerMessage(self.topic, self.data, self.service_name, status=BrokerMessageStatus.SUCCESS).ok) - self.assertFalse(BrokerMessage(self.topic, self.data, self.service_name, status=BrokerMessageStatus.ERROR).ok) - self.assertFalse( - BrokerMessage(self.topic, self.data, self.service_name, status=BrokerMessageStatus.SYSTEM_ERROR).ok - ) + self.assertTrue(BrokerMessage(self.topic, self.data, status=BrokerMessageStatus.SUCCESS).ok) + self.assertFalse(BrokerMessage(self.topic, self.data, status=BrokerMessageStatus.ERROR).ok) + self.assertFalse(BrokerMessage(self.topic, self.data, status=BrokerMessageStatus.SYSTEM_ERROR).ok) def test_avro_serialization(self): message = BrokerMessage( self.topic, self.data, - saga=self.saga, + identifier=self.identifier, reply_topic=self.reply_topic, user=self.user, status=self.status, - service_name=self.service_name, strategy=self.strategy, ) observed = BrokerMessage.from_avro_bytes(message.avro_bytes) diff --git a/tests/test_networks/test_brokers/test_publishers/test_producers.py b/tests/test_networks/test_brokers/test_publishers/test_producers.py index 39a81c3f..21784a3d 100644 --- a/tests/test_networks/test_brokers/test_publishers/test_producers.py +++ b/tests/test_networks/test_brokers/test_publishers/test_producers.py @@ -139,13 +139,15 @@ async def test_dispatch_forever_without_notify(self): async def test_concurrency_dispatcher(self): model = FakeModel("foo") - saga = uuid4() + identifier = uuid4() broker_publisher = BrokerPublisher.from_config(config=self.config) async with broker_publisher: for x in range(60): - await broker_publisher.send(model, "CommandBroker-Delete", saga=saga, reply_topic="TestDeleteReply") + await broker_publisher.send( + model, "CommandBroker-Delete", identifier=identifier, reply_topic="TestDeleteReply" + ) async with aiopg.connect(**self.broker_queue_db) as connect: async with connect.cursor() as cur: @@ -165,30 +167,26 @@ async def test_concurrency_dispatcher(self): async def test_if_commands_was_deleted(self): async with BrokerPublisher.from_config(config=self.config) as broker_publisher: - queue_id_1 = await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply") - queue_id_2 = await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply") + await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply") + await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply") await self.producer.dispatch() async with aiopg.connect(**self.broker_queue_db) as connection: async with connection.cursor() as cursor: await cursor.execute("SELECT COUNT(*) FROM producer_queue WHERE topic = '%s'" % "TestDeleteReply") - records = await cursor.fetchone() - - assert queue_id_1 > 0 - assert queue_id_2 > 0 - assert records[0] == 0 + self.assertEqual(0, (await cursor.fetchone())[0]) async def test_if_commands_retry_was_incremented(self): model = FakeModel("foo") - saga = uuid4() + identifier = uuid4() async with BrokerPublisher.from_config(config=self.config) as broker_publisher: - queue_id_1 = await broker_publisher.send( - model, "TestDeleteOrderReply", saga=saga, status=BrokerMessageStatus.SUCCESS + await broker_publisher.send( + model, "TestDeleteOrderReply", identifier=identifier, status=BrokerMessageStatus.SUCCESS ) - queue_id_2 = await broker_publisher.send( - model, "TestDeleteOrderReply", saga=saga, status=BrokerMessageStatus.SUCCESS + await broker_publisher.send( + model, "TestDeleteOrderReply", identifier=identifier, status=BrokerMessageStatus.SUCCESS ) self.producer.publish = AsyncMock(return_value=False) @@ -196,20 +194,14 @@ async def test_if_commands_retry_was_incremented(self): async with aiopg.connect(**self.broker_queue_db) as connection: async with connection.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM producer_queue WHERE topic = '%s'" % "TestDeleteOrderReply") - records = await cursor.fetchone() - - await cursor.execute("SELECT retry FROM producer_queue WHERE id=%d;" % queue_id_1) - retry_1 = await cursor.fetchone() + await cursor.execute("SELECT COUNT(*) FROM producer_queue WHERE topic = 'TestDeleteOrderReply'") + self.assertEqual(2, (await cursor.fetchone())[0]) - await cursor.execute("SELECT retry FROM producer_queue WHERE id=%d;" % queue_id_2) - retry_2 = await cursor.fetchone() + await cursor.execute("SELECT retry FROM producer_queue WHERE id=1;") + self.assertEqual(1, (await cursor.fetchone())[0]) - assert queue_id_1 > 0 - assert queue_id_2 > 0 - assert records[0] == 2 - assert retry_1[0] > 0 - assert retry_2[0] > 0 + await cursor.execute("SELECT retry FROM producer_queue WHERE id=2;") + self.assertEqual(1, (await cursor.fetchone())[0]) async def _notify(self, name): await sleep(0.2) diff --git a/tests/test_networks/test_brokers/test_publishers/test_publishers.py b/tests/test_networks/test_brokers/test_publishers/test_publishers.py index 628ef2b7..3f3390df 100644 --- a/tests/test_networks/test_brokers/test_publishers/test_publishers.py +++ b/tests/test_networks/test_brokers/test_publishers/test_publishers.py @@ -4,6 +4,7 @@ call, ) from uuid import ( + UUID, uuid4, ) @@ -48,70 +49,79 @@ def test_from_config_default(self): self.assertIsInstance(BrokerPublisher.from_config(config=self.config), BrokerPublisher) async def test_send(self): - mock = AsyncMock(return_value=56) + mock = AsyncMock() self.publisher.enqueue = mock - identifier = await self.publisher.send(FakeModel("Foo"), topic="fake") + observed = await self.publisher.send(FakeModel("Foo"), topic="fake") - self.assertEqual(56, identifier) + self.assertIsInstance(observed, UUID) self.assertEqual(1, mock.call_count) args = mock.call_args.args self.assertEqual("fake", args[0]) self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) - expected = BrokerMessage("fake", FakeModel("Foo"), service_name="Order") + expected = BrokerMessage("fake", FakeModel("Foo"), identifier=observed) self.assertEqual(expected, Model.from_avro_bytes(args[2])) - async def test_send_with_reply_topic(self): - mock = AsyncMock(return_value=56) + async def test_send_with_identifier(self): + mock = AsyncMock() self.publisher.enqueue = mock - saga = uuid4() + identifier = uuid4() + observed = await self.publisher.send(FakeModel("Foo"), topic="fake", identifier=identifier) + + self.assertEqual(identifier, observed) + self.assertEqual(1, mock.call_count) + + args = mock.call_args.args + self.assertEqual("fake", args[0]) + self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) - identifier = await self.publisher.send(FakeModel("foo"), "fake", saga=saga, reply_topic="ekaf") + expected = BrokerMessage("fake", FakeModel("Foo"), identifier=identifier) + self.assertEqual(expected, Model.from_avro_bytes(args[2])) - self.assertEqual(56, identifier) + async def test_send_with_reply_topic(self): + mock = AsyncMock() + self.publisher.enqueue = mock + + observed = await self.publisher.send(FakeModel("foo"), "fake", reply_topic="ekaf") + + self.assertIsInstance(observed, UUID) self.assertEqual(1, mock.call_count) args = mock.call_args.args self.assertEqual("fake", args[0]) self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) - expected = BrokerMessage("fake", FakeModel("foo"), saga=saga, reply_topic="ekaf", service_name="Order") + expected = BrokerMessage("fake", FakeModel("foo"), identifier=observed, reply_topic="ekaf") self.assertEqual(expected, Model.from_avro_bytes(args[2])) async def test_send_with_user(self): - mock = AsyncMock(return_value=56) + mock = AsyncMock() self.publisher.enqueue = mock - saga = uuid4() user = uuid4() - identifier = await self.publisher.send(FakeModel("foo"), "fake", saga=saga, reply_topic="ekaf", user=user) + observed = await self.publisher.send(FakeModel("foo"), "fake", reply_topic="ekaf", user=user) - self.assertEqual(56, identifier) + self.assertIsInstance(observed, UUID) self.assertEqual(1, mock.call_count) args = mock.call_args.args self.assertEqual("fake", args[0]) self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) - expected = BrokerMessage( - "fake", FakeModel("foo"), saga=saga, reply_topic="ekaf", user=user, service_name="Order" - ) + expected = BrokerMessage("fake", FakeModel("foo"), identifier=observed, reply_topic="ekaf", user=user) self.assertEqual(expected, Model.from_avro_bytes(args[2])) async def test_send_with_status(self): - mock = AsyncMock(return_value=56) + mock = AsyncMock() self.publisher.enqueue = mock - saga = uuid4() reply_topic = "fakeReply" - identifier = await self.publisher.send( - FakeModel("foo"), saga=saga, topic=reply_topic, status=BrokerMessageStatus.SUCCESS - ) + observed = await self.publisher.send(FakeModel("foo"), topic=reply_topic, status=BrokerMessageStatus.SUCCESS) - self.assertEqual(56, identifier) + self.assertIsInstance(observed, UUID) self.assertEqual(1, mock.call_count) args = mock.call_args.args @@ -119,24 +129,20 @@ async def test_send_with_status(self): self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) expected = BrokerMessage( - reply_topic, - FakeModel("foo"), - saga=saga, - status=BrokerMessageStatus.SUCCESS, - service_name=self.config.service.name, + reply_topic, FakeModel("foo"), identifier=observed, status=BrokerMessageStatus.SUCCESS, ) observed = Model.from_avro_bytes(args[2]) self.assertEqual(expected, observed) async def test_send_with_multicast_strategy(self): - mock = AsyncMock(return_value=56) + mock = AsyncMock() self.publisher.enqueue = mock topic = "fakeReply" - identifier = await self.publisher.send(FakeModel("foo"), topic=topic, strategy=BrokerMessageStrategy.MULTICAST) + observed = await self.publisher.send(FakeModel("foo"), topic=topic, strategy=BrokerMessageStrategy.MULTICAST) - self.assertEqual(56, identifier) + self.assertIsInstance(observed, UUID) self.assertEqual(1, mock.call_count) args = mock.call_args.args @@ -144,7 +150,7 @@ async def test_send_with_multicast_strategy(self): self.assertEqual(BrokerMessageStrategy.MULTICAST, args[1]) expected = BrokerMessage( - topic, FakeModel("foo"), service_name=self.config.service.name, strategy=BrokerMessageStrategy.MULTICAST, + topic, FakeModel("foo"), identifier=observed, strategy=BrokerMessageStrategy.MULTICAST, ) observed = Model.from_avro_bytes(args[2]) self.assertEqual(expected, observed) @@ -155,9 +161,9 @@ async def test_enqueue(self): mock = AsyncMock(return_value=(56,)) self.publisher.submit_query_and_fetchone = mock - identifier = await self.publisher.enqueue("test_topic", BrokerMessageStrategy.UNICAST, b"test") + observed = await self.publisher.enqueue("test_topic", BrokerMessageStrategy.UNICAST, b"test") - self.assertEqual(56, identifier) + self.assertEqual(56, observed) self.assertEqual(1, mock.call_count) self.assertEqual(call(query, ("test_topic", b"test", BrokerMessageStrategy.UNICAST)), mock.call_args) diff --git a/tests/test_networks/test_rest/test_handlers.py b/tests/test_networks/test_rest/test_handlers.py index ab44c20f..87a9d15a 100644 --- a/tests/test_networks/test_rest/test_handlers.py +++ b/tests/test_networks/test_rest/test_handlers.py @@ -18,7 +18,7 @@ PostgresAsyncTestCase, ) from minos.networks import ( - USER_CONTEXT_VAR, + REQUEST_USER_CONTEXT_VAR, Request, Response, RestHandler, @@ -98,7 +98,7 @@ async def test_get_callback_with_user(self): async def _fn(request) -> None: self.assertEqual(user, request.user) - self.assertEqual(user, USER_CONTEXT_VAR.get()) + self.assertEqual(user, REQUEST_USER_CONTEXT_VAR.get()) mock = AsyncMock(side_effect=_fn)