Skip to content
This repository has been archived by the owner on Jan 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #459 from Clariteia/0.3.1
Browse files Browse the repository at this point in the history
0.3.1
  • Loading branch information
Sergio García Prado authored Nov 30, 2021
2 parents 64c9b16 + 503ce00 commit ca4c525
Show file tree
Hide file tree
Showing 19 changed files with 195 additions and 170 deletions.
9 changes: 9 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,12 @@ History
* Rename `Consumer` and `ConsumerService` as `BrokerConsumer` and `BrokerConsumerService` respectively.
* Rename `Producer` and `ProducerService` as `BrokerProducer` and `BrokerProducerService` respectively.
* Rename `HandlerRequest`, `HandlerResponse` and `HandlerResponseException` as `BrokerRequest`, `BrokerResponse` and `BrokerResponseException` respectively.

0.3.1 (2021-11-30)
------------------

* Add `identifier: UUID` and `headers: dict[str, str]` attributes to `BrokerMessage`.
* Remove `saga: Optional[UUID]` and `service_name: str` attributes from `BrokerMessage`.
* Now `BrokerPublisher.send` returns the `BrokerMessage` identifier instead of the entry identifier on the `Producer`'s queue.
* Add `REQUEST_HEADERS_CONTEXT_VAR`.
* Rename `USER_CONTEXT_VAR` and `REPLY_TOPIC_CONTEXT_VAR` as `REQUEST_USER_CONTEXT_VAR` and `REQUEST_REPLY_TOPIC_CONTEXT_VAR` respectively.
7 changes: 4 additions & 3 deletions minos/networks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
__author__ = """Clariteia Devs"""
__email__ = "devs@clariteia.com"
__version__ = "0.3.0"
__version__ = "0.3.1"

