Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,22 @@ await broker.send_message(...)
The package implements the [Transactional Outbox](https://microservices.io/patterns/data/transactional-outbox.html)
pattern, which ensures that messages are produced to the broker according to the at-least-once semantics.

To use the outbox pattern with SQLAlchemy, you first need to define your outbox model using the provided mixin:

```python
from sqlalchemy.orm import DeclarativeBase
from cqrs.outbox import OutboxModelMixin

class Base(DeclarativeBase):
pass

class OutboxModel(Base, OutboxModelMixin):
__tablename__ = "outbox" # You can customize the table name
```
Then, you can use SqlAlchemyOutboxedEventRepository in your handlers:
```python
import cqrs

def do_some_logic(meeting_room_id: int, session: sql_session.AsyncSession):
"""
Make changes to the database
Expand Down
17 changes: 13 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ dependencies = [
"pydantic==2.*",
"orjson==3.9.15",
"di[anyio]==0.79.2",
"sqlalchemy[asyncio]==2.0.*",
"retry-async==0.1.4",
"python-dotenv==1.0.1",
"dependency-injector>=4.48.2"
"dependency-injector>=4.48.2",
"uuid6>=2025.0.1",
"aio-pika>=9.3.0",
"confluent-kafka>=2.6.0",
]
description = "Python CQRS pattern implementation"
maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
name = "python-cqrs"
readme = "README.md"
requires-python = ">=3.10"
version = "4.4.5"
version = "4.4.6"

[project.optional-dependencies]
dev = [
Expand All @@ -44,8 +46,12 @@ dev = [
"pytest-asyncio~=0.21.1",
"pytest-env==0.6.2",
"cryptography==42.0.2",
"sqlalchemy[asyncio]==2.0.*",
"asyncmy==0.2.9",
"requests>=2.32.5"
"requests>=2.32.5",
"confluent-kafka==2.6.0",
"protobuf==4.25.5",
"aio-pika==9.3.0"
]
examples = [
"fastapi==0.109.*",
Expand All @@ -62,6 +68,9 @@ protobuf = ["protobuf==4.25.5"]
rabbit = [
"aio-pika==9.3.0"
]
sqlalchemy = [
"sqlalchemy[asyncio]==2.0.*"
]

[project.urls]
Documentation = "https://vadikko2.github.io/python-cqrs-mkdocs/"
Expand Down
6 changes: 1 addition & 5 deletions src/cqrs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
)
from cqrs.outbox.map import OutboxedEventMap
from cqrs.outbox.repository import OutboxedEventRepository
from cqrs.outbox.sqlalchemy import (
rebind_outbox_model,
SqlAlchemyOutboxedEventRepository,
)
from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository
from cqrs.producer import EventProducer
from cqrs.requests.map import RequestMap, SagaMap
from cqrs.requests.request import Request
Expand Down Expand Up @@ -56,7 +53,6 @@
"DIContainer",
"Compressor",
"ZlibCompressor",
"rebind_outbox_model",
"Saga",
"SagaStepHandler",
"ContextT",
Expand Down
3 changes: 2 additions & 1 deletion src/cqrs/events/event.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import os
import typing
import uuid6
import uuid

import dotenv
Expand Down Expand Up @@ -30,7 +31,7 @@ class NotificationEvent(Event, typing.Generic[PayloadT], frozen=True):
The base class for notification events
"""

event_id: uuid.UUID = pydantic.Field(default_factory=uuid.uuid4)
event_id: uuid.UUID = pydantic.Field(default_factory=uuid6.uuid7)
event_timestamp: datetime.datetime = pydantic.Field(
default_factory=datetime.datetime.now,
)
Expand Down
20 changes: 20 additions & 0 deletions src/cqrs/outbox/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from cqrs.outbox.repository import OutboxedEventRepository, EventStatus
from cqrs.outbox.map import OutboxedEventMap

__all__ = [
"OutboxedEventRepository",
"EventStatus",
"OutboxedEventMap",
]

try:
from cqrs.outbox.sqlalchemy import (
SqlAlchemyOutboxedEventRepository,
OutboxModelMixin
)
__all__.extend([
"SqlAlchemyOutboxedEventRepository",
"OutboxModelMixin"
])
except ImportError:
pass
145 changes: 82 additions & 63 deletions src/cqrs/outbox/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -1,93 +1,125 @@
import datetime
import logging
import typing

import dotenv
import uuid
import orjson
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.dialects import mysql
from sqlalchemy.ext.asyncio import session as sql_session
from sqlalchemy.orm import DeclarativeMeta, registry
import uuid6

import cqrs
from cqrs import compressors
from cqrs.outbox import map, repository

Base = registry().generate_base()

logger = logging.getLogger(__name__)
try:
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.orm import Mapped, mapped_column, declared_attr
from sqlalchemy.ext.asyncio import session as sql_session
from sqlalchemy.dialects import postgresql
except ImportError:
raise ImportError(
"You are trying to use SQLAlchemy outbox implementation, "
"but 'sqlalchemy' is not installed. "
"Please install it using: pip install python-cqrs[sqlalchemy]"
)

dotenv.load_dotenv()

logger = logging.getLogger(__name__)
DEFAULT_OUTBOX_TABLE_NAME = "outbox"

MAX_FLUSH_COUNTER_VALUE = 5


class OutboxModel(Base):
__tablename__ = DEFAULT_OUTBOX_TABLE_NAME

__table_args__ = (
sqlalchemy.UniqueConstraint(
"event_id_bin",
"event_name",
name="event_id_unique_index",
),
)
id = sqlalchemy.Column(
sqlalchemy.BigInteger(),
class BinaryUUID(sqlalchemy.TypeDecorator):
"""Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL)."""
impl = sqlalchemy.BINARY(16)
cache_ok = True

def load_dialect_impl(self, dialect):
if dialect.name == "postgresql":
return dialect.type_descriptor(postgresql.UUID())
else:
return dialect.type_descriptor(sqlalchemy.BINARY(16))

def process_bind_param(self, value, dialect):
if value is None:
return value
if dialect.name == "postgresql":
return value # asyncpg work with uuid.UUID
if isinstance(value, uuid.UUID):
return value.bytes # For MySQL return 16 bytes
return value

def process_result_value(self, value, dialect):
if value is None:
return value
if dialect.name == "postgresql":
return value # asyncpg return uuid.UUID
if isinstance(value, bytes):
return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID
return value


class OutboxModelMixin:
@declared_attr.directive
def __tablename__(self) -> str:
return DEFAULT_OUTBOX_TABLE_NAME

id: Mapped[int] = mapped_column(
sqlalchemy.BigInteger,
sqlalchemy.Identity(),
primary_key=True,
nullable=False,
autoincrement=True,
comment="Identity",
)
event_id = sqlalchemy.Column(
sqlalchemy.Uuid,
event_id: Mapped[uuid.UUID] = mapped_column(
BinaryUUID,
nullable=False,
comment="Event idempotency id",
)
event_id_bin = sqlalchemy.Column(
sqlalchemy.BINARY(16),
nullable=False,
comment="Event idempotency id in 16 bit presentation",
)
event_status = sqlalchemy.Column(
event_status: Mapped[repository.EventStatus] = mapped_column(
sqlalchemy.Enum(repository.EventStatus),
nullable=False,
default=repository.EventStatus.NEW,
comment="Event producing status",
)
flush_counter = sqlalchemy.Column(
sqlalchemy.SmallInteger(),
flush_counter: Mapped[int] = mapped_column(
sqlalchemy.SmallInteger,
nullable=False,
default=0,
comment="Event producing flush counter",
)
event_name = sqlalchemy.Column(
event_name: Mapped[typing.Text] = mapped_column(
sqlalchemy.String(255),
nullable=False,
comment="Event name",
)
topic = sqlalchemy.Column(
topic: Mapped[typing.Text] = mapped_column(
sqlalchemy.String(255),
nullable=False,
comment="Event topic",
default="",
comment="Event topic",
)
created_at = sqlalchemy.Column(
created_at: Mapped[datetime.datetime] = mapped_column(
sqlalchemy.DateTime,
nullable=False,
server_default=func.now(),
comment="Event creation timestamp",
)
payload = sqlalchemy.Column(
mysql.BLOB,
payload: Mapped[bytes] = mapped_column(
sqlalchemy.LargeBinary,
nullable=False,
default={},
comment="Event payload",
)

@declared_attr
def __table_args__(self):
return (
sqlalchemy.UniqueConstraint(
"event_id",
"event_name",
name="event_id_unique_index",
),
)

def row_to_dict(self) -> typing.Dict[typing.Text, typing.Any]:
return {
column.name: getattr(self, column.name) for column in self.__table__.columns
Expand Down Expand Up @@ -153,10 +185,12 @@ class SqlAlchemyOutboxedEventRepository(repository.OutboxedEventRepository):
def __init__(
self,
session: sql_session.AsyncSession,
outbox_model: type[OutboxModelMixin],
compressor: compressors.Compressor | None = None,
):
self.session = session
self._compressor = compressor
self._outbox_model = outbox_model

def add(
self,
Expand All @@ -176,22 +210,21 @@ def add(
bytes_payload = self._compressor.compress(bytes_payload)

self.session.add(
OutboxModel(
self._outbox_model(
event_id=event.event_id,
event_id_bin=func.UUID_TO_BIN(event.event_id),
event_name=event.event_name,
created_at=event.event_timestamp,
payload=bytes_payload,
topic=event.topic,
),
)

def _process_events(self, model: OutboxModel) -> repository.OutboxedEvent | None:
def _process_events(self, model: OutboxModelMixin) -> repository.OutboxedEvent | None:
event_dict = model.row_to_dict()

event_model = map.OutboxedEventMap.get(event_dict["event_name"])
if event_model is None:
return
return None

if self._compressor is not None:
event_dict["payload"] = self._compressor.decompress(event_dict["payload"])
Expand All @@ -209,8 +242,8 @@ async def get_many(
batch_size: int = 100,
topic: typing.Text | None = None,
) -> typing.List[repository.OutboxedEvent]:
events: typing.Sequence[OutboxModel] = (
(await self.session.execute(OutboxModel.get_batch_query(batch_size, topic)))
events: typing.Sequence[OutboxModelMixin] = (
(await self.session.execute(self._outbox_model.get_batch_query(batch_size, topic)))
.scalars()
.all()
)
Expand All @@ -231,25 +264,11 @@ async def update_status(
new_status: repository.EventStatus,
) -> None:
await self.session.execute(
statement=OutboxModel.update_status_query(outboxed_event_id, new_status),
statement=self._outbox_model.update_status_query(outboxed_event_id, new_status),
)

async def commit(self):
await self.session.commit()

async def rollback(self):
await self.session.rollback()


def rebind_outbox_model(
model: typing.Any,
new_base: DeclarativeMeta,
table_name: typing.Text | None = None,
):
model.__bases__ = (new_base,)
model.__table__.name = table_name or model.__table__.name
new_base.metadata._add_table(
model.__table__.name,
model.__table__.schema,
model.__table__,
)
Loading