From 6f7b0648feb7f66d83ed605dd69215f2cd966f6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Mon, 29 Nov 2021 10:49:30 +0100 Subject: [PATCH 1/7] ISSUE #457 * Add `headers` field to `BrokerMessage`. --- minos/networks/__init__.py | 1 + minos/networks/brokers/__init__.py | 1 + minos/networks/brokers/handlers/handlers.py | 21 +++++++++++-------- minos/networks/brokers/messages.py | 7 ++++++- .../networks/brokers/publishers/publishers.py | 3 +++ .../test_handlers/test_handlers.py | 9 ++++---- 6 files changed, 28 insertions(+), 14 deletions(-) diff --git a/minos/networks/__init__.py b/minos/networks/__init__.py index f5011386..787c5565 100644 --- a/minos/networks/__init__.py +++ b/minos/networks/__init__.py @@ -3,6 +3,7 @@ __version__ = "0.3.0" from .brokers import ( + HEADERS_CONTEXT_VAR, REPLY_TOPIC_CONTEXT_VAR, BrokerConsumer, BrokerConsumerService, diff --git a/minos/networks/brokers/__init__.py b/minos/networks/brokers/__init__.py index c0d438b0..1b804ae5 100644 --- a/minos/networks/brokers/__init__.py +++ b/minos/networks/brokers/__init__.py @@ -14,6 +14,7 @@ BrokerResponseException, ) from .messages import ( + HEADERS_CONTEXT_VAR, REPLY_TOPIC_CONTEXT_VAR, BrokerMessage, BrokerMessageStatus, diff --git a/minos/networks/brokers/handlers/handlers.py b/minos/networks/brokers/handlers/handlers.py index 5d99cc38..e550fdd2 100644 --- a/minos/networks/brokers/handlers/handlers.py +++ b/minos/networks/brokers/handlers/handlers.py @@ -60,6 +60,7 @@ consume_queue, ) from ..messages import ( + HEADERS_CONTEXT_VAR, BrokerMessage, BrokerMessageStatus, ) @@ -304,26 +305,27 @@ async def dispatch_one(self, entry: BrokerHandlerEntry) -> None: fn = self.get_callback(entry.callback) message = entry.data - data, status = await fn(message) + data, status, headers = await fn(message) if message.reply_topic is not None: await self.publisher.send( - data, topic=message.reply_topic, saga=message.saga, status=status, user=message.user + data, topic=message.reply_topic, saga=message.saga, status=status, user=message.user, headers=headers, ) @staticmethod def get_callback( fn: Callable[[BrokerRequest], Union[Optional[BrokerRequest], Awaitable[Optional[BrokerRequest]]]] - ) -> Callable[[BrokerMessage], Awaitable[tuple[Any, BrokerMessageStatus]]]: + ) -> Callable[[BrokerMessage], Awaitable[tuple[Any, BrokerMessageStatus, dict[str, str]]]]: """Get the handler function to be used by the Broker Handler. :param fn: The action function. :return: A wrapper function around the given one that is compatible with the Broker Handler API. """ - async def _fn(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus]: + async def _fn(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus, dict[str, str]]: request = BrokerRequest(raw) - token = USER_CONTEXT_VAR.set(request.user) + user_token = USER_CONTEXT_VAR.set(request.user) + headers_token = HEADERS_CONTEXT_VAR.set(raw.headers) try: response = fn(request) @@ -331,15 +333,16 @@ async def _fn(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus]: response = await response if isinstance(response, Response): response = await response.content() - return response, BrokerMessageStatus.SUCCESS + return response, BrokerMessageStatus.SUCCESS, HEADERS_CONTEXT_VAR.get() except ResponseException as exc: logger.warning(f"Raised an application exception: {exc!s}") - return repr(exc), BrokerMessageStatus.ERROR + return repr(exc), BrokerMessageStatus.ERROR, HEADERS_CONTEXT_VAR.get() except Exception as exc: logger.exception(f"Raised a system exception: {exc!r}") - return repr(exc), BrokerMessageStatus.SYSTEM_ERROR + return repr(exc), BrokerMessageStatus.SYSTEM_ERROR, HEADERS_CONTEXT_VAR.get() finally: - USER_CONTEXT_VAR.reset(token) + USER_CONTEXT_VAR.reset(user_token) + HEADERS_CONTEXT_VAR.reset(headers_token) return _fn diff --git a/minos/networks/brokers/messages.py b/minos/networks/brokers/messages.py index e9487d0f..0c3ddbfd 100644 --- a/minos/networks/brokers/messages.py +++ b/minos/networks/brokers/messages.py @@ -23,6 +23,7 @@ ) REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None) +HEADERS_CONTEXT_VAR: Final[ContextVar[Optional[dict[str, str]]]] = ContextVar("headers", default=None) class BrokerMessage(DeclarativeModel): @@ -36,6 +37,7 @@ class BrokerMessage(DeclarativeModel): user: Optional[UUID] status: BrokerMessageStatus strategy: BrokerMessageStrategy + headers: dict[str, str] def __init__( self, @@ -45,13 +47,16 @@ def __init__( *, status: Optional[BrokerMessageStatus] = None, strategy: Optional[BrokerMessageStrategy] = None, + headers: Optional[dict[str, str]] = None, **kwargs ): if status is None: status = BrokerMessageStatus.SUCCESS if strategy is None: strategy = BrokerMessageStrategy.UNICAST - super().__init__(topic, data, service_name, status=status, strategy=strategy, **kwargs) + if headers is None: + headers = dict() + super().__init__(topic, data, service_name, status=status, strategy=strategy, headers=headers, **kwargs) @property def ok(self) -> bool: diff --git a/minos/networks/brokers/publishers/publishers.py b/minos/networks/brokers/publishers/publishers.py index e7c25970..969eeb6f 100644 --- a/minos/networks/brokers/publishers/publishers.py +++ b/minos/networks/brokers/publishers/publishers.py @@ -57,6 +57,7 @@ async def send( user: Optional[UUID] = None, status: BrokerMessageStatus = BrokerMessageStatus.SUCCESS, strategy: BrokerMessageStrategy = BrokerMessageStrategy.UNICAST, + headers: Optional[dict[str, str]] = None, **kwargs, ) -> int: """Send a ``BrokerMessage``. @@ -68,6 +69,7 @@ async def send( :param user: The user identifier that send the message. :param status: The status code of the message. :param strategy: The publishing strategy. + :param headers: TODO :param kwargs: Additional named arguments. :return: This method does not return anything. """ @@ -81,6 +83,7 @@ async def send( user=user, service_name=self.service_name, strategy=strategy, + headers=headers, ) logger.info(f"Publishing '{message!s}'...") return await self.enqueue(message.topic, message.strategy, message.avro_bytes) diff --git a/tests/test_networks/test_brokers/test_handlers/test_handlers.py b/tests/test_networks/test_brokers/test_handlers/test_handlers.py index eba4f4dd..f4dad723 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_handlers.py +++ b/tests/test_networks/test_brokers/test_handlers/test_handlers.py @@ -222,6 +222,7 @@ async def test_dispatch(self): saga=self.message.saga, status=BrokerMessageStatus.SUCCESS, user=self.user, + headers=dict(), ) ], send_mock.call_args_list, @@ -288,20 +289,20 @@ async def test_dispatch_one(self): async def test_get_callback(self): fn = self.handler.get_callback(_Cls._fn) - self.assertEqual((FakeModel("foo"), BrokerMessageStatus.SUCCESS), await fn(self.message)) + self.assertEqual((FakeModel("foo"), BrokerMessageStatus.SUCCESS, dict()), await fn(self.message)) async def test_get_callback_none(self): fn = self.handler.get_callback(_Cls._fn_none) - self.assertEqual((None, BrokerMessageStatus.SUCCESS), await fn(self.message)) + self.assertEqual((None, BrokerMessageStatus.SUCCESS, dict()), await fn(self.message)) async def test_get_callback_raises_response(self): fn = self.handler.get_callback(_Cls._fn_raises_response) - expected = (repr(BrokerResponseException("foo")), BrokerMessageStatus.ERROR) + expected = (repr(BrokerResponseException("foo")), BrokerMessageStatus.ERROR, dict()) self.assertEqual(expected, await fn(self.message)) async def test_get_callback_raises_exception(self): fn = self.handler.get_callback(_Cls._fn_raises_exception) - expected = (repr(ValueError()), BrokerMessageStatus.SYSTEM_ERROR) + expected = (repr(ValueError()), BrokerMessageStatus.SYSTEM_ERROR, dict()) self.assertEqual(expected, await fn(self.message)) async def test_get_callback_with_user(self): From f1b8488cd569a7a1566e511b95dd8c5cf32e51bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Mon, 29 Nov 2021 15:19:05 +0100 Subject: [PATCH 2/7] ISSUE #457 * Rename `saga` as `identifier`. --- minos/networks/brokers/dynamic/brokers.py | 9 ++- minos/networks/brokers/handlers/handlers.py | 7 +- minos/networks/brokers/messages.py | 17 ++++- .../networks/brokers/publishers/publishers.py | 15 ++-- .../test_handlers/test_entries.py | 9 ++- .../test_handlers/test_handlers.py | 15 ++-- .../test_handlers/test_requests.py | 16 ++-- .../test_brokers/test_messages.py | 11 +-- .../test_publishers/test_producers.py | 44 +++++------ .../test_publishers/test_publishers.py | 74 ++++++++++++------- 10 files changed, 133 insertions(+), 84 deletions(-) diff --git a/minos/networks/brokers/dynamic/brokers.py b/minos/networks/brokers/dynamic/brokers.py index 81f92967..881e6517 100644 --- a/minos/networks/brokers/dynamic/brokers.py +++ b/minos/networks/brokers/dynamic/brokers.py @@ -10,6 +10,9 @@ from typing import ( Optional, ) +from uuid import ( + UUID, +) from aiopg import ( Cursor, @@ -84,15 +87,15 @@ async def _destroy(self) -> None: await super()._destroy() # noinspection PyUnusedLocal - async def send(self, *args, reply_topic: None = None, **kwargs) -> None: + async def send(self, *args, reply_topic: None = None, **kwargs) -> UUID: """Send a ``BrokerMessage``. :param args: Additional positional arguments. :param reply_topic: This argument is ignored if ignored in favor of ``self.topic``. :param kwargs: Additional named arguments. - :return: This method does not return anything. + :return: The ``UUID`` identifier of the message. """ - await self.publisher.send(*args, reply_topic=self.topic, **kwargs) + return await self.publisher.send(*args, reply_topic=self.topic, **kwargs) async def get_one(self, *args, **kwargs) -> BrokerHandlerEntry: """Get one handler entry from the given topics. diff --git a/minos/networks/brokers/handlers/handlers.py b/minos/networks/brokers/handlers/handlers.py index e550fdd2..07aa54a9 100644 --- a/minos/networks/brokers/handlers/handlers.py +++ b/minos/networks/brokers/handlers/handlers.py @@ -309,7 +309,12 @@ async def dispatch_one(self, entry: BrokerHandlerEntry) -> None: if message.reply_topic is not None: await self.publisher.send( - data, topic=message.reply_topic, saga=message.saga, status=status, user=message.user, headers=headers, + data, + topic=message.reply_topic, + identifier=message.identifier, + status=status, + user=message.user, + headers=headers, ) @staticmethod diff --git a/minos/networks/brokers/messages.py b/minos/networks/brokers/messages.py index 0c3ddbfd..df95f4ff 100644 --- a/minos/networks/brokers/messages.py +++ b/minos/networks/brokers/messages.py @@ -16,6 +16,7 @@ ) from uuid import ( UUID, + uuid4, ) from minos.common import ( @@ -32,7 +33,7 @@ class BrokerMessage(DeclarativeModel): topic: str data: Any service_name: str - saga: Optional[UUID] + identifier: UUID reply_topic: Optional[str] user: Optional[UUID] status: BrokerMessageStatus @@ -45,18 +46,30 @@ def __init__( data: Any, service_name: str, *, + identifier: Optional[UUID] = None, status: Optional[BrokerMessageStatus] = None, strategy: Optional[BrokerMessageStrategy] = None, headers: Optional[dict[str, str]] = None, **kwargs ): + if identifier is None: + identifier = uuid4() if status is None: status = BrokerMessageStatus.SUCCESS if strategy is None: strategy = BrokerMessageStrategy.UNICAST if headers is None: headers = dict() - super().__init__(topic, data, service_name, status=status, strategy=strategy, headers=headers, **kwargs) + super().__init__( + topic=topic, + data=data, + service_name=service_name, + identifier=identifier, + status=status, + strategy=strategy, + headers=headers, + **kwargs + ) @property def ok(self) -> bool: diff --git a/minos/networks/brokers/publishers/publishers.py b/minos/networks/brokers/publishers/publishers.py index 969eeb6f..047075c0 100644 --- a/minos/networks/brokers/publishers/publishers.py +++ b/minos/networks/brokers/publishers/publishers.py @@ -52,32 +52,32 @@ async def send( data: Any, topic: str, *, - saga: Optional[UUID] = None, + identifier: Optional[UUID] = None, reply_topic: Optional[str] = None, user: Optional[UUID] = None, status: BrokerMessageStatus = BrokerMessageStatus.SUCCESS, strategy: BrokerMessageStrategy = BrokerMessageStrategy.UNICAST, headers: Optional[dict[str, str]] = None, **kwargs, - ) -> int: + ) -> UUID: """Send a ``BrokerMessage``. :param data: The data to be send. :param topic: Topic in which the message will be published. - :param saga: Saga identifier. + :param identifier: The identifier of the message. :param reply_topic: An optional topic name to wait for a response. :param user: The user identifier that send the message. :param status: The status code of the message. :param strategy: The publishing strategy. - :param headers: TODO + :param headers: A mapping of string values identified by a string key. :param kwargs: Additional named arguments. - :return: This method does not return anything. + :return: The ``UUID`` identifier of the message. """ message = BrokerMessage( topic=topic, data=data, - saga=saga, + identifier=identifier, status=status, reply_topic=reply_topic, user=user, @@ -86,7 +86,8 @@ async def send( headers=headers, ) logger.info(f"Publishing '{message!s}'...") - return await self.enqueue(message.topic, message.strategy, message.avro_bytes) + await self.enqueue(message.topic, message.strategy, message.avro_bytes) + return message.identifier async def enqueue(self, topic: str, strategy: BrokerMessageStrategy, raw: bytes) -> int: """Send a sequence of bytes to the given topic. diff --git a/tests/test_networks/test_brokers/test_handlers/test_entries.py b/tests/test_networks/test_brokers/test_handlers/test_entries.py index dae7a44e..a2091fbb 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_entries.py +++ b/tests/test_networks/test_brokers/test_handlers/test_entries.py @@ -17,12 +17,17 @@ class TestHandlerEntry(unittest.IsolatedAsyncioTestCase): def setUp(self) -> None: - self.saga = uuid4() + self.identifier = uuid4() self.user = uuid4() self.service_name = "foo" self.message = BrokerMessage( - "AddOrder", FakeModel("foo"), self.service_name, saga=self.saga, user=self.user, reply_topic="UpdateTicket", + "AddOrder", + FakeModel("foo"), + self.service_name, + identifier=self.identifier, + user=self.user, + reply_topic="UpdateTicket", ) def test_constructor(self): diff --git a/tests/test_networks/test_brokers/test_handlers/test_handlers.py b/tests/test_networks/test_brokers/test_handlers/test_handlers.py index f4dad723..7b26e289 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_handlers.py +++ b/tests/test_networks/test_brokers/test_handlers/test_handlers.py @@ -80,12 +80,17 @@ def setUp(self) -> None: self.publisher = BrokerPublisher.from_config(self.config) self.handler = BrokerHandler.from_config(self.config, publisher=self.publisher) - self.saga = uuid4() + self.identifier = uuid4() self.user = uuid4() self.service_name = self.config.service.name self.message = BrokerMessage( - "AddOrder", FakeModel("foo"), self.service_name, saga=self.saga, user=self.user, reply_topic="UpdateTicket", + "AddOrder", + FakeModel("foo"), + self.service_name, + identifier=self.identifier, + user=self.user, + reply_topic="UpdateTicket", ) async def asyncSetUp(self): @@ -219,7 +224,7 @@ async def test_dispatch(self): call( "add_order", topic="UpdateTicket", - saga=self.message.saga, + identifier=self.message.identifier, status=BrokerMessageStatus.SUCCESS, user=self.user, headers=dict(), @@ -248,10 +253,10 @@ async def test_dispatch_concurrent(self): FakeModel, ) - saga = uuid4() + identifier = uuid4() instance = BrokerMessage( - "AddOrder", [FakeModel("foo")], self.service_name, saga=saga, reply_topic="UpdateTicket" + "AddOrder", [FakeModel("foo")], self.service_name, identifier=identifier, reply_topic="UpdateTicket" ) instance_wrong = namedtuple("FakeCommand", ("topic", "avro_bytes"))("AddOrder", bytes(b"Test")) diff --git a/tests/test_networks/test_brokers/test_handlers/test_requests.py b/tests/test_networks/test_brokers/test_handlers/test_requests.py index 7f73781a..d1ec62f7 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_requests.py +++ b/tests/test_networks/test_brokers/test_handlers/test_requests.py @@ -16,10 +16,10 @@ class TestHandlerRequest(unittest.IsolatedAsyncioTestCase): def setUp(self) -> None: self.data = [FakeModel("foo"), FakeModel("bar")] - self.saga = uuid4() + self.identifier = uuid4() self.service_name = "foo" self.raw = BrokerMessage( - "FooCreated", self.data, self.service_name, saga=self.saga, reply_topic="AddOrderReply" + "FooCreated", self.data, self.service_name, identifier=self.identifier, reply_topic="AddOrderReply" ) def test_repr(self): @@ -32,7 +32,9 @@ def test_eq_true(self): def test_eq_false(self): another = BrokerRequest( - BrokerMessage("FooUpdated", self.data, self.service_name, saga=self.saga, reply_topic="AddOrderReply") + BrokerMessage( + "FooUpdated", self.data, self.service_name, identifier=self.identifier, reply_topic="AddOrderReply" + ) ) self.assertNotEqual(BrokerRequest(self.raw), another) @@ -50,13 +52,17 @@ async def test_content(self): async def test_content_single(self): request = BrokerRequest( - BrokerMessage("FooCreated", self.data[0], self.service_name, saga=self.saga, reply_topic="AddOrderReply") + BrokerMessage( + "FooCreated", self.data[0], self.service_name, identifier=self.identifier, reply_topic="AddOrderReply" + ) ) self.assertEqual(self.data[0], await request.content()) async def test_content_simple(self): request = BrokerRequest( - BrokerMessage("FooCreated", 1234, self.service_name, saga=self.saga, reply_topic="AddOrderReply") + BrokerMessage( + "FooCreated", 1234, self.service_name, identifier=self.identifier, reply_topic="AddOrderReply" + ) ) self.assertEqual(1234, await request.content()) diff --git a/tests/test_networks/test_brokers/test_messages.py b/tests/test_networks/test_brokers/test_messages.py index 3b2f96ca..0915dbf6 100644 --- a/tests/test_networks/test_brokers/test_messages.py +++ b/tests/test_networks/test_brokers/test_messages.py @@ -1,5 +1,6 @@ import unittest from uuid import ( + UUID, uuid4, ) @@ -17,7 +18,7 @@ class TestBrokerMessage(unittest.TestCase): def setUp(self) -> None: self.topic = "FooCreated" self.data = [FakeModel("blue"), FakeModel("red")] - self.saga = uuid4() + self.identifier = uuid4() self.reply_topic = "AddOrderReply" self.status = BrokerMessageStatus.SUCCESS self.user = uuid4() @@ -29,7 +30,7 @@ def test_constructor_simple(self): self.assertEqual(self.topic, message.topic) self.assertEqual(self.data, message.data) self.assertEqual(self.service_name, message.service_name) - self.assertEqual(None, message.saga) + self.assertIsInstance(message.identifier, UUID) self.assertEqual(None, message.reply_topic) self.assertEqual(None, message.user) self.assertEqual(BrokerMessageStatus.SUCCESS, message.status) @@ -40,7 +41,7 @@ def test_constructor(self): self.topic, self.data, self.service_name, - saga=self.saga, + identifier=self.identifier, reply_topic=self.reply_topic, user=self.user, status=self.status, @@ -48,7 +49,7 @@ def test_constructor(self): ) self.assertEqual(self.topic, message.topic) self.assertEqual(self.data, message.data) - self.assertEqual(self.saga, message.saga) + self.assertEqual(self.identifier, message.identifier) self.assertEqual(self.reply_topic, message.reply_topic) self.assertEqual(self.user, message.user) self.assertEqual(self.status, message.status) @@ -66,7 +67,7 @@ def test_avro_serialization(self): message = BrokerMessage( self.topic, self.data, - saga=self.saga, + identifier=self.identifier, reply_topic=self.reply_topic, user=self.user, status=self.status, diff --git a/tests/test_networks/test_brokers/test_publishers/test_producers.py b/tests/test_networks/test_brokers/test_publishers/test_producers.py index 39a81c3f..21784a3d 100644 --- a/tests/test_networks/test_brokers/test_publishers/test_producers.py +++ b/tests/test_networks/test_brokers/test_publishers/test_producers.py @@ -139,13 +139,15 @@ async def test_dispatch_forever_without_notify(self): async def test_concurrency_dispatcher(self): model = FakeModel("foo") - saga = uuid4() + identifier = uuid4() broker_publisher = BrokerPublisher.from_config(config=self.config) async with broker_publisher: for x in range(60): - await broker_publisher.send(model, "CommandBroker-Delete", saga=saga, reply_topic="TestDeleteReply") + await broker_publisher.send( + model, "CommandBroker-Delete", identifier=identifier, reply_topic="TestDeleteReply" + ) async with aiopg.connect(**self.broker_queue_db) as connect: async with connect.cursor() as cur: @@ -165,30 +167,26 @@ async def test_concurrency_dispatcher(self): async def test_if_commands_was_deleted(self): async with BrokerPublisher.from_config(config=self.config) as broker_publisher: - queue_id_1 = await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply") - queue_id_2 = await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply") + await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply") + await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply") await self.producer.dispatch() async with aiopg.connect(**self.broker_queue_db) as connection: async with connection.cursor() as cursor: await cursor.execute("SELECT COUNT(*) FROM producer_queue WHERE topic = '%s'" % "TestDeleteReply") - records = await cursor.fetchone() - - assert queue_id_1 > 0 - assert queue_id_2 > 0 - assert records[0] == 0 + self.assertEqual(0, (await cursor.fetchone())[0]) async def test_if_commands_retry_was_incremented(self): model = FakeModel("foo") - saga = uuid4() + identifier = uuid4() async with BrokerPublisher.from_config(config=self.config) as broker_publisher: - queue_id_1 = await broker_publisher.send( - model, "TestDeleteOrderReply", saga=saga, status=BrokerMessageStatus.SUCCESS + await broker_publisher.send( + model, "TestDeleteOrderReply", identifier=identifier, status=BrokerMessageStatus.SUCCESS ) - queue_id_2 = await broker_publisher.send( - model, "TestDeleteOrderReply", saga=saga, status=BrokerMessageStatus.SUCCESS + await broker_publisher.send( + model, "TestDeleteOrderReply", identifier=identifier, status=BrokerMessageStatus.SUCCESS ) self.producer.publish = AsyncMock(return_value=False) @@ -196,20 +194,14 @@ async def test_if_commands_retry_was_incremented(self): async with aiopg.connect(**self.broker_queue_db) as connection: async with connection.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM producer_queue WHERE topic = '%s'" % "TestDeleteOrderReply") - records = await cursor.fetchone() - - await cursor.execute("SELECT retry FROM producer_queue WHERE id=%d;" % queue_id_1) - retry_1 = await cursor.fetchone() + await cursor.execute("SELECT COUNT(*) FROM producer_queue WHERE topic = 'TestDeleteOrderReply'") + self.assertEqual(2, (await cursor.fetchone())[0]) - await cursor.execute("SELECT retry FROM producer_queue WHERE id=%d;" % queue_id_2) - retry_2 = await cursor.fetchone() + await cursor.execute("SELECT retry FROM producer_queue WHERE id=1;") + self.assertEqual(1, (await cursor.fetchone())[0]) - assert queue_id_1 > 0 - assert queue_id_2 > 0 - assert records[0] == 2 - assert retry_1[0] > 0 - assert retry_2[0] > 0 + await cursor.execute("SELECT retry FROM producer_queue WHERE id=2;") + self.assertEqual(1, (await cursor.fetchone())[0]) async def _notify(self, name): await sleep(0.2) diff --git a/tests/test_networks/test_brokers/test_publishers/test_publishers.py b/tests/test_networks/test_brokers/test_publishers/test_publishers.py index 628ef2b7..e29cdec7 100644 --- a/tests/test_networks/test_brokers/test_publishers/test_publishers.py +++ b/tests/test_networks/test_brokers/test_publishers/test_publishers.py @@ -4,6 +4,7 @@ call, ) from uuid import ( + UUID, uuid4, ) @@ -48,70 +49,83 @@ def test_from_config_default(self): self.assertIsInstance(BrokerPublisher.from_config(config=self.config), BrokerPublisher) async def test_send(self): - mock = AsyncMock(return_value=56) + mock = AsyncMock() self.publisher.enqueue = mock - identifier = await self.publisher.send(FakeModel("Foo"), topic="fake") + observed = await self.publisher.send(FakeModel("Foo"), topic="fake") - self.assertEqual(56, identifier) + self.assertIsInstance(observed, UUID) self.assertEqual(1, mock.call_count) args = mock.call_args.args self.assertEqual("fake", args[0]) self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) - expected = BrokerMessage("fake", FakeModel("Foo"), service_name="Order") + expected = BrokerMessage("fake", FakeModel("Foo"), service_name="Order", identifier=observed) self.assertEqual(expected, Model.from_avro_bytes(args[2])) - async def test_send_with_reply_topic(self): - mock = AsyncMock(return_value=56) + async def test_send_with_identifier(self): + mock = AsyncMock() self.publisher.enqueue = mock - saga = uuid4() + identifier = uuid4() + observed = await self.publisher.send(FakeModel("Foo"), topic="fake", identifier=identifier) - identifier = await self.publisher.send(FakeModel("foo"), "fake", saga=saga, reply_topic="ekaf") + self.assertEqual(identifier, observed) + self.assertEqual(1, mock.call_count) - self.assertEqual(56, identifier) + args = mock.call_args.args + self.assertEqual("fake", args[0]) + self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) + + expected = BrokerMessage("fake", FakeModel("Foo"), service_name="Order", identifier=identifier) + self.assertEqual(expected, Model.from_avro_bytes(args[2])) + + async def test_send_with_reply_topic(self): + mock = AsyncMock() + self.publisher.enqueue = mock + + observed = await self.publisher.send(FakeModel("foo"), "fake", reply_topic="ekaf") + + self.assertIsInstance(observed, UUID) self.assertEqual(1, mock.call_count) args = mock.call_args.args self.assertEqual("fake", args[0]) self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) - expected = BrokerMessage("fake", FakeModel("foo"), saga=saga, reply_topic="ekaf", service_name="Order") + expected = BrokerMessage( + "fake", FakeModel("foo"), identifier=observed, reply_topic="ekaf", service_name="Order" + ) self.assertEqual(expected, Model.from_avro_bytes(args[2])) async def test_send_with_user(self): - mock = AsyncMock(return_value=56) + mock = AsyncMock() self.publisher.enqueue = mock - saga = uuid4() user = uuid4() - identifier = await self.publisher.send(FakeModel("foo"), "fake", saga=saga, reply_topic="ekaf", user=user) + observed = await self.publisher.send(FakeModel("foo"), "fake", reply_topic="ekaf", user=user) - self.assertEqual(56, identifier) + self.assertIsInstance(observed, UUID) self.assertEqual(1, mock.call_count) args = mock.call_args.args self.assertEqual("fake", args[0]) self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) expected = BrokerMessage( - "fake", FakeModel("foo"), saga=saga, reply_topic="ekaf", user=user, service_name="Order" + "fake", FakeModel("foo"), identifier=observed, reply_topic="ekaf", user=user, service_name="Order" ) self.assertEqual(expected, Model.from_avro_bytes(args[2])) async def test_send_with_status(self): - mock = AsyncMock(return_value=56) + mock = AsyncMock() self.publisher.enqueue = mock - saga = uuid4() reply_topic = "fakeReply" - identifier = await self.publisher.send( - FakeModel("foo"), saga=saga, topic=reply_topic, status=BrokerMessageStatus.SUCCESS - ) + observed = await self.publisher.send(FakeModel("foo"), topic=reply_topic, status=BrokerMessageStatus.SUCCESS) - self.assertEqual(56, identifier) + self.assertIsInstance(observed, UUID) self.assertEqual(1, mock.call_count) args = mock.call_args.args @@ -121,7 +135,7 @@ async def test_send_with_status(self): expected = BrokerMessage( reply_topic, FakeModel("foo"), - saga=saga, + identifier=observed, status=BrokerMessageStatus.SUCCESS, service_name=self.config.service.name, ) @@ -129,14 +143,14 @@ async def test_send_with_status(self): self.assertEqual(expected, observed) async def test_send_with_multicast_strategy(self): - mock = AsyncMock(return_value=56) + mock = AsyncMock() self.publisher.enqueue = mock topic = "fakeReply" - identifier = await self.publisher.send(FakeModel("foo"), topic=topic, strategy=BrokerMessageStrategy.MULTICAST) + observed = await self.publisher.send(FakeModel("foo"), topic=topic, strategy=BrokerMessageStrategy.MULTICAST) - self.assertEqual(56, identifier) + self.assertIsInstance(observed, UUID) self.assertEqual(1, mock.call_count) args = mock.call_args.args @@ -144,7 +158,11 @@ async def test_send_with_multicast_strategy(self): self.assertEqual(BrokerMessageStrategy.MULTICAST, args[1]) expected = BrokerMessage( - topic, FakeModel("foo"), service_name=self.config.service.name, strategy=BrokerMessageStrategy.MULTICAST, + topic, + FakeModel("foo"), + identifier=observed, + service_name=self.config.service.name, + strategy=BrokerMessageStrategy.MULTICAST, ) observed = Model.from_avro_bytes(args[2]) self.assertEqual(expected, observed) @@ -155,9 +173,9 @@ async def test_enqueue(self): mock = AsyncMock(return_value=(56,)) self.publisher.submit_query_and_fetchone = mock - identifier = await self.publisher.enqueue("test_topic", BrokerMessageStrategy.UNICAST, b"test") + observed = await self.publisher.enqueue("test_topic", BrokerMessageStrategy.UNICAST, b"test") - self.assertEqual(56, identifier) + self.assertEqual(56, observed) self.assertEqual(1, mock.call_count) self.assertEqual(call(query, ("test_topic", b"test", BrokerMessageStrategy.UNICAST)), mock.call_args) From 6f16aae7714475f813acaa5b408de9f6a8807682 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Mon, 29 Nov 2021 15:22:32 +0100 Subject: [PATCH 3/7] ISSUE #457 * Minor improvement. --- minos/networks/brokers/publishers/publishers.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/minos/networks/brokers/publishers/publishers.py b/minos/networks/brokers/publishers/publishers.py index 047075c0..906a0cd6 100644 --- a/minos/networks/brokers/publishers/publishers.py +++ b/minos/networks/brokers/publishers/publishers.py @@ -3,9 +3,6 @@ ) import logging -from abc import ( - ABC, -) from typing import ( Any, Optional, @@ -34,7 +31,7 @@ logger = logging.getLogger(__name__) -class BrokerPublisher(BrokerPublisherSetup, ABC): +class BrokerPublisher(BrokerPublisherSetup): """Broker Publisher class.""" def __init__(self, *args, service_name: str, **kwargs): From 708dc172b509bcbf55ecc129ff90c29499db3e08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Mon, 29 Nov 2021 15:48:27 +0100 Subject: [PATCH 4/7] ISSUE #457 * Rename `USER_CONTEXT_VAR` as `REQUEST_USER_CONTEXT_VAR`. * Rename `REPLY_TOPIC_CONTEXT_VAR` as `REQUEST_REPLY_TOPIC_CONTEXT_VAR`. * Rename `HEADERS_CONTEXT_VAR` as `REQUEST_HEADERS_CONTEXT_VAR`. --- minos/networks/__init__.py | 6 +++--- minos/networks/brokers/__init__.py | 4 ++-- minos/networks/brokers/dynamic/pools.py | 6 +++--- minos/networks/brokers/handlers/handlers.py | 18 +++++++++--------- minos/networks/brokers/messages.py | 4 ++-- minos/networks/requests.py | 3 +-- minos/networks/rest/handlers.py | 6 +++--- .../test_brokers/test_dynamic/test_pools.py | 8 ++++---- .../test_handlers/test_handlers.py | 4 ++-- tests/test_networks/test_rest/test_handlers.py | 4 ++-- 10 files changed, 31 insertions(+), 32 deletions(-) diff --git a/minos/networks/__init__.py b/minos/networks/__init__.py index 787c5565..660b1040 100644 --- a/minos/networks/__init__.py +++ b/minos/networks/__init__.py @@ -3,8 +3,8 @@ __version__ = "0.3.0" from .brokers import ( - HEADERS_CONTEXT_VAR, - REPLY_TOPIC_CONTEXT_VAR, + REQUEST_HEADERS_CONTEXT_VAR, + REQUEST_REPLY_TOPIC_CONTEXT_VAR, BrokerConsumer, BrokerConsumerService, BrokerHandler, @@ -57,7 +57,7 @@ MinosRedefinedEnrouteDecoratorException, ) from .requests import ( - USER_CONTEXT_VAR, + REQUEST_USER_CONTEXT_VAR, Request, Response, ResponseException, diff --git a/minos/networks/brokers/__init__.py b/minos/networks/brokers/__init__.py index 1b804ae5..c23cb10d 100644 --- a/minos/networks/brokers/__init__.py +++ b/minos/networks/brokers/__init__.py @@ -14,8 +14,8 @@ BrokerResponseException, ) from .messages import ( - HEADERS_CONTEXT_VAR, - REPLY_TOPIC_CONTEXT_VAR, + REQUEST_HEADERS_CONTEXT_VAR, + REQUEST_REPLY_TOPIC_CONTEXT_VAR, BrokerMessage, BrokerMessageStatus, BrokerMessageStrategy, diff --git a/minos/networks/brokers/dynamic/pools.py b/minos/networks/brokers/dynamic/pools.py index 8e8cd7e9..3581c380 100644 --- a/minos/networks/brokers/dynamic/pools.py +++ b/minos/networks/brokers/dynamic/pools.py @@ -35,7 +35,7 @@ BrokerConsumer, ) from ..messages import ( - REPLY_TOPIC_CONTEXT_VAR, + REQUEST_REPLY_TOPIC_CONTEXT_VAR, ) from ..publishers import ( BrokerPublisher, @@ -150,9 +150,9 @@ def __init__(self, wrapper: AsyncContextManager[DynamicBroker]): async def __aenter__(self) -> DynamicBroker: handler = await self.wrapper.__aenter__() - self._token = REPLY_TOPIC_CONTEXT_VAR.set(handler.topic) + self._token = REQUEST_REPLY_TOPIC_CONTEXT_VAR.set(handler.topic) return handler async def __aexit__(self, exc_type, exc_val, exc_tb): - REPLY_TOPIC_CONTEXT_VAR.reset(self._token) + REQUEST_REPLY_TOPIC_CONTEXT_VAR.reset(self._token) await self.wrapper.__aexit__(exc_type, exc_val, exc_tb) diff --git a/minos/networks/brokers/handlers/handlers.py b/minos/networks/brokers/handlers/handlers.py index 07aa54a9..6989e350 100644 --- a/minos/networks/brokers/handlers/handlers.py +++ b/minos/networks/brokers/handlers/handlers.py @@ -52,7 +52,7 @@ MinosActionNotFoundException, ) from ...requests import ( - USER_CONTEXT_VAR, + REQUEST_USER_CONTEXT_VAR, Response, ResponseException, ) @@ -60,7 +60,7 @@ consume_queue, ) from ..messages import ( - HEADERS_CONTEXT_VAR, + REQUEST_HEADERS_CONTEXT_VAR, BrokerMessage, BrokerMessageStatus, ) @@ -329,8 +329,8 @@ def get_callback( async def _fn(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus, dict[str, str]]: request = BrokerRequest(raw) - user_token = USER_CONTEXT_VAR.set(request.user) - headers_token = HEADERS_CONTEXT_VAR.set(raw.headers) + user_token = REQUEST_USER_CONTEXT_VAR.set(request.user) + headers_token = REQUEST_HEADERS_CONTEXT_VAR.set(raw.headers) try: response = fn(request) @@ -338,16 +338,16 @@ async def _fn(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus, dict[str, s response = await response if isinstance(response, Response): response = await response.content() - return response, BrokerMessageStatus.SUCCESS, HEADERS_CONTEXT_VAR.get() + return response, BrokerMessageStatus.SUCCESS, REQUEST_HEADERS_CONTEXT_VAR.get() except ResponseException as exc: logger.warning(f"Raised an application exception: {exc!s}") - return repr(exc), BrokerMessageStatus.ERROR, HEADERS_CONTEXT_VAR.get() + return repr(exc), BrokerMessageStatus.ERROR, REQUEST_HEADERS_CONTEXT_VAR.get() except Exception as exc: logger.exception(f"Raised a system exception: {exc!r}") - return repr(exc), BrokerMessageStatus.SYSTEM_ERROR, HEADERS_CONTEXT_VAR.get() + return repr(exc), BrokerMessageStatus.SYSTEM_ERROR, REQUEST_HEADERS_CONTEXT_VAR.get() finally: - USER_CONTEXT_VAR.reset(user_token) - HEADERS_CONTEXT_VAR.reset(headers_token) + REQUEST_USER_CONTEXT_VAR.reset(user_token) + REQUEST_HEADERS_CONTEXT_VAR.reset(headers_token) return _fn diff --git a/minos/networks/brokers/messages.py b/minos/networks/brokers/messages.py index df95f4ff..3145e253 100644 --- a/minos/networks/brokers/messages.py +++ b/minos/networks/brokers/messages.py @@ -23,8 +23,8 @@ DeclarativeModel, ) -REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None) -HEADERS_CONTEXT_VAR: Final[ContextVar[Optional[dict[str, str]]]] = ContextVar("headers", default=None) +REQUEST_REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None) +REQUEST_HEADERS_CONTEXT_VAR: Final[ContextVar[Optional[dict[str, str]]]] = ContextVar("headers", default=None) class BrokerMessage(DeclarativeModel): diff --git a/minos/networks/requests.py b/minos/networks/requests.py index f52b8046..c74c92bf 100644 --- a/minos/networks/requests.py +++ b/minos/networks/requests.py @@ -30,8 +30,7 @@ MinosException, ) -USER_CONTEXT_VAR: Final[ContextVar[Optional[UUID]]] = ContextVar("user", default=None) -USER_CONTEXT_VAR.set(None) # needed to "register" the context variable. +REQUEST_USER_CONTEXT_VAR: Final[ContextVar[Optional[UUID]]] = ContextVar("user", default=None) class Request(ABC): diff --git a/minos/networks/rest/handlers.py b/minos/networks/rest/handlers.py index de9f61b5..5aa26179 100644 --- a/minos/networks/rest/handlers.py +++ b/minos/networks/rest/handlers.py @@ -29,7 +29,7 @@ EnrouteBuilder, ) from ..requests import ( - USER_CONTEXT_VAR, + REQUEST_USER_CONTEXT_VAR, Response, ResponseException, ) @@ -128,7 +128,7 @@ async def _fn(request: web.Request) -> web.Response: logger.info(f"Dispatching '{request!s}' from '{request.remote!s}'...") request = RestRequest(request) - token = USER_CONTEXT_VAR.set(request.user) + token = REQUEST_USER_CONTEXT_VAR.set(request.user) try: response = fn(request) @@ -146,7 +146,7 @@ async def _fn(request: web.Request) -> web.Response: logger.exception(f"Raised a system exception: {exc!r}") raise web.HTTPInternalServerError() finally: - USER_CONTEXT_VAR.reset(token) + REQUEST_USER_CONTEXT_VAR.reset(token) return _fn diff --git a/tests/test_networks/test_brokers/test_dynamic/test_pools.py b/tests/test_networks/test_brokers/test_dynamic/test_pools.py index 1e460894..19210450 100644 --- a/tests/test_networks/test_brokers/test_dynamic/test_pools.py +++ b/tests/test_networks/test_brokers/test_dynamic/test_pools.py @@ -11,7 +11,7 @@ PostgresAsyncTestCase, ) from minos.networks import ( - REPLY_TOPIC_CONTEXT_VAR, + REQUEST_REPLY_TOPIC_CONTEXT_VAR, BrokerConsumer, BrokerPublisher, DynamicBroker, @@ -70,12 +70,12 @@ async def test_acquire(self): self.assertIn(broker.topic, self.pool.client.list_topics()) async def test_acquire_reply_topic_context_var(self): - self.assertEqual(None, REPLY_TOPIC_CONTEXT_VAR.get()) + self.assertEqual(None, REQUEST_REPLY_TOPIC_CONTEXT_VAR.get()) async with self.pool.acquire() as broker: - self.assertEqual(broker.topic, REPLY_TOPIC_CONTEXT_VAR.get()) + self.assertEqual(broker.topic, REQUEST_REPLY_TOPIC_CONTEXT_VAR.get()) - self.assertEqual(None, REPLY_TOPIC_CONTEXT_VAR.get()) + self.assertEqual(None, REQUEST_REPLY_TOPIC_CONTEXT_VAR.get()) if __name__ == "__main__": diff --git a/tests/test_networks/test_brokers/test_handlers/test_handlers.py b/tests/test_networks/test_brokers/test_handlers/test_handlers.py index 7b26e289..a1bbfa95 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_handlers.py +++ b/tests/test_networks/test_brokers/test_handlers/test_handlers.py @@ -33,7 +33,7 @@ PostgresAsyncTestCase, ) from minos.networks import ( - USER_CONTEXT_VAR, + REQUEST_USER_CONTEXT_VAR, BrokerHandler, BrokerHandlerEntry, BrokerMessage, @@ -313,7 +313,7 @@ async def test_get_callback_raises_exception(self): async def test_get_callback_with_user(self): async def _fn(request) -> None: self.assertEqual(self.user, request.user) - self.assertEqual(self.user, USER_CONTEXT_VAR.get()) + self.assertEqual(self.user, REQUEST_USER_CONTEXT_VAR.get()) mock = AsyncMock(side_effect=_fn) diff --git a/tests/test_networks/test_rest/test_handlers.py b/tests/test_networks/test_rest/test_handlers.py index ab44c20f..87a9d15a 100644 --- a/tests/test_networks/test_rest/test_handlers.py +++ b/tests/test_networks/test_rest/test_handlers.py @@ -18,7 +18,7 @@ PostgresAsyncTestCase, ) from minos.networks import ( - USER_CONTEXT_VAR, + REQUEST_USER_CONTEXT_VAR, Request, Response, RestHandler, @@ -98,7 +98,7 @@ async def test_get_callback_with_user(self): async def _fn(request) -> None: self.assertEqual(user, request.user) - self.assertEqual(user, USER_CONTEXT_VAR.get()) + self.assertEqual(user, REQUEST_USER_CONTEXT_VAR.get()) mock = AsyncMock(side_effect=_fn) From 81f64b657b92eb16d61a5d0dd4d5ae4ff3fec541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Mon, 29 Nov 2021 16:51:16 +0100 Subject: [PATCH 5/7] ISSUE #457 * Add tests for `REQUEST_HEADERS_CONTEXT_VARS` --- .../test_handlers/test_handlers.py | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/tests/test_networks/test_brokers/test_handlers/test_handlers.py b/tests/test_networks/test_brokers/test_handlers/test_handlers.py index a1bbfa95..daa873df 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_handlers.py +++ b/tests/test_networks/test_brokers/test_handlers/test_handlers.py @@ -33,6 +33,7 @@ PostgresAsyncTestCase, ) from minos.networks import ( + REQUEST_HEADERS_CONTEXT_VAR, REQUEST_USER_CONTEXT_VAR, BrokerHandler, BrokerHandlerEntry, @@ -91,6 +92,7 @@ def setUp(self) -> None: identifier=self.identifier, user=self.user, reply_topic="UpdateTicket", + headers={"foo": "bar"}, ) async def asyncSetUp(self): @@ -227,7 +229,7 @@ async def test_dispatch(self): identifier=self.message.identifier, status=BrokerMessageStatus.SUCCESS, user=self.user, - headers=dict(), + headers={"foo": "bar"}, ) ], send_mock.call_args_list, @@ -294,20 +296,20 @@ async def test_dispatch_one(self): async def test_get_callback(self): fn = self.handler.get_callback(_Cls._fn) - self.assertEqual((FakeModel("foo"), BrokerMessageStatus.SUCCESS, dict()), await fn(self.message)) + self.assertEqual((FakeModel("foo"), BrokerMessageStatus.SUCCESS, {"foo": "bar"}), await fn(self.message)) async def test_get_callback_none(self): fn = self.handler.get_callback(_Cls._fn_none) - self.assertEqual((None, BrokerMessageStatus.SUCCESS, dict()), await fn(self.message)) + self.assertEqual((None, BrokerMessageStatus.SUCCESS, {"foo": "bar"}), await fn(self.message)) async def test_get_callback_raises_response(self): fn = self.handler.get_callback(_Cls._fn_raises_response) - expected = (repr(BrokerResponseException("foo")), BrokerMessageStatus.ERROR, dict()) + expected = (repr(BrokerResponseException("foo")), BrokerMessageStatus.ERROR, {"foo": "bar"}) self.assertEqual(expected, await fn(self.message)) async def test_get_callback_raises_exception(self): fn = self.handler.get_callback(_Cls._fn_raises_exception) - expected = (repr(ValueError()), BrokerMessageStatus.SYSTEM_ERROR, dict()) + expected = (repr(ValueError()), BrokerMessageStatus.SYSTEM_ERROR, {"foo": "bar"}) self.assertEqual(expected, await fn(self.message)) async def test_get_callback_with_user(self): @@ -322,6 +324,18 @@ async def _fn(request) -> None: self.assertEqual(1, mock.call_count) + async def test_get_callback_with_headers(self): + async def _fn(request) -> None: + self.assertEqual({"foo": "bar"}, request.raw.headers) + REQUEST_HEADERS_CONTEXT_VAR.get()["bar"] = "foo" + + mock = AsyncMock(side_effect=_fn) + + handler = self.handler.get_callback(mock) + _, _, observed = await handler(self.message) + + self.assertEqual({"foo": "bar", "bar": "foo"}, observed) + async def test_dispatch_without_sorting(self): observed = list() From 9c00d38d7d177ea1b6853f7d8c4dc9faa9aa7a02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Mon, 29 Nov 2021 17:22:00 +0100 Subject: [PATCH 6/7] ISSUE #457 * Remove `service_name: str` from `BrokerMessage`. --- minos/networks/brokers/messages.py | 11 +-------- .../networks/brokers/publishers/publishers.py | 7 +----- .../test_handlers/test_entries.py | 20 ++++++---------- .../test_handlers/test_handlers.py | 21 +++++----------- .../test_handlers/test_requests.py | 17 ++++--------- .../test_brokers/test_messages.py | 15 ++++-------- .../test_publishers/test_publishers.py | 24 +++++-------------- 7 files changed, 29 insertions(+), 86 deletions(-) diff --git a/minos/networks/brokers/messages.py b/minos/networks/brokers/messages.py index 3145e253..0799b9d6 100644 --- a/minos/networks/brokers/messages.py +++ b/minos/networks/brokers/messages.py @@ -32,7 +32,6 @@ class BrokerMessage(DeclarativeModel): topic: str data: Any - service_name: str identifier: UUID reply_topic: Optional[str] user: Optional[UUID] @@ -44,7 +43,6 @@ def __init__( self, topic: str, data: Any, - service_name: str, *, identifier: Optional[UUID] = None, status: Optional[BrokerMessageStatus] = None, @@ -61,14 +59,7 @@ def __init__( if headers is None: headers = dict() super().__init__( - topic=topic, - data=data, - service_name=service_name, - identifier=identifier, - status=status, - strategy=strategy, - headers=headers, - **kwargs + topic=topic, data=data, identifier=identifier, status=status, strategy=strategy, headers=headers, **kwargs ) @property diff --git a/minos/networks/brokers/publishers/publishers.py b/minos/networks/brokers/publishers/publishers.py index 906a0cd6..52554e5a 100644 --- a/minos/networks/brokers/publishers/publishers.py +++ b/minos/networks/brokers/publishers/publishers.py @@ -34,14 +34,10 @@ class BrokerPublisher(BrokerPublisherSetup): """Broker Publisher class.""" - def __init__(self, *args, service_name: str, **kwargs): - super().__init__(*args, **kwargs) - self.service_name = service_name - @classmethod def _from_config(cls, *args, config: MinosConfig, **kwargs) -> BrokerPublisher: # noinspection PyProtectedMember - return cls(*args, service_name=config.service.name, **config.broker.queue._asdict(), **kwargs) + return cls(*args, **config.broker.queue._asdict(), **kwargs) # noinspection PyMethodOverriding async def send( @@ -78,7 +74,6 @@ async def send( status=status, reply_topic=reply_topic, user=user, - service_name=self.service_name, strategy=strategy, headers=headers, ) diff --git a/tests/test_networks/test_brokers/test_handlers/test_entries.py b/tests/test_networks/test_brokers/test_handlers/test_entries.py index a2091fbb..2d030c23 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_entries.py +++ b/tests/test_networks/test_brokers/test_handlers/test_entries.py @@ -19,15 +19,9 @@ class TestHandlerEntry(unittest.IsolatedAsyncioTestCase): def setUp(self) -> None: self.identifier = uuid4() self.user = uuid4() - self.service_name = "foo" self.message = BrokerMessage( - "AddOrder", - FakeModel("foo"), - self.service_name, - identifier=self.identifier, - user=self.user, - reply_topic="UpdateTicket", + "AddOrder", FakeModel("foo"), identifier=self.identifier, user=self.user, reply_topic="UpdateTicket", ) def test_constructor(self): @@ -54,12 +48,12 @@ def test_constructor_extended(self): def test_sort(self): unsorted = [ - BrokerHandlerEntry(1, "", 0, BrokerMessage("", "foo", "").avro_bytes, 1), - BrokerHandlerEntry(1, "", 0, BrokerMessage("", 4, "").avro_bytes, 1), - BrokerHandlerEntry(1, "", 0, BrokerMessage("", 2, "").avro_bytes, 1), - BrokerHandlerEntry(1, "", 0, BrokerMessage("", 3, "").avro_bytes, 1), - BrokerHandlerEntry(1, "", 0, BrokerMessage("", 1, "").avro_bytes, 1), - BrokerHandlerEntry(1, "", 0, BrokerMessage("", "bar", "").avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", "foo").avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", 4).avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", 2).avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", 3).avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", 1).avro_bytes, 1), + BrokerHandlerEntry(1, "", 0, BrokerMessage("", "bar").avro_bytes, 1), ] expected = [unsorted[0], unsorted[4], unsorted[2], unsorted[3], unsorted[1], unsorted[5]] diff --git a/tests/test_networks/test_brokers/test_handlers/test_handlers.py b/tests/test_networks/test_brokers/test_handlers/test_handlers.py index daa873df..af50d911 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_handlers.py +++ b/tests/test_networks/test_brokers/test_handlers/test_handlers.py @@ -83,12 +83,10 @@ def setUp(self) -> None: self.identifier = uuid4() self.user = uuid4() - self.service_name = self.config.service.name self.message = BrokerMessage( "AddOrder", FakeModel("foo"), - self.service_name, identifier=self.identifier, user=self.user, reply_topic="UpdateTicket", @@ -242,7 +240,7 @@ async def test_dispatch(self): async def test_dispatch_wrong(self): instance_1 = namedtuple("FakeCommand", ("topic", "avro_bytes"))("AddOrder", bytes(b"Test")) - instance_2 = BrokerMessage("NoActionTopic", FakeModel("Foo"), self.service_name) + instance_2 = BrokerMessage("NoActionTopic", FakeModel("Foo")) queue_id_1 = await self._insert_one(instance_1) queue_id_2 = await self._insert_one(instance_2) @@ -257,9 +255,7 @@ async def test_dispatch_concurrent(self): identifier = uuid4() - instance = BrokerMessage( - "AddOrder", [FakeModel("foo")], self.service_name, identifier=identifier, reply_topic="UpdateTicket" - ) + instance = BrokerMessage("AddOrder", [FakeModel("foo")], identifier=identifier, reply_topic="UpdateTicket") instance_wrong = namedtuple("FakeCommand", ("topic", "avro_bytes"))("AddOrder", bytes(b"Test")) for _ in range(10): @@ -283,7 +279,7 @@ async def test_dispatch_one(self): lookup_mock = MagicMock(return_value=callback_mock) topic = "TicketAdded" - event = BrokerMessage(topic, FakeModel("Foo"), self.service_name) + event = BrokerMessage(topic, FakeModel("Foo")) entry = BrokerHandlerEntry(1, topic, 0, event.avro_bytes, 1, callback_lookup=lookup_mock) await self.handler.dispatch_one(entry) @@ -346,8 +342,8 @@ async def _fn2(request): self.handler.get_action = MagicMock(return_value=_fn2) events = [ - BrokerMessage("TicketAdded", FakeModel("uuid1"), self.service_name), - BrokerMessage("TicketAdded", FakeModel("uuid2"), self.service_name), + BrokerMessage("TicketAdded", FakeModel("uuid1")), + BrokerMessage("TicketAdded", FakeModel("uuid2")), ] for event in events: @@ -369,12 +365,7 @@ async def _fn2(request): events = list() for i in range(1, 6): - events.extend( - [ - BrokerMessage("TicketAdded", ["uuid1", i], self.service_name), - BrokerMessage("TicketAdded", ["uuid2", i], self.service_name), - ] - ) + events.extend([BrokerMessage("TicketAdded", ["uuid1", i]), BrokerMessage("TicketAdded", ["uuid2", i])]) shuffle(events) for event in events: diff --git a/tests/test_networks/test_brokers/test_handlers/test_requests.py b/tests/test_networks/test_brokers/test_handlers/test_requests.py index d1ec62f7..9142df7b 100644 --- a/tests/test_networks/test_brokers/test_handlers/test_requests.py +++ b/tests/test_networks/test_brokers/test_handlers/test_requests.py @@ -17,10 +17,7 @@ class TestHandlerRequest(unittest.IsolatedAsyncioTestCase): def setUp(self) -> None: self.data = [FakeModel("foo"), FakeModel("bar")] self.identifier = uuid4() - self.service_name = "foo" - self.raw = BrokerMessage( - "FooCreated", self.data, self.service_name, identifier=self.identifier, reply_topic="AddOrderReply" - ) + self.raw = BrokerMessage("FooCreated", self.data, identifier=self.identifier, reply_topic="AddOrderReply") def test_repr(self): request = BrokerRequest(self.raw) @@ -32,9 +29,7 @@ def test_eq_true(self): def test_eq_false(self): another = BrokerRequest( - BrokerMessage( - "FooUpdated", self.data, self.service_name, identifier=self.identifier, reply_topic="AddOrderReply" - ) + BrokerMessage("FooUpdated", self.data, identifier=self.identifier, reply_topic="AddOrderReply") ) self.assertNotEqual(BrokerRequest(self.raw), another) @@ -52,17 +47,13 @@ async def test_content(self): async def test_content_single(self): request = BrokerRequest( - BrokerMessage( - "FooCreated", self.data[0], self.service_name, identifier=self.identifier, reply_topic="AddOrderReply" - ) + BrokerMessage("FooCreated", self.data[0], identifier=self.identifier, reply_topic="AddOrderReply") ) self.assertEqual(self.data[0], await request.content()) async def test_content_simple(self): request = BrokerRequest( - BrokerMessage( - "FooCreated", 1234, self.service_name, identifier=self.identifier, reply_topic="AddOrderReply" - ) + BrokerMessage("FooCreated", 1234, identifier=self.identifier, reply_topic="AddOrderReply") ) self.assertEqual(1234, await request.content()) diff --git a/tests/test_networks/test_brokers/test_messages.py b/tests/test_networks/test_brokers/test_messages.py index 0915dbf6..06d3c7f6 100644 --- a/tests/test_networks/test_brokers/test_messages.py +++ b/tests/test_networks/test_brokers/test_messages.py @@ -22,14 +22,12 @@ def setUp(self) -> None: self.reply_topic = "AddOrderReply" self.status = BrokerMessageStatus.SUCCESS self.user = uuid4() - self.service_name = "foo" self.strategy = BrokerMessageStrategy.MULTICAST def test_constructor_simple(self): - message = BrokerMessage(self.topic, self.data, self.service_name) + message = BrokerMessage(self.topic, self.data) self.assertEqual(self.topic, message.topic) self.assertEqual(self.data, message.data) - self.assertEqual(self.service_name, message.service_name) self.assertIsInstance(message.identifier, UUID) self.assertEqual(None, message.reply_topic) self.assertEqual(None, message.user) @@ -40,7 +38,6 @@ def test_constructor(self): message = BrokerMessage( self.topic, self.data, - self.service_name, identifier=self.identifier, reply_topic=self.reply_topic, user=self.user, @@ -53,15 +50,12 @@ def test_constructor(self): self.assertEqual(self.reply_topic, message.reply_topic) self.assertEqual(self.user, message.user) self.assertEqual(self.status, message.status) - self.assertEqual(self.service_name, message.service_name) self.assertEqual(self.strategy, message.strategy) def test_ok(self): - self.assertTrue(BrokerMessage(self.topic, self.data, self.service_name, status=BrokerMessageStatus.SUCCESS).ok) - self.assertFalse(BrokerMessage(self.topic, self.data, self.service_name, status=BrokerMessageStatus.ERROR).ok) - self.assertFalse( - BrokerMessage(self.topic, self.data, self.service_name, status=BrokerMessageStatus.SYSTEM_ERROR).ok - ) + self.assertTrue(BrokerMessage(self.topic, self.data, status=BrokerMessageStatus.SUCCESS).ok) + self.assertFalse(BrokerMessage(self.topic, self.data, status=BrokerMessageStatus.ERROR).ok) + self.assertFalse(BrokerMessage(self.topic, self.data, status=BrokerMessageStatus.SYSTEM_ERROR).ok) def test_avro_serialization(self): message = BrokerMessage( @@ -71,7 +65,6 @@ def test_avro_serialization(self): reply_topic=self.reply_topic, user=self.user, status=self.status, - service_name=self.service_name, strategy=self.strategy, ) observed = BrokerMessage.from_avro_bytes(message.avro_bytes) diff --git a/tests/test_networks/test_brokers/test_publishers/test_publishers.py b/tests/test_networks/test_brokers/test_publishers/test_publishers.py index e29cdec7..3f3390df 100644 --- a/tests/test_networks/test_brokers/test_publishers/test_publishers.py +++ b/tests/test_networks/test_brokers/test_publishers/test_publishers.py @@ -61,7 +61,7 @@ async def test_send(self): self.assertEqual("fake", args[0]) self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) - expected = BrokerMessage("fake", FakeModel("Foo"), service_name="Order", identifier=observed) + expected = BrokerMessage("fake", FakeModel("Foo"), identifier=observed) self.assertEqual(expected, Model.from_avro_bytes(args[2])) async def test_send_with_identifier(self): @@ -78,7 +78,7 @@ async def test_send_with_identifier(self): self.assertEqual("fake", args[0]) self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) - expected = BrokerMessage("fake", FakeModel("Foo"), service_name="Order", identifier=identifier) + expected = BrokerMessage("fake", FakeModel("Foo"), identifier=identifier) self.assertEqual(expected, Model.from_avro_bytes(args[2])) async def test_send_with_reply_topic(self): @@ -93,9 +93,7 @@ async def test_send_with_reply_topic(self): args = mock.call_args.args self.assertEqual("fake", args[0]) self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) - expected = BrokerMessage( - "fake", FakeModel("foo"), identifier=observed, reply_topic="ekaf", service_name="Order" - ) + expected = BrokerMessage("fake", FakeModel("foo"), identifier=observed, reply_topic="ekaf") self.assertEqual(expected, Model.from_avro_bytes(args[2])) async def test_send_with_user(self): @@ -112,9 +110,7 @@ async def test_send_with_user(self): args = mock.call_args.args self.assertEqual("fake", args[0]) self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) - expected = BrokerMessage( - "fake", FakeModel("foo"), identifier=observed, reply_topic="ekaf", user=user, service_name="Order" - ) + expected = BrokerMessage("fake", FakeModel("foo"), identifier=observed, reply_topic="ekaf", user=user) self.assertEqual(expected, Model.from_avro_bytes(args[2])) async def test_send_with_status(self): @@ -133,11 +129,7 @@ async def test_send_with_status(self): self.assertEqual(BrokerMessageStrategy.UNICAST, args[1]) expected = BrokerMessage( - reply_topic, - FakeModel("foo"), - identifier=observed, - status=BrokerMessageStatus.SUCCESS, - service_name=self.config.service.name, + reply_topic, FakeModel("foo"), identifier=observed, status=BrokerMessageStatus.SUCCESS, ) observed = Model.from_avro_bytes(args[2]) self.assertEqual(expected, observed) @@ -158,11 +150,7 @@ async def test_send_with_multicast_strategy(self): self.assertEqual(BrokerMessageStrategy.MULTICAST, args[1]) expected = BrokerMessage( - topic, - FakeModel("foo"), - identifier=observed, - service_name=self.config.service.name, - strategy=BrokerMessageStrategy.MULTICAST, + topic, FakeModel("foo"), identifier=observed, strategy=BrokerMessageStrategy.MULTICAST, ) observed = Model.from_avro_bytes(args[2]) self.assertEqual(expected, observed) From 503ce0091db93fac9609151d893c4bd3d418f925 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Tue, 30 Nov 2021 09:19:34 +0100 Subject: [PATCH 7/7] v0.3.1 --- HISTORY.md | 9 +++++++++ minos/networks/__init__.py | 2 +- pyproject.toml | 2 +- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index f971672e..70d94e7b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -151,3 +151,12 @@ History * Rename `Consumer` and `ConsumerService` as `BrokerConsumer` and `BrokerConsumerService` respectively. * Rename `Producer` and `ProducerService` as `BrokerProducer` and `BrokerProducerService` respectively. * Rename `HandlerRequest`, `HandlerResponse` and `HandlerResponseException` as `BrokerRequest`, `BrokerResponse` and `BrokerResponseException` respectively. + +0.3.1 (2021-11-30) +------------------ + +* Add `identifier: UUID` and `headers: dict[str, str]` attributes to `BrokerMessage`. +* Remove `saga: Optional[UUID]` and `service_name: str` attributes from `BrokerMessage`. +* Now `BrokerPublisher.send` returns the `BrokerMessage` identifier instead of the entry identifier on the `Producer`'s queue. +* Add `REQUEST_HEADERS_CONTEXT_VAR`. +* Rename `USER_CONTEXT_VAR` and `REPLY_TOPIC_CONTEXT_VAR` as `REQUEST_USER_CONTEXT_VAR` and `REQUEST_REPLY_TOPIC_CONTEXT_VAR` respectively. diff --git a/minos/networks/__init__.py b/minos/networks/__init__.py index 660b1040..01b65a85 100644 --- a/minos/networks/__init__.py +++ b/minos/networks/__init__.py @@ -1,6 +1,6 @@ __author__ = """Clariteia Devs""" __email__ = "devs@clariteia.com" -__version__ = "0.3.0" +__version__ = "0.3.1" from .brokers import ( REQUEST_HEADERS_CONTEXT_VAR, diff --git a/pyproject.toml b/pyproject.toml index 281b3a30..061d8ccf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "minos_microservice_networks" -version = "0.3.0" +version = "0.3.1" description = "Python Package with the common network classes and utilities used in Minos Microservice." readme = "README.md" repository = "https://github.com/clariteia/minos_microservice_network"