Skip to content
This repository has been archived by the owner on Jan 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #445 from Clariteia/0.2.0
Browse files Browse the repository at this point in the history
0.2.0
  • Loading branch information
Sergio García Prado authored Nov 15, 2021
2 parents e3ede6e + 5d28488 commit b01b1a7
Show file tree
Hide file tree
Showing 72 changed files with 666 additions and 995 deletions.
9 changes: 9 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,12 @@ History

* Add `REPLY_TOPIC_CONTEXT_VAR` and integrate with `DynamicHandlerPool`.
* Add support for `post_fn` callbacks following the same strategy as in `pre_fn` callbacks.

0.2.0 (2021-11-15)
------------------

* Remove dependency to `minos-microservice-aggregate` (now `minos.aggregate` package will require `minos.networks`).
* Add support for middleware functions.
* Add support variable number of services (previously only `CommandService` and `QueryService` were allowed).
* Migrate `Command`, `CommandReply`, `CommandStatus` and `Event` from `minos.common` to `minos.networks`.
* Add support for `minos-microservice-common=^0.3.0`
41 changes: 19 additions & 22 deletions minos/networks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
__author__ = """Clariteia Devs"""
__email__ = "devs@clariteia.com"
__version__ = "0.1.1"
__version__ = "0.2.0"

from .brokers import (
REPLY_TOPIC_CONTEXT_VAR,
Broker,
BrokerSetup,
Command,
CommandBroker,
CommandHandler,
CommandHandlerService,
CommandReply,
CommandReplyBroker,
CommandStatus,
Consumer,
ConsumerService,
DynamicHandler,
DynamicHandlerPool,
Event,
EventBroker,
EventHandler,
EventHandlerService,
Handler,
HandlerEntry,
HandlerRequest,
HandlerResponse,
HandlerResponseException,
HandlerSetup,
Producer,
ProducerService,
)
Expand Down Expand Up @@ -44,24 +62,6 @@
MinosNetworkException,
MinosRedefinedEnrouteDecoratorException,
)
from .handlers import (
CommandHandler,
CommandHandlerService,
CommandReplyHandler,
CommandReplyHandlerService,
Consumer,
ConsumerService,
DynamicHandler,
DynamicHandlerPool,
EventHandler,
EventHandlerService,
Handler,
HandlerEntry,
HandlerRequest,
HandlerResponse,
HandlerResponseException,
HandlerSetup,
)
from .messages import (
USER_CONTEXT_VAR,
Request,
Expand All @@ -84,9 +84,6 @@
ScheduledRequestContent,
ScheduledResponseException,
)
from .snapshots import (
SnapshotService,
)
from .utils import (
consume_queue,
get_host_ip,
Expand Down
38 changes: 25 additions & 13 deletions minos/networks/brokers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
from .abc import (
from .messages import (
REPLY_TOPIC_CONTEXT_VAR,
Command,
CommandReply,
CommandStatus,
Event,
)
from .publishers import (
Broker,
BrokerSetup,
)
from .command_replies import (
CommandReplyBroker,
)
from .commands import (
REPLY_TOPIC_CONTEXT_VAR,
CommandBroker,
)
from .events import (
CommandReplyBroker,
EventBroker,
)
from .producers import (
Producer,
)
from .services import (
ProducerService,
)
from .subscribers import (
CommandHandler,
CommandHandlerService,
Consumer,
ConsumerService,
DynamicHandler,
DynamicHandlerPool,
EventHandler,
EventHandlerService,
Handler,
HandlerEntry,
HandlerRequest,
HandlerResponse,
HandlerResponseException,
HandlerSetup,
)
67 changes: 67 additions & 0 deletions minos/networks/brokers/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import (
annotations,
)

from contextvars import (
ContextVar,
)
from enum import (
IntEnum,
)
from typing import (
Any,
Final,
Optional,
)
from uuid import (
UUID,
)

from minos.common import (
DeclarativeModel,
)

REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None)


class Command(DeclarativeModel):
"""Base Command class."""

topic: str
data: Any
saga: Optional[UUID]
reply_topic: str
user: Optional[UUID]


class CommandReply(DeclarativeModel):
"""Base Command class."""

topic: str
data: Any
saga: Optional[UUID]
status: CommandStatus
service_name: Optional[str]

@property
def ok(self) -> bool:
"""Check if the reply is okay or not.
:return: ``True`` if the reply is okay or ``False`` otherwise.
"""
return self.status == CommandStatus.SUCCESS


class CommandStatus(IntEnum):
"""Command Status class."""

SUCCESS = 200
ERROR = 400
SYSTEM_ERROR = 500


class Event(DeclarativeModel):
"""Base Event class."""

topic: str
data: Any
19 changes: 19 additions & 0 deletions minos/networks/brokers/publishers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from .abc import (
Broker,
BrokerSetup,
)
from .command_replies import (
CommandReplyBroker,
)
from .commands import (
CommandBroker,
)
from .events import (
EventBroker,
)
from .producers import (
Producer,
)
from .services import (
ProducerService,
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
)

from minos.common import (
MinosBroker,
PostgreSqlMinosDatabase,
)

Expand All @@ -22,7 +21,7 @@ async def _create_broker_table(self) -> None:
await self.submit_query(_CREATE_TABLE_QUERY, lock=hash("producer_queue"))


class Broker(MinosBroker, BrokerSetup, ABC):
class Broker(BrokerSetup, ABC):
"""Minos Broker Class."""

ACTION: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
)

