diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..499c856 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,74 @@ +name: Tests + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + +jobs: + lint: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11", "3.12"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[dev,examples]" + + - name: Run ruff check + run: | + ruff check src tests examples + + - name: Run ruff format check + run: | + ruff format --check src tests examples + + - name: Run pyright + run: | + pyright --pythonversion ${{ matrix.python-version }} src tests examples + + - name: Check minimum Python version (vermin) + run: | + vermin --target=3.10- --violations --eval-annotations --backport typing_extensions --exclude=venv --exclude=build --exclude=.git --exclude=.venv src examples tests + + test: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11", "3.12"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[dev]" + + - name: Run tests + run: | + pytest --cov=src --cov-report=xml ./tests/unit + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} + fail_ci_if_error: false diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3e85db2..8086d97 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,13 +1,11 @@ repos: - hooks: - - id: check-toml - id: check-docstring-first - id: check-ast - exclude: (^tests/mock/|^tests/integration/|^tests/fixtures) id: trailing-whitespace - id: end-of-file-fixer - id: check-yaml - - id: check-toml - id: check-added-large-files - args: - --pytest-test-first @@ -21,25 +19,6 @@ repos: - id: add-trailing-comma repo: https://github.com/asottile/add-trailing-comma rev: v3.1.0 -- hooks: - - args: - - --autofix - - --indent - - '2' - files: ^.*\.yaml$ - id: pretty-format-yaml - - args: - - --autofix - - --indent - - '2' - id: pretty-format-toml - repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks - rev: v2.12.0 -- hooks: - - id: toml-sort - - id: toml-sort-fix - repo: https://github.com/pappasam/toml-sort - rev: v0.23.1 - hooks: - id: pycln name: pycln @@ -61,6 +40,13 @@ repos: hooks: - id: pyright types: [python] +- repo: https://github.com/netromdk/vermin + rev: v1.6.0 + hooks: + - id: vermin + args: [--target=3.10-, --violations, --eval-annotations, --backport typing_extensions, --exclude=venv, --exclude=build, --exclude=.git, --exclude=.venv, src, examples, tests] + language: python + additional_dependencies: [vermin] - repo: local hooks: - id: pytest-unit diff --git a/README.md b/README.md index 5c6e613..8d122b3 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,9 @@

Python CQRS

Event-Driven Architecture Framework for Distributed Systems

