Skip to content
This repository has been archived by the owner on Jan 28, 2022. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
…e_common into main
  • Loading branch information
andrea-mucci committed May 3, 2021
2 parents da93d33 + 984a7dd commit e1a4b63
Show file tree
Hide file tree
Showing 16 changed files with 459 additions and 169 deletions.
3 changes: 3 additions & 0 deletions minos/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
MinosConfig,
MinosConfigAbstract,
)
from .database import (
PostgreSqlMinosDatabase,
)
from .exceptions import (
EmptyMinosModelSequenceException,
MinosAttributeValidationException,
Expand Down
27 changes: 25 additions & 2 deletions minos/common/configuration/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@
EVENT = collections.namedtuple("Event", "name controller action")
COMMAND = collections.namedtuple("Command", "name controller action")
SERVICE = collections.namedtuple("Service", "name")

EVENTS = collections.namedtuple("Events", "broker items queue")
COMMANDS = collections.namedtuple("Commands", "broker items queue")
REST = collections.namedtuple("Rest", "broker endpoints")

REPOSITORY = collections.namedtuple("Repository", "database user password host port")
SNAPSHOT = collections.namedtuple("Snapshot", "database user password host port")

_ENVIRONMENT_MAPPER = {
"commands.queue.host": "MINOS_COMMANDS_QUEUE_HOST",
Expand All @@ -57,6 +56,11 @@
"repository.database": "MINOS_REPOSITORY_DATABASE",
"repository.user": "MINOS_REPOSITORY_USER",
"repository.password": "MINOS_REPOSITORY_PASSWORD",
"snapshot.host": "MINOS_SNAPSHOT_HOST",
"snapshot.port": "MINOS_SNAPSHOT_PORT",
"snapshot.database": "MINOS_SNAPSHOT_DATABASE",
"snapshot.user": "MINOS_SNAPSHOT_USER",
"snapshot.password": "MINOS_SNAPSHOT_PASSWORD",
}

_PARAMETERIZED_MAPPER = {
Expand All @@ -79,6 +83,11 @@
"repository.database": "repository_database",
"repository.user": "repository_user",
"repository.password": "repository_password",
"snapshot.host": "snapshot_host",
"snapshot.port": "snapshot_port",
"snapshot.database": "snapshot_database",
"snapshot.user": "snapshot_user",
"snapshot.password": "snapshot_password",
}

_default: t.Optional[MinosConfigAbstract] = None
Expand Down Expand Up @@ -311,3 +320,17 @@ def repository(self) -> REPOSITORY:
host=self._get("repository.host"),
port=int(self._get("repository.port")),
)

