From a0c711ef4706f825c252368877a5d6120f00a285 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Thu, 8 Jan 2026 18:25:58 +0300 Subject: [PATCH 1/2] sqlalchemy as optional deps and correction of sqlalchemy models of outbox --- pyproject.toml | 6 +- src/cqrs/__init__.py | 6 +- src/cqrs/outbox/__init__.py | 20 +++++ src/cqrs/outbox/sqlalchemy.py | 110 +++++++++++-------------- src/cqrs/saga/storage/sqlalchemy.py | 20 +++-- tests/integration/test_event_outbox.py | 9 -- 6 files changed, 86 insertions(+), 85 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2a09a6d..6c7b1f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ dependencies = [ "pydantic==2.*", "orjson==3.9.15", "di[anyio]==0.79.2", - "sqlalchemy[asyncio]==2.0.*", "retry-async==0.1.4", "python-dotenv==1.0.1", "dependency-injector>=4.48.2" @@ -29,7 +28,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}] name = "python-cqrs" readme = "README.md" requires-python = ">=3.10" -version = "4.4.5" +version = "4.4.6" [project.optional-dependencies] dev = [ @@ -62,6 +61,9 @@ protobuf = ["protobuf==4.25.5"] rabbit = [ "aio-pika==9.3.0" ] +sqlalchemy = [ + "sqlalchemy[asyncio]==2.0.*" +] [project.urls] Documentation = "https://vadikko2.github.io/python-cqrs-mkdocs/" diff --git a/src/cqrs/__init__.py b/src/cqrs/__init__.py index 9060890..de17693 100644 --- a/src/cqrs/__init__.py +++ b/src/cqrs/__init__.py @@ -13,10 +13,7 @@ ) from cqrs.outbox.map import OutboxedEventMap from cqrs.outbox.repository import OutboxedEventRepository -from cqrs.outbox.sqlalchemy import ( - rebind_outbox_model, - SqlAlchemyOutboxedEventRepository, -) +from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository from cqrs.producer import EventProducer from cqrs.requests.map import RequestMap, SagaMap from cqrs.requests.request import Request @@ -56,7 +53,6 @@ "DIContainer", "Compressor", "ZlibCompressor", - "rebind_outbox_model", "Saga", "SagaStepHandler", "ContextT", diff --git a/src/cqrs/outbox/__init__.py b/src/cqrs/outbox/__init__.py index e69de29..59fb75a 100644 --- a/src/cqrs/outbox/__init__.py +++ b/src/cqrs/outbox/__init__.py @@ -0,0 +1,20 @@ +from cqrs.outbox.repository import OutboxedEventRepository, EventStatus +from cqrs.outbox.map import OutboxedEventMap + +__all__ = [ + "OutboxedEventRepository", + "EventStatus", + "OutboxedEventMap", +] + +try: + from cqrs.outbox.sqlalchemy import ( + SqlAlchemyOutboxedEventRepository, + OutboxModelMixin + ) + __all__.extend([ + "SqlAlchemyOutboxedEventRepository", + "OutboxModelMixin" + ]) +except ImportError: + pass diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index aeb126f..a251a81 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -1,93 +1,92 @@ +import datetime import logging import typing - -import dotenv +import uuid import orjson -import sqlalchemy -from sqlalchemy import func -from sqlalchemy.dialects import mysql -from sqlalchemy.ext.asyncio import session as sql_session -from sqlalchemy.orm import DeclarativeMeta, registry - import cqrs from cqrs import compressors from cqrs.outbox import map, repository -Base = registry().generate_base() - -logger = logging.getLogger(__name__) +try: + import sqlalchemy + from sqlalchemy import func + from sqlalchemy.orm import Mapped, mapped_column, declared_attr + from sqlalchemy.ext.asyncio import session as sql_session +except ImportError: + raise ImportError( + "You are trying to use SQLAlchemy outbox implementation, " + "but 'sqlalchemy' is not installed. " + "Please install it using: pip install python-cqrs[sqlalchemy]" + ) -dotenv.load_dotenv() +logger = logging.getLogger(__name__) DEFAULT_OUTBOX_TABLE_NAME = "outbox" - MAX_FLUSH_COUNTER_VALUE = 5 -class OutboxModel(Base): - __tablename__ = DEFAULT_OUTBOX_TABLE_NAME +class OutboxModelMixin: + @declared_attr.directive + def __tablename__(self) -> str: + return DEFAULT_OUTBOX_TABLE_NAME - __table_args__ = ( - sqlalchemy.UniqueConstraint( - "event_id_bin", - "event_name", - name="event_id_unique_index", - ), - ) - id = sqlalchemy.Column( - sqlalchemy.BigInteger(), + id: Mapped[int] = mapped_column( + sqlalchemy.BigInteger, sqlalchemy.Identity(), primary_key=True, nullable=False, - autoincrement=True, comment="Identity", ) - event_id = sqlalchemy.Column( + event_id: Mapped[uuid.UUID] = mapped_column( sqlalchemy.Uuid, nullable=False, comment="Event idempotency id", ) - event_id_bin = sqlalchemy.Column( - sqlalchemy.BINARY(16), - nullable=False, - comment="Event idempotency id in 16 bit presentation", - ) - event_status = sqlalchemy.Column( + event_status: Mapped[repository.EventStatus] = mapped_column( sqlalchemy.Enum(repository.EventStatus), nullable=False, default=repository.EventStatus.NEW, comment="Event producing status", ) - flush_counter = sqlalchemy.Column( - sqlalchemy.SmallInteger(), + flush_counter: Mapped[int] = mapped_column( + sqlalchemy.SmallInteger, nullable=False, default=0, comment="Event producing flush counter", ) - event_name = sqlalchemy.Column( + event_name: Mapped[typing.Text] = mapped_column( sqlalchemy.String(255), nullable=False, comment="Event name", ) - topic = sqlalchemy.Column( + topic: Mapped[typing.Text] = mapped_column( sqlalchemy.String(255), nullable=False, - comment="Event topic", default="", + comment="Event topic", ) - created_at = sqlalchemy.Column( + created_at: Mapped[datetime.datetime] = mapped_column( sqlalchemy.DateTime, nullable=False, server_default=func.now(), comment="Event creation timestamp", ) - payload = sqlalchemy.Column( - mysql.BLOB, + payload: Mapped[bytes] = mapped_column( + sqlalchemy.LargeBinary, nullable=False, - default={}, comment="Event payload", ) + @declared_attr + def __table_args__(self): + return ( + sqlalchemy.UniqueConstraint( + "event_id", + "event_name", + name="event_id_unique_index", + ), + ) + def row_to_dict(self) -> typing.Dict[typing.Text, typing.Any]: return { column.name: getattr(self, column.name) for column in self.__table__.columns @@ -153,10 +152,12 @@ class SqlAlchemyOutboxedEventRepository(repository.OutboxedEventRepository): def __init__( self, session: sql_session.AsyncSession, + outbox_model: type[OutboxModelMixin], compressor: compressors.Compressor | None = None, ): self.session = session self._compressor = compressor + self._outbox_model = outbox_model def add( self, @@ -176,9 +177,8 @@ def add( bytes_payload = self._compressor.compress(bytes_payload) self.session.add( - OutboxModel( + self._outbox_model( event_id=event.event_id, - event_id_bin=func.UUID_TO_BIN(event.event_id), event_name=event.event_name, created_at=event.event_timestamp, payload=bytes_payload, @@ -186,12 +186,12 @@ def add( ), ) - def _process_events(self, model: OutboxModel) -> repository.OutboxedEvent | None: + def _process_events(self, model: OutboxModelMixin) -> repository.OutboxedEvent | None: event_dict = model.row_to_dict() event_model = map.OutboxedEventMap.get(event_dict["event_name"]) if event_model is None: - return + return None if self._compressor is not None: event_dict["payload"] = self._compressor.decompress(event_dict["payload"]) @@ -209,8 +209,8 @@ async def get_many( batch_size: int = 100, topic: typing.Text | None = None, ) -> typing.List[repository.OutboxedEvent]: - events: typing.Sequence[OutboxModel] = ( - (await self.session.execute(OutboxModel.get_batch_query(batch_size, topic))) + events: typing.Sequence[OutboxModelMixin] = ( + (await self.session.execute(self._outbox_model.get_batch_query(batch_size, topic))) .scalars() .all() ) @@ -231,7 +231,7 @@ async def update_status( new_status: repository.EventStatus, ) -> None: await self.session.execute( - statement=OutboxModel.update_status_query(outboxed_event_id, new_status), + statement=self._outbox_model.update_status_query(outboxed_event_id, new_status), ) async def commit(self): @@ -239,17 +239,3 @@ async def commit(self): async def rollback(self): await self.session.rollback() - - -def rebind_outbox_model( - model: typing.Any, - new_base: DeclarativeMeta, - table_name: typing.Text | None = None, -): - model.__bases__ = (new_base,) - model.__table__.name = table_name or model.__table__.name - new_base.metadata._add_table( - model.__table__.name, - model.__table__.schema, - model.__table__, - ) diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index 3be685d..b3776b2 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -2,18 +2,24 @@ import logging import typing import uuid - -import sqlalchemy -from sqlalchemy import func -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker -from sqlalchemy.orm import registry - from cqrs.dispatcher.exceptions import SagaConcurrencyError from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus from cqrs.saga.storage.models import SagaLogEntry from cqrs.saga.storage.protocol import ISagaStorage +try: + import sqlalchemy + from sqlalchemy import func + from sqlalchemy.exc import SQLAlchemyError + from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + from sqlalchemy.orm import registry +except ImportError: + raise ImportError( + "You are trying to use SQLAlchemy saga storage implementation, " + "but 'sqlalchemy' is not installed. " + "Please install it using: pip install python-cqrs[sqlalchemy]" + ) + Base = registry().generate_base() logger = logging.getLogger(__name__) diff --git a/tests/integration/test_event_outbox.py b/tests/integration/test_event_outbox.py index bb0f542..971262e 100644 --- a/tests/integration/test_event_outbox.py +++ b/tests/integration/test_event_outbox.py @@ -212,12 +212,3 @@ async def test_mark_as_failure_negative(self, session): produce_candidates = await repository.get_many(batch_size=1) assert not len(produce_candidates) - - -async def test_rebind_outbox_model_positive(init_orm, session): - custom_base = registry().generate_base() - sqlalchemy.rebind_outbox_model(sqlalchemy.OutboxModel, custom_base, "rebind_outbox") - await init_orm.run_sync(custom_base.metadata.create_all) - - async with session.begin(): - await session.execute(sqla.text("SELECT * FROM rebind_outbox WHERE True;")) From befd9292cdc774b3e521fbe7126435064822b6d0 Mon Sep 17 00:00:00 2001 From: "nikitakunov8@gmail.com" Date: Fri, 9 Jan 2026 13:33:53 +0300 Subject: [PATCH 2/2] tests and readme fix --- README.md | 15 +++ pyproject.toml | 11 +- src/cqrs/events/event.py | 3 +- src/cqrs/outbox/sqlalchemy.py | 35 +++++- tests/conftest.py | 17 +-- tests/integration/fixtures.py | 94 -------------- tests/integration/sqlalchemy/__init__.py | 0 tests/integration/sqlalchemy/conftest.py | 51 ++++++++ .../{ => sqlalchemy}/test_event_outbox.py | 116 +++++++++++------- .../test_saga_mediator_sqlalchemy.py | 0 .../test_saga_storage_sqlalchemy.py | 2 - 11 files changed, 182 insertions(+), 162 deletions(-) delete mode 100644 tests/integration/fixtures.py create mode 100644 tests/integration/sqlalchemy/__init__.py create mode 100644 tests/integration/sqlalchemy/conftest.py rename tests/integration/{ => sqlalchemy}/test_event_outbox.py (60%) rename tests/integration/{ => sqlalchemy}/test_saga_mediator_sqlalchemy.py (100%) rename tests/integration/{ => sqlalchemy}/test_saga_storage_sqlalchemy.py (98%) diff --git a/README.md b/README.md index e5664b6..68464d6 100644 --- a/README.md +++ b/README.md @@ -467,7 +467,22 @@ await broker.send_message(...) The package implements the [Transactional Outbox](https://microservices.io/patterns/data/transactional-outbox.html) pattern, which ensures that messages are produced to the broker according to the at-least-once semantics. +To use the outbox pattern with SQLAlchemy, you first need to define your outbox model using the provided mixin: + ```python +from sqlalchemy.orm import DeclarativeBase +from cqrs.outbox import OutboxModelMixin + +class Base(DeclarativeBase): + pass + +class OutboxModel(Base, OutboxModelMixin): + __tablename__ = "outbox" # You can customize the table name +``` +Then, you can use SqlAlchemyOutboxedEventRepository in your handlers: +```python +import cqrs + def do_some_logic(meeting_room_id: int, session: sql_session.AsyncSession): """ Make changes to the database diff --git a/pyproject.toml b/pyproject.toml index 6c7b1f6..12c3a3b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,10 @@ dependencies = [ "di[anyio]==0.79.2", "retry-async==0.1.4", "python-dotenv==1.0.1", - "dependency-injector>=4.48.2" + "dependency-injector>=4.48.2", + "uuid6>=2025.0.1", + "aio-pika>=9.3.0", + "confluent-kafka>=2.6.0", ] description = "Python CQRS pattern implementation" maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}] @@ -43,8 +46,12 @@ dev = [ "pytest-asyncio~=0.21.1", "pytest-env==0.6.2", "cryptography==42.0.2", + "sqlalchemy[asyncio]==2.0.*", "asyncmy==0.2.9", - "requests>=2.32.5" + "requests>=2.32.5", + "confluent-kafka==2.6.0", + "protobuf==4.25.5", + "aio-pika==9.3.0" ] examples = [ "fastapi==0.109.*", diff --git a/src/cqrs/events/event.py b/src/cqrs/events/event.py index 3364f0f..e31a646 100644 --- a/src/cqrs/events/event.py +++ b/src/cqrs/events/event.py @@ -1,6 +1,7 @@ import datetime import os import typing +import uuid6 import uuid import dotenv @@ -30,7 +31,7 @@ class NotificationEvent(Event, typing.Generic[PayloadT], frozen=True): The base class for notification events """ - event_id: uuid.UUID = pydantic.Field(default_factory=uuid.uuid4) + event_id: uuid.UUID = pydantic.Field(default_factory=uuid6.uuid7) event_timestamp: datetime.datetime = pydantic.Field( default_factory=datetime.datetime.now, ) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index a251a81..d20ea10 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -3,6 +3,8 @@ import typing import uuid import orjson +import uuid6 + import cqrs from cqrs import compressors from cqrs.outbox import map, repository @@ -12,6 +14,7 @@ from sqlalchemy import func from sqlalchemy.orm import Mapped, mapped_column, declared_attr from sqlalchemy.ext.asyncio import session as sql_session + from sqlalchemy.dialects import postgresql except ImportError: raise ImportError( "You are trying to use SQLAlchemy outbox implementation, " @@ -25,6 +28,36 @@ MAX_FLUSH_COUNTER_VALUE = 5 +class BinaryUUID(sqlalchemy.TypeDecorator): + """Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL).""" + impl = sqlalchemy.BINARY(16) + cache_ok = True + + def load_dialect_impl(self, dialect): + if dialect.name == "postgresql": + return dialect.type_descriptor(postgresql.UUID()) + else: + return dialect.type_descriptor(sqlalchemy.BINARY(16)) + + def process_bind_param(self, value, dialect): + if value is None: + return value + if dialect.name == "postgresql": + return value # asyncpg work with uuid.UUID + if isinstance(value, uuid.UUID): + return value.bytes # For MySQL return 16 bytes + return value + + def process_result_value(self, value, dialect): + if value is None: + return value + if dialect.name == "postgresql": + return value # asyncpg return uuid.UUID + if isinstance(value, bytes): + return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID + return value + + class OutboxModelMixin: @declared_attr.directive def __tablename__(self) -> str: @@ -38,7 +71,7 @@ def __tablename__(self) -> str: comment="Identity", ) event_id: Mapped[uuid.UUID] = mapped_column( - sqlalchemy.Uuid, + BinaryUUID, nullable=False, comment="Event idempotency id", ) diff --git a/tests/conftest.py b/tests/conftest.py index b76341b..ce5f4bf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,26 +1,13 @@ -import asyncio -from unittest import mock - import pytest - +from unittest import mock from cqrs.adapters import kafka + TEST_TOPIC = "TestCqrsTopic" pytest_plugins = ["tests.integration.fixtures"] -@pytest.fixture(scope="session") -def event_loop(): - """Create event loop for session-scoped fixtures.""" - loop = asyncio.new_event_loop() - try: - yield loop - finally: - if not loop.is_closed(): - loop.close() - - @pytest.fixture(scope="function") async def kafka_producer() -> kafka.KafkaProducer: return mock.create_autospec(kafka.KafkaProducer) diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py deleted file mode 100644 index 8965e73..0000000 --- a/tests/integration/fixtures.py +++ /dev/null @@ -1,94 +0,0 @@ -import contextlib -import functools -import os - -import dotenv -import pytest -from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine - -from cqrs.outbox import sqlalchemy - -dotenv.load_dotenv() -DATABASE_DSN = os.environ.get("DATABASE_DSN", "") - - -@pytest.fixture(scope="function") -async def init_orm(): - """Initialize outbox tables - drops and creates tables before each test.""" - engine = create_async_engine( - DATABASE_DSN, - pool_pre_ping=True, - pool_size=10, - max_overflow=30, - echo=False, - ) - async with engine.begin() as connect: - await connect.run_sync(sqlalchemy.Base.metadata.drop_all) - await connect.run_sync(sqlalchemy.Base.metadata.create_all) - yield connect - - -@pytest.fixture(scope="function") -async def session(init_orm): - engine_factory = functools.partial( - create_async_engine, - DATABASE_DSN, - isolation_level="REPEATABLE READ", - ) - session = async_sessionmaker(engine_factory())() - async with contextlib.aclosing(session): - yield session - - -# Saga storage fixtures -@pytest.fixture(scope="session") -async def init_saga_orm(): - """Initialize saga storage tables - drops and creates tables BEFORE test only.""" - from cqrs.saga.storage.sqlalchemy import Base - - engine = create_async_engine( - DATABASE_DSN, - pool_pre_ping=True, - pool_size=10, - max_overflow=30, - echo=False, - ) - # Drop and create tables BEFORE test (not after) - # Use begin() to ensure tables are created, but don't keep transaction open - async with engine.begin() as connect: - await connect.run_sync(Base.metadata.drop_all) - await connect.run_sync(Base.metadata.create_all) - - # Yield engine so it can be used for sessions - # Data will persist after test because we don't drop tables in cleanup - yield engine - - # Cleanup: dispose engine but DON'T drop tables - keep data in DB - await engine.dispose() - - -@pytest.fixture(scope="session") -def saga_session_factory(init_saga_orm): - """Create a session factory for saga storage tests.""" - engine = init_saga_orm - return async_sessionmaker(engine, expire_on_commit=False, autocommit=False) - - -@pytest.fixture(scope="session") -async def saga_session(saga_session_factory): - """Create a session for saga storage tests - commits data to persist.""" - # Use autocommit=False but ensure we commit explicitly - session = saga_session_factory() - - async with contextlib.aclosing(session): - try: - yield session - # Final commit before closing to ensure data persists - if session.in_transaction(): - await session.commit() - except Exception: - # Only rollback on exception - if session.in_transaction(): - await session.rollback() - raise - # No cleanup that would delete data - data persists in DB diff --git a/tests/integration/sqlalchemy/__init__.py b/tests/integration/sqlalchemy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/sqlalchemy/conftest.py b/tests/integration/sqlalchemy/conftest.py new file mode 100644 index 0000000..44208a6 --- /dev/null +++ b/tests/integration/sqlalchemy/conftest.py @@ -0,0 +1,51 @@ +import pytest +import os +import asyncio +from sqlalchemy.ext import asyncio as sqla_async +import dotenv + +dotenv.load_dotenv() +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///:memory:") + +@pytest.fixture(scope="session") +def event_loop(): + loop = asyncio.new_event_loop() + yield loop + loop.close() + + +@pytest.fixture(scope="session") +async def engine(): + engine = sqla_async.create_async_engine(url=DATABASE_URL, pool_pre_ping=True) + yield engine + await engine.dispose() + + +@pytest.fixture(scope="function") +async def session(engine): + connection = await engine.connect() + transaction = await connection.begin() + + session_maker = sqla_async.async_sessionmaker(bind=connection, expire_on_commit=False) + session = session_maker() + + yield session + + await session.close() + await transaction.rollback() + await connection.close() + + +@pytest.fixture(scope="session") +async def init_saga_orm(engine): + """Initialize saga storage tables.""" + from cqrs.saga.storage.sqlalchemy import Base + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + yield + + +@pytest.fixture(scope="session") +def saga_session_factory(saga_engine, init_saga_orm): + return sqla_async.async_sessionmaker(saga_engine, expire_on_commit=False) \ No newline at end of file diff --git a/tests/integration/test_event_outbox.py b/tests/integration/sqlalchemy/test_event_outbox.py similarity index 60% rename from tests/integration/test_event_outbox.py rename to tests/integration/sqlalchemy/test_event_outbox.py index 971262e..829180a 100644 --- a/tests/integration/test_event_outbox.py +++ b/tests/integration/sqlalchemy/test_event_outbox.py @@ -1,8 +1,9 @@ import typing import pydantic +import pytest +from sqlalchemy import orm as sqla_orm import sqlalchemy as sqla -from sqlalchemy.orm import registry import cqrs from cqrs import events, Request, RequestHandler @@ -13,6 +14,14 @@ ) +class Base(sqla_orm.DeclarativeBase): + pass + + +class TestOutboxModel(Base, sqlalchemy.OutboxModelMixin): + __tablename__ = "test_outbox" + + class OutboxRequest(Request): message: typing.Text count: int @@ -52,19 +61,38 @@ async def handle(self, request: OutboxRequest) -> None: await self.repository.commit() +@pytest.fixture +def outbox_repo(session): + return sqlalchemy.SqlAlchemyOutboxedEventRepository( + session, + outbox_model=TestOutboxModel + ) + + class TestOutbox: - async def test_outbox_add_3_event_positive(self, session): + + @pytest.fixture(autouse=True) + async def setup_db(self, engine): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + @pytest.fixture(autouse=True) + async def cleanup_table(self, session, setup_db): + await session.execute(sqla.text(f"TRUNCATE TABLE {TestOutboxModel.__tablename__}")) + await session.commit() + yield + + async def test_outbox_add_3_event_positive(self, outbox_repo): """ checks positive save events to outbox case """ - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest(message="test_outbox_add_3_event_positive", count=3) - await OutboxRequestHandler(repository).handle(request) + await OutboxRequestHandler(outbox_repo).handle(request) not_produced_events: typing.List[ outbox_repository.OutboxedEvent - ] = await repository.get_many(3) - await session.commit() + ] = await outbox_repo.get_many(3) + await outbox_repo.commit() assert len(not_produced_events) == 3 assert all( @@ -80,135 +108,129 @@ async def test_outbox_add_3_event_positive(self, session): ), ) - async def test_get_new_events_positive(self, session): + async def test_get_new_events_positive(self, outbox_repo): """ checks getting many new events """ - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=3, ) - await OutboxRequestHandler(repository).handle(request) + await OutboxRequestHandler(outbox_repo).handle(request) - events_list = await repository.get_many(3) - await session.commit() + events_list = await outbox_repo.get_many(3) + await outbox_repo.commit() assert len(events_list) == 3 - async def test_get_new_events_negative(self, session): + async def test_get_new_events_negative(self, outbox_repo): """ checks getting many new events, but not produced """ - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=3, ) - await OutboxRequestHandler(repository).handle(request) - events_list = await repository.get_many(3) - await repository.update_status( + await OutboxRequestHandler(outbox_repo).handle(request) + events_list = await outbox_repo.get_many(3) + await outbox_repo.update_status( events_list[-1].id, repository_protocol.EventStatus.PRODUCED, ) - await session.commit() + await outbox_repo.commit() - new_events_list = await repository.get_many(3) - await session.commit() + new_events_list = await outbox_repo.get_many(3) + await outbox_repo.commit() assert len(new_events_list) == 2 - async def test_get_new_event_positive(self, session): + async def test_get_new_event_positive(self, outbox_repo): """ checks getting one event positive """ - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=1, ) - await OutboxRequestHandler(repository).handle(request) - [event_over_get_all_events_method] = await repository.get_many(1) + await OutboxRequestHandler(outbox_repo).handle(request) + [event_over_get_all_events_method] = await outbox_repo.get_many(1) event: outbox_repository.OutboxedEvent | None = next( iter( - await repository.get_many( + await outbox_repo.get_many( batch_size=1, ), ), None, ) - await session.commit() + await outbox_repo.commit() assert event assert event.id == event_over_get_all_events_method.id # noqa - async def test_get_new_event_negative(self, session): + async def test_get_new_event_negative(self, outbox_repo): """ checks getting one event positive, but not produced """ - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=1, ) - await OutboxRequestHandler(repository).handle(request) - [event_over_get_all_events_method] = await repository.get_many(1) - await repository.update_status( + await OutboxRequestHandler(outbox_repo).handle(request) + [event_over_get_all_events_method] = await outbox_repo.get_many(1) + await outbox_repo.update_status( event_over_get_all_events_method.id, repository_protocol.EventStatus.PRODUCED, ) - await session.commit() + await outbox_repo.commit() - event = await repository.get_many( + event = await outbox_repo.get_many( batch_size=1, ) - await session.commit() + await outbox_repo.commit() assert not event - async def test_mark_as_failure_positive(self, session): + async def test_mark_as_failure_positive(self, outbox_repo): """checks reading failure produced event successfully""" - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=2, ) - await OutboxRequestHandler(repository).handle(request) - [failure_event, success_event] = await repository.get_many(2) + await OutboxRequestHandler(outbox_repo).handle(request) + [failure_event, success_event] = await outbox_repo.get_many(2) # mark FIRST event as failure - await repository.update_status( + await outbox_repo.update_status( failure_event.id, repository_protocol.EventStatus.NOT_PRODUCED, ) - await session.commit() + await outbox_repo.commit() - produce_candidates = await repository.get_many(batch_size=2) + produce_candidates = await outbox_repo.get_many(batch_size=2) assert len(produce_candidates) == 2 # check events order by status assert produce_candidates[0].id == success_event.id assert produce_candidates[1].id == failure_event.id - async def test_mark_as_failure_negative(self, session): + async def test_mark_as_failure_negative(self, outbox_repo): """checks reading failure produced events with flush_counter speeding""" - repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) request = OutboxRequest( message="test_outbox_mark_event_as_produced_positive", count=1, ) - await OutboxRequestHandler(repository).handle(request) - [failure_event] = await repository.get_many(1) + await OutboxRequestHandler(outbox_repo).handle(request) + [failure_event] = await outbox_repo.get_many(1) for _ in range(sqlalchemy.MAX_FLUSH_COUNTER_VALUE): - await repository.update_status( + await outbox_repo.update_status( failure_event.id, repository_protocol.EventStatus.NOT_PRODUCED, ) - await session.commit() + await outbox_repo.commit() - produce_candidates = await repository.get_many(batch_size=1) + produce_candidates = await outbox_repo.get_many(batch_size=1) assert not len(produce_candidates) diff --git a/tests/integration/test_saga_mediator_sqlalchemy.py b/tests/integration/sqlalchemy/test_saga_mediator_sqlalchemy.py similarity index 100% rename from tests/integration/test_saga_mediator_sqlalchemy.py rename to tests/integration/sqlalchemy/test_saga_mediator_sqlalchemy.py diff --git a/tests/integration/test_saga_storage_sqlalchemy.py b/tests/integration/sqlalchemy/test_saga_storage_sqlalchemy.py similarity index 98% rename from tests/integration/test_saga_storage_sqlalchemy.py rename to tests/integration/sqlalchemy/test_saga_storage_sqlalchemy.py index a4d92c6..6050057 100644 --- a/tests/integration/test_saga_storage_sqlalchemy.py +++ b/tests/integration/sqlalchemy/test_saga_storage_sqlalchemy.py @@ -9,8 +9,6 @@ from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus from cqrs.saga.storage.sqlalchemy import SqlAlchemySagaStorage -# Fixtures init_saga_orm and saga_session_factory are imported from tests/integration/fixtures.py - @pytest.fixture def storage(