from .brokers import (
REPLY_TOPIC_CONTEXT_VAR,
REQUEST_HEADERS_CONTEXT_VAR,
REQUEST_REPLY_TOPIC_CONTEXT_VAR,
BrokerConsumer,
BrokerConsumerService,
BrokerHandler,
Expand Down Expand Up @@ -56,7 +57,7 @@
MinosRedefinedEnrouteDecoratorException,
)
from .requests import (
USER_CONTEXT_VAR,
REQUEST_USER_CONTEXT_VAR,
Request,
Response,
ResponseException,
Expand Down
3 changes: 2 additions & 1 deletion minos/networks/brokers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
BrokerResponseException,
)
from .messages import (
REPLY_TOPIC_CONTEXT_VAR,
REQUEST_HEADERS_CONTEXT_VAR,
REQUEST_REPLY_TOPIC_CONTEXT_VAR,
BrokerMessage,
BrokerMessageStatus,
BrokerMessageStrategy,
Expand Down
9 changes: 6 additions & 3 deletions minos/networks/brokers/dynamic/brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from typing import (
Optional,
)
from uuid import (
UUID,
)

from aiopg import (
Cursor,
Expand Down Expand Up @@ -84,15 +87,15 @@ async def _destroy(self) -> None:
await super()._destroy()

# noinspection PyUnusedLocal
async def send(self, *args, reply_topic: None = None, **kwargs) -> None:
async def send(self, *args, reply_topic: None = None, **kwargs) -> UUID:
"""Send a ``BrokerMessage``.
:param args: Additional positional arguments.
:param reply_topic: This argument is ignored if ignored in favor of ``self.topic``.
:param kwargs: Additional named arguments.
:return: This method does not return anything.
:return: The ``UUID`` identifier of the message.
"""
await self.publisher.send(*args, reply_topic=self.topic, **kwargs)
return await self.publisher.send(*args, reply_topic=self.topic, **kwargs)

async def get_one(self, *args, **kwargs) -> BrokerHandlerEntry:
"""Get one handler entry from the given topics.
Expand Down
6 changes: 3 additions & 3 deletions minos/networks/brokers/dynamic/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
BrokerConsumer,
)
from ..messages import (
REPLY_TOPIC_CONTEXT_VAR,
REQUEST_REPLY_TOPIC_CONTEXT_VAR,
)
from ..publishers import (
BrokerPublisher,
Expand Down Expand Up @@ -150,9 +150,9 @@ def __init__(self, wrapper: AsyncContextManager[DynamicBroker]):

async def __aenter__(self) -> DynamicBroker:
handler = await self.wrapper.__aenter__()
self._token = REPLY_TOPIC_CONTEXT_VAR.set(handler.topic)
self._token = REQUEST_REPLY_TOPIC_CONTEXT_VAR.set(handler.topic)
return handler

async def __aexit__(self, exc_type, exc_val, exc_tb):
REPLY_TOPIC_CONTEXT_VAR.reset(self._token)
REQUEST_REPLY_TOPIC_CONTEXT_VAR.reset(self._token)
await self.wrapper.__aexit__(exc_type, exc_val, exc_tb)
28 changes: 18 additions & 10 deletions minos/networks/brokers/handlers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@
MinosActionNotFoundException,
)
from ...requests import (
USER_CONTEXT_VAR,
REQUEST_USER_CONTEXT_VAR,
Response,
ResponseException,
)
from ...utils import (
consume_queue,
)
from ..messages import (
REQUEST_HEADERS_CONTEXT_VAR,
BrokerMessage,
BrokerMessageStatus,
)
Expand Down Expand Up @@ -304,42 +305,49 @@ async def dispatch_one(self, entry: BrokerHandlerEntry) -> None:

fn = self.get_callback(entry.callback)
message = entry.data
data, status = await fn(message)
data, status, headers = await fn(message)

if message.reply_topic is not None:
await self.publisher.send(
data, topic=message.reply_topic, saga=message.saga, status=status, user=message.user
data,
topic=message.reply_topic,
identifier=message.identifier,
status=status,
user=message.user,
headers=headers,
)

@staticmethod
def get_callback(
fn: Callable[[BrokerRequest], Union[Optional[BrokerRequest], Awaitable[Optional[BrokerRequest]]]]
) -> Callable[[BrokerMessage], Awaitable[tuple[Any, BrokerMessageStatus]]]:
) -> Callable[[BrokerMessage], Awaitable[tuple[Any, BrokerMessageStatus, dict[str, str]]]]:
"""Get the handler function to be used by the Broker Handler.
:param fn: The action function.
:return: A wrapper function around the given one that is compatible with the Broker Handler API.
"""

async def _fn(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus]:
async def _fn(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus, dict[str, str]]:
request = BrokerRequest(raw)
token = USER_CONTEXT_VAR.set(request.user)
user_token = REQUEST_USER_CONTEXT_VAR.set(request.user)
headers_token = REQUEST_HEADERS_CONTEXT_VAR.set(raw.headers)

try:
response = fn(request)
if isawaitable(response):
response = await response
if isinstance(response, Response):
response = await response.content()
return response, BrokerMessageStatus.SUCCESS
return response, BrokerMessageStatus.SUCCESS, REQUEST_HEADERS_CONTEXT_VAR.get()
except ResponseException as exc:
logger.warning(f"Raised an application exception: {exc!s}")
return repr(exc), BrokerMessageStatus.ERROR
return repr(exc), BrokerMessageStatus.ERROR, REQUEST_HEADERS_CONTEXT_VAR.get()
except Exception as exc:
logger.exception(f"Raised a system exception: {exc!r}")
return repr(exc), BrokerMessageStatus.SYSTEM_ERROR
return repr(exc), BrokerMessageStatus.SYSTEM_ERROR, REQUEST_HEADERS_CONTEXT_VAR.get()
finally:
USER_CONTEXT_VAR.reset(token)
REQUEST_USER_CONTEXT_VAR.reset(user_token)
REQUEST_HEADERS_CONTEXT_VAR.reset(headers_token)

return _fn

Expand Down
19 changes: 14 additions & 5 deletions minos/networks/brokers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,51 @@
)
from uuid import (
UUID,
uuid4,
)

from minos.common import (
DeclarativeModel,
)

REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None)
REQUEST_REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None)
REQUEST_HEADERS_CONTEXT_VAR: Final[ContextVar[Optional[dict[str, str]]]] = ContextVar("headers", default=None)


class BrokerMessage(DeclarativeModel):
"""Broker Message class."""

topic: str
data: Any
service_name: str
saga: Optional[UUID]
identifier: UUID
reply_topic: Optional[str]
user: Optional[UUID]
status: BrokerMessageStatus
strategy: BrokerMessageStrategy
headers: dict[str, str]

def __init__(
self,
topic: str,
data: Any,
service_name: str,
*,
identifier: Optional[UUID] = None,
status: Optional[BrokerMessageStatus] = None,
strategy: Optional[BrokerMessageStrategy] = None,
headers: Optional[dict[str, str]] = None,
**kwargs
):
if identifier is None:
identifier = uuid4()
if status is None:
status = BrokerMessageStatus.SUCCESS
if strategy is None:
strategy = BrokerMessageStrategy.UNICAST
super().__init__(topic, data, service_name, status=status, strategy=strategy, **kwargs)
if headers is None:
headers = dict()
super().__init__(
topic=topic, data=data, identifier=identifier, status=status, strategy=strategy, headers=headers, **kwargs
)

@property
def ok(self) -> bool:
Expand Down
28 changes: 12 additions & 16 deletions minos/networks/brokers/publishers/publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
)

import logging
from abc import (
ABC,
)
from typing import (
Any,
Optional,
Expand Down Expand Up @@ -34,56 +31,55 @@
logger = logging.getLogger(__name__)


class BrokerPublisher(BrokerPublisherSetup, ABC):
class BrokerPublisher(BrokerPublisherSetup):
"""Broker Publisher class."""

def __init__(self, *args, service_name: str, **kwargs):
super().__init__(*args, **kwargs)
self.service_name = service_name

@classmethod
def _from_config(cls, *args, config: MinosConfig, **kwargs) -> BrokerPublisher:
# noinspection PyProtectedMember
return cls(*args, service_name=config.service.name, **config.broker.queue._asdict(), **kwargs)
return cls(*args, **config.broker.queue._asdict(), **kwargs)

# noinspection PyMethodOverriding
async def send(
self,
data: Any,
topic: str,
*,
saga: Optional[UUID] = None,
identifier: Optional[UUID] = None,
reply_topic: Optional[str] = None,
user: Optional[UUID] = None,
status: BrokerMessageStatus = BrokerMessageStatus.SUCCESS,
strategy: BrokerMessageStrategy = BrokerMessageStrategy.UNICAST,
headers: Optional[dict[str, str]] = None,
**kwargs,
) -> int:
) -> UUID:
"""Send a ``BrokerMessage``.
:param data: The data to be send.
:param topic: Topic in which the message will be published.
:param saga: Saga identifier.
:param identifier: The identifier of the message.
:param reply_topic: An optional topic name to wait for a response.
:param user: The user identifier that send the message.
:param status: The status code of the message.
:param strategy: The publishing strategy.
:param headers: A mapping of string values identified by a string key.
:param kwargs: Additional named arguments.
:return: This method does not return anything.
:return: The ``UUID`` identifier of the message.
"""

message = BrokerMessage(
topic=topic,
data=data,
saga=saga,
identifier=identifier,
status=status,
reply_topic=reply_topic,
user=user,
service_name=self.service_name,
strategy=strategy,
headers=headers,
)
logger.info(f"Publishing '{message!s}'...")
return await self.enqueue(message.topic, message.strategy, message.avro_bytes)
await self.enqueue(message.topic, message.strategy, message.avro_bytes)
return message.identifier

async def enqueue(self, topic: str, strategy: BrokerMessageStrategy, raw: bytes) -> int:
"""Send a sequence of bytes to the given topic.
Expand Down
3 changes: 1 addition & 2 deletions minos/networks/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
MinosException,
)

USER_CONTEXT_VAR: Final[ContextVar[Optional[UUID]]] = ContextVar("user", default=None)
USER_CONTEXT_VAR.set(None) # needed to "register" the context variable.
REQUEST_USER_CONTEXT_VAR: Final[ContextVar[Optional[UUID]]] = ContextVar("user", default=None)


class Request(ABC):
Expand Down
6 changes: 3 additions & 3 deletions minos/networks/rest/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
EnrouteBuilder,
)
from ..requests import (
USER_CONTEXT_VAR,
REQUEST_USER_CONTEXT_VAR,
Response,
ResponseException,
)
Expand Down Expand Up @@ -128,7 +128,7 @@ async def _fn(request: web.Request) -> web.Response:
logger.info(f"Dispatching '{request!s}' from '{request.remote!s}'...")