@property
def snapshot(self) -> SNAPSHOT:
"""Get the snapshot config.
:return: A ``SNAPSHOT`` NamedTuple instance.
"""
return SNAPSHOT(
database=self._get("snapshot.database"),
user=self._get("snapshot.user"),
password=self._get("snapshot.password"),
host=self._get("snapshot.host"),
port=int(self._get("snapshot.port")),
)
87 changes: 87 additions & 0 deletions minos/common/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
Copyright (C) 2021 Clariteia SL
This file is part of minos framework.
Minos framework can not be copied and/or distributed without the express permission of Clariteia SL.
"""
from abc import (
ABC,
)
from typing import (
AsyncIterator,
NoReturn,
)

import aiopg
from aiopg import (
Pool,
)

from .setup import (
MinosSetup,
)


class PostgreSqlMinosDatabase(ABC, MinosSetup):
"""PostgreSql Minos Database base class."""

def __init__(self, host: str, port: int, database: str, user: str, password: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
self._pool = None

async def _destroy(self) -> NoReturn:
if self._pool is not None:
self._pool.close()
await self._pool.wait_closed()
self._pool = None

async def submit_query_and_fetchone(self, *args, **kwargs) -> tuple:
"""Submit a SQL query and gets the first response.
:param args: Additional positional arguments.
:param kwargs: Additional named arguments.
:return: This method does not return anything.
"""
return await self.submit_query_and_iter(*args, **kwargs).__anext__()

async def submit_query_and_iter(self, *args, **kwargs) -> AsyncIterator[tuple]:
"""Submit a SQL query and return an asynchronous iterator.
:param args: Additional positional arguments.
:param kwargs: Additional named arguments.
:return: This method does not return anything.
"""
pool = await self.pool
with await pool.cursor() as cursor:
await cursor.execute(*args, **kwargs)
async for row in cursor:
yield row

async def submit_query(self, *args, **kwargs) -> NoReturn:
"""Submit a SQL query.
:param args: Additional positional arguments.
:param kwargs: Additional named arguments.
:return: This method does not return anything.
"""
pool = await self.pool
with await pool.cursor() as cursor:
await cursor.execute(*args, **kwargs)

@property
async def pool(self) -> Pool:
"""Get the connections pool.
:return: A ``Pool`` object.
"""
if self._pool is None:
self._pool = await aiopg.create_pool(
host=self.host, port=self.port, dbname=self.database, user=self.user, password=self.password,
)
return self._pool
2 changes: 1 addition & 1 deletion minos/common/model/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def get_one(
_repository = cls._build_repository(_config)

# noinspection PyTypeChecker
entries = await _repository.select(aggregate_name=cls.classname, aggregate_id=id)
entries = [v async for v in _repository.select(aggregate_name=cls.classname, aggregate_id=id)]
if not len(entries):
raise MinosRepositoryAggregateNotFoundException(f"Not found any entries for the {repr(id)} id.")

Expand Down
24 changes: 10 additions & 14 deletions minos/common/repository/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from typing import (
TYPE_CHECKING,
NoReturn,
AsyncIterator,
Optional,
Union,
)
Expand Down Expand Up @@ -55,14 +55,7 @@ def from_config(cls, *args, config: MinosConfig = None, **kwargs) -> Optional[Mi
# noinspection PyProtectedMember
return cls(*args, **config.repository._asdict(), **kwargs)

async def __aenter__(self) -> MinosRepository:
await self.setup()
return self

async def __aexit__(self, exc_type, exc_value, exc_traceback):
pass

async def insert(self, entry: Union[Aggregate, MinosRepositoryEntry]) -> NoReturn:
async def insert(self, entry: Union[Aggregate, MinosRepositoryEntry]) -> MinosRepositoryEntry:
"""Store new insertion entry into de repository.
:param entry: Entry to be stored.
Expand All @@ -76,7 +69,7 @@ async def insert(self, entry: Union[Aggregate, MinosRepositoryEntry]) -> NoRetur
entry.action = MinosRepositoryAction.INSERT
return await self._submit(entry)

async def update(self, entry: Union[Aggregate, MinosRepositoryEntry]) -> NoReturn:
async def update(self, entry: Union[Aggregate, MinosRepositoryEntry]) -> MinosRepositoryEntry:
"""Store new update entry into de repository.
:param entry: Entry to be stored.
Expand All @@ -90,7 +83,7 @@ async def update(self, entry: Union[Aggregate, MinosRepositoryEntry]) -> NoRetur
entry.action = MinosRepositoryAction.UPDATE
return await self._submit(entry)

async def delete(self, entry: Union[Aggregate, MinosRepositoryEntry]) -> NoReturn:
async def delete(self, entry: Union[Aggregate, MinosRepositoryEntry]) -> MinosRepositoryEntry:
"""Store new deletion entry into de repository.
:param entry: Entry to be stored.
Expand Down Expand Up @@ -127,7 +120,7 @@ async def select(
id_gt: Optional[int] = None,
id_le: Optional[int] = None,
id_ge: Optional[int] = None,
) -> list[MinosRepositoryEntry]:
) -> AsyncIterator[MinosRepositoryEntry]:
"""Perform a selection query of entries stored in to the repository.
:param aggregate_id: Aggregate identifier.
Expand All @@ -146,7 +139,7 @@ async def select(
"""
await self.setup()

