-
Notifications
You must be signed in to change notification settings - Fork 7
12. Mediator Design Pattern
Cortex.Mediator is an implementation of the Mediator design pattern tailored for CQRS-based systems. It centralizes the sending of commands, querying for data, and publishing of notifications, while supporting pluggable pipeline behaviors (e.g., transactions, logging, validation).
-
Purpose: Change system state with or without (
Cortex.Mediator.Unit) returning a value (e.g., create or update actions). -
Marker Interface:
ICommand<TResult>,ICommand -
Handler Interface:
ICommandHandler<TCommand, TResult>,ICommandHandler<TCommand> -
Pipeline Behavior:
ICommandPipelineBehavior<TCommand, TResult>,ICommandPipelineBehavior<TCommand>
- Purpose: Retrieve or compute data without modifying system state.
-
Marker Interface:
IQuery<TResult> -
Handler Interface:
IQueryHandler<TQuery, TResult> -
Pipeline Behavior:
IQueryPipelineBehavior<TQuery, TResult>
- Purpose: Broadcast events to one or more handlers without returning values.
-
Marker Interface:
INotification -
Handler Interface:
INotificationHandler<TNotification>
- Core Interface:
IMediatorTask<TResult> SendCommandAsync<TCommand, TResult>(TCommand command, CancellationToken cancellationToken = default)Task SendCommandAsync<TCommand>(TCommand command, CancellationToken cancellationToken = default)Task<TResult> SendQueryAsync<TQuery, TResult>(TQuery query, CancellationToken cancellationToken = default)Task PublishAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
-
Use Cases: Cross-cutting concerns (logging, validation, transactions, etc.) around command and query execution.
-
Examples in this library:
- LoggingCommandBehavior – logs command handling and exceptions.
Coming with version v1.8
- ValidationCommandBehavior – runs FluentValidation checks on the command/query before handling.
sequenceDiagram
participant Client as Client Code
participant Mediator as IMediator
participant Behaviors as Pipeline Behaviors
participant Handler as ICommandHandler
Client->>Mediator: SendAsync(TCommand)
activate Mediator
Mediator->>Behaviors: Invoke pipeline behaviors
activate Behaviors
Behaviors->>Handler: handler.Handle(command)
activate Handler
Handler-->>Behaviors: Execution completes
deactivate Handler
Behaviors-->>Mediator: Return from behaviors
deactivate Behaviors
Mediator-->>Client: Return (task completes)
deactivate Mediator
-
Add Reference: Include the Cortex.Mediator NuGet package or add the source to your project.
-
Register in DI: Invoke
AddCortexMediator(...)inStartup.csorProgram.cs(for .NET 6 minimal APIs) to scan your assemblies for handlers.
// Example in Startup.cs
public void ConfigureServices(IServiceCollection services)
{
// 1. Standard .NET setup
services.AddControllers();
// 2. Register your DB connection or IDbConnection
// services.AddScoped<IDbConnection>(...);
// 3. Add Cortex.Mediator
services.AddCortexMediator(
configuration: Configuration, // your app config
handlerAssemblyMarkerTypes: new[] { typeof(Startup) }, // assemblies to scan
configure: options =>
{
// Optionally add default behaviors such as Logging
options.AddDefaultBehaviors();
}
);
// ...
}
// Example in Program.cs
builder.Services.AddControllers();
builder.Services.AddCortexMediator(
configuration: builder.Configuration, // your app config
handlerAssemblyMarkerTypes: new[] { typeof(Program) }, // assemblies to scan
configure: options =>
{
// Optionally add default behaviors such as Logging
options
.AddDefaultBehaviors();
}
);In the above example:
- We scan the assembly containing
Startupfor any command/query/notification handlers. -
AddDefaultBehaviors()registers standard pipeline behaviors (validation, logging, transaction) out of the box.
Commands typically change system state.
// 1. Define a simple command
public class CreateUserCommand : ICommand
{
public string UserName { get; set; }
public string Email { get; set; }
}
// 2. Implement the handler
public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand, Guid>
{
public async Task Handle(CreateUserCommand command, CancellationToken cancellationToken)
{
// Example: Persist user to the database
// using EF Core, raw SQL, etc.
// e.g. _dbContext.Users.Add(new User { ... });
// await _dbContext.SaveChangesAsync();
Console.WriteLine($"User '{command.UserName}' created successfully!");
}
}Usage:
// Suppose we inject IMediator in a controller or service:
public class UserController : ControllerBase
{
private readonly IMediator _mediator;
public UserController(IMediator mediator)
{
_mediator = mediator;
}
[HttpPost("create")]
public async Task<IActionResult> CreateUser([FromBody] CreateUserCommand command)
{
await _mediator.SendCommandAsync(command);
return Ok("User creation requestd");
}
}Queries read data without side effects.
// 1. A query that returns a DTO
public class GetUserDetailsQuery : IQuery<UserDto>
{
public int UserId { get; set; }
}
// 2. Query handler with a result type
public class GetUserDetailsQueryHandler : IQueryHandler<GetUserDetailsQuery, UserDto>
{
public async Task<UserDto> Handle(GetUserDetailsQuery query, CancellationToken cancellationToken)
{
// Fetch user by query.UserId
// return new UserDto { ... };
return new UserDto { UserId = query.UserId, UserName = "Sample", Email = "sample@domain.com" };
}
}
// Sample result model
public class UserDto
{
public int UserId { get; set; }
public string UserName { get; set; }
public string Email { get; set; }
}Usage:
public class UserQueriesController : ControllerBase
{
private readonly IMediator _mediator;
public UserQueriesController(IMediator mediator)
{
_mediator = mediator;
}
[HttpGet("user/{id}")]
public async Task<IActionResult> GetUser(int id)
{
var userDetails = await _mediator.SendQueryAsync<GetUserDetailsQuery, UserDto>(
new GetUserDetailsQuery { UserId = id });
return Ok(userDetails);
}
}Notifications allow broadcasting events to multiple handlers.
// 1. A notification representing an event
public class UserCreatedNotification : INotification
{
public string UserName { get; set; }
public string Email { get; set; }
}
// 2. Handler(s) that listen for this notification
public class SendWelcomeEmailHandler : INotificationHandler<UserCreatedNotification>
{
public async Task Handle(UserCreatedNotification notification, CancellationToken cancellationToken)
{
// e.g. send an email
Console.WriteLine($"Welcome email sent to {notification.Email}");
}
}
public class AnalyticsUpdateHandler : INotificationHandler<UserCreatedNotification>
{
public async Task Handle(UserCreatedNotification notification, CancellationToken cancellationToken)
{
// e.g. log an analytics event
Console.WriteLine($"Analytics updated for new user {notification.UserName}");
}
}Usage:
public class UserRegistrationService
{
private readonly IMediator _mediator;
public UserRegistrationService(IMediator mediator)
{
_mediator = mediator;
}
public async Task RegisterUserAsync(string userName, string email)
{
// 1. Create user in DB ...
// 2. Publish notification
await _mediator.PublishAsync(new UserCreatedNotification
{
UserName = userName,
Email = email
});
}
}Pipeline behaviors are optional modules that run before and after your command/query/notification handlers. They can:
- Validate input (
ValidationCommandBehavior) - Log operations (
LoggingCommandBehavior)
By default, when you call .AddDefaultBehaviors() in your AddCortexMediator(...) registration:
- Validation uses FluentValidation to validate each command/query.
-
Logging logs the command name and any exceptions to an
ILogger.
services.AddCortexMediator(
Configuration,
new[] { typeof(Program) },
options =>
{
// Register default behaviors: Logging, Validation, Transaction
options.AddDefaultBehaviors();
}
);Custom pipeline behaviors can also be added:
// A custom pipeline behavior that measures execution time
public class TimingBehavior<TCommand, TResult> : ICommandPipelineBehavior<TCommand, TResult>
where TCommand : ICommand<TResult>
{
public async Task<TResult> Handle(TCommand command, CommandHandlerDelegate next, CancellationToken cancellationToken)
{
var start = DateTime.UtcNow;
var result = await next();
var duration = DateTime.UtcNow - start;
Console.WriteLine($"Command {typeof(TCommand).Name} took {duration.TotalMilliseconds}ms");
return result;
}
}
// Register custom open generic pipeline
services.AddCortexMediator(Configuration, new[] { typeof(Program) }, options =>
{
options.AddOpenCommandPipelineBehavior(typeof(TimingBehavior<,>));
});CQRS (Command Query Responsibility Segregation) is an architectural pattern splitting read and write operations:
- Commands: Change system state; do not return data.
- Queries: Return data without changing state.
Cortex.Mediator naturally supports CQRS:
- Implement commands (
ICommand<TResult>) + handlers for writes. - Implement queries (
IQuery<TResult>) + handlers for reads. - Keep them in separate classes/modules for clarity.
Notifications further extend event-driven designs by letting different parts of the system react asynchronously.
Below is a simplified example combining commands, queries, and notifications with pipeline behaviors:
using Cortex.Mediator;
// 1. Register services in your Startup
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
// Suppose you have an IDbConnection or EF DbContext to manage
// services.AddScoped<IDbConnection>(...);
// 2. Add Cortex.Mediator, scanning your assembly
services.AddCortexMediator(Configuration, new[] { typeof(Startup) }, options =>
{
// Add default pipeline behaviors (Validation, Logging, Transaction)
options.AddDefaultBehaviors();
});
// 3. Register your other dependencies
}
// 2. Example command and handler
public class CreateInvoiceCommand : ICommand<Unit>
{
public string CustomerId { get; set; }
public decimal Amount { get; set; }
}
public class CreateInvoiceCommandHandler : ICommandHandler<CreateInvoiceCommand, Unit>
{
public async Task<Unit> Handle(CreateInvoiceCommand command, CancellationToken cancellationToken)
{
// Insert into DB, e.g.
// _dbContext.Invoices.Add(new Invoice { ... });
// await _dbContext.SaveChangesAsync();
Console.WriteLine($"Invoice created for Customer: {command.CustomerId} Amount: {command.Amount}");
return Unit.Value;
}
}
// 3. Example query and handler
public class GetInvoiceQuery : IQuery<InvoiceDto>
{
public int InvoiceId { get; set; }
}
public class GetInvoiceQueryHandler : IQueryHandler<GetInvoiceQuery, InvoiceDto>
{
public async Task<InvoiceDto> Handle(GetInvoiceQuery query, CancellationToken cancellationToken)
{
// e.g. retrieve from DB
return new InvoiceDto
{
InvoiceId = query.InvoiceId,
CustomerId = "cust123",
Amount = 199.99m
};
}
}
public class InvoiceDto
{
public int InvoiceId { get; set; }
public string CustomerId { get; set; }
public decimal Amount { get; set; }
}
// 4. Example usage in a controller
[ApiController]
[Route("api/[controller]")]
public class InvoicesController : ControllerBase
{
private readonly IMediator _mediator;
public InvoicesController(IMediator mediator)
{
_mediator = mediator;
}
[HttpPost("create")]
public async Task<IActionResult> CreateInvoice([FromBody] CreateInvoiceCommand command)
{
await _mediator.SendCommandAsync<CreateInvoiceCommand, Unit>(command); // triggers CreateInvoiceCommandHandler
return Ok("Invoice created.");
}
[HttpGet("{id}")]
public async Task<ActionResult<InvoiceDto>> GetInvoice(int id)
{
var dto = await _mediator.SendQueryAsync<GetInvoiceQuery, InvoiceDto>(new GetInvoiceQuery { InvoiceId = id });
return Ok(dto);
}
}Cortex.Mediator elegantly integrates with .NET’s DI container and fosters a clean CQRS approach:
-
Commands and Queries keep your read/write operations separate and explicit.
-
Notifications enable event-based architectures.
-
Pipeline Behaviors give you flexible ways to add cross-cutting concerns like logging, transactions, and validation.
For smaller projects, the separation may be minimal — but as your application grows, Cortex.Mediator’s structured approach will help keep the code organized, testable, and scalable.
Cortex Data Framework WIKI