Skip to content
This repository has been archived by the owner on Jan 28, 2022. It is now read-only.

Commit

Permalink
ISSUE #457
Browse files Browse the repository at this point in the history
* Rename `saga` as `identifier`.
  • Loading branch information
Sergio García Prado committed Nov 29, 2021
1 parent 6f7b064 commit f1b8488
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 84 deletions.
9 changes: 6 additions & 3 deletions minos/networks/brokers/dynamic/brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from typing import (
Optional,
)
from uuid import (
UUID,
)

from aiopg import (
Cursor,
Expand Down Expand Up @@ -84,15 +87,15 @@ async def _destroy(self) -> None:
await super()._destroy()

# noinspection PyUnusedLocal
async def send(self, *args, reply_topic: None = None, **kwargs) -> None:
async def send(self, *args, reply_topic: None = None, **kwargs) -> UUID:
"""Send a ``BrokerMessage``.
:param args: Additional positional arguments.
:param reply_topic: This argument is ignored if ignored in favor of ``self.topic``.
:param kwargs: Additional named arguments.
:return: This method does not return anything.
:return: The ``UUID`` identifier of the message.
"""
await self.publisher.send(*args, reply_topic=self.topic, **kwargs)
return await self.publisher.send(*args, reply_topic=self.topic, **kwargs)

async def get_one(self, *args, **kwargs) -> BrokerHandlerEntry:
"""Get one handler entry from the given topics.
Expand Down
7 changes: 6 additions & 1 deletion minos/networks/brokers/handlers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,12 @@ async def dispatch_one(self, entry: BrokerHandlerEntry) -> None:

if message.reply_topic is not None:
await self.publisher.send(
data, topic=message.reply_topic, saga=message.saga, status=status, user=message.user, headers=headers,
data,
topic=message.reply_topic,
identifier=message.identifier,
status=status,
user=message.user,
headers=headers,
)

@staticmethod
Expand Down
17 changes: 15 additions & 2 deletions minos/networks/brokers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from uuid import (
UUID,
uuid4,
)

from minos.common import (
Expand All @@ -32,7 +33,7 @@ class BrokerMessage(DeclarativeModel):
topic: str
data: Any
service_name: str
saga: Optional[UUID]
identifier: UUID
reply_topic: Optional[str]
user: Optional[UUID]
status: BrokerMessageStatus
Expand All @@ -45,18 +46,30 @@ def __init__(
data: Any,
service_name: str,
*,
identifier: Optional[UUID] = None,
status: Optional[BrokerMessageStatus] = None,
strategy: Optional[BrokerMessageStrategy] = None,
headers: Optional[dict[str, str]] = None,
**kwargs
):
if identifier is None:
identifier = uuid4()
if status is None:
status = BrokerMessageStatus.SUCCESS
if strategy is None:
strategy = BrokerMessageStrategy.UNICAST
if headers is None:
headers = dict()
super().__init__(topic, data, service_name, status=status, strategy=strategy, headers=headers, **kwargs)
super().__init__(
topic=topic,
data=data,
service_name=service_name,
identifier=identifier,
status=status,
strategy=strategy,
headers=headers,
**kwargs
)

@property
def ok(self) -> bool:
Expand Down
15 changes: 8 additions & 7 deletions minos/networks/brokers/publishers/publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,32 @@ async def send(
data: Any,
topic: str,
*,
saga: Optional[UUID] = None,
identifier: Optional[UUID] = None,
reply_topic: Optional[str] = None,
user: Optional[UUID] = None,
status: BrokerMessageStatus = BrokerMessageStatus.SUCCESS,
strategy: BrokerMessageStrategy = BrokerMessageStrategy.UNICAST,
headers: Optional[dict[str, str]] = None,
**kwargs,
) -> int:
) -> UUID:
"""Send a ``BrokerMessage``.
:param data: The data to be send.
:param topic: Topic in which the message will be published.
:param saga: Saga identifier.
:param identifier: The identifier of the message.
:param reply_topic: An optional topic name to wait for a response.
:param user: The user identifier that send the message.
:param status: The status code of the message.
:param strategy: The publishing strategy.
:param headers: TODO
:param headers: A mapping of string values identified by a string key.
:param kwargs: Additional named arguments.
:return: This method does not return anything.
:return: The ``UUID`` identifier of the message.
"""

message = BrokerMessage(
topic=topic,
data=data,
saga=saga,
identifier=identifier,
status=status,
reply_topic=reply_topic,
user=user,
Expand All @@ -86,7 +86,8 @@ async def send(
headers=headers,
)
logger.info(f"Publishing '{message!s}'...")
return await self.enqueue(message.topic, message.strategy, message.avro_bytes)
await self.enqueue(message.topic, message.strategy, message.avro_bytes)
return message.identifier

async def enqueue(self, topic: str, strategy: BrokerMessageStrategy, raw: bytes) -> int:
"""Send a sequence of bytes to the given topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

class TestHandlerEntry(unittest.IsolatedAsyncioTestCase):
def setUp(self) -> None:
self.saga = uuid4()
self.identifier = uuid4()
self.user = uuid4()
self.service_name = "foo"

self.message = BrokerMessage(
"AddOrder", FakeModel("foo"), self.service_name, saga=self.saga, user=self.user, reply_topic="UpdateTicket",
"AddOrder",
FakeModel("foo"),
self.service_name,
identifier=self.identifier,
user=self.user,
reply_topic="UpdateTicket",
)

def test_constructor(self):
Expand Down
15 changes: 10 additions & 5 deletions tests/test_networks/test_brokers/test_handlers/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,17 @@ def setUp(self) -> None:
self.publisher = BrokerPublisher.from_config(self.config)
self.handler = BrokerHandler.from_config(self.config, publisher=self.publisher)

self.saga = uuid4()
self.identifier = uuid4()
self.user = uuid4()
self.service_name = self.config.service.name

self.message = BrokerMessage(
"AddOrder", FakeModel("foo"), self.service_name, saga=self.saga, user=self.user, reply_topic="UpdateTicket",
"AddOrder",
FakeModel("foo"),
self.service_name,
identifier=self.identifier,
user=self.user,
reply_topic="UpdateTicket",
)

async def asyncSetUp(self):
Expand Down Expand Up @@ -219,7 +224,7 @@ async def test_dispatch(self):
call(
"add_order",
topic="UpdateTicket",
saga=self.message.saga,
identifier=self.message.identifier,
status=BrokerMessageStatus.SUCCESS,
user=self.user,
headers=dict(),
Expand Down Expand Up @@ -248,10 +253,10 @@ async def test_dispatch_concurrent(self):
FakeModel,
)

saga = uuid4()
identifier = uuid4()

instance = BrokerMessage(
"AddOrder", [FakeModel("foo")], self.service_name, saga=saga, reply_topic="UpdateTicket"
"AddOrder", [FakeModel("foo")], self.service_name, identifier=identifier, reply_topic="UpdateTicket"
)
instance_wrong = namedtuple("FakeCommand", ("topic", "avro_bytes"))("AddOrder", bytes(b"Test"))

Expand Down
16 changes: 11 additions & 5 deletions tests/test_networks/test_brokers/test_handlers/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
class TestHandlerRequest(unittest.IsolatedAsyncioTestCase):
def setUp(self) -> None:
self.data = [FakeModel("foo"), FakeModel("bar")]
self.saga = uuid4()
self.identifier = uuid4()
self.service_name = "foo"
self.raw = BrokerMessage(
"FooCreated", self.data, self.service_name, saga=self.saga, reply_topic="AddOrderReply"
"FooCreated", self.data, self.service_name, identifier=self.identifier, reply_topic="AddOrderReply"
)

def test_repr(self):
Expand All @@ -32,7 +32,9 @@ def test_eq_true(self):

def test_eq_false(self):
another = BrokerRequest(
BrokerMessage("FooUpdated", self.data, self.service_name, saga=self.saga, reply_topic="AddOrderReply")
BrokerMessage(
"FooUpdated", self.data, self.service_name, identifier=self.identifier, reply_topic="AddOrderReply"
)
)
self.assertNotEqual(BrokerRequest(self.raw), another)

Expand All @@ -50,13 +52,17 @@ async def test_content(self):

async def test_content_single(self):
request = BrokerRequest(
BrokerMessage("FooCreated", self.data[0], self.service_name, saga=self.saga, reply_topic="AddOrderReply")
BrokerMessage(
"FooCreated", self.data[0], self.service_name, identifier=self.identifier, reply_topic="AddOrderReply"
)
)
self.assertEqual(self.data[0], await request.content())

async def test_content_simple(self):
request = BrokerRequest(
BrokerMessage("FooCreated", 1234, self.service_name, saga=self.saga, reply_topic="AddOrderReply")
BrokerMessage(
"FooCreated", 1234, self.service_name, identifier=self.identifier, reply_topic="AddOrderReply"
)
)
self.assertEqual(1234, await request.content())

Expand Down
11 changes: 6 additions & 5 deletions tests/test_networks/test_brokers/test_messages.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import unittest
from uuid import (
UUID,
uuid4,
)

Expand All @@ -17,7 +18,7 @@ class TestBrokerMessage(unittest.TestCase):
def setUp(self) -> None:
self.topic = "FooCreated"
self.data = [FakeModel("blue"), FakeModel("red")]
self.saga = uuid4()
self.identifier = uuid4()
self.reply_topic = "AddOrderReply"
self.status = BrokerMessageStatus.SUCCESS
self.user = uuid4()
Expand All @@ -29,7 +30,7 @@ def test_constructor_simple(self):
self.assertEqual(self.topic, message.topic)
self.assertEqual(self.data, message.data)
self.assertEqual(self.service_name, message.service_name)
self.assertEqual(None, message.saga)
self.assertIsInstance(message.identifier, UUID)
self.assertEqual(None, message.reply_topic)
self.assertEqual(None, message.user)
self.assertEqual(BrokerMessageStatus.SUCCESS, message.status)
Expand All @@ -40,15 +41,15 @@ def test_constructor(self):
self.topic,
self.data,
self.service_name,
saga=self.saga,
identifier=self.identifier,
reply_topic=self.reply_topic,
user=self.user,
status=self.status,
strategy=self.strategy,
)
self.assertEqual(self.topic, message.topic)
self.assertEqual(self.data, message.data)
self.assertEqual(self.saga, message.saga)
self.assertEqual(self.identifier, message.identifier)
self.assertEqual(self.reply_topic, message.reply_topic)
self.assertEqual(self.user, message.user)
self.assertEqual(self.status, message.status)
Expand All @@ -66,7 +67,7 @@ def test_avro_serialization(self):
message = BrokerMessage(
self.topic,
self.data,
saga=self.saga,
identifier=self.identifier,
reply_topic=self.reply_topic,
user=self.user,
status=self.status,
Expand Down
44 changes: 18 additions & 26 deletions tests/test_networks/test_brokers/test_publishers/test_producers.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,15 @@ async def test_dispatch_forever_without_notify(self):

async def test_concurrency_dispatcher(self):
model = FakeModel("foo")
saga = uuid4()
identifier = uuid4()

broker_publisher = BrokerPublisher.from_config(config=self.config)

async with broker_publisher:
for x in range(60):
await broker_publisher.send(model, "CommandBroker-Delete", saga=saga, reply_topic="TestDeleteReply")
await broker_publisher.send(
model, "CommandBroker-Delete", identifier=identifier, reply_topic="TestDeleteReply"
)

async with aiopg.connect(**self.broker_queue_db) as connect:
async with connect.cursor() as cur:
Expand All @@ -165,51 +167,41 @@ async def test_concurrency_dispatcher(self):

async def test_if_commands_was_deleted(self):
async with BrokerPublisher.from_config(config=self.config) as broker_publisher:
queue_id_1 = await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply")
queue_id_2 = await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply")
await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply")
await broker_publisher.send(FakeModel("Foo"), "TestDeleteReply")

await self.producer.dispatch()

async with aiopg.connect(**self.broker_queue_db) as connection:
async with connection.cursor() as cursor:
await cursor.execute("SELECT COUNT(*) FROM producer_queue WHERE topic = '%s'" % "TestDeleteReply")
records = await cursor.fetchone()

assert queue_id_1 > 0
assert queue_id_2 > 0
assert records[0] == 0
self.assertEqual(0, (await cursor.fetchone())[0])

async def test_if_commands_retry_was_incremented(self):
model = FakeModel("foo")
saga = uuid4()
identifier = uuid4()

async with BrokerPublisher.from_config(config=self.config) as broker_publisher:
queue_id_1 = await broker_publisher.send(
model, "TestDeleteOrderReply", saga=saga, status=BrokerMessageStatus.SUCCESS
await broker_publisher.send(
model, "TestDeleteOrderReply", identifier=identifier, status=BrokerMessageStatus.SUCCESS
)
queue_id_2 = await broker_publisher.send(
model, "TestDeleteOrderReply", saga=saga, status=BrokerMessageStatus.SUCCESS
await broker_publisher.send(
model, "TestDeleteOrderReply", identifier=identifier, status=BrokerMessageStatus.SUCCESS
)

self.producer.publish = AsyncMock(return_value=False)
await self.producer.dispatch()

async with aiopg.connect(**self.broker_queue_db) as connection:
async with connection.cursor() as cursor:
await cursor.execute("SELECT COUNT(*) FROM producer_queue WHERE topic = '%s'" % "TestDeleteOrderReply")
records = await cursor.fetchone()

await cursor.execute("SELECT retry FROM producer_queue WHERE id=%d;" % queue_id_1)
retry_1 = await cursor.fetchone()
await cursor.execute("SELECT COUNT(*) FROM producer_queue WHERE topic = 'TestDeleteOrderReply'")
self.assertEqual(2, (await cursor.fetchone())[0])

await cursor.execute("SELECT retry FROM producer_queue WHERE id=%d;" % queue_id_2)
retry_2 = await cursor.fetchone()
await cursor.execute("SELECT retry FROM producer_queue WHERE id=1;")
self.assertEqual(1, (await cursor.fetchone())[0])

assert queue_id_1 > 0
assert queue_id_2 > 0
assert records[0] == 2
assert retry_1[0] > 0
assert retry_2[0] > 0
await cursor.execute("SELECT retry FROM producer_queue WHERE id=2;")
self.assertEqual(1, (await cursor.fetchone())[0])

async def _notify(self, name):
await sleep(0.2)
Expand Down
Loading

0 comments on commit f1b8488

Please sign in to comment.