return await self._select(
generator = self._select(
aggregate_id=aggregate_id,
aggregate_name=aggregate_name,
version=version,
Expand All @@ -160,7 +153,10 @@ async def select(
id_le=id_le,
id_ge=id_ge,
)
# noinspection PyTypeChecker
async for entry in generator:
yield entry

@abstractmethod
async def _select(self, *args, **kwargs) -> list[MinosRepositoryEntry]:
async def _select(self, *args, **kwargs) -> AsyncIterator[MinosRepositoryEntry]:
"""Perform a selection query of entries stored in to the repository."""
30 changes: 22 additions & 8 deletions minos/common/repository/entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
annotations,
)

from datetime import (
datetime,
)
from enum import (
Enum,
)
Expand Down Expand Up @@ -48,7 +51,7 @@ def value_of(cls, value: str) -> Optional[MinosRepositoryAction]:
class MinosRepositoryEntry(object):
"""Class that represents an entry (or row) on the events repository database which stores the aggregate changes."""

__slots__ = "id", "action", "aggregate_id", "aggregate_name", "version", "data"
__slots__ = "aggregate_id", "aggregate_name", "version", "data", "id", "action", "created_at"

# noinspection PyShadowingBuiltins
def __init__(
Expand All @@ -59,27 +62,30 @@ def __init__(
data: Union[bytes, memoryview] = bytes(),
id: Optional[int] = None,
action: Optional[Union[str, MinosRepositoryAction]] = None,
created_at: Optional[datetime] = None,
):
if isinstance(data, memoryview):
data = data.tobytes()
if action is not None and isinstance(action, str):
action = MinosRepositoryAction.value_of(action)

self.id = id
self.action = action

self.aggregate_id = aggregate_id
self.aggregate_name = aggregate_name
self.version = version
self.data = data

self.id = id
self.action = action
self.created_at = created_at

@classmethod
def from_aggregate(cls, aggregate: Aggregate) -> MinosRepositoryEntry:
"""Build a new instance from an ``Aggregate``.
:param aggregate: The aggregate instance.
:return: A new ``MinosRepositoryEntry`` instance.
"""
# noinspection PyTypeChecker
return cls(aggregate.id, aggregate.classname, aggregate.version, aggregate.avro_bytes)

def __eq__(self, other: "MinosRepositoryEntry") -> bool:
Expand All @@ -89,12 +95,20 @@ def __hash__(self) -> int:
return hash(tuple(self))

def __iter__(self) -> Iterable:
# noinspection PyRedundantParentheses
yield from (self.id, self.action, self.aggregate_name, self.version, self.data)
yield from (
self.aggregate_id,
self.aggregate_name,
self.version,
self.data,
self.id,
self.action,
self.created_at,
)

def __repr__(self):
return (
f"{type(self).__name__}(id={repr(self.id)}, action={repr(self.action)}, "
f"{type(self).__name__}("
f"aggregate_id={repr(self.aggregate_id)}, aggregate_name={repr(self.aggregate_name)}, "
f"version={repr(self.version)}, data={repr(self.data)})"
f"version={repr(self.version)}, data={repr(self.data)}, "
f"id={repr(self.id)}, action={repr(self.action)}, created_at={repr(self.created_at)})"
)
11 changes: 4 additions & 7 deletions minos/common/repository/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
count,
)
from typing import (
TYPE_CHECKING,
AsyncIterator,
NoReturn,
Optional,
Union,
)

from .abc import (
Expand All @@ -26,9 +25,6 @@
MinosRepositoryEntry,
)

if TYPE_CHECKING:
from ..model import Aggregate


class MinosInMemoryRepository(MinosRepository):
"""Memory-based implementation of the repository class in ``minos``."""
Expand Down Expand Up @@ -96,7 +92,7 @@ async def _select(
id_ge: Optional[int] = None,
*args,
**kwargs
) -> list[MinosRepositoryEntry]:
) -> AsyncIterator[MinosRepositoryEntry]:

# noinspection DuplicatedCode
def _fn_filter(entry: MinosRepositoryEntry) -> bool:
Expand Down Expand Up @@ -128,4 +124,5 @@ def _fn_filter(entry: MinosRepositoryEntry) -> bool:

iterable = iter(self._storage)
iterable = filter(_fn_filter, iterable)
return list(iterable)
for item in iterable:
yield item
Loading

0 comments on commit e1a4b63

Please sign in to comment.