request = RestRequest(request)
token = USER_CONTEXT_VAR.set(request.user)
token = REQUEST_USER_CONTEXT_VAR.set(request.user)

try:
response = fn(request)
Expand All @@ -146,7 +146,7 @@ async def _fn(request: web.Request) -> web.Response:
logger.exception(f"Raised a system exception: {exc!r}")
raise web.HTTPInternalServerError()
finally:
USER_CONTEXT_VAR.reset(token)
REQUEST_USER_CONTEXT_VAR.reset(token)

return _fn

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "minos_microservice_networks"
version = "0.3.0"
version = "0.3.1"
description = "Python Package with the common network classes and utilities used in Minos Microservice."
readme = "README.md"
repository = "https://github.com/clariteia/minos_microservice_network"
Expand Down
8 changes: 4 additions & 4 deletions tests/test_networks/test_brokers/test_dynamic/test_pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
PostgresAsyncTestCase,
)
from minos.networks import (
REPLY_TOPIC_CONTEXT_VAR,
REQUEST_REPLY_TOPIC_CONTEXT_VAR,
BrokerConsumer,
BrokerPublisher,
DynamicBroker,
Expand Down Expand Up @@ -70,12 +70,12 @@ async def test_acquire(self):
self.assertIn(broker.topic, self.pool.client.list_topics())

async def test_acquire_reply_topic_context_var(self):
self.assertEqual(None, REPLY_TOPIC_CONTEXT_VAR.get())
self.assertEqual(None, REQUEST_REPLY_TOPIC_CONTEXT_VAR.get())

async with self.pool.acquire() as broker:
self.assertEqual(broker.topic, REPLY_TOPIC_CONTEXT_VAR.get())
self.assertEqual(broker.topic, REQUEST_REPLY_TOPIC_CONTEXT_VAR.get())

self.assertEqual(None, REPLY_TOPIC_CONTEXT_VAR.get())
self.assertEqual(None, REQUEST_REPLY_TOPIC_CONTEXT_VAR.get())


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit ca4c525

Please sign in to comment.