+ + Python Versions + PyPI version @@ -18,6 +21,9 @@ Downloads per month + + Coverage + Documentation @@ -52,7 +58,8 @@ project ([documentation](https://akhundmurad.github.io/diator/)) with several en 11. Parallel event processing with configurable concurrency limits; 12. Chain of Responsibility pattern support with `CORRequestHandler` for processing requests through multiple handlers in sequence; 13. Orchestrated Saga pattern support for managing distributed transactions with automatic compensation and recovery mechanisms; -14. Built-in Mermaid diagram generation, enabling automatic generation of Sequence and Class diagrams for documentation and visualization. +14. Built-in Mermaid diagram generation, enabling automatic generation of Sequence and Class diagrams for documentation and visualization; +15. Flexible Request and Response types support - use Pydantic-based or Dataclass-based implementations, with the ability to mix and match types based on your needs. ## Request Handlers @@ -236,6 +243,61 @@ class_diagram = generator.class_diagram() Complete example: [CoR Mermaid Diagrams](https://github.com/vadikko2/cqrs/blob/master/examples/cor_mermaid.py) +## Request and Response Types + +The library supports both Pydantic-based (`PydanticRequest`/`PydanticResponse`, aliased as `Request`/`Response`) and Dataclass-based (`DCRequest`/`DCResponse`) implementations. You can also implement custom classes by implementing the `IRequest`/`IResponse` interfaces directly. + +```python +import dataclasses + +# Pydantic-based (default) +class CreateUserCommand(cqrs.Request): + username: str + email: str + +class UserResponse(cqrs.Response): + user_id: str + username: str + +# Dataclass-based +@dataclasses.dataclass +class CreateProductCommand(cqrs.DCRequest): + name: str + price: float + +@dataclasses.dataclass +class ProductResponse(cqrs.DCResponse): + product_id: str + name: str + +# Custom implementation +class CustomRequest(cqrs.IRequest): + def __init__(self, user_id: str, action: str): + self.user_id = user_id + self.action = action + + def to_dict(self) -> dict: + return {"user_id": self.user_id, "action": self.action} + + @classmethod + def from_dict(cls, **kwargs) -> "CustomRequest": + return cls(user_id=kwargs["user_id"], action=kwargs["action"]) + +class CustomResponse(cqrs.IResponse): + def __init__(self, result: str, status: int): + self.result = result + self.status = status + + def to_dict(self) -> dict: + return {"result": self.result, "status": self.status} + + @classmethod + def from_dict(cls, **kwargs) -> "CustomResponse": + return cls(result=kwargs["result"], status=kwargs["status"]) +``` + +A complete example can be found in [request_response_types.py](https://github.com/vadikko2/cqrs/blob/master/examples/request_response_types.py) + ## Saga Pattern The package implements the Orchestrated Saga pattern for managing distributed transactions across multiple services or operations. @@ -871,7 +933,7 @@ async def process_files_stream( async for result in mediator.stream(command): sse_data = { "type": "progress", - "data": result.model_dump(), + "data": result.to_dict(), } yield f"data: {json.dumps(sse_data)}\n\n" diff --git a/examples/dependency_injector_integration_practical_example.py b/examples/dependency_injector_integration_practical_example.py index 4e28673..8db017e 100644 --- a/examples/dependency_injector_integration_practical_example.py +++ b/examples/dependency_injector_integration_practical_example.py @@ -68,7 +68,13 @@ from abc import ABC, abstractmethod from collections.abc import AsyncGenerator from contextlib import asynccontextmanager -from typing import Generic, Optional, Self, TypeVar +import sys +from typing import Generic, Optional, TypeVar + +if sys.version_info >= (3, 11): + from typing import Self # novm +else: + from typing_extensions import Self import uvicorn diff --git a/examples/kafka_event_consuming.py b/examples/kafka_event_consuming.py index 2a0543b..989b033 100644 --- a/examples/kafka_event_consuming.py +++ b/examples/kafka_event_consuming.py @@ -177,6 +177,6 @@ async def hello_world_event_handler( ) print( f"1. Run kafka infrastructure with: `docker compose -f ./docker-compose-dev.yml up -d`\n" - f"2. Send to kafka topic `hello_world` event: {orjson.dumps(ev.model_dump(mode='json')).decode()}", + f"2. Send to kafka topic `hello_world` event: {orjson.dumps(ev.to_dict()).decode()}", ) asyncio.run(app.run()) diff --git a/examples/kafka_proto_event_consuming.py b/examples/kafka_proto_event_consuming.py deleted file mode 100644 index 686a433..0000000 --- a/examples/kafka_proto_event_consuming.py +++ /dev/null @@ -1,163 +0,0 @@ -""" -Example: Consuming Protobuf Events from Kafka - -This example demonstrates how to consume Protobuf-serialized events from Kafka -and process them using CQRS event handlers. The system shows how to use Protobuf -for efficient binary serialization in event-driven systems. - -Use case: High-throughput event processing with efficient serialization. Protobuf -provides compact binary format, faster serialization/deserialization, and schema -evolution support compared to JSON. This is ideal for systems processing large -volumes of events. - -================================================================================ -HOW TO RUN THIS EXAMPLE -================================================================================ - -Step 1: Start Kafka Infrastructure ------------------------------------ - docker compose -f ./docker-compose-dev.yml up -d - -Wait for Kafka to be ready (usually takes 30-60 seconds). - -Step 2: Send Protobuf Events to Kafka --------------------------------------- -In a separate terminal, run the producer: - python examples/kafka_proto_event_producing.py - -This will send a Protobuf-serialized UserJoinedECST event to the "user_joined_proto" topic. - -Step 3: Run the Consumer -------------------------- - python examples/kafka_proto_event_consuming.py - -The consumer will: -- Connect to Kafka broker at localhost:9092 -- Subscribe to "user_joined_proto" topic -- Deserialize Protobuf messages into UserJoinedECST events -- Process events through event handlers -- Print event details for each received event - -================================================================================ -WHAT THIS EXAMPLE DEMONSTRATES -================================================================================ - -1. Protobuf Deserialization: - - Use ProtobufValueDeserializer to deserialize Kafka messages - - Deserialize binary Protobuf data into typed event objects - - Map Protobuf messages to domain event models - -2. Protobuf Schema Integration: - - Use generated Protobuf classes (UserJoinedECSTProtobuf) - - Convert Protobuf messages to domain events - - Handle schema evolution and versioning - -3. Event Handler Processing: - - Register event handlers for Protobuf events - - EventMediator dispatches events to handlers - - Handlers process events asynchronously - -4. Error Handling: - - Check for DeserializeProtobufError before processing - - Acknowledge messages only after successful processing - - Handle deserialization failures gracefully - -================================================================================ -REQUIREMENTS -================================================================================ - -Make sure you have installed: - - cqrs (this package) - - di (dependency injection) - - faststream (Kafka integration) - - protobuf (Protobuf support) - -Make sure Kafka is running: - - Use docker-compose-dev.yml to start Kafka locally - - Or configure connection to existing Kafka cluster - -For more information about Protobuf deserialization: - https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/protobuf_consumer.py - -================================================================================ -""" - -import asyncio -import logging - -import cqrs -import di -import faststream -from cqrs.deserializers import protobuf -from cqrs.events import bootstrap -from faststream import kafka - -from examples import kafka_proto_event_producing -from examples.proto.user_joined_pb2 import UserJoinedECST as UserJoinedECSTProtobuf # type: ignore - -logging.basicConfig(level=logging.DEBUG) -logging.getLogger("aiokafka").setLevel(logging.ERROR) -logger = logging.getLogger("cqrs") - -broker = kafka.KafkaBroker(bootstrap_servers=["localhost:9092"]) -app = faststream.FastStream(broker, logger=logger) - -TOPIC_NAME = "user_joined_proto" - - -class UserJoinedECSTEventHandler( - cqrs.EventHandler[kafka_proto_event_producing.UserJoinedECST], -): - async def handle( - self, - event: kafka_proto_event_producing.UserJoinedECST, - ) -> None: - print( - f"Handle user {event.payload.user_id} joined meeting {event.payload.meeting_id} event", - ) - - -def events_mapper(mapper: cqrs.EventMap) -> None: - """Maps events to handlers.""" - mapper.bind( - kafka_proto_event_producing.UserJoinedECST, - UserJoinedECSTEventHandler, - ) - - -def mediator_factory() -> cqrs.EventMediator: - return bootstrap.bootstrap( - di_container=di.Container(), - events_mapper=events_mapper, - ) - - -@broker.subscriber( - TOPIC_NAME, - group_id="protobuf_consumers", - auto_commit=False, - auto_offset_reset="earliest", - value_deserializer=protobuf.ProtobufValueDeserializer( - model=kafka_proto_event_producing.UserJoinedECST, - protobuf_model=UserJoinedECSTProtobuf, - ), -) -async def consumer( - body: kafka_proto_event_producing.UserJoinedECST - | protobuf.DeserializeProtobufError, - msg: kafka.KafkaMessage, - mediator: cqrs.EventMediator = faststream.Depends(mediator_factory), -) -> None: - if not isinstance(body, protobuf.DeserializeProtobufError): - await mediator.send(body) - await msg.ack() - - -if __name__ == "__main__": - # More information about deserialization: - # https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/protobuf_consumer.py - print( - "1. Run kafka infrastructure with: `docker compose -f ./docker-compose-dev.yml up -d`\n" - "2. Send event to kafka topic via `python examples/kafka_proto_event_producing.py`", - ) - asyncio.run(app.run()) diff --git a/examples/kafka_proto_event_producing.py b/examples/kafka_proto_event_producing.py deleted file mode 100644 index 6c88f1d..0000000 --- a/examples/kafka_proto_event_producing.py +++ /dev/null @@ -1,163 +0,0 @@ -""" -Example: Producing Protobuf Events to Kafka - -This example demonstrates how to produce Protobuf-serialized events to Kafka. -The system shows how to use Protobuf for efficient binary serialization in -event-driven systems. - -Use case: High-throughput event publishing with efficient serialization. Protobuf -provides compact binary format, faster serialization/deserialization, and schema -evolution support compared to JSON. This is ideal for systems publishing large -volumes of events. - -================================================================================ -HOW TO RUN THIS EXAMPLE -================================================================================ - -Step 1: Start Kafka Infrastructure ------------------------------------ - docker compose -f ./docker-compose-dev.yml up -d - -Wait for Kafka to be ready (usually takes 30-60 seconds). - -Step 2: Run the Producer -------------------------- - python examples/kafka_proto_event_producing.py - -The producer will: -- Create a UserJoinedECST event with Protobuf payload -- Convert the event to Protobuf format -- Publish the event to Kafka topic "user_joined_proto" -- Use Protobuf serialization for efficient binary encoding - -Step 3: Verify Event (Optional) ---------------------------------- -Run the consumer example to verify the event was published: - python examples/kafka_proto_event_consuming.py - -================================================================================ -WHAT THIS EXAMPLE DEMONSTRATES -================================================================================ - -1. Protobuf Event Definition: - - Create NotificationEvent with typed payloads (Pydantic models) - - Implement proto() method to convert events to Protobuf format - - Map domain events to Protobuf schema - -2. Protobuf Serialization: - - Configure Kafka producer with protobuf_value_serializer - - Serialize events to compact binary format - - Reduce message size compared to JSON - -3. Kafka Producer Configuration: - - Set up Kafka producer with connection settings - - Configure security protocols (PLAINTEXT or SASL_SSL) - - Support for SSL/TLS and SASL authentication - -4. Event Publishing: - - Create OutboxedEvent wrapper for publishing - - Send events to Kafka topics using message broker - - Events are serialized and published asynchronously - -================================================================================ -REQUIREMENTS -================================================================================ - -Make sure you have installed: - - cqrs (this package) - - pydantic (for typed payloads) - - protobuf (Protobuf support) - -Make sure Kafka is running: - - Use docker-compose-dev.yml to start Kafka locally - - Or configure connection to existing Kafka cluster - -For more information about Protobuf serialization: - https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/protobuf_producer.py - -================================================================================ -""" - -import asyncio -import ssl - -import pydantic - -import cqrs -from cqrs.adapters import kafka as kafka_adapters -from cqrs.message_brokers import kafka, protocol as broker_protocol -from cqrs.outbox import repository -from cqrs.serializers import protobuf -from examples.proto.user_joined_pb2 import UserJoinedECST as UserJoinedECSTProtobuf # type: ignore - - -class UserJoinedECSTPayload(pydantic.BaseModel, frozen=True): - user_id: str - meeting_id: str - - model_config = pydantic.ConfigDict(from_attributes=True) - - -class UserJoinedECST(cqrs.NotificationEvent[UserJoinedECSTPayload], frozen=True): - def proto(self) -> UserJoinedECSTProtobuf: - return UserJoinedECSTProtobuf( - event_id=str(self.event_id), - event_timestamp=str(self.event_timestamp), - event_name=self.event_name, - payload=UserJoinedECSTProtobuf.Payload( - user_id=self.payload.user_id, # type: ignore - meeting_id=self.payload.meeting_id, # type: ignore - ), - ) - - -def create_kafka_producer( - ssl_context: ssl.SSLContext | None = None, -) -> kafka_adapters.KafkaProducer: - dsn = "localhost:9092" - value_serializer = protobuf.protobuf_value_serializer - if ssl_context is None: - return kafka_adapters.kafka_producer_factory( - security_protocol="PLAINTEXT", - sasl_mechanism="PLAIN", - dsn=dsn, - value_serializer=value_serializer, - ) - return kafka_adapters.kafka_producer_factory( - security_protocol="SASL_SSL", - sasl_mechanism="SCRAM-SHA-256", - ssl_context=ssl_context, - dsn=dsn, - value_serializer=value_serializer, - ) - - -async def main(): - event = UserJoinedECST( - event_name="user_joined_ecst", - topic="user_joined_proto", - payload=UserJoinedECSTPayload(user_id="123", meeting_id="456"), - ) - kafka_producer = create_kafka_producer(ssl_context=None) - broker = kafka.KafkaMessageBroker( - producer=kafka_producer, - ) - await broker.send_message( - message=broker_protocol.Message( - message_name=event.event_name, - message_id=event.event_id, - topic=event.topic, - payload=repository.OutboxedEvent( - id=1, - event=event, - status=repository.EventStatus.NEW, - topic=event.topic, - ), - ), - ) - - -if __name__ == "__main__": - # More information about serialization: - # https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/protobuf_producer.py - asyncio.run(main()) diff --git a/examples/request_response_types.py b/examples/request_response_types.py new file mode 100644 index 0000000..55ff351 --- /dev/null +++ b/examples/request_response_types.py @@ -0,0 +1,388 @@ +""" +Example: Different Request and Response Types + +This example demonstrates the flexibility of the CQRS library in supporting different +types of Request and Response implementations. The library supports both Pydantic-based +and Dataclass-based implementations, allowing you to choose the best fit for your needs. + +Use case: Flexibility in choosing request/response implementations. You can use: +- PydanticRequest/PydanticResponse for validation and serialization features +- DCRequest/DCResponse for lightweight implementations without Pydantic dependency +- Mix and match different types based on your requirements + +================================================================================ +HOW TO RUN THIS EXAMPLE +================================================================================ + +Run the example: + python examples/request_response_types.py + +The example will: +- Demonstrate Pydantic-based requests and responses +- Demonstrate Dataclass-based requests and responses +- Show mixed usage (Pydantic request with Dataclass response, etc.) +- Verify that all types work correctly with the mediator + +================================================================================ +WHAT THIS EXAMPLE DEMONSTRATES +================================================================================ + +1. PydanticRequest and PydanticResponse: + - Use Pydantic models for automatic validation + - Benefit from Pydantic's serialization features + - Type-safe with runtime validation + +2. DCRequest and DCResponse: + - Use Python dataclasses for lightweight implementations + - No Pydantic dependency required + - Simple and straightforward + +3. Mixed Usage: + - Combine Pydantic requests with Dataclass responses + - Combine Dataclass requests with Pydantic responses + - Flexibility to choose the best type for each use case + +4. Type Compatibility: + - All request types implement IRequest interface + - All response types implement IResponse interface + - Mediator works seamlessly with all types + +================================================================================ +REQUIREMENTS +================================================================================ + +Make sure you have installed: + - cqrs (this package) + - di (dependency injection) + - pydantic (for PydanticRequest/PydanticResponse) + +================================================================================ +""" + +import asyncio +import dataclasses +import logging +import typing + +import di +import pydantic + +import cqrs +from cqrs.requests import bootstrap + +logging.basicConfig(level=logging.INFO) + +# Storage for demonstration +USER_STORAGE: typing.Dict[str, typing.Dict[str, typing.Any]] = {} +PRODUCT_STORAGE: typing.Dict[str, typing.Dict[str, typing.Any]] = {} +ORDER_STORAGE: typing.Dict[str, typing.Dict[str, typing.Any]] = {} + +# ============================================================================ +# Pydantic-based Request and Response +# ============================================================================ + + +class CreateUserCommand(cqrs.PydanticRequest): + """Pydantic-based command with automatic validation.""" + + username: str + email: str + age: int = pydantic.Field(gt=0, le=120) + + +class UserResponse(cqrs.PydanticResponse): + """Pydantic-based response with validation.""" + + user_id: str + username: str + email: str + age: int + + +class CreateUserCommandHandler(cqrs.RequestHandler[CreateUserCommand, UserResponse]): + """Handler using Pydantic request and response.""" + + @property + def events(self) -> typing.Sequence[cqrs.IEvent]: + return [] + + async def handle(self, request: CreateUserCommand) -> UserResponse: + user_id = f"user_{len(USER_STORAGE) + 1}" + user_data = { + "user_id": user_id, + "username": request.username, + "email": request.email, + "age": request.age, + } + USER_STORAGE[user_id] = user_data + print(f"Created user with Pydantic: {user_data}") + return UserResponse(**user_data) + + +# ============================================================================ +# Dataclass-based Request and Response +# ============================================================================ + + +@dataclasses.dataclass +class CreateProductCommand(cqrs.DCRequest): + """Dataclass-based command - lightweight, no Pydantic dependency.""" + + name: str + price: float + category: str + + +@dataclasses.dataclass +class ProductResponse(cqrs.DCResponse): + """Dataclass-based response - simple and straightforward.""" + + product_id: str + name: str + price: float + category: str + + +class CreateProductCommandHandler( + cqrs.RequestHandler[CreateProductCommand, ProductResponse], +): + """Handler using Dataclass request and response.""" + + @property + def events(self) -> typing.Sequence[cqrs.IEvent]: + return [] + + async def handle(self, request: CreateProductCommand) -> ProductResponse: + product_id = f"product_{len(PRODUCT_STORAGE) + 1}" + product_data = { + "product_id": product_id, + "name": request.name, + "price": request.price, + "category": request.category, + } + PRODUCT_STORAGE[product_id] = product_data + print(f"Created product with Dataclass: {product_data}") + return ProductResponse(**product_data) + + +# ============================================================================ +# Mixed: Pydantic Request with Dataclass Response +# ============================================================================ + + +class CreateOrderCommand(cqrs.PydanticRequest): + """Pydantic request with validation.""" + + user_id: str + product_id: str + quantity: int = pydantic.Field(gt=0) + + +@dataclasses.dataclass +class OrderResponse(cqrs.DCResponse): + """Dataclass response - lightweight.""" + + order_id: str + user_id: str + product_id: str + quantity: int + total_price: float + + +class CreateOrderCommandHandler( + cqrs.RequestHandler[CreateOrderCommand, OrderResponse], +): + """Handler mixing Pydantic request with Dataclass response.""" + + @property + def events(self) -> typing.Sequence[cqrs.IEvent]: + return [] + + async def handle(self, request: CreateOrderCommand) -> OrderResponse: + if request.user_id not in USER_STORAGE: + raise ValueError(f"User {request.user_id} not found") + if request.product_id not in PRODUCT_STORAGE: + raise ValueError(f"Product {request.product_id} not found") + + order_id = f"order_{len(ORDER_STORAGE) + 1}" + product = PRODUCT_STORAGE[request.product_id] + total_price = product["price"] * request.quantity + + order_data = { + "order_id": order_id, + "user_id": request.user_id, + "product_id": request.product_id, + "quantity": request.quantity, + "total_price": total_price, + } + ORDER_STORAGE[order_id] = order_data + print(f"Created order (Pydantic request + Dataclass response): {order_data}") + return OrderResponse(**order_data) + + +# ============================================================================ +# Mixed: Dataclass Request with Pydantic Response +# ============================================================================ + + +@dataclasses.dataclass +class GetUserQuery(cqrs.DCRequest): + """Dataclass query - simple and lightweight.""" + + user_id: str + + +class UserDetailsResponse(cqrs.PydanticResponse): + """Pydantic response with validation.""" + + user_id: str + username: str + email: str + age: int + total_orders: int = 0 + + +class GetUserQueryHandler( + cqrs.RequestHandler[GetUserQuery, UserDetailsResponse], +): + """Handler mixing Dataclass request with Pydantic response.""" + + @property + def events(self) -> typing.Sequence[cqrs.IEvent]: + return [] + + async def handle(self, request: GetUserQuery) -> UserDetailsResponse: + if request.user_id not in USER_STORAGE: + raise ValueError(f"User {request.user_id} not found") + + user = USER_STORAGE[request.user_id] + total_orders = sum( + 1 for order in ORDER_STORAGE.values() if order["user_id"] == request.user_id + ) + + return UserDetailsResponse( + user_id=user["user_id"], + username=user["username"], + email=user["email"], + age=user["age"], + total_orders=total_orders, + ) + + +# ============================================================================ +# Mapping and Bootstrap +# ============================================================================ + + +def commands_mapper(mapper: cqrs.RequestMap) -> None: + """Register all command handlers.""" + mapper.bind(CreateUserCommand, CreateUserCommandHandler) + mapper.bind(CreateProductCommand, CreateProductCommandHandler) + mapper.bind(CreateOrderCommand, CreateOrderCommandHandler) + + +def queries_mapper(mapper: cqrs.RequestMap) -> None: + """Register all query handlers.""" + mapper.bind(GetUserQuery, GetUserQueryHandler) + + +# ============================================================================ +# Main Execution +# ============================================================================ + + +async def main(): + """Demonstrate different request/response type combinations.""" + mediator = bootstrap.bootstrap( + di_container=di.Container(), + commands_mapper=commands_mapper, + queries_mapper=queries_mapper, + ) + + print("=" * 80) + print("Demonstrating Different Request/Response Types") + print("=" * 80) + print() + + # 1. Pydantic Request + Pydantic Response + print("1. Pydantic Request + Pydantic Response") + print("-" * 80) + user_response = await mediator.send( + CreateUserCommand(username="john_doe", email="john@example.com", age=30), + ) + print(f"Response type: {type(user_response).__name__}") + print(f"Response data: {user_response.to_dict()}") + print() + + # 2. Dataclass Request + Dataclass Response + print("2. Dataclass Request + Dataclass Response") + print("-" * 80) + product_response = await mediator.send( + CreateProductCommand(name="Laptop", price=999.99, category="Electronics"), + ) + print(f"Response type: {type(product_response).__name__}") + print(f"Response data: {product_response.to_dict()}") + print() + + # 3. Pydantic Request + Dataclass Response + print("3. Pydantic Request + Dataclass Response") + print("-" * 80) + order_response = await mediator.send( + CreateOrderCommand( + user_id=user_response.user_id, + product_id=product_response.product_id, + quantity=2, + ), + ) + print(f"Response type: {type(order_response).__name__}") + print(f"Response data: {order_response.to_dict()}") + print() + + # 4. Dataclass Request + Pydantic Response + print("4. Dataclass Request + Pydantic Response") + print("-" * 80) + user_details = await mediator.send(GetUserQuery(user_id=user_response.user_id)) + print(f"Response type: {type(user_details).__name__}") + print(f"Response data: {user_details.to_dict()}") + print() + + # Demonstrate serialization/deserialization + print("=" * 80) + print("Serialization/Deserialization Demo") + print("=" * 80) + print() + + # Serialize Pydantic response + user_dict = user_response.to_dict() + print(f"Pydantic response serialized: {user_dict}") + restored_user = UserResponse.from_dict(**user_dict) + print(f"Pydantic response restored: {restored_user}") + print() + + # Serialize Dataclass response + product_dict = product_response.to_dict() + print(f"Dataclass response serialized: {product_dict}") + restored_product = ProductResponse.from_dict(**product_dict) + print(f"Dataclass response restored: {restored_product}") + print() + + # Validation example with Pydantic + print("=" * 80) + print("Pydantic Validation Example") + print("=" * 80) + try: + # This should fail validation (age > 120) + await mediator.send( + CreateUserCommand(username="invalid", email="test@example.com", age=150), + ) + except pydantic.ValidationError as e: + print(f"Validation error caught (expected): {e}") + print() + + print("=" * 80) + print("All examples completed successfully!") + print("=" * 80) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/saga_fastapi_sse.py b/examples/saga_fastapi_sse.py index 08fe7fd..32c4ef0 100644 --- a/examples/saga_fastapi_sse.py +++ b/examples/saga_fastapi_sse.py @@ -291,7 +291,7 @@ def __init__(self, inventory_service: InventoryService) -> None: self._events: list[cqrs.Event] = [] @property - def events(self) -> list[cqrs.Event]: + def events(self) -> typing.Sequence[cqrs.IEvent]: return self._events.copy() async def act( @@ -325,7 +325,7 @@ def __init__(self, payment_service: PaymentService) -> None: self._events: list[cqrs.Event] = [] @property - def events(self) -> list[cqrs.Event]: + def events(self) -> typing.Sequence[cqrs.IEvent]: return self._events.copy() async def act( @@ -356,7 +356,7 @@ def __init__(self, shipping_service: ShippingService) -> None: self._events: list[cqrs.Event] = [] @property - def events(self) -> list[cqrs.Event]: + def events(self) -> typing.Sequence[cqrs.IEvent]: return self._events.copy() async def act( @@ -449,9 +449,7 @@ def mediator_factory() -> cqrs.SagaMediator: def serialize_response(response: Response | None) -> dict[str, typing.Any]: if response is None: return {} - if isinstance(response, pydantic.BaseModel): - return response.model_dump() - return {"response": str(response)} + return response.to_dict() @app.post("/process-order") diff --git a/pyproject.toml b/pyproject.toml index 77144af..7abcd8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,65 +9,62 @@ authors = [ ] classifiers = [ "Development Status :: 4 - Beta", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", - "License :: OSI Approved :: MIT License", - "Operating System :: OS Independent" + "Programming Language :: Python :: 3.12" ] dependencies = [ - "pydantic==2.*", - "orjson==3.9.15", "di[anyio]==0.79.2", - "sqlalchemy[asyncio]==2.0.*", - "retry-async==0.1.4", + "dependency-injector>=4.48.2", + "orjson==3.9.15", + "pydantic==2.*", "python-dotenv==1.0.1", - "dependency-injector>=4.48.2" + "retry-async==0.1.4", + "sqlalchemy[asyncio]==2.0.*", + "typing-extensions>=4.0.0" ] description = "Python CQRS pattern implementation" maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}] name = "python-cqrs" readme = "README.md" requires-python = ">=3.10" -version = "4.5.1" +version = "4.6.0" [project.optional-dependencies] -aiobreaker = [ - "aiobreaker>=0.3.0" -] +aiobreaker = ["aiobreaker>=0.3.0"] dev = [ # Develope tools "pycln==2.5.0", "pre-commit==3.8.0", - "pyright==1.1.377", + "pyright==1.1.408", "ruff==0.6.2", + "vermin>=1.6.0", + "pytest-cov>=4.0.0", # Tests - "aiokafka==0.10.0", + "aio-pika==9.3.0", # from rabbit + "aiokafka==0.10.0", # from kafka + "requests==2.*", # from aiokafka "pytest~=7.4.2", "pytest-asyncio~=0.21.1", "pytest-env==0.6.2", "cryptography==42.0.2", "asyncmy==0.2.9", - "requests>=2.32.5", "redis>=5.0.0", # Circuit breaker for tests - "aiobreaker>=0.3.0" + "aiobreaker>=0.3.0" # from aiobreaker ] examples = [ "fastapi==0.109.*", - "uvicorn==0.32.0", "faststream[kafka]==0.5.28", - "faker>=37.12.0" -] -kafka = [ - "aiokafka==0.10.0", - # for SchemaRegistry - "confluent-kafka==2.6.0" + "faker>=37.12.0", + "uvicorn==0.32.0", + "aiohttp==3.13.2" ] +kafka = ["aiokafka==0.10.0"] protobuf = ["protobuf==4.25.5"] -rabbit = [ - "aio-pika==9.3.0" -] +rabbit = ["aio-pika==9.3.0"] [project.urls] Documentation = "https://mkdocs.python-cqrs.dev/" @@ -80,5 +77,8 @@ asyncio_mode = "auto" junit_family = "xunit1" testpaths = ["tests"] +[tool.ruff] +target-version = "py310" + [tool.setuptools.packages.find] where = ["src"] diff --git a/pyrightconfig.json b/pyrightconfig.json index 5fae9df..3662dee 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -8,14 +8,20 @@ "defineConstant": { "DEBUG": true }, - "pythonVersion": "3.12", + "pythonVersion": "3.10", "pythonPlatform": "Linux", "executionEnvironments": [ { "root": "./", - "pythonVersion": "3.12", + "pythonVersion": "3.10", "pythonPlatform": "Linux", "reportMissingImports": "error" + }, + { + "root": "./examples", + "pythonVersion": "3.10", + "pythonPlatform": "Linux", + "reportMissingImports": "warning" } ] } diff --git a/scripts/upload_proto.sh b/scripts/upload_proto.sh deleted file mode 100644 index fed6f11..0000000 --- a/scripts/upload_proto.sh +++ /dev/null @@ -1,11 +0,0 @@ -# This script is required to upload proto schema to Schema Registry -# !/bin/bash -user_joined_proto=$(cat ./examples/proto/user_joined.proto | jq -Rs .) -echo "Upload schema: $user_joined_proto" - -curl -X POST http://localhost:8085/subjects/user_joined_proto-value/versions \ --H "Content-Type: application/vnd.schemaregistry.v1+json" \ --d '{ - "schemaType": "PROTOBUF", - "schema": "syntax = \"proto3\";\n\nmessage UserJoinedECST {\n message Payload {\n string user_id = 1;\n string meeting_id = 2;\n }\n string event_id = 1;\n string event_timestamp = 2;\n string event_name = 3;\n Payload payload = 6;\n}\n\nmessage UserJoinedNotification {\n message Payload {\n string user_id = 1;\n string meeting_id = 2;\n }\n string event_id = 1;\n string event_timestamp = 2;\n string event_name = 3;\n Payload payload = 6;\n}\n" -}' diff --git a/src/cqrs/__init__.py b/src/cqrs/__init__.py index 9060890..319c7a1 100644 --- a/src/cqrs/__init__.py +++ b/src/cqrs/__init__.py @@ -2,7 +2,20 @@ from cqrs.container.di import DIContainer from cqrs.container.protocol import Container from cqrs.events import EventMap -from cqrs.events.event import DomainEvent, Event, NotificationEvent +from cqrs.events.event import ( + DCEvent, + DCDomainEvent, + DCNotificationEvent, + DomainEvent, + Event, + IDomainEvent, + IEvent, + INotificationEvent, + NotificationEvent, + PydanticDomainEvent, + PydanticEvent, + PydanticNotificationEvent, +) from cqrs.events.event_emitter import EventEmitter from cqrs.events.event_handler import EventHandler from cqrs.mediator import ( @@ -12,24 +25,32 @@ StreamingRequestMediator, ) from cqrs.outbox.map import OutboxedEventMap -from cqrs.outbox.repository import OutboxedEventRepository +from cqrs.outbox.repository import ( + EventStatus, + OutboxedEvent, + OutboxedEventRepository, +) from cqrs.outbox.sqlalchemy import ( rebind_outbox_model, SqlAlchemyOutboxedEventRepository, ) from cqrs.producer import EventProducer from cqrs.requests.map import RequestMap, SagaMap -from cqrs.requests.request import Request +from cqrs.requests.mermaid import CoRMermaid +from cqrs.requests.request import DCRequest, IRequest, PydanticRequest, Request from cqrs.requests.request_handler import ( RequestHandler, StreamingRequestHandler, ) -from cqrs.response import Response -from cqrs.requests.mermaid import CoRMermaid +from cqrs.response import DCResponse, IResponse, PydanticResponse, Response from cqrs.saga.mermaid import SagaMermaid from cqrs.saga.models import ContextT from cqrs.saga.saga import Saga -from cqrs.saga.step import SagaStepHandler +from cqrs.saga.step import ( + Resp, + SagaStepHandler, + SagaStepResult, +) __all__ = ( "RequestMediator", @@ -37,18 +58,35 @@ "StreamingRequestMediator", "EventMediator", "DomainEvent", + "IDomainEvent", + "DCDomainEvent", + "PydanticDomainEvent", "NotificationEvent", + "INotificationEvent", + "DCNotificationEvent", + "PydanticNotificationEvent", "Event", + "IEvent", + "DCEvent", + "PydanticEvent", "EventEmitter", "EventHandler", "EventMap", "OutboxedEventMap", + "EventStatus", + "OutboxedEvent", "Request", + "IRequest", + "DCRequest", + "PydanticRequest", "RequestHandler", "StreamingRequestHandler", "RequestMap", "SagaMap", "Response", + "IResponse", + "DCResponse", + "PydanticResponse", "OutboxedEventRepository", "SqlAlchemyOutboxedEventRepository", "EventProducer", @@ -59,6 +97,8 @@ "rebind_outbox_model", "Saga", "SagaStepHandler", + "SagaStepResult", + "Resp", "ContextT", "SagaMermaid", "CoRMermaid", diff --git a/src/cqrs/adapters/amqp.py b/src/cqrs/adapters/amqp.py index 512e692..7501af9 100644 --- a/src/cqrs/adapters/amqp.py +++ b/src/cqrs/adapters/amqp.py @@ -23,10 +23,19 @@ class AMQPPublisher(protocol.AMQPPublisher): def __init__(self, channel_pool: pool.Pool[aio_pika.abc.AbstractChannel]): self.channel_pool = channel_pool - async def publish(self, message: abc.AbstractMessage, queue_name: str, exchange_name: str) -> None: + async def publish( + self, + message: abc.AbstractMessage, + queue_name: str, + exchange_name: str, + ) -> None: async with self.channel_pool.acquire() as channel: queue = await channel.declare_queue(queue_name) - exchange = await channel.declare_exchange(exchange_name, type="direct", auto_delete=True) + exchange = await channel.declare_exchange( + exchange_name, + type="direct", + auto_delete=True, + ) await queue.bind(exchange=exchange, routing_key=queue_name) await exchange.publish(message=message, routing_key=queue_name) diff --git a/src/cqrs/adapters/protocol.py b/src/cqrs/adapters/protocol.py index 86a1a1d..7886909 100644 --- a/src/cqrs/adapters/protocol.py +++ b/src/cqrs/adapters/protocol.py @@ -1,6 +1,7 @@ import typing -import aio_pika +if typing.TYPE_CHECKING: + import aio_pika class KafkaProducer(typing.Protocol): @@ -14,7 +15,7 @@ async def produce( class AMQPPublisher(typing.Protocol): async def publish( self, - message: aio_pika.abc.AbstractMessage, + message: "aio_pika.abc.AbstractMessage", queue_name: str, exchange_name: str, ) -> None: ... @@ -23,6 +24,9 @@ async def publish( class AMQPConsumer(typing.Protocol): async def consume( self, - handler: typing.Callable[[aio_pika.abc.AbstractIncomingMessage], typing.Awaitable[None]], + handler: typing.Callable[ + ["aio_pika.abc.AbstractIncomingMessage"], + typing.Awaitable[None], + ], queue_name: str, ) -> None: ... diff --git a/src/cqrs/container/dependency_injector.py b/src/cqrs/container/dependency_injector.py index e7fd7ab..3373e64 100644 --- a/src/cqrs/container/dependency_injector.py +++ b/src/cqrs/container/dependency_injector.py @@ -1,4 +1,4 @@ -from typing import TypeVar, Type, Optional, cast +from typing import TypeVar, Optional, cast import inspect import functools from dependency_injector import providers @@ -9,7 +9,7 @@ class DependencyInjectorCQRSContainer( - CQRSContainerProtocol[DependencyInjectorContainer] + CQRSContainerProtocol[DependencyInjectorContainer], ): """ Adapter bridging dependency-injector containers with CQRS framework. @@ -136,7 +136,7 @@ def attach_external_container(self, container: DependencyInjectorContainer) -> N self._get_provider.cache_clear() self._traverse_container(container) - async def resolve(self, type_: Type[T]) -> T: + async def resolve(self, type_: type[T]) -> T: """ Resolve and instantiate a dependency by its type. @@ -268,7 +268,7 @@ def _traverse_container( def _get_provider_by_path_segments( self, path_segments: tuple[str, ...], - ) -> providers.Provider[T]: + ) -> providers.Provider[object]: """ Navigate container hierarchy to retrieve a provider by its access path. @@ -309,7 +309,7 @@ def _get_provider_by_path_segments( @functools.cache def _get_provider( self, - requested_type: Type[T], + requested_type: type[T], ) -> providers.Provider[T]: """ Find and return the provider for a requested type with caching. @@ -370,8 +370,11 @@ def _get_provider( """ # Strategy 1: Exact type match if requested_type in self._type_to_provider_path_map: - return self._get_provider_by_path_segments( - self._type_to_provider_path_map[requested_type] + return cast( + providers.Provider[T], + self._get_provider_by_path_segments( + self._type_to_provider_path_map[requested_type], + ), ) # Strategy 2: Inheritance-based match @@ -379,12 +382,15 @@ def _get_provider( # This enables resolving abstract base classes to their concrete implementations for registered_type in self._type_to_provider_path_map: if issubclass(registered_type, requested_type): - return self._get_provider_by_path_segments( - self._type_to_provider_path_map[registered_type] + return cast( + providers.Provider[T], + self._get_provider_by_path_segments( + self._type_to_provider_path_map[registered_type], + ), ) # No provider found for the requested type raise ValueError( f"Provider for type {requested_type.__name__} not found. " - f"Ensure the type is registered in the dependency-injector container." + f"Ensure the type is registered in the dependency-injector container.", ) diff --git a/src/cqrs/deserializers/__init__.py b/src/cqrs/deserializers/__init__.py index f7ea6eb..115a56b 100644 --- a/src/cqrs/deserializers/__init__.py +++ b/src/cqrs/deserializers/__init__.py @@ -1,13 +1,8 @@ -from cqrs.deserializers.exceptions import ( - DeserializeJsonError, - DeserializeProtobufError, -) -from cqrs.deserializers.json import JsonDeserializer -from cqrs.deserializers.protobuf import ProtobufValueDeserializer +from cqrs.deserializers.exceptions import DeserializeJsonError +from cqrs.deserializers.json import Deserializable, JsonDeserializer __all__ = ( + "Deserializable", "JsonDeserializer", "DeserializeJsonError", - "ProtobufValueDeserializer", - "DeserializeProtobufError", ) diff --git a/src/cqrs/deserializers/exceptions.py b/src/cqrs/deserializers/exceptions.py index 3152059..4cb2d0a 100644 --- a/src/cqrs/deserializers/exceptions.py +++ b/src/cqrs/deserializers/exceptions.py @@ -1,15 +1,18 @@ +import dataclasses import typing -import pydantic +@dataclasses.dataclass(frozen=True) +class DeserializeJsonError: + """ + Error that occurred during JSON deserialization. -class DeserializeJsonError(pydantic.BaseModel): - error_message: str - error_type: typing.Type[Exception] - message_data: str | bytes | None + Args: + error_message: Human-readable error message + error_type: Type of the exception that occurred + message_data: The original message data that failed to deserialize + """ - -class DeserializeProtobufError(pydantic.BaseModel): error_message: str error_type: typing.Type[Exception] - message_data: bytes + message_data: str | bytes | None diff --git a/src/cqrs/deserializers/json.py b/src/cqrs/deserializers/json.py index 27fe056..765b8cb 100644 --- a/src/cqrs/deserializers/json.py +++ b/src/cqrs/deserializers/json.py @@ -1,25 +1,104 @@ import logging import typing +import sys -import pydantic +import orjson from cqrs.deserializers.exceptions import DeserializeJsonError -_T = typing.TypeVar("_T", bound=pydantic.BaseModel) +if sys.version_info >= (3, 11): + from typing import Self # novm +else: + from typing_extensions import Self logger = logging.getLogger("cqrs") +class Deserializable(typing.Protocol): + """ + Protocol for objects that can be deserialized from a dictionary. + + Objects implementing this protocol must have a classmethod `from_dict` + that creates an instance from keyword arguments. + """ + + @classmethod + def from_dict(cls, **kwargs) -> Self: + """ + Create an instance from keyword arguments. + + Args: + **kwargs: Keyword arguments matching the object fields. + + Returns: + A new instance of the class. + """ + ... + + +_T = typing.TypeVar("_T", bound=Deserializable) + + class JsonDeserializer(typing.Generic[_T]): - def __init__(self, model: typing.Type[_T]): - self._model: typing.Type[_T] = model + """ + Deserializer for JSON messages. + + Converts JSON strings or bytes into Python objects using the `from_dict` + classmethod of the target model. + + Example:: + + deserializer = JsonDeserializer(MyEvent) + result = deserializer('{"field": "value"}') + if isinstance(result, DeserializeJsonError): + # Handle error + else: + # Use result + """ + + def __init__(self, model: typing.Type[typing.Any]): + """ + Initialize JSON deserializer. + + Args: + model: Class that implements Deserializable protocol (has a from_dict classmethod). + Can be a regular type or a parameterized generic type + (e.g., NotificationEvent[PayloadType]). + + Note: + The model type must implement the Deserializable protocol (have a from_dict + classmethod). This is verified at runtime. For proper type inference, + specify the generic parameter: JsonDeserializer[ConcreteType](model=...) + """ + # Runtime check: verify that model implements Deserializable protocol + if not hasattr(model, "from_dict") or not callable( + getattr(model, "from_dict", None), + ): + raise TypeError( + f"Model {model} does not implement Deserializable protocol: " + "missing 'from_dict' classmethod", + ) + # Store model - type is preserved through generic parameter _T for return type + self._model: typing.Type[typing.Any] = model def __call__(self, data: str | bytes | None) -> _T | None | DeserializeJsonError: - if data is None: - return + """ + Deserialize JSON data into model instance. + Args: + data: JSON string, bytes, or None + + Returns: + Instance of the model, None if data is None, or DeserializeJsonError on failure. + """ + if data is None: + return None try: - return self._model.model_validate_json(data) + json_dict = orjson.loads(data) + # Safe cast: model is Type[_T] where _T bound=Deserializable, + # so from_dict is guaranteed to return _T + result = self._model.from_dict(**json_dict) + return typing.cast(_T, result) except Exception as e: logger.error( f"Error while deserializing json message: {e}", diff --git a/src/cqrs/deserializers/protobuf.py b/src/cqrs/deserializers/protobuf.py deleted file mode 100644 index 31f5532..0000000 --- a/src/cqrs/deserializers/protobuf.py +++ /dev/null @@ -1,66 +0,0 @@ -import logging -import typing - -import cqrs -import pydantic -from confluent_kafka.schema_registry import protobuf -from google.protobuf.message import Message - -from cqrs.deserializers.exceptions import DeserializeProtobufError - -logger = logging.getLogger("cqrs") - - -class ProtobufValueDeserializer: - """ - Deserialize protobuf message into CQRS event model. - """ - - def __init__( - self, - model: typing.Type[cqrs.NotificationEvent], - protobuf_model: typing.Type[Message], - ): - self._model = model - self._protobuf_model = protobuf_model - - def __call__( - self, - msg: typing.ByteString, - ) -> cqrs.NotificationEvent | DeserializeProtobufError: - protobuf_deserializer = protobuf.ProtobufDeserializer( - self._protobuf_model, - {"use.deprecated.format": False}, - ) - try: - proto_event = protobuf_deserializer(msg, None) - except Exception as error: - logger.error( - f"Error while deserializing protobuf message: {error}", - ) - return DeserializeProtobufError( - error_message=str(error), - error_type=type(error), - message_data=bytes(msg), - ) - - if proto_event is None: - logger.debug("Protobuf message is empty") - empty_error = ValueError("Protobuf message is empty") - return DeserializeProtobufError( - error_message=str(empty_error), - error_type=type(empty_error), - message_data=bytes(msg), - ) - - try: - return self._model.model_validate(proto_event) - except pydantic.ValidationError as error: - logger.error( - f"Error while validate proto event into model {self._model.__name__}: {error}", - ) - return DeserializeProtobufError( - error_message=str(error), - error_type=type(error), - message_data=bytes(msg), - ) diff --git a/src/cqrs/dispatcher/event.py b/src/cqrs/dispatcher/event.py index 5408372..da10198 100644 --- a/src/cqrs/dispatcher/event.py +++ b/src/cqrs/dispatcher/event.py @@ -2,7 +2,7 @@ import typing from cqrs.container.protocol import Container -from cqrs.events.event import Event +from cqrs.events.event import IEvent from cqrs.events.event_handler import EventHandler from cqrs.events.map import EventMap from cqrs.middlewares.base import MiddlewareChain @@ -25,13 +25,13 @@ def __init__( async def _handle_event( self, - event: Event, + event: IEvent, handle_type: typing.Type[_EventHandler], ): handler: _EventHandler = await self._container.resolve(handle_type) await handler.handle(event) - async def dispatch(self, event: Event) -> None: + async def dispatch(self, event: IEvent) -> None: handler_types = self._event_map.get(type(event), []) if not handler_types: logger.warning( diff --git a/src/cqrs/dispatcher/models.py b/src/cqrs/dispatcher/models.py index 4c82cb9..5db8e44 100644 --- a/src/cqrs/dispatcher/models.py +++ b/src/cqrs/dispatcher/models.py @@ -2,13 +2,13 @@ import logging import typing -from cqrs.events.event import Event -from cqrs.response import Response +from cqrs.events.event import IEvent +from cqrs.response import IResponse from cqrs.saga.step import SagaStepResult logger = logging.getLogger("cqrs") -_ResponseT = typing.TypeVar("_ResponseT", Response, None, covariant=True) +_ResponseT = typing.TypeVar("_ResponseT", IResponse, None, covariant=True) @dataclasses.dataclass @@ -16,7 +16,7 @@ class RequestDispatchResult(typing.Generic[_ResponseT]): """Result of request dispatch execution.""" response: _ResponseT - events: typing.List[Event] = dataclasses.field(default_factory=list) + events: typing.Sequence[IEvent] = dataclasses.field(default_factory=list) @dataclasses.dataclass @@ -24,5 +24,5 @@ class SagaDispatchResult: """Result of saga dispatch execution for a single step.""" step_result: SagaStepResult - events: typing.List[Event] = dataclasses.field(default_factory=list) + events: typing.List[IEvent] = dataclasses.field(default_factory=list) saga_id: str | None = None diff --git a/src/cqrs/dispatcher/request.py b/src/cqrs/dispatcher/request.py index eb4e9ed..3f94f8f 100644 --- a/src/cqrs/dispatcher/request.py +++ b/src/cqrs/dispatcher/request.py @@ -16,7 +16,7 @@ CORRequestHandlerT as CORRequestHandlerType, ) from cqrs.requests.map import RequestMap, HandlerType -from cqrs.requests.request import Request +from cqrs.requests.request import IRequest from cqrs.requests.request_handler import RequestHandler logger = logging.getLogger("cqrs") @@ -57,18 +57,15 @@ async def _resolve_handler( "COR handler must be type CORRequestHandler", ) - async with asyncio.TaskGroup() as tg: - tasks = [ - tg.create_task(self._container.resolve(h)) for h in handler_type - ] - handlers = [task.result() for task in tasks] + tasks = [self._container.resolve(h) for h in handler_type] + handlers = await asyncio.gather(*tasks) return build_chain( typing.cast(typing.List[CORRequestHandlerType], handlers), ) return typing.cast(_RequestHandler, await self._container.resolve(handler_type)) - async def dispatch(self, request: Request) -> RequestDispatchResult: + async def dispatch(self, request: IRequest) -> RequestDispatchResult: handler_type = self._request_map.get(type(request), None) if not handler_type: raise RequestHandlerDoesNotExist( diff --git a/src/cqrs/dispatcher/streaming.py b/src/cqrs/dispatcher/streaming.py index a985e01..716f83b 100644 --- a/src/cqrs/dispatcher/streaming.py +++ b/src/cqrs/dispatcher/streaming.py @@ -6,7 +6,7 @@ from cqrs.dispatcher.models import RequestDispatchResult from cqrs.middlewares.base import MiddlewareChain from cqrs.requests.map import RequestMap -from cqrs.requests.request import Request +from cqrs.requests.request import IRequest from cqrs.requests.request_handler import StreamingRequestHandler @@ -30,7 +30,7 @@ def __init__( async def dispatch( self, - request: Request, + request: IRequest, ) -> typing.AsyncIterator[RequestDispatchResult]: """ Dispatch a request to a streaming handler and yield results. @@ -72,9 +72,8 @@ async def dispatch( async_gen = handler.handle(request) async for response in async_gen: - events = handler.events.copy() - if hasattr(handler, "clear_events"): - handler.clear_events() + events = list(handler.events) + handler.clear_events() yield RequestDispatchResult( response=response, events=events, diff --git a/src/cqrs/events/__init__.py b/src/cqrs/events/__init__.py index 3587773..82bb518 100644 --- a/src/cqrs/events/__init__.py +++ b/src/cqrs/events/__init__.py @@ -1,12 +1,34 @@ -from cqrs.events.event import DomainEvent, Event, NotificationEvent +from cqrs.events.event import ( + DCEvent, + DCDomainEvent, + DCNotificationEvent, + DomainEvent, + Event, + IDomainEvent, + IEvent, + INotificationEvent, + NotificationEvent, + PydanticDomainEvent, + PydanticEvent, + PydanticNotificationEvent, +) from cqrs.events.event_emitter import EventEmitter from cqrs.events.event_handler import EventHandler from cqrs.events.map import EventMap __all__ = ( "Event", + "IEvent", + "DCEvent", + "PydanticEvent", "DomainEvent", + "IDomainEvent", + "DCDomainEvent", + "PydanticDomainEvent", "NotificationEvent", + "INotificationEvent", + "DCNotificationEvent", + "PydanticNotificationEvent", "EventEmitter", "EventHandler", "EventMap", diff --git a/src/cqrs/events/event.py b/src/cqrs/events/event.py index 3364f0f..62b5bd2 100644 --- a/src/cqrs/events/event.py +++ b/src/cqrs/events/event.py @@ -1,11 +1,19 @@ +import abc +import dataclasses import datetime import os import typing import uuid +import sys import dotenv import pydantic +if sys.version_info >= (3, 11): + from typing import Self # novm +else: + from typing_extensions import Self + dotenv.load_dotenv() DEFAULT_OUTPUT_TOPIC = os.getenv("DEFAULT_OUTPUT_TOPIC", "output_topic") @@ -13,23 +21,320 @@ PayloadT = typing.TypeVar("PayloadT", bound=typing.Any) -class Event(pydantic.BaseModel, frozen=True): +class IEvent(abc.ABC): + """ + Interface for event-type objects. + + This abstract base class defines the contract that all event implementations + must follow. Events represent domain events or notification events in the + CQRS pattern and are used for communication between different parts of the system. + + All event implementations must provide: + - `to_dict()`: Convert the event instance to a dictionary representation + - `from_dict()`: Create an event instance from a dictionary + """ + + @abc.abstractmethod + def to_dict(self) -> dict: + """ + Convert the event instance to a dictionary representation. + + Returns: + A dictionary containing all fields of the event instance. + """ + raise NotImplementedError + + @classmethod + @abc.abstractmethod + def from_dict(cls, **kwargs) -> Self: + """ + Create an event instance from keyword arguments. + + Args: + **kwargs: Keyword arguments matching the event fields. + + Returns: + A new instance of the event class. + """ + raise NotImplementedError + + +@dataclasses.dataclass(frozen=True) +class DCEvent(IEvent): + """ + Dataclass-based implementation of the event interface. + + This class provides an event implementation using Python's frozen dataclasses. + Events are immutable (frozen=True) to ensure they cannot be modified after creation. + It's useful when you want to avoid pydantic dependency or prefer dataclasses + for event definitions. + + Example:: + + @dataclasses.dataclass(frozen=True) + class UserCreatedEvent(DCEvent): + user_id: str + username: str + + event = UserCreatedEvent(user_id="123", username="john") + data = event.to_dict() # {"user_id": "123", "username": "john"} + restored = UserCreatedEvent.from_dict(**data) + """ + + @classmethod + def from_dict(cls, **kwargs) -> Self: + """ + Create an event instance from keyword arguments. + + Args: + **kwargs: Keyword arguments matching the dataclass fields. + + Returns: + A new instance of the event class. + """ + return cls(**kwargs) + + def to_dict(self) -> dict: + """ + Convert the event instance to a dictionary representation. + + Returns: + A dictionary containing all fields of the dataclass instance. + """ + return dataclasses.asdict(self) + + +class PydanticEvent(pydantic.BaseModel, IEvent, frozen=True): """ - The base class for events + Pydantic-based implementation of the event interface. + + This class provides an event implementation using Pydantic models with + frozen=True to ensure immutability. It offers data validation, serialization, + and other Pydantic features. This is the default event implementation used + by the library. + + Events are immutable to ensure they cannot be modified after creation, + which is important for event sourcing and event-driven architectures. + + Example:: + + class UserCreatedEvent(PydanticEvent): + user_id: str + username: str + + event = UserCreatedEvent(user_id="123", username="john") + data = event.to_dict() # {"user_id": "123", "username": "john"} + restored = UserCreatedEvent.from_dict(**data) """ + @classmethod + def from_dict(cls, **kwargs) -> Self: + """ + Create an event instance from keyword arguments. + + Validates and converts types (UUID strings to UUID objects, + datetime strings to datetime objects, nested objects like payload). + + Args: + **kwargs: Keyword arguments matching the event fields. + + Returns: + A new instance of the event class. + """ + return cls.model_validate(kwargs) + + def to_dict(self) -> dict: + """ + Convert the event instance to a dictionary representation. + + Returns: + A dictionary containing all fields of the event instance. + """ + return self.model_dump(mode="python") -class DomainEvent(Event, frozen=True): + +Event = PydanticEvent + + +class IDomainEvent(IEvent): """ - The base class for domain events + Interface for domain event objects. + + Domain events represent something that happened in the domain that domain experts + care about. They are typically used for in-process event handling within the + same bounded context. + + This interface extends IEvent and is implemented by DCDomainEvent and + PydanticDomainEvent. """ -class NotificationEvent(Event, typing.Generic[PayloadT], frozen=True): +@dataclasses.dataclass(frozen=True) +class DCDomainEvent(DCEvent, IDomainEvent): """ - The base class for notification events + Dataclass-based implementation of domain events. + + Domain events represent something that happened in the domain that domain experts + care about. They are typically used for in-process event handling within the + same bounded context. + + This is the dataclass implementation. For Pydantic-based implementation, + use PydanticDomainEvent. + + Example:: + + @dataclasses.dataclass(frozen=True) + class OrderCreatedEvent(DCDomainEvent): + order_id: str + customer_id: str + total_amount: float """ + +class PydanticDomainEvent(PydanticEvent, IDomainEvent, frozen=True): + """ + Pydantic-based implementation of domain events. + + Domain events represent something that happened in the domain that domain experts + care about. They are typically used for in-process event handling within the + same bounded context. + + This is the default domain event implementation used by the library. + + Example:: + + class OrderCreatedEvent(PydanticDomainEvent): + order_id: str + customer_id: str + total_amount: float + """ + + +DomainEvent = PydanticDomainEvent + + +class INotificationEvent(IEvent, typing.Generic[PayloadT]): + """ + Interface for notification event objects. + + Notification events are used for cross-service communication and are typically + published to message brokers (Kafka, RabbitMQ, etc.). They include metadata + like event_id, event_timestamp, event_name, and topic for routing. + + This interface extends IEvent and is implemented by DCNotificationEvent and + PydanticNotificationEvent. It requires specific attributes that notification + events must have. + + All notification event implementations must provide the following attributes: + - `event_id`: uuid.UUID - Unique identifier for the event + - `event_timestamp`: datetime.datetime - Timestamp when the event occurred + - `event_name`: str - Name of the event type + - `topic`: str - Message broker topic where the event should be published + - `payload`: PayloadT - Generic payload data of type PayloadT + """ + + # These attributes must be implemented by subclasses: + # - event_id: uuid.UUID - Unique identifier for the event + # - event_timestamp: datetime.datetime - Timestamp when the event occurred + # - event_name: str - Name of the event type + # - topic: str - Message broker topic where the event should be published + # - payload: PayloadT - Generic payload data of type PayloadT + # + # Type stubs for type checkers: + if typing.TYPE_CHECKING: + event_id: uuid.UUID + event_timestamp: datetime.datetime + event_name: str + topic: str + payload: PayloadT + + def proto(self) -> typing.Any: ... # Method for protobuf representation + + +@dataclasses.dataclass(frozen=True) +class DCNotificationEvent( + DCEvent, + INotificationEvent[PayloadT], + typing.Generic[PayloadT], +): + """ + Dataclass-based implementation of notification events. + + Notification events are used for cross-service communication and are typically + published to message brokers (Kafka, RabbitMQ, etc.). They include metadata + like event_id, event_timestamp, event_name, and topic for routing. + + This is the dataclass implementation. For Pydantic-based implementation, + use PydanticNotificationEvent. + + Args: + event_id: Unique identifier for the event (auto-generated if not provided) + event_timestamp: Timestamp when the event occurred (auto-generated if not provided) + event_name: Name of the event type + topic: Message broker topic where the event should be published + payload: Generic payload data of type PayloadT + + Example:: + + @dataclasses.dataclass(frozen=True) + class UserRegisteredEvent(DCNotificationEvent[dict]): + event_name: str = "user.registered" + payload: dict = dataclasses.field(default_factory=lambda: {"user_id": "123"}) + """ + + event_name: str + payload: PayloadT + + event_id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4) + event_timestamp: datetime.datetime = dataclasses.field( + default_factory=datetime.datetime.now, + ) + topic: str = dataclasses.field(default=DEFAULT_OUTPUT_TOPIC) + + def proto(self) -> typing.Any: + """ + Return protobuf representation of the event. + + Raises: + NotImplementedError: This method must be implemented by subclasses + that need protobuf serialization. + """ + raise NotImplementedError("Method not implemented for dataclass events") + + def __hash__(self) -> int: + """ + Return the hash of the event based on its event_id. + + Returns: + Hash value of the event_id. + """ + return hash(self.event_id) + + +class PydanticNotificationEvent( + PydanticEvent, + INotificationEvent[PayloadT], + typing.Generic[PayloadT], + frozen=True, +): + """ + Pydantic-based implementation of notification events. + + Notification events are used for cross-service communication and are typically + published to message brokers (Kafka, RabbitMQ, etc.). They include metadata + like event_id, event_timestamp, event_name, and topic for routing. + + This is the default notification event implementation used by the library. + + Example:: + + class UserRegisteredEvent(PydanticNotificationEvent[dict]): + event_name: str = "user.registered" + payload: dict = pydantic.Field(default_factory=lambda: {"user_id": "123"}) + """ + + payload: PayloadT + event_id: uuid.UUID = pydantic.Field(default_factory=uuid.uuid4) event_timestamp: datetime.datetime = pydantic.Field( default_factory=datetime.datetime.now, @@ -37,8 +342,6 @@ class NotificationEvent(Event, typing.Generic[PayloadT], frozen=True): event_name: typing.Text topic: typing.Text = pydantic.Field(default=DEFAULT_OUTPUT_TOPIC) - payload: PayloadT = pydantic.Field(default=None) - model_config = pydantic.ConfigDict(from_attributes=True) def proto(self): @@ -48,4 +351,4 @@ def __hash__(self): return hash(self.event_id) -__all__ = ("Event", "DomainEvent", "NotificationEvent") +NotificationEvent = PydanticNotificationEvent diff --git a/src/cqrs/events/event_emitter.py b/src/cqrs/events/event_emitter.py index b931638..99f3346 100644 --- a/src/cqrs/events/event_emitter.py +++ b/src/cqrs/events/event_emitter.py @@ -3,7 +3,7 @@ import typing from cqrs import container as di_container, message_brokers -from cqrs.events.event import DomainEvent, Event, NotificationEvent +from cqrs.events.event import IDomainEvent, IEvent, INotificationEvent from cqrs.events import event_handler, map logger = logging.getLogger("cqrs") @@ -28,12 +28,12 @@ def __init__( self._message_broker = message_broker @functools.singledispatchmethod - async def emit(self, event: Event) -> None: + async def emit(self, event: IEvent) -> None: pass async def _send_to_broker( self, - event: NotificationEvent, + event: INotificationEvent, ) -> None: """ Sends event to the message broker. @@ -47,7 +47,7 @@ async def _send_to_broker( message_name=type(event).__name__, message_id=event.event_id, topic=event.topic, - payload=event.model_dump(mode="json"), + payload=event.to_dict(), ) logger.debug( @@ -59,7 +59,7 @@ async def _send_to_broker( await self._message_broker.send_message(message) @emit.register - async def _(self, event: DomainEvent) -> None: + async def _(self, event: IDomainEvent) -> None: handlers_types = self._event_map.get(type(event), []) if not handlers_types: logger.warning( @@ -78,5 +78,5 @@ async def _(self, event: DomainEvent) -> None: await handler.handle(event) @emit.register - async def _(self, event: NotificationEvent) -> None: + async def _(self, event: INotificationEvent) -> None: await self._send_to_broker(event) diff --git a/src/cqrs/events/event_handler.py b/src/cqrs/events/event_handler.py index 97813c2..5bdaf06 100644 --- a/src/cqrs/events/event_handler.py +++ b/src/cqrs/events/event_handler.py @@ -1,9 +1,9 @@ import abc import typing -from cqrs.events.event import Event +from cqrs.events.event import IEvent -E = typing.TypeVar("E", bound=Event, contravariant=True) +E = typing.TypeVar("E", bound=IEvent, contravariant=True) class EventHandler(abc.ABC, typing.Generic[E]): diff --git a/src/cqrs/events/event_processor.py b/src/cqrs/events/event_processor.py index 66656ff..f0703ea 100644 --- a/src/cqrs/events/event_processor.py +++ b/src/cqrs/events/event_processor.py @@ -1,7 +1,7 @@ import asyncio import typing -from cqrs.events.event import Event +from cqrs.events.event import IEvent from cqrs.events.event_emitter import EventEmitter from cqrs.events.map import EventMap @@ -36,7 +36,7 @@ def __init__( self._concurrent_event_handle_enable = concurrent_event_handle_enable self._event_semaphore = asyncio.Semaphore(max_concurrent_event_handlers) - async def emit_events(self, events: typing.List[Event]) -> None: + async def emit_events(self, events: typing.Sequence[IEvent]) -> None: """ Emit events via event emitter. @@ -58,7 +58,7 @@ async def emit_events(self, events: typing.List[Event]) -> None: for event in events: asyncio.create_task(self._emit_event_with_semaphore(event)) - async def _emit_event_with_semaphore(self, event: Event) -> None: + async def _emit_event_with_semaphore(self, event: IEvent) -> None: """Process a single event with semaphore limit.""" if not self._event_emitter: return diff --git a/src/cqrs/events/map.py b/src/cqrs/events/map.py index 2d191bc..5f772ff 100644 --- a/src/cqrs/events/map.py +++ b/src/cqrs/events/map.py @@ -1,9 +1,9 @@ import typing -from cqrs.events.event import Event +from cqrs.events.event import IEvent from cqrs.events import event_handler -_KT = typing.TypeVar("_KT", bound=typing.Type[Event]) +_KT = typing.TypeVar("_KT", bound=typing.Type[IEvent]) _VT: typing.TypeAlias = typing.List[typing.Type[event_handler.EventHandler]] diff --git a/src/cqrs/mediator.py b/src/cqrs/mediator.py index b3ad875..fb80476 100644 --- a/src/cqrs/mediator.py +++ b/src/cqrs/mediator.py @@ -6,20 +6,20 @@ from cqrs.dispatcher.request import RequestDispatcher from cqrs.dispatcher.saga import SagaDispatcher from cqrs.dispatcher.streaming import StreamingRequestDispatcher -from cqrs.events.event import Event +from cqrs.events.event import IEvent from cqrs.events.event_emitter import EventEmitter from cqrs.events.event_processor import EventProcessor from cqrs.events.map import EventMap from cqrs.middlewares.base import MiddlewareChain from cqrs.requests.map import RequestMap, SagaMap -from cqrs.requests.request import Request -from cqrs.response import Response +from cqrs.requests.request import IRequest +from cqrs.response import IResponse from cqrs.saga.models import SagaContext from cqrs.saga.step import SagaStepResult from cqrs.saga.storage.memory import MemorySagaStorage from cqrs.saga.storage.protocol import ISagaStorage -_ResponseT = typing.TypeVar("_ResponseT", Response, None, covariant=True) +_ResponseT = typing.TypeVar("_ResponseT", IResponse, None, covariant=True) class RequestMediator: @@ -81,7 +81,7 @@ def __init__( middleware_chain=middleware_chain, # type: ignore ) - async def send(self, request: Request) -> _ResponseT: + async def send(self, request: IRequest) -> _ResponseT: """ Send a request and return the response. @@ -126,7 +126,7 @@ def __init__( middleware_chain=middleware_chain, # type: ignore ) - async def send(self, event: Event) -> None: + async def send(self, event: IEvent) -> None: await self._dispatcher.dispatch(event) @@ -190,8 +190,8 @@ def __init__( async def stream( self, - request: Request, - ) -> typing.AsyncIterator[Response | None]: + request: IRequest, + ) -> typing.AsyncIterator[IResponse | None]: """ Stream results from a generator-based handler. diff --git a/src/cqrs/message_brokers/amqp.py b/src/cqrs/message_brokers/amqp.py index 28e0063..311138b 100644 --- a/src/cqrs/message_brokers/amqp.py +++ b/src/cqrs/message_brokers/amqp.py @@ -8,7 +8,12 @@ class AMQPMessageBroker(protocol.MessageBroker): - def __init__(self, publisher: adapters_protocol.AMQPPublisher, exchange_name: str, pika_log_level: str = "ERROR"): + def __init__( + self, + publisher: adapters_protocol.AMQPPublisher, + exchange_name: str, + pika_log_level: str = "ERROR", + ): self.publisher = publisher self.exchange_name = exchange_name logging.getLogger("aiormq").setLevel(pika_log_level) @@ -17,6 +22,6 @@ def __init__(self, publisher: adapters_protocol.AMQPPublisher, exchange_name: st async def send_message(self, message: protocol.Message) -> None: await self.publisher.publish( message=aio_pika.Message(body=orjson.dumps(message.payload)), - exchange_name=self.exchange_name, queue_name=message.topic, + exchange_name=self.exchange_name, ) diff --git a/src/cqrs/message_brokers/protocol.py b/src/cqrs/message_brokers/protocol.py index 1423280..7267ae1 100644 --- a/src/cqrs/message_brokers/protocol.py +++ b/src/cqrs/message_brokers/protocol.py @@ -1,15 +1,34 @@ import abc +import dataclasses import typing import uuid -import pydantic +@dataclasses.dataclass +class Message: + """ + Internal message structure for message broker communication. + + Args: + message_name: Name of the message type + message_id: Unique identifier for the message (auto-generated if not provided) + topic: Message broker topic where the message should be sent + payload: Message payload data + """ -class Message(pydantic.BaseModel): - message_name: typing.Text = pydantic.Field() - message_id: uuid.UUID = pydantic.Field(default_factory=uuid.uuid4) + message_name: typing.Text topic: typing.Text payload: typing.Any + message_id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4) + + def to_dict(self) -> dict[str, typing.Any]: + """ + Convert the message instance to a dictionary representation. + + Returns: + A dictionary containing all fields of the message instance. + """ + return dataclasses.asdict(self) class MessageBroker(abc.ABC): diff --git a/src/cqrs/middlewares/logging.py b/src/cqrs/middlewares/logging.py index f54f69b..d1750b2 100644 --- a/src/cqrs/middlewares/logging.py +++ b/src/cqrs/middlewares/logging.py @@ -2,29 +2,32 @@ from cqrs.middlewares import base from cqrs.middlewares.base import HandleType -from cqrs.requests.request import Request -from cqrs.response import Response +from cqrs.requests.request import IRequest +from cqrs.response import IResponse logger = logging.getLogger("cqrs") class LoggingMiddleware(base.Middleware): - async def __call__(self, request: Request, handle: HandleType) -> Response | None: + async def __call__(self, request: IRequest, handle: HandleType) -> IResponse | None: logger.debug( "Handle %s request", type(request).__name__, extra={ - "request_json_fields": {"request": request.model_dump(mode="json")}, + "request_json_fields": {"request": request.to_dict()}, "to_mask": True, }, ) resp = await handle(request) + resp_dict = {} + if resp: + resp_dict = resp.to_dict() logger.debug( "Request %s handled", type(request).__name__, extra={ "request_json_fields": { - "response": resp.model_dump(mode="json") if resp else {}, + "response": resp_dict, }, "to_mask": True, }, diff --git a/src/cqrs/outbox/map.py b/src/cqrs/outbox/map.py index 4b52834..5cb41d4 100644 --- a/src/cqrs/outbox/map.py +++ b/src/cqrs/outbox/map.py @@ -1,16 +1,16 @@ import typing -from cqrs.events.event import NotificationEvent +from cqrs.events.event import INotificationEvent class OutboxedEventMap: - _registry: typing.Dict[typing.Text, typing.Type[NotificationEvent]] = {} + _registry: typing.Dict[typing.Text, typing.Type[INotificationEvent]] = {} @classmethod def register( cls, event_name: typing.Text, - event_type: typing.Type[NotificationEvent], + event_type: typing.Type[INotificationEvent], ) -> None: if event_name in cls._registry: raise KeyError(f"Event with {event_name} already registered") @@ -20,5 +20,5 @@ def register( def get( cls, event_name: typing.Text, - ) -> typing.Type[NotificationEvent] | None: + ) -> typing.Type[INotificationEvent] | None: return cls._registry.get(event_name) diff --git a/src/cqrs/outbox/mock.py b/src/cqrs/outbox/mock.py index 1d5329d..b2fec1c 100644 --- a/src/cqrs/outbox/mock.py +++ b/src/cqrs/outbox/mock.py @@ -16,7 +16,7 @@ async def __aenter__(self) -> typing.Dict: async def __aexit__(self, exc_type, exc_val, exc_tb): pass - def add(self, event: cqrs.NotificationEvent) -> None: + def add(self, event: cqrs.INotificationEvent) -> None: MockOutboxedEventRepository.COUNTER += 1 self.session[MockOutboxedEventRepository.COUNTER] = repository.OutboxedEvent( id=MockOutboxedEventRepository.COUNTER, diff --git a/src/cqrs/outbox/repository.py b/src/cqrs/outbox/repository.py index b6f7029..474b864 100644 --- a/src/cqrs/outbox/repository.py +++ b/src/cqrs/outbox/repository.py @@ -1,23 +1,59 @@ import abc +import dataclasses import enum +import sys import typing -import pydantic - import cqrs -from cqrs.events.event import NotificationEvent +from cqrs.events.event import INotificationEvent + +if sys.version_info >= (3, 11): + StrEnum = enum.StrEnum # novm +else: + # For Python 3.10 compatibility, use regular Enum with string values + class StrEnum(str, enum.Enum): # type: ignore[misc] + """Compatible StrEnum for Python 3.10.""" + + def __str__(self) -> str: + return self.value -class EventStatus(enum.StrEnum): +class EventStatus(StrEnum): NEW = "new" PRODUCED = "produced" NOT_PRODUCED = "not_produced" -class OutboxedEvent(pydantic.BaseModel, frozen=True): - id: pydantic.PositiveInt - event: cqrs.NotificationEvent - topic: typing.Text +@dataclasses.dataclass(frozen=True) +class OutboxedEvent: + """ + Outboxed event dataclass. + + Outboxed events represent notification events that are stored in an outbox + pattern for reliable message delivery. They include metadata about the event + and its processing status. + + This is an internal data structure used by the outbox pattern implementation. + + Args: + id: Unique identifier for the outboxed event + event: The notification event being stored + topic: Message broker topic where the event should be published + status: Current processing status of the event + + Example:: + + outboxed_event = OutboxedEvent( + id=1, + event=notification_event, + topic="user.events", + status=EventStatus.NEW + ) + """ + + id: int + event: cqrs.INotificationEvent + topic: str status: EventStatus @@ -25,7 +61,7 @@ class OutboxedEventRepository(abc.ABC): @abc.abstractmethod def add( self, - event: NotificationEvent, + event: INotificationEvent, ) -> None: """Add an event to the repository.""" @@ -52,3 +88,10 @@ async def commit(self): @abc.abstractmethod async def rollback(self): pass + + +__all__ = ( + "EventStatus", + "OutboxedEvent", + "OutboxedEventRepository", +) diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index aeb126f..1e533f7 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -160,7 +160,7 @@ def __init__( def add( self, - event: cqrs.NotificationEvent, + event: cqrs.INotificationEvent, ) -> None: registered_event = map.OutboxedEventMap.get(event.event_name) if registered_event is None: @@ -171,7 +171,7 @@ def add( f"Event type {type(event)} does not match registered event type {registered_event}", ) - bytes_payload = orjson.dumps(event.model_dump(mode="json")) + bytes_payload = orjson.dumps(event.to_dict()) if self._compressor is not None: bytes_payload = self._compressor.compress(bytes_payload) @@ -191,17 +191,19 @@ def _process_events(self, model: OutboxModel) -> repository.OutboxedEvent | None event_model = map.OutboxedEventMap.get(event_dict["event_name"]) if event_model is None: - return + return None if self._compressor is not None: event_dict["payload"] = self._compressor.decompress(event_dict["payload"]) - event_dict["payload"] = orjson.loads(event_dict["payload"]) + event_payload_dict = orjson.loads(event_dict["payload"]) + # Use from_dict interface method for validation and type conversion + # This works through the interface without exposing implementation details return repository.OutboxedEvent( id=event_dict["id"], topic=event_dict["topic"], status=event_dict["event_status"], - event=event_model.model_validate(event_dict["payload"]), + event=event_model.from_dict(**event_payload_dict), ) async def get_many( diff --git a/src/cqrs/producer.py b/src/cqrs/producer.py index 47bb7b0..f1dbaf8 100644 --- a/src/cqrs/producer.py +++ b/src/cqrs/producer.py @@ -37,7 +37,7 @@ async def send_message(self, event: repository_protocol.OutboxedEvent): message_name=event.event.event_name, message_id=event.event.event_id, topic=event.topic, - payload=event.event.model_dump(), + payload=event.event.to_dict(), ), ) except Exception as error: @@ -48,12 +48,12 @@ async def send_message(self, event: repository_protocol.OutboxedEvent): return await self.repository.update_status( event.id, - repository_protocol.EventStatus.NOT_PRODUCED, + repository_protocol.EventStatus.NOT_PRODUCED, # type: ignore[arg-type] ) else: if not self.repository: return await self.repository.update_status( event.id, - repository_protocol.EventStatus.PRODUCED, + repository_protocol.EventStatus.PRODUCED, # type: ignore[arg-type] ) diff --git a/src/cqrs/requests/cor_request_handler.py b/src/cqrs/requests/cor_request_handler.py index e6b0adb..6fee707 100644 --- a/src/cqrs/requests/cor_request_handler.py +++ b/src/cqrs/requests/cor_request_handler.py @@ -4,7 +4,7 @@ import functools import typing -from cqrs.events.event import Event +from cqrs.events.event import IEvent from cqrs.types import ReqT, ResT @@ -20,7 +20,7 @@ class CORRequestHandler(abc.ABC, typing.Generic[ReqT, ResT]): class AuthenticationHandler(CORRequestHandler[LoginCommand, None]): def __init__(self, auth_service: AuthServiceProtocol) -> None: self._auth_service = auth_service - self.events: typing.List[Event] = [] + self.events: typing.List[IEvent] = [] async def handle(self, request: LoginCommand) -> None | None: if self._auth_service.can_authenticate(request): @@ -47,7 +47,7 @@ async def next(self, request: ReqT) -> ResT | None: @property @abc.abstractmethod - def events(self) -> typing.List[Event]: + def events(self) -> typing.Sequence[IEvent]: raise NotImplementedError @abc.abstractmethod diff --git a/src/cqrs/requests/map.py b/src/cqrs/requests/map.py index 4f6291f..faea78c 100644 --- a/src/cqrs/requests/map.py +++ b/src/cqrs/requests/map.py @@ -1,7 +1,7 @@ import typing from cqrs.requests.cor_request_handler import CORRequestHandler -from cqrs.requests.request import Request +from cqrs.requests.request import IRequest from cqrs.requests.request_handler import ( RequestHandler, StreamingRequestHandler, @@ -9,7 +9,7 @@ from cqrs.saga.models import SagaContext from cqrs.saga.saga import Saga -_KT = typing.TypeVar("_KT", bound=typing.Type[Request]) +_KT = typing.TypeVar("_KT", bound=typing.Type[IRequest]) # Type alias for handler types that can be bound to requests HandlerType = ( diff --git a/src/cqrs/requests/mermaid.py b/src/cqrs/requests/mermaid.py index 0a58498..d3e6d13 100644 --- a/src/cqrs/requests/mermaid.py +++ b/src/cqrs/requests/mermaid.py @@ -244,7 +244,7 @@ def class_diagram(self) -> str: for response_type in sorted(response_types, key=lambda x: x.__name__): class_name = response_type.__name__ lines.append(f" class {class_name} {{") - # Try to get fields if it's a Pydantic model or dataclass + # Try to get fields from dataclass or model if hasattr(response_type, "__dataclass_fields__"): fields = response_type.__dataclass_fields__ for field_name, field_info in fields.items(): diff --git a/src/cqrs/requests/request.py b/src/cqrs/requests/request.py index efee98b..b220d3e 100644 --- a/src/cqrs/requests/request.py +++ b/src/cqrs/requests/request.py @@ -1,13 +1,143 @@ +import abc +import dataclasses +import sys + import pydantic +if sys.version_info >= (3, 11): + from typing import Self # novm +else: + from typing_extensions import Self + + +class IRequest(abc.ABC): + """ + Interface for request-type objects. + + This abstract base class defines the contract that all request implementations + must follow. Requests are input objects passed to request handlers and are used + for defining queries or commands in the CQRS pattern. -class Request(pydantic.BaseModel): + All request implementations must provide: + - `to_dict()`: Convert the request instance to a dictionary representation + - `from_dict()`: Create a request instance from a dictionary """ - Base class for request-type objects. + + @abc.abstractmethod + def to_dict(self) -> dict: + """ + Convert the request instance to a dictionary representation. + + Returns: + A dictionary containing all fields of the request instance. + """ + raise NotImplementedError + + @classmethod + @abc.abstractmethod + def from_dict(cls, **kwargs) -> Self: + """ + Create a request instance from keyword arguments. + + Args: + **kwargs: Keyword arguments matching the request fields. + + Returns: + A new instance of the request class. + """ + raise NotImplementedError + + +@dataclasses.dataclass +class DCRequest(IRequest): + """ + Dataclass-based implementation of the request interface. + + This class provides a request implementation using Python's dataclasses. + It's useful when you want to avoid pydantic dependency or prefer dataclasses + for request definitions. + + Example:: + + @dataclasses.dataclass + class GetUserQuery(DCRequest): + user_id: str + + query = GetUserQuery(user_id="123") + data = query.to_dict() # {"user_id": "123"} + restored = GetUserQuery.from_dict(**data) + """ + + @classmethod + def from_dict(cls, **kwargs) -> Self: + """ + Create a request instance from keyword arguments. + + Args: + **kwargs: Keyword arguments matching the dataclass fields. + + Returns: + A new instance of the request class. + """ + return cls(**kwargs) + + def to_dict(self) -> dict: + """ + Convert the request instance to a dictionary representation. + + Returns: + A dictionary containing all fields of the dataclass instance. + """ + return dataclasses.asdict(self) + + +class PydanticRequest(pydantic.BaseModel, IRequest): + """ + Pydantic-based implementation of the request interface. + + This class provides a request implementation using Pydantic models. + It offers data validation, serialization, and other Pydantic features. + This is the default request implementation used by the library. The request is an input of the request handler. Often Request is used for defining queries or commands. + + Example:: + + class CreateUserCommand(PydanticRequest): + username: str + email: str + + command = CreateUserCommand(username="john", email="john@example.com") + data = command.to_dict() # {"username": "john", "email": "john@example.com"} + restored = CreateUserCommand.from_dict(**data) """ + @classmethod + def from_dict(cls, **kwargs) -> Self: + """ + Create a request instance from keyword arguments. + + Validates and converts types, ensuring required fields are present. + + Args: + **kwargs: Keyword arguments matching the request fields. + + Returns: + A new instance of the request class. + """ + return cls.model_validate(kwargs) + + def to_dict(self) -> dict: + """ + Convert the request instance to a dictionary representation. + + Returns: + A dictionary containing all fields of the request instance. + """ + return self.model_dump(mode="python") + + +Request = PydanticRequest -__all__ = ("Request",) +__all__ = ("Request", "IRequest", "DCRequest", "PydanticRequest") diff --git a/src/cqrs/requests/request_handler.py b/src/cqrs/requests/request_handler.py index 0e2d761..739b21e 100644 --- a/src/cqrs/requests/request_handler.py +++ b/src/cqrs/requests/request_handler.py @@ -1,7 +1,7 @@ import abc import typing -from cqrs.events.event import Event +from cqrs.events.event import IEvent from cqrs.types import ReqT, ResT @@ -16,7 +16,7 @@ class RequestHandler(abc.ABC, typing.Generic[ReqT, ResT]): class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]) def __init__(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api - self.events: list[Event] = [] + self.events: list[IEvent] = [] async def handle(self, request: JoinMeetingCommand) -> None: await self._meetings_api.join_user(request.user_id, request.meeting_id) @@ -26,7 +26,7 @@ async def handle(self, request: JoinMeetingCommand) -> None: class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]) def __init__(self, meetings_api: MeetingAPIProtocol) -> None: self._meetings_api = meetings_api - self.events: list[Event] = [] + self.events: list[IEvent] = [] async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult: link = await self._meetings_api.get_link(request.meeting_id) @@ -36,7 +36,7 @@ async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult: @property @abc.abstractmethod - def events(self) -> typing.List[Event]: + def events(self) -> typing.Sequence[IEvent]: raise NotImplementedError @abc.abstractmethod @@ -75,7 +75,7 @@ async def handle(self, request: ProcessItemsCommand) -> typing.AsyncIterator[Pro @property @abc.abstractmethod - def events(self) -> typing.List[Event]: + def events(self) -> typing.Sequence[IEvent]: raise NotImplementedError @abc.abstractmethod diff --git a/src/cqrs/response.py b/src/cqrs/response.py index 31871dc..8f9a6f3 100644 --- a/src/cqrs/response.py +++ b/src/cqrs/response.py @@ -1,11 +1,146 @@ +import abc +import dataclasses +import sys + import pydantic +if sys.version_info >= (3, 11): + from typing import Self # novm +else: + from typing_extensions import Self + + +class IResponse(abc.ABC): + """ + Interface for response-type objects. + + This abstract base class defines the contract that all response implementations + must follow. Responses are result objects returned by request handlers and are + typically used for defining the result of queries in the CQRS pattern. + + All response implementations must provide: + - `to_dict()`: Convert the response instance to a dictionary representation + - `from_dict()`: Create a response instance from a dictionary + """ + + @abc.abstractmethod + def to_dict(self) -> dict: + """ + Convert the response instance to a dictionary representation. + + Returns: + A dictionary containing all fields of the response instance. + """ + raise NotImplementedError + + @classmethod + @abc.abstractmethod + def from_dict(cls, **kwargs) -> Self: + """ + Create a response instance from keyword arguments. + + Args: + **kwargs: Keyword arguments matching the response fields. + + Returns: + A new instance of the response class. + """ + raise NotImplementedError + + +@dataclasses.dataclass +class DCResponse(IResponse): + """ + Dataclass-based implementation of the response interface. + + This class provides a response implementation using Python's dataclasses. + It's useful when you want to avoid pydantic dependency or prefer dataclasses + for response definitions. + + Example:: + + @dataclasses.dataclass + class UserResponse(DCResponse): + user_id: str + username: str + email: str + + response = UserResponse(user_id="123", username="john", email="john@example.com") + data = response.to_dict() # {"user_id": "123", "username": "john", "email": "john@example.com"} + restored = UserResponse.from_dict(**data) + """ + + @classmethod + def from_dict(cls, **kwargs) -> Self: + """ + Create a response instance from keyword arguments. + + Args: + **kwargs: Keyword arguments matching the dataclass fields. + + Returns: + A new instance of the response class. + """ + return cls(**kwargs) + + def to_dict(self) -> dict: + """ + Convert the response instance to a dictionary representation. -class Response(pydantic.BaseModel): + Returns: + A dictionary containing all fields of the dataclass instance. + """ + return dataclasses.asdict(self) + + +class PydanticResponse(pydantic.BaseModel, IResponse): """ - Base class for response type objects. + Pydantic-based implementation of the response interface. - The response is a result of the request handling, which hold by RequestHandler. + This class provides a response implementation using Pydantic models. + It offers data validation, serialization, and other Pydantic features. + This is the default response implementation used by the library. + The response is a result of the request handling, which is held by RequestHandler. Often the response is used for defining the result of the query. + + Example:: + + class UserResponse(PydanticResponse): + user_id: str + username: str + email: str + + response = UserResponse(user_id="123", username="john", email="john@example.com") + data = response.to_dict() # {"user_id": "123", "username": "john", "email": "john@example.com"} + restored = UserResponse.from_dict(**data) """ + + @classmethod + def from_dict(cls, **kwargs) -> Self: + """ + Create a response instance from keyword arguments. + + Validates and converts types, ensuring required fields are present. + + Args: + **kwargs: Keyword arguments matching the response fields. + + Returns: + A new instance of the response class. + """ + return cls.model_validate(kwargs) + + def to_dict(self) -> dict: + """ + Convert the response instance to a dictionary representation. + + Returns: + A dictionary containing all fields of the response instance. + """ + return self.model_dump(mode="python") + + +Response = PydanticResponse + +__all__ = ("Response", "IResponse", "DCResponse", "PydanticResponse") diff --git a/src/cqrs/saga/mermaid.py b/src/cqrs/saga/mermaid.py index b01ff06..719ed0f 100644 --- a/src/cqrs/saga/mermaid.py +++ b/src/cqrs/saga/mermaid.py @@ -375,7 +375,7 @@ def class_diagram(self) -> str: for response_type in sorted(response_types, key=lambda x: x.__name__): class_name = response_type.__name__ lines.append(f" class {class_name} {{") - # Try to get fields if it's a Pydantic model or dataclass + # Try to get fields from dataclass or model if hasattr(response_type, "__dataclass_fields__"): fields = response_type.__dataclass_fields__ for field_name, field_info in fields.items(): diff --git a/src/cqrs/saga/recovery.py b/src/cqrs/saga/recovery.py index cff3354..9e0f28f 100644 --- a/src/cqrs/saga/recovery.py +++ b/src/cqrs/saga/recovery.py @@ -37,7 +37,7 @@ async def recover_saga( (assuming the constructor accepts kwargs). If a function is provided, it will be called with the data dict. Examples: - - MyPydanticModel.model_validate + - MyContextClass.from_dict (if implements from_dict interface) - lambda d: MyDataClass(**d) - MyClass (if __init__ accepts **kwargs) container: DI container for resolving step handlers. diff --git a/src/cqrs/saga/step.py b/src/cqrs/saga/step.py index 1abeb39..cee2d9b 100644 --- a/src/cqrs/saga/step.py +++ b/src/cqrs/saga/step.py @@ -1,29 +1,46 @@ from __future__ import annotations import abc +import dataclasses import typing -import pydantic - -from cqrs.events.event import Event -from cqrs.response import Response +from cqrs.events.event import IEvent +from cqrs.response import IResponse from cqrs.saga.models import ContextT -Resp = typing.TypeVar("Resp", bound=Response | None, covariant=True) +Resp = typing.TypeVar("Resp", bound=IResponse | None, covariant=True) -class SagaStepResult(pydantic.BaseModel, typing.Generic[ContextT, Resp]): +@dataclasses.dataclass(frozen=True) +class SagaStepResult(typing.Generic[ContextT, Resp]): """ Result of a saga step execution. Contains the response from the step's act method and metadata about the step. - The step_type field uses typing.Any for Pydantic validation compatibility, + The step_type field uses typing.Any for compatibility, but the actual runtime type is Type[SagaStepHandler[ContextT, Resp]]. + + This is an internal data structure used by the saga pattern implementation. + + Args: + response: The response object from the step (can be None) + step_type: Type of the step handler that produced this result + with_error: Whether the step resulted in an error + error_message: Error message if with_error is True + error_traceback: Error traceback lines if with_error is True + error_type: Type of exception if with_error is True + + Example:: + + result = SagaStepResult( + response=response, + step_type=ReserveInventoryStep, + with_error=False + ) """ response: Resp step_type: typing.Any # type: ignore[assignment] # Actual type: Type[SagaStepHandler[ContextT, Resp]] - with_error: bool = False error_message: str | None = None error_traceback: list[str] | None = None @@ -93,7 +110,7 @@ async def compensate(self, context: OrderContext) -> None: def _generate_step_result( self, - response: Response | None, + response: IResponse | None, with_error: bool = False, error_message: str | None = None, error_traceback: list[str] | None = None, @@ -126,12 +143,12 @@ def _generate_step_result( @property @abc.abstractmethod - def events(self) -> typing.List[Event]: + def events(self) -> typing.Sequence[IEvent]: """ Get the list of domain events produced by this step. Returns: - A list of domain events that were generated during the execution + A sequence of domain events that were generated during the execution of the act method. These events can be emitted after the step completes successfully. @@ -177,3 +194,10 @@ async def compensate(self, context: ContextT) -> None: """ raise NotImplementedError + + +__all__ = ( + "SagaStepResult", + "SagaStepHandler", + "Resp", +) diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index 3be685d..fbeded7 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -164,7 +164,8 @@ async def update_context( result = await session.execute(stmt) - if result.rowcount == 0: + # Type ignore: SQLAlchemy Result from update() has rowcount attribute + if result.rowcount == 0: # type: ignore[attr-defined] # Check if saga exists to distinguish between "not found" and "concurrency error" # But for now, we assume if rowcount is 0 and we checked version, it's concurrency if current_version is not None: diff --git a/src/cqrs/serializers/__init__.py b/src/cqrs/serializers/__init__.py index 9017591..b80c01d 100644 --- a/src/cqrs/serializers/__init__.py +++ b/src/cqrs/serializers/__init__.py @@ -1,7 +1,3 @@ from cqrs.serializers.default import default_serializer -from cqrs.serializers.protobuf import protobuf_value_serializer -__all__ = ( - "protobuf_value_serializer", - "default_serializer", -) \ No newline at end of file +__all__ = ("default_serializer",) diff --git a/src/cqrs/serializers/default.py b/src/cqrs/serializers/default.py index a549d68..deb4eb7 100644 --- a/src/cqrs/serializers/default.py +++ b/src/cqrs/serializers/default.py @@ -1,8 +1,24 @@ import typing import orjson -import pydantic -def default_serializer(message: pydantic.BaseModel) -> typing.ByteString: - return orjson.dumps(message.model_dump(mode="json")) +def default_serializer(message: typing.Any) -> typing.ByteString: + """ + Default serializer for messages. + + Works with any object that has a to_dict() method (interface-based approach). + Falls back to model_dump() if available, otherwise serializes as-is. + + Args: + message: Object to serialize. Should implement to_dict() method. + + Returns: + Serialized message as bytes. + """ + if hasattr(message, "to_dict"): + return orjson.dumps(message.to_dict()) + elif hasattr(message, "model_dump"): + return orjson.dumps(message.model_dump(mode="json")) + else: + return orjson.dumps(message) diff --git a/src/cqrs/serializers/protobuf.py b/src/cqrs/serializers/protobuf.py deleted file mode 100644 index 4f94f0c..0000000 --- a/src/cqrs/serializers/protobuf.py +++ /dev/null @@ -1,39 +0,0 @@ -import os -import typing - -import dotenv -from confluent_kafka import schema_registry, serialization -from confluent_kafka.schema_registry import protobuf - -import cqrs - -dotenv.load_dotenv() - -KAFKA_SCHEMA_REGISTRY_URL = os.getenv( - "KAFKA_SCHEMA_REGISTRY_URL", - "http://localhost:8085", -) - - -def protobuf_value_serializer( - event: cqrs.NotificationEvent, -) -> typing.ByteString | None: - """ - Serialize CQRS event model into protobuf message. - """ - protobuf_event = event.proto() - schema_registry_client = schema_registry.SchemaRegistryClient( - {"url": KAFKA_SCHEMA_REGISTRY_URL}, - ) - protobuf_serializer = protobuf.ProtobufSerializer( - protobuf_event.__class__, - schema_registry_client, - {"use.deprecated.format": False}, - ) - - context = serialization.SerializationContext( - event.topic, - serialization.MessageField.VALUE, - ) - - return protobuf_serializer(protobuf_event, context) diff --git a/src/cqrs/types.py b/src/cqrs/types.py index 6fb76cc..6fdbc20 100644 --- a/src/cqrs/types.py +++ b/src/cqrs/types.py @@ -7,12 +7,12 @@ import typing -from cqrs.requests.request import Request -from cqrs.response import Response +from cqrs.requests.request import IRequest +from cqrs.response import IResponse # Type variable for request types (contravariant - can accept subtypes) -ReqT = typing.TypeVar("ReqT", bound=Request, contravariant=True) +ReqT = typing.TypeVar("ReqT", bound=IRequest, contravariant=True) # Type variable for response types (covariant - can return subtypes) -# Can be Response or None -ResT = typing.TypeVar("ResT", bound=Response | None, covariant=True) +# Can be IResponse or None +ResT = typing.TypeVar("ResT", bound=IResponse | None, covariant=True) diff --git a/tests/integration/test_event_outbox.py b/tests/integration/test_event_outbox.py index bb0f542..648df38 100644 --- a/tests/integration/test_event_outbox.py +++ b/tests/integration/test_event_outbox.py @@ -33,7 +33,7 @@ def __init__(self, repository: cqrs.OutboxedEventRepository): self.repository = repository @property - def events(self) -> list[events.Event]: + def events(self) -> typing.Sequence[events.IEvent]: return [] async def handle(self, request: OutboxRequest) -> None: @@ -109,7 +109,7 @@ async def test_get_new_events_negative(self, session): events_list = await repository.get_many(3) await repository.update_status( events_list[-1].id, - repository_protocol.EventStatus.PRODUCED, + repository_protocol.EventStatus.PRODUCED, # type: ignore[arg-type] ) await session.commit() @@ -156,7 +156,7 @@ async def test_get_new_event_negative(self, session): [event_over_get_all_events_method] = await repository.get_many(1) await repository.update_status( event_over_get_all_events_method.id, - repository_protocol.EventStatus.PRODUCED, + repository_protocol.EventStatus.PRODUCED, # type: ignore[arg-type] ) await session.commit() @@ -180,7 +180,7 @@ async def test_mark_as_failure_positive(self, session): # mark FIRST event as failure await repository.update_status( failure_event.id, - repository_protocol.EventStatus.NOT_PRODUCED, + repository_protocol.EventStatus.NOT_PRODUCED, # type: ignore[arg-type] ) await session.commit() @@ -204,7 +204,7 @@ async def test_mark_as_failure_negative(self, session): for _ in range(sqlalchemy.MAX_FLUSH_COUNTER_VALUE): await repository.update_status( failure_event.id, - repository_protocol.EventStatus.NOT_PRODUCED, + repository_protocol.EventStatus.NOT_PRODUCED, # type: ignore[arg-type] ) await session.commit() diff --git a/tests/integration/test_streaming_mediator.py b/tests/integration/test_streaming_mediator.py index 8861c92..943b291 100644 --- a/tests/integration/test_streaming_mediator.py +++ b/tests/integration/test_streaming_mediator.py @@ -28,7 +28,7 @@ def __init__(self) -> None: self._processed_count = 0 @property - def events(self) -> typing.List[events.Event]: + def events(self) -> typing.Sequence[events.IEvent]: return self._events.copy() def clear_events(self) -> None: diff --git a/tests/unit/test_cor_request_handler.py b/tests/unit/test_cor_request_handler.py index 77b7b19..da180fa 100644 --- a/tests/unit/test_cor_request_handler.py +++ b/tests/unit/test_cor_request_handler.py @@ -24,7 +24,7 @@ class TestHandlerA(CORRequestHandler[TRequest, TResult | None]): call_count: int = 0 @property - def events(self) -> typing.List[cqrs.Event]: + def events(self) -> typing.Sequence[cqrs.IEvent]: return [] async def handle(self, request: TRequest) -> TResult | None: @@ -46,7 +46,7 @@ class TestHandlerB(CORRequestHandler[TRequest, TResult | None]): call_count: int = 0 @property - def events(self) -> typing.List[cqrs.Event]: + def events(self) -> typing.Sequence[cqrs.IEvent]: return [] async def handle(self, request: TRequest) -> TResult | None: @@ -68,7 +68,7 @@ class TestHandlerC(CORRequestHandler[TRequest, TResult | None]): call_count: int = 0 @property - def events(self) -> typing.List[cqrs.Event]: + def events(self) -> typing.Sequence[cqrs.IEvent]: return [] async def handle(self, request: TRequest) -> TResult | None: @@ -90,7 +90,7 @@ class DefaultTestHandler(CORRequestHandler[TRequest, TResult | None]): call_count: int = 0 @property - def events(self) -> typing.List[cqrs.Event]: + def events(self) -> typing.Sequence[cqrs.IEvent]: return [] async def handle(self, request: TRequest) -> TResult | None: diff --git a/tests/unit/test_deserializers.py b/tests/unit/test_deserializers.py index f0d7bdb..119f927 100644 --- a/tests/unit/test_deserializers.py +++ b/tests/unit/test_deserializers.py @@ -1,12 +1,10 @@ import typing -import uuid -from unittest.mock import Mock, patch import orjson import pydantic import cqrs -from cqrs.deserializers import json, protobuf +from cqrs.deserializers import json class DeserializedModelPayload(pydantic.BaseModel): @@ -41,7 +39,7 @@ def test_json_deserializer_from_bytes_positive(): def test_json_deserializer_from_str_positive(): - deserializer = json.JsonDeserializer[cqrs.NotificationEvent]( + deserializer = json.JsonDeserializer( model=cqrs.NotificationEvent[DeserializedModelPayload], ) @@ -54,7 +52,7 @@ def test_json_deserializer_from_str_positive(): def test_json_deserializer_invalid_json_negative(): - deserializer = json.JsonDeserializer[cqrs.NotificationEvent]( + deserializer = json.JsonDeserializer( model=cqrs.NotificationEvent[DeserializedModelPayload], ) @@ -67,7 +65,7 @@ def test_json_deserializer_invalid_json_negative(): def test_json_deserializer_invalid_structure_negative(): - deserializer = json.JsonDeserializer[cqrs.NotificationEvent]( + deserializer = json.JsonDeserializer( model=cqrs.NotificationEvent[DeserializedModelPayload], ) @@ -82,7 +80,7 @@ def test_json_deserializer_invalid_structure_negative(): def test_json_deserializer_missing_required_fields_negative(): - deserializer = json.JsonDeserializer[cqrs.NotificationEvent]( + deserializer = json.JsonDeserializer( model=cqrs.NotificationEvent[DeserializedModelPayload], ) @@ -100,7 +98,7 @@ def test_json_deserializer_missing_required_fields_negative(): def test_json_deserializer_empty_string_negative(): - deserializer = json.JsonDeserializer[cqrs.NotificationEvent]( + deserializer = json.JsonDeserializer( model=cqrs.NotificationEvent[DeserializedModelPayload], ) @@ -113,7 +111,7 @@ def test_json_deserializer_empty_string_negative(): def test_json_deserializer_empty_json_object_negative(): - deserializer = json.JsonDeserializer[cqrs.NotificationEvent]( + deserializer = json.JsonDeserializer( model=cqrs.NotificationEvent[DeserializedModelPayload], ) @@ -123,234 +121,3 @@ def test_json_deserializer_empty_json_object_negative(): assert result.error_message is not None assert result.error_type is not None assert result.message_data == "{}" - - -# ============================================================================ -# ProtobufValueDeserializer Tests -# ============================================================================ - - -class MockProtobufMessage: - """Mock protobuf message for testing.""" - - def __init__(self, event_id: str, event_name: str, payload: dict | None = None): - self.event_id = event_id - self.event_name = event_name - self.event_timestamp = "2024-01-01T00:00:00" - if payload: - self.payload = Mock() - self.payload.user_id = payload.get("user_id", "") - self.payload.meeting_id = payload.get("meeting_id", "") - - -def test_protobuf_deserializer_success(): - """Test successful protobuf deserialization.""" - # Use a mock class that can be used as a type - mock_protobuf_model = type("MockProtobufModel", (), {}) # type: ignore[assignment] - mock_event_model = cqrs.NotificationEvent[DeserializedModelPayload] - - deserializer = protobuf.ProtobufValueDeserializer( - model=mock_event_model, - protobuf_model=mock_protobuf_model, # type: ignore[arg-type] - ) - - mock_proto_message = MockProtobufMessage( - event_id="123", - event_name="test_event", - ) - - # Mock the ProtobufDeserializer class to return a callable that returns our mock message - mock_protobuf_deserializer_instance = Mock(return_value=mock_proto_message) - - with patch( - "cqrs.deserializers.protobuf.protobuf.ProtobufDeserializer", - return_value=mock_protobuf_deserializer_instance, - ): - # Mock model_validate to return a proper event - expected_event = cqrs.NotificationEvent[DeserializedModelPayload]( - event_id=uuid.UUID("12345678-1234-5678-1234-567812345678"), - event_name="test_event", - payload=DeserializedModelPayload(foo="foo", bar=1), - ) - - with patch.object( - mock_event_model, - "model_validate", - return_value=expected_event, - ): - result = deserializer(b"test_bytes") - - assert isinstance(result, cqrs.NotificationEvent) - assert result.event_name == "test_event" - # Verify that ProtobufDeserializer was called correctly - mock_protobuf_deserializer_instance.assert_called_once_with( - b"test_bytes", - None, - ) - - -def test_protobuf_deserializer_protobuf_deserialization_error(): - """Test error during protobuf deserialization.""" - mock_protobuf_model = type("MockProtobufModel", (), {}) # type: ignore[assignment] - mock_event_model = cqrs.NotificationEvent[DeserializedModelPayload] - - deserializer = protobuf.ProtobufValueDeserializer( - model=mock_event_model, - protobuf_model=mock_protobuf_model, # type: ignore[arg-type] - ) - - # Mock ProtobufDeserializer to raise an exception - mock_protobuf_deserializer_instance = Mock( - side_effect=ValueError("Invalid protobuf data"), - ) - - with patch( - "cqrs.deserializers.protobuf.protobuf.ProtobufDeserializer", - return_value=mock_protobuf_deserializer_instance, - ): - test_bytes = b"invalid_protobuf_data" - result = deserializer(test_bytes) - - assert isinstance(result, protobuf.DeserializeProtobufError) - assert result.error_message == "Invalid protobuf data" - assert result.error_type is ValueError - assert result.message_data == test_bytes - - -def test_protobuf_deserializer_empty_message(): - """Test handling of empty protobuf message.""" - mock_protobuf_model = type("MockProtobufModel", (), {}) # type: ignore[assignment] - mock_event_model = cqrs.NotificationEvent[DeserializedModelPayload] - - deserializer = protobuf.ProtobufValueDeserializer( - model=mock_event_model, - protobuf_model=mock_protobuf_model, # type: ignore[arg-type] - ) - - # Mock ProtobufDeserializer to return None (empty message) - mock_protobuf_deserializer_instance = Mock(return_value=None) - - with patch( - "cqrs.deserializers.protobuf.protobuf.ProtobufDeserializer", - return_value=mock_protobuf_deserializer_instance, - ): - test_bytes = b"empty_message" - result = deserializer(test_bytes) - - assert isinstance(result, protobuf.DeserializeProtobufError) - assert "empty" in result.error_message.lower() - assert result.error_type is ValueError - assert result.message_data == test_bytes - - -def test_protobuf_deserializer_validation_error(): - """Test pydantic validation error during model conversion.""" - mock_protobuf_model = type("MockProtobufModel", (), {}) # type: ignore[assignment] - mock_event_model = cqrs.NotificationEvent[DeserializedModelPayload] - - deserializer = protobuf.ProtobufValueDeserializer( - model=mock_event_model, - protobuf_model=mock_protobuf_model, # type: ignore[arg-type] - ) - - mock_proto_message = MockProtobufMessage( - event_id="123", - event_name="test_event", - ) - - mock_protobuf_deserializer_instance = Mock(return_value=mock_proto_message) - - with patch( - "cqrs.deserializers.protobuf.protobuf.ProtobufDeserializer", - return_value=mock_protobuf_deserializer_instance, - ): - # Create a validation error - validation_error = pydantic.ValidationError.from_exception_data( - "TestModel", - [{"type": "missing", "loc": ("payload",), "input": {}}], - ) - - with patch.object( - mock_event_model, - "model_validate", - side_effect=validation_error, - ): - test_bytes = b"test_bytes" - result = deserializer(test_bytes) - - assert isinstance(result, protobuf.DeserializeProtobufError) - assert result.error_message is not None - assert result.error_type == pydantic.ValidationError - assert result.message_data == test_bytes - - -def test_protobuf_deserializer_generic_exception(): - """Test handling of generic exceptions during protobuf deserialization.""" - mock_protobuf_model = type("MockProtobufModel", (), {}) # type: ignore[assignment] - mock_event_model = cqrs.NotificationEvent[DeserializedModelPayload] - - deserializer = protobuf.ProtobufValueDeserializer( - model=mock_event_model, - protobuf_model=mock_protobuf_model, # type: ignore[arg-type] - ) - - # Mock ProtobufDeserializer to raise a RuntimeError - mock_protobuf_deserializer_instance = Mock( - side_effect=RuntimeError("Unexpected error"), - ) - - with patch( - "cqrs.deserializers.protobuf.protobuf.ProtobufDeserializer", - return_value=mock_protobuf_deserializer_instance, - ): - test_bytes = b"test_bytes" - result = deserializer(test_bytes) - - assert isinstance(result, protobuf.DeserializeProtobufError) - assert result.error_message == "Unexpected error" - assert result.error_type is RuntimeError - assert result.message_data == test_bytes - - -def test_protobuf_deserializer_byte_string_input(): - """Test that deserializer accepts ByteString types.""" - mock_protobuf_model = type("MockProtobufModel", (), {}) # type: ignore[assignment] - mock_event_model = cqrs.NotificationEvent[DeserializedModelPayload] - - deserializer = protobuf.ProtobufValueDeserializer( - model=mock_event_model, - protobuf_model=mock_protobuf_model, # type: ignore[arg-type] - ) - - mock_proto_message = MockProtobufMessage( - event_id="123", - event_name="test_event", - ) - - mock_protobuf_deserializer_instance = Mock(return_value=mock_proto_message) - - with patch( - "cqrs.deserializers.protobuf.protobuf.ProtobufDeserializer", - return_value=mock_protobuf_deserializer_instance, - ): - expected_event = cqrs.NotificationEvent[DeserializedModelPayload]( - event_id=uuid.UUID("12345678-1234-5678-1234-567812345678"), - event_name="test_event", - payload=DeserializedModelPayload(foo="foo", bar=1), - ) - - with patch.object( - mock_event_model, - "model_validate", - return_value=expected_event, - ): - # Test with bytes - result_bytes = deserializer(b"test_bytes") - assert isinstance(result_bytes, cqrs.NotificationEvent) - - # Reset mock for next call - mock_protobuf_deserializer_instance.reset_mock() - - # Test with bytearray - result_bytearray = deserializer(bytearray(b"test_bytes")) - assert isinstance(result_bytearray, cqrs.NotificationEvent) diff --git a/tests/unit/test_event_processor.py b/tests/unit/test_event_processor.py index c6a2968..8825762 100644 --- a/tests/unit/test_event_processor.py +++ b/tests/unit/test_event_processor.py @@ -67,7 +67,6 @@ async def test_event_processor_processes_events_parallel() -> None: _TestDomainEvent(item_id="2"), _TestDomainEvent(item_id="3"), ] - await processor.emit_events(events) # Wait for background tasks to complete diff --git a/tests/unit/test_request_mediator_parallel_events.py b/tests/unit/test_request_mediator_parallel_events.py index cd01d3a..0101e44 100644 --- a/tests/unit/test_request_mediator_parallel_events.py +++ b/tests/unit/test_request_mediator_parallel_events.py @@ -1,7 +1,9 @@ import asyncio +import typing import pydantic +import cqrs from cqrs.events import ( DomainEvent, Event, @@ -25,7 +27,7 @@ def __init__(self) -> None: self._events: list[Event] = [] @property - def events(self) -> list[Event]: + def events(self) -> typing.Sequence[cqrs.IEvent]: return self._events.copy() def clear_events(self) -> None: diff --git a/tests/unit/test_streaming_dispatcher.py b/tests/unit/test_streaming_dispatcher.py index 184859d..145a693 100644 --- a/tests/unit/test_streaming_dispatcher.py +++ b/tests/unit/test_streaming_dispatcher.py @@ -4,6 +4,7 @@ import pydantic import pytest +import cqrs from cqrs.dispatcher import StreamingRequestDispatcher from cqrs.events import Event, NotificationEvent from cqrs.requests.map import RequestMap @@ -29,7 +30,7 @@ def __init__(self) -> None: self._events: list[Event] = [] @property - def events(self) -> list[Event]: + def events(self) -> typing.Sequence[cqrs.IEvent]: return self._events.copy() def clear_events(self) -> None: diff --git a/tests/unit/test_streaming_mediator.py b/tests/unit/test_streaming_mediator.py index 644833d..589d022 100644 --- a/tests/unit/test_streaming_mediator.py +++ b/tests/unit/test_streaming_mediator.py @@ -12,6 +12,7 @@ EventMap, NotificationEvent, ) +from cqrs.events.event import IEvent from cqrs.mediator import StreamingRequestMediator from cqrs.message_brokers import devnull from cqrs.requests.map import RequestMap @@ -35,7 +36,7 @@ def __init__(self) -> None: self._events: list[Event] = [] @property - def events(self) -> list[Event]: + def events(self) -> typing.Sequence[IEvent]: return self._events.copy() def clear_events(self) -> None: @@ -193,7 +194,7 @@ def __init__(self) -> None: self._events: list[Event] = [] @property - def events(self) -> list[Event]: + def events(self) -> typing.Sequence[IEvent]: return self._events.copy() def clear_events(self) -> None: