Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions docs/intro.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@


# Introduction to SlimMessageBus <!-- omit in toc -->

- [Key elements of SlimMessageBus](#key-elements-of-slimmessagebus)
Expand All @@ -9,6 +11,7 @@
- [Consumer](#consumer)
- [Start or Stop message consumption](#start-or-stop-message-consumption)
- [Health check circuit breaker](#health-check-circuit-breaker)
- [Consumer filtering](#consumer-filtering)
- [Consumer Context (Accessing Additional Message Information)](#consumer-context-accessing-additional-message-information)
- [Recommended Approach: Constructor Injection](#recommended-approach-constructor-injection)
- [Obsolete Approaches](#obsolete-approaches)
Expand Down Expand Up @@ -350,6 +353,110 @@ Here’s a clearer, more structured rewrite of that section with improved readab

---

#### Consumer filtering


##### Consumer filtering by message headers

SlimMessageBus supports registering predicates that filter which consumer invokers should handle a given incoming message based on message headers (and optionally the transport message). This allows multiple consumers to subscribe to the same topic/queue and be selected at runtime using header values (for example `ResourceType`, `TenantId`, `MessageVersion`, etc).

This document describes:
- Where to configure filters (consumer-level and per-invoker).
- How filters are evaluated at runtime.
- Examples.

##### Motivation

When multiple logical message handlers share the same transport path (same queue or topic) it is useful to dispatch to a specific handler based on message metadata carried in headers. Filters are lightweight predicates executed during invoker selection and allow routing decisions without touching the transport topology.

##### API

Per-consumer filter (applies only to that derived-consumer). Use the `Filter` overload that accepts a predicate:

```csharp
bus.Consume<MyMessage>(...
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageB))
.WithConsumer<MyMessageAConsumer2>());
```



##### Runtime behaviour and precedence

- When a message arrives, consumers are matched by message CLR type (as before).
- For each candidate consumer the runtime evaluates the predicate in this order:
1. `consumer.Filter` (per-consumer filter), if present.
2. Otherwise `consumer.ParentSettings.Filter` (consumer-level filter), if present.
- If no filter is present the invoker is considered a match as before.
- If a filter throws an exception it is treated as a non-match and a warning log entry is emitted.
- If none of the invokers match and the consumer configuration says to fail on unrecognized type, the message is handled according to existing undeclared-message rules (log / throw / ack etc).

This approach allows a default consumer-level filter and selective overrides per-invoker.

##### Example: two consumers on same queue filtered by header

Producer (publishes with header):
```csharp
public class SomeMessage
{
public string ResourceType { get; set; }
// other properties
}

await bus.Publish(new SomeMessage { ResourceType = "SomeMessageA" },
ctx => ctx.SetHeader("ResourceType", "SomeMessageA"));
```

Consumer registration:
```csharp
public class SomeMessageAConsumer : IConsumer<SomeMessage>
{
public Task Consume(SomeMessage message)
{
// handle SomeMessageA
}
}

public class SomeMessageBConsumer : IConsumer<SomeMessage>
{
public Task Consume(SomeMessage message)
{
// handle SomeMessageB
}
}

bus.Consume<SomeMessageA>(x => x.Topic(topic)
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageA))
.WithConsumer<SomeMessageAConsumer>());

bus.Consume<SomeMessageA>(x => x.Topic(topic)
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageB))
.WithConsumer<SomeMessageAConsumer2>());
```


If the incoming message has `ResourceType = "SomeMessageA"` only the first consumer will be executed.

##### Notes and recommendations

- Keep filter predicates cheap and deterministic — they are executed during invoker selection.
- Prefer header-only predicates for portability across providers. Use the transport message parameter only when you need provider-specific metadata.
- Consider logging or metrics when an arrival does not match any consumer so routing configuration problems are visible.
- Unit tests: there is (or should be) a unit test demonstrating two consumers on the same topic filtered by header under `Tests\SlimMessageBus.Host.Memory.Test\MemoryMessageBusTests.cs` (see `When_Publish_Given_TwoConsumersOnSameTopic_WithDifferentFilters_Then_OnlyMatchingConsumerInvoked`).

##### Troubleshooting

- If you expect a consumer to be invoked but it is not:
- Verify the header key and value types (strings vs other types).
- Verify whether the filter throws an exception (exceptions are treated as non-matches and logged).
- Confirm whether consumer-level filter is present — it will be used only if per-invoker filter is absent.

##### Summary

Filters let you route messages to specific invokers based on headers or transport-level metadata while keeping a single transport path for related messages. They are configured in the fluent builder and evaluated at runtime during invoker selection.

---

### Consumer Context (Accessing Additional Message Information)

Within a message consumer, you can access the [`IConsumerContext`](/src/SlimMessageBus/IConsumerContext.cs) to retrieve detailed metadata about the message being processed. This includes:
Expand Down Expand Up @@ -496,6 +603,8 @@ Each processing of a message resolves the `TConsumer` instance from the DI.

> Please note that anything higher than 1 will cause multiple messages to be consumed concurrently in one service instance. This will typically impact message processing order (ie 2nd message might get processed sooner than the 1st message).

#### Filtering

## Request-response communication

SMB provides an implementation of request-response over topics or queues - depending on what the underlying provider supports.
Expand Down
135 changes: 133 additions & 2 deletions docs/intro.t.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@


# Introduction to SlimMessageBus <!-- omit in toc -->

- [Key elements of SlimMessageBus](#key-elements-of-slimmessagebus)
Expand All @@ -9,6 +11,7 @@
- [Consumer](#consumer)
- [Start or Stop message consumption](#start-or-stop-message-consumption)
- [Health check circuit breaker](#health-check-circuit-breaker)
- [Consumer filtering](#consumer-filtering)
- [Consumer Context (Accessing Additional Message Information)](#consumer-context-accessing-additional-message-information)
- [Recommended Approach: Constructor Injection](#recommended-approach-constructor-injection)
- [Obsolete Approaches](#obsolete-approaches)
Expand Down Expand Up @@ -350,6 +353,110 @@ Here’s a clearer, more structured rewrite of that section with improved readab

---

#### Consumer filtering


##### Consumer filtering by message headers

SlimMessageBus supports registering predicates that filter which consumer invokers should handle a given incoming message based on message headers (and optionally the transport message). This allows multiple consumers to subscribe to the same topic/queue and be selected at runtime using header values (for example `ResourceType`, `TenantId`, `MessageVersion`, etc).

This document describes:
- Where to configure filters (consumer-level and per-invoker).
- How filters are evaluated at runtime.
- Examples.

##### Motivation

When multiple logical message handlers share the same transport path (same queue or topic) it is useful to dispatch to a specific handler based on message metadata carried in headers. Filters are lightweight predicates executed during invoker selection and allow routing decisions without touching the transport topology.

##### API

Per-consumer filter (applies only to that derived-consumer). Use the `Filter` overload that accepts a predicate:

```csharp
bus.Consume<MyMessage>(...
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageB))
.WithConsumer<MyMessageAConsumer2>());
```



##### Runtime behaviour and precedence

- When a message arrives, consumers are matched by message CLR type (as before).
- For each candidate consumer the runtime evaluates the predicate in this order:
1. `consumer.Filter` (per-consumer filter), if present.
2. Otherwise `consumer.ParentSettings.Filter` (consumer-level filter), if present.
- If no filter is present the invoker is considered a match as before.
- If a filter throws an exception it is treated as a non-match and a warning log entry is emitted.
- If none of the invokers match and the consumer configuration says to fail on unrecognized type, the message is handled according to existing undeclared-message rules (log / throw / ack etc).

This approach allows a default consumer-level filter and selective overrides per-invoker.

##### Example: two consumers on same queue filtered by header

Producer (publishes with header):
```csharp
public class SomeMessage
{
public string ResourceType { get; set; }
// other properties
}

await bus.Publish(new SomeMessage { ResourceType = "SomeMessageA" },
ctx => ctx.SetHeader("ResourceType", "SomeMessageA"));
```

Consumer registration:
```csharp
public class SomeMessageAConsumer : IConsumer<SomeMessage>
{
public Task Consume(SomeMessage message)
{
// handle SomeMessageA
}
}

public class SomeMessageBConsumer : IConsumer<SomeMessage>
{
public Task Consume(SomeMessage message)
{
// handle SomeMessageB
}
}

bus.Consume<SomeMessageA>(x => x.Topic(topic)
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageA))
.WithConsumer<SomeMessageAConsumer>());

bus.Consume<SomeMessageA>(x => x.Topic(topic)
.Filter(headers => headers != null && headers.TryGetValue("ResourceType", out var v) && (string)v == nameof(SomeMessageB))
.WithConsumer<SomeMessageAConsumer2>());
```


If the incoming message has `ResourceType = "SomeMessageA"` only the first consumer will be executed.

##### Notes and recommendations

- Keep filter predicates cheap and deterministic — they are executed during invoker selection.
- Prefer header-only predicates for portability across providers. Use the transport message parameter only when you need provider-specific metadata.
- Consider logging or metrics when an arrival does not match any consumer so routing configuration problems are visible.
- Unit tests: there is (or should be) a unit test demonstrating two consumers on the same topic filtered by header under `Tests\SlimMessageBus.Host.Memory.Test\MemoryMessageBusTests.cs` (see `When_Publish_Given_TwoConsumersOnSameTopic_WithDifferentFilters_Then_OnlyMatchingConsumerInvoked`).

##### Troubleshooting

- If you expect a consumer to be invoked but it is not:
- Verify the header key and value types (strings vs other types).
- Verify whether the filter throws an exception (exceptions are treated as non-matches and logged).
- Confirm whether consumer-level filter is present — it will be used only if per-invoker filter is absent.

##### Summary

Filters let you route messages to specific invokers based on headers or transport-level metadata while keeping a single transport path for related messages. They are configured in the fluent builder and evaluated at runtime during invoker selection.

---

### Consumer Context (Accessing Additional Message Information)

Within a message consumer, you can access the [`IConsumerContext`](/src/SlimMessageBus/IConsumerContext.cs) to retrieve detailed metadata about the message being processed. This includes:
Expand Down Expand Up @@ -496,6 +603,8 @@ Each processing of a message resolves the `TConsumer` instance from the DI.

> Please note that anything higher than 1 will cause multiple messages to be consumed concurrently in one service instance. This will typically impact message processing order (ie 2nd message might get processed sooner than the 1st message).

#### Filtering

## Request-response communication

SMB provides an implementation of request-response over topics or queues - depending on what the underlying provider supports.
Expand Down Expand Up @@ -1243,7 +1352,29 @@ public class LoggingConsumerInterceptor<TMessage> : IConsumerInterceptor<TMessag

Message processing by consumers or handlers may result in exceptions. The [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) provides a standard way to integrate custom error handling logic across different transports.

@[:cs](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs,Interface)
```cs
public interface IConsumerErrorHandler<in T>
{
/// <summary>
/// <para>
/// Executed when the message consumer (or handler) errors out. The interface allows for interception of
/// exceptions to manipulate the processing pipeline (success/fail/retry).
/// </para>
/// <para>
/// The consumer context is available to apply transport specific operations (acknowledge/reject/dead letter/etc).
/// </para>
/// <para>
/// If message execution is to be re-attempted, any delays/jitter should be applied before the method returns.
/// </para>
/// </summary>
/// <param name="message">The message that failed to process.</param>
/// <param name="consumerContext">The consumer context for the message processing pipeline.</param>
/// <param name="exception">Exception that occurred during message processing.</param>
/// <param name="attempts">The number of times the message has been attempted to be processed.</param>
/// <returns>The error handling result.</returns>
Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
}
```

The returned `ProcessResult` object is used to override the execution for the remainder of the execution pipeline. Some transports provide additional options.

Expand Down Expand Up @@ -1420,4 +1551,4 @@ This allows to recreate missing elements in the infrastructure without restartin
## Versions

- The v3 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/3.0.0).
- The v2 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0).
- The v2 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0).
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,14 @@ public TConsumerBuilder WhenUndeclaredMessageTypeArrives(Action<UndeclaredMessag
action(ConsumerSettings.UndeclaredMessageType);
return (TConsumerBuilder)this;
}

/// <summary>
/// More advanced overload where transport message is passed as well.
/// </summary>
public TConsumerBuilder Filter(ConsumerFilter<object> headerPredicateWithTransport)
{
if (headerPredicateWithTransport == null) throw new ArgumentNullException(nameof(headerPredicateWithTransport));
ConsumerSettings.Filter = headerPredicateWithTransport;
return (TConsumerBuilder)this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ private void CalculateResponseType()
/// Enables the disposal of consumer instance after the message has been consumed.
/// </summary>
public bool IsDisposeConsumerEnabled { get; set; }

/// <summary>
/// Optional predicate evaluated on arrival headers and transport message to decide if this invoker should be used.
/// </summary>
public ConsumerFilter<object> Filter { get; set; }
}
2 changes: 2 additions & 0 deletions src/SlimMessageBus.Host.Configuration/Settings/Delegates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

public delegate void MessageHeaderModifier<in T>(IDictionary<string, object> headers, T message);

public delegate bool ConsumerFilter<in T>(IReadOnlyDictionary<string, object> headers, T message);

public delegate Task ConsumerMethod(object consumer, object message, IConsumerContext consumerContext, CancellationToken cancellationToken);
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@ public interface IMessageTypeConsumerInvokerSettings
/// The consumer method.
/// </summary>
MethodInfo ConsumerMethodInfo { get; set; }

/// <summary>
/// Optional predicate to filter arriving messages by headers/transport message.
/// When set, the invoker is only considered if the predicate returns true.
/// </summary>
ConsumerFilter<object> Filter { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class MessageTypeConsumerInvokerSettings : IMessageTypeConsumerInvokerSet
public ConsumerMethod ConsumerMethod { get; set; }
/// <inheritdoc/>
public MethodInfo ConsumerMethodInfo { get; set; }
/// <inheritdoc/>
public ConsumerFilter<object> Filter { get; set; }

public MessageTypeConsumerInvokerSettings(ConsumerSettings parentSettings, Type messageType, Type consumerType)
{
Expand Down
Loading
Loading