from minos.common import (
CommandReply,
CommandStatus,
MinosConfig,
)

from ..messages import (
CommandReply,
CommandStatus,
)
from .abc import (
Broker,
)
Expand All @@ -28,9 +30,13 @@ class CommandReplyBroker(Broker):

ACTION = "commandReply"

def __init__(self, *args, service_name: str, **kwargs):
super().__init__(*args, **kwargs)
self.service_name = service_name

@classmethod
def _from_config(cls, *args, config: MinosConfig, **kwargs) -> CommandReplyBroker:
return cls(*args, **config.broker.queue._asdict(), **kwargs)
return cls(*args, service_name=config.service.name, **config.broker.queue._asdict(), **kwargs)

# noinspection PyMethodOverriding
async def send(self, data: Any, topic: str, saga: UUID, status: CommandStatus, **kwargs) -> int:
Expand All @@ -39,10 +45,10 @@ async def send(self, data: Any, topic: str, saga: UUID, status: CommandStatus, *
:param data: The data to be send.
:param topic: Topic in which the message will be published.
:param saga: Saga identifier.
:param status: Command status.
:param status: command status.
:return: This method does not return anything.
"""

command_reply = CommandReply(topic, data, saga, status)
command_reply = CommandReply(topic, data, saga=saga, status=status, service_name=self.service_name)
logger.info(f"Sending '{command_reply!s}'...")
return await self.enqueue(command_reply.topic, command_reply.avro_bytes)
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,28 @@
)

import logging
from contextvars import (
ContextVar,
)
from typing import (
Any,
Final,
Optional,
)
from uuid import (
UUID,
)

from minos.common import (
Command,
MinosConfig,
)

from ..messages import (
REPLY_TOPIC_CONTEXT_VAR,
Command,
)
from .abc import (
Broker,
)

logger = logging.getLogger(__name__)

REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None)


class CommandBroker(Broker):
"""Minos Command Broker Class."""
Expand All @@ -48,7 +45,7 @@ async def send(
self,
data: Any,
topic: str,
saga: UUID,
saga: Optional[UUID] = None,
reply_topic: Optional[str] = None,
user: Optional[UUID] = None,
**kwargs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
)

import logging

from minos.aggregate import (
AggregateDiff,
from typing import (
Any,
)

from minos.common import (
Event,
MinosConfig,
)

from ..messages import (
Event,
)
from .abc import (
Broker,
)
Expand All @@ -29,7 +31,7 @@ def _from_config(cls, *args, config: MinosConfig, **kwargs) -> EventBroker:
return cls(*args, **config.broker.queue._asdict(), **kwargs)

# noinspection PyMethodOverriding
async def send(self, data: AggregateDiff, topic: str, **kwargs) -> int:
async def send(self, data: Any, topic: str, **kwargs) -> int:
"""Send an ``Event``.
:param data: The data to be send.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
MinosConfig,
)

from ..handlers import (
Consumer,
)
from ..utils import (
from ...utils import (
consume_queue,
)
from ..subscribers import (
Consumer,
)
from .abc import (
BrokerSetup,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
Handler,
HandlerSetup,
)
from .command_replies import (
CommandReplyHandler,
CommandReplyHandlerService,
)
from .commands import (
CommandHandler,
CommandHandlerService,
Expand Down
Loading

0 comments on commit b01b1a7

Please sign in to comment.