Releases: vadikko2/python-cqrs
Decouple Interfaces from Pydantic Types
Release Notes - Version 4.6.0
π Overview
This release introduces significant improvements to the framework's architecture, including decoupling interfaces from Pydantic types, bug fixes for event processing, and enhanced saga pattern support with circuit breaker fallback mechanisms.
β¨ Features
Decouple Interfaces from Pydantic Types
The framework now supports decoupling interfaces from Pydantic types, providing more flexibility in type definitions and reducing tight coupling with Pydantic. This change allows for better separation of concerns and improved extensibility.
Changes:
- Updated
DependencyInjectorCQRSContainerto work with decoupled interfaces - Modified event type handling to support non-Pydantic types
- Updated examples to demonstrate the new approach
π Bug Fixes
Fixed Duplicate Event Processing (#36)
Resolved an issue where events were being processed multiple times, which could lead to unintended side effects and duplicate operations.
Changes:
- Fixed event processor to prevent duplicate event emission
- Updated mediator logic to ensure events are processed only once
- Enhanced test coverage for event processing scenarios
Files Modified:
src/cqrs/events/event_processor.pysrc/cqrs/mediator.py- Multiple test files updated for better coverage
π§ Improvements
CI/CD Enhancements
- Updated GitHub Actions workflows for tests and linting
- Improved linting stages configuration
- Enhanced test action configurations
- Added code coverage tracking and badges
Documentation Updates
- Updated README with new badges and coverage information
- Enhanced documentation URLs
- Improved code examples
Code Quality
- Fixed linter errors across the codebase
- Improved type hints and annotations
- Enhanced code documentation
π¦ Dependencies
No breaking dependency changes in this release. All existing dependencies remain compatible.
π Migration Guide
If you're using Pydantic-based request/response types, no changes are required. The framework maintains backward compatibility while now supporting decoupled interfaces.
π Testing
- Added comprehensive tests for saga fallback mechanisms
- Enhanced event processing tests to prevent regressions
- Improved integration test coverage
- Added tests for circuit breaker adapter
π Bug Fix: Duplicate Event Processing
π Bug Fix: Duplicate Event Processing
Fixed critical bug where events were being processed twice, causing duplicate event handler executions.
What Changed
- Before: Events were processed through both
process_events()andemit_events(), causing duplicate execution - After: Events are now processed exactly once through
EventEmitter, which handles routing internally
Impact
β
Events processed exactly once
β
Improved performance
β
Simplified internal API
β
No breaking changes for end users
Technical Details
- Removed duplicate processing logic from
EventProcessor - Simplified
EventProcessorAPI (removedcontainerandmiddleware_chainparameters) EventEmitternow handles all event routing (DomainEvent β handlers, NotificationEvent β broker)
Saga Fallback Pattern with Circuit Breaker Support
π Saga Fallback Pattern with Circuit Breaker Support
This release introduces a powerful Fallback mechanism for Saga steps with integrated Circuit Breaker protection, enabling more resilient distributed transaction handling.
β¨ Key Features
- Automatic Fallback Execution: Define alternative steps that execute automatically when primary steps fail
- Circuit Breaker Integration: Prevent cascading failures with configurable thresholds and fail-fast protection
- Context Snapshot/Restore: Context modifications by failing steps are automatically rolled back before fallback execution
- Exception-Based Triggers: Configure specific exception types that trigger fallback immediately
- Full Compensation Support: Fallback steps are properly compensated if subsequent steps fail
- Per-Step Isolation: Each step gets its own isolated circuit breaker instance
π Quick Example
from cqrs.saga.fallback import Fallback
from cqrs.adapters.circuit_breaker import AioBreakerAdapter
class OrderSagaWithFallback(Saga[OrderContext]):
steps = [
Fallback(
step=PrimaryStep,
fallback=FallbackStep,
failure_exceptions=(ConnectionError, TimeoutError),
circuit_breaker=AioBreakerAdapter(
fail_max=3,
timeout_duration=60,
),
),
]π Installation
Circuit breaker functionality requires the optional aiobreaker dependency:
pip install python-cqrs[aiobreaker]π Documentation
- See
examples/saga_fallback.pyfor a complete working example - Check
README.mdfor detailed usage documentation
π― Benefits
- Improved Resilience: Automatic fallback to alternative steps when services fail
- Reduced Load: Circuit breaker prevents unnecessary calls to failing services
- Better UX: Graceful degradation instead of complete failure
- Production Ready: Battle-tested patterns for distributed systems
π Migration
No breaking changes! This is a fully backward-compatible addition. Existing sagas continue to work without modification.
Improve the reliability, consistency, and concurrency control of the Saga implementation
π Highlights
This release significantly improves the reliability, consistency, and concurrency control of the Saga implementation. We have redesigned the SqlAlchemySagaStorage to handle transactions internally, ensuring crash safety. Additionally, we introduced Optimistic Locking to prevent race conditions during context updates and Row Locking to ensure safe saga recovery in distributed environments.
π₯ Breaking Changes
Storage Interface Updates
The ISagaStorage protocol has been updated. Custom storage implementations must be adapted:
- load_saga_state signature changed: Now returns a tuple of 3 values: (status, context, version) instead of 2. It also accepts a keyword-only argument read_for_update: bool.
- update_context signature changed: Now accepts an optional current_version: int argument for optimistic locking.
SqlAlchemySagaStorage Initialization
- The SqlAlchemySagaStorage class now requires an async_sessionmaker (session factory) instead of a single external AsyncSession.
- The storage now manages its own short-lived sessions for every operation (Autocommit/Checkpointing pattern).
β¨ New Features
π Concurrency Control
- Optimistic Locking: Added a version column to the SagaExecutionModel. When updating the saga context, the storage now checks if the version matches. If a concurrent update occurred, a SagaConcurrencyError is raised.
- Row Locking for Recovery: Implemented SELECT ... FOR UPDATE support in load_saga_state. When recovering sagas, the system now locks the specific saga row, preventing multiple recovery workers from picking up the same saga simultaneously.
π‘οΈ Robust Transaction Management
- Checkpointing: Every saga step transition and state change is now committed immediately to the database using isolated transactions. This ensures that if the application crashes, the saga can resume from the exact last completed step without data loss.
- Connection Pooling: The new design supports efficient connection pooling by returning connections to the pool immediately after each operation.
π οΈ Improvements & Fixes
- Memory Storage: Updated MemorySagaStorage to support versioning and the new protocol signature, ensuring parity between testing and production environments.
- Type Safety: Fixed pyright type checking errors regarding method overrides in storage implementations.
- Tests: Updated integration tests to verify locking mechanisms and transaction isolation.
π Documentation
Updated Saga Storage documentation to reflect the new architecture, database schema changes (version column), and connection pool best practices.
Updated Saga Recovery documentation to explain how Row Locking ensures safe distributed recovery.
Added comprehensive examples for setting up SQLAlchemy storage with connection pooling.
Add SagaMediator supporting
Add SagaMediator supporting
Add Saga and CoR mermaid diagrams generating
Add Saga and CoR mermaid diagrams generating
Saga Pattern & Distributed Transactions
π New Features
Saga Pattern & Distributed Transactions
- Saga Orchestrator: Introduced Saga and SagaTransaction to manage long-running distributed transactions across multiple services 2. or internal operations.
- Step-Based Execution: Added SagaStepHandler interface. Each step defines an act (forward action) and compensate (rollback action) method.
- Automatic Compensation: If any step fails, the Saga automatically triggers compensation for all previously completed steps in reverse order (LIFO).
- State Persistence: Introduced ISagaStorage protocol with MemorySagaStorage implementation. Sagas now persist their state (RUNNING, COMPENSATING, FAILED, COMPLETED) and execution history, enabling crash recovery.
- Recovery Tooling: Added recover_saga() utility to resume interrupted sagas from their last known consistent state.
π‘ Reliability & Recovery Strategy
This release implements a Strict Backward Recovery strategy to guarantee data consistency:
- Point of No Return: Once a Saga enters COMPENSATING or FAILED status, forward execution is permanently disabled. Recovery will only complete pending compensations.
- Zombie State Prevention: Prevents scenarios where a partial rollback conflicts with a retried forward execution.
- Idempotency: The recovery mechanism automatically skips steps that were already successfully completed (or compensated) to prevent double-execution.
- Resilient Compensation: Added configurable exponential backoff and retry mechanisms specifically for compensation steps (compensation_retry_count, compensation_retry_delay).
β‘οΈ Implementation Details
- Type Safety: Full generic type support for SagaContext. The context is strictly typed throughout the transaction.
- DI Integration: Step handlers are resolved dynamically via the Dependency Injection container, allowing seamless injection of repositories and API clients.
- Middleware Support: Saga execution supports the existing middleware chain for logging, context propagation, etc.
- Flexible Context Serialization: Support for Pydantic models, DataClasses, and standard dictionaries for state persistence.
π Documentation & Examples
- Core Examples: Added examples/saga.py demonstrating success and failure scenarios.
- FastAPI Integration: Added examples/fastapi_saga_sse.py showing how to stream Saga progress to clients via Server-Sent Events (SSE).
- Recovery Guide: Added comprehensive documentation on how the recovery strategy works and how to implement storage backends.
π§ͺ Testing
- Extensive test suite added covering complex edge cases:
- Recovery from crashes during forward execution.
- Recovery from crashes during compensation (ensuring the rollback completes).
- Verification of the "Point of No Return" logic.
- Idempotency checks for step re-execution.
Chain of Responsibility Pattern Support
π New Features
Chain of Responsibility Pattern Support
- CORRequestHandler & SyncCORRequestHandler: Added new handler types implementing the chain of responsibility pattern for processing requests through multiple handlers in sequence.
- Chain Building: Introduced build_chain() function to automatically link handlers together from a list.
- Chain Navigation: Added next() method to handlers for passing requests to the next handler in the chain when current handler cannot process the request.
- Automatic Chain Resolution: Updated RequestDispatcher to automatically resolve and build chains when multiple handler types are registered for a single request.
- Handler Registration: Enhanced RequestMap to support binding request types to lists of handler classes that will be automatically chained together.
π§ͺ Testing
Comprehensive Test Coverage: Added extensive unit tests in tests/unit/test_cor_request_handler.py covering:
- Chain termination after successful processing
- Sequential handler execution
- Call count verification
- Multiple request independence
- Fallback handler behavior
π Documentation
- Updated README with comprehensive Chain of Responsibility section including:
- Pattern explanation and use cases
- Complete code examples with payment processing handlers
- Chain registration examples
- Integration with existing CQRS infrastructure
- Added detailed example in examples/cor_request_handler.py demonstrating real-world usage
- Enhanced docstrings with proper examples and API documentation
β‘οΈ Implementation Details
- Type Safety: Full generic type support for request and response types in chain handlers
- DI Container Integration: Seamless integration with existing dependency injection containers
- Middleware Compatibility: Chain handlers work with existing middleware infrastructure
- Event Support: Chain handlers maintain full event generation and publishing capabilities
What's Changed
Full Changelog: 4.2.1...4.3.1
Refactor clear events logic for Stream Handler
Refactor clear events logic for Stream Handler
Streaming and Parallel Event Processing
π New Features
Streaming Support & SSE
- StreamingRequestMediator: Added a new mediator designed for handling streaming requests that yield results incrementally.
- StreamingRequestHandler: Introduced a new handler type for processing large batches or long-running operations with real-time progress updates.
- FastAPI SSE Integration: Added native support and examples for using StreamingRequestMediator with Server-Sent Events (SSE) in FastAPI applications.
Parallel Event Processing
- Concurrent Event Handling: Both RequestMediator and StreamingRequestMediator now support processing domain events in parallel.
- Concurrency Control: Added max_concurrent_event_handlers parameter to limit the number of simultaneously running event handlers.
- Configuration: Added concurrent_event_handle_enable flag to toggle between sequential and parallel execution.
Dependency Injection
- dependency-injector Support: Added explicit support and documentation for the dependency-injector library using the DependencyInjectorCQRSContainer adapter.
π Documentation
- Updated README with comprehensive examples for:
- Streaming Request Handlers.
- Parallel Event Processing configuration.
- FastAPI SSE implementation.
- DI container setup (both di and dependency-injector).