Skip to content

Conversation

@t11omas
Copy link

@t11omas t11omas commented Dec 2, 2025

This pull request introduces a new feature for consumer message filtering based on message headers and, optionally, the raw transport message.

Motivation and Goal:
The primary goal is to provide a flexible and efficient way to route incoming messages to specific consumer invokers when multiple consumers subscribe to the same topic or queue. This allows for fine-grained message dispatching based on metadata in headers (e.g., ResourceType, TenantId, MessageVersion), avoiding the need to modify the underlying transport topology.

New Features:

  • Introduces new Filter overloads in the consumer configuration builder (AbstractConsumerBuilder) to define predicates for message filtering.
  • Supports filtering at both the consumer-level (ConsumerSettings.Filter) and the more specific per-invoker level (IMessageTypeConsumerInvokerSettings.Filter), with per-invoker filters taking precedence.
  • Implements runtime evaluation of these filter predicates during message processing and invoker selection in MessageProcessor.
  • Includes error handling for filter predicates: if a filter throws an exception, it is treated as a non-match, and a warning is logged.

Noteworthy changes in usage for the end user:
Users can now configure .Filter() on consumer registrations to conditionally execute consumers based on header values. This enables patterns like having multiple specialized consumers for the same message type on a single topic, each handling a subset of messages defined by header criteria.

Documentation:

  • The /docs/intro.md documentation has been updated with a comprehensive section detailing "Consumer filtering," including its motivation, API usage, runtime behavior, precedence rules, examples, recommendations, and troubleshooting tips.

Tests:

  • A new unit test, When_Publish_Given_TwoConsumersOnSameTopic_WithDifferentFilters_Then_OnlyMatchingConsumerInvoked, has been added to MemoryMessageBusTests.cs to validate the header-based consumer filtering functionality.

@t11omas t11omas mentioned this pull request Dec 2, 2025
@t11omas
Copy link
Author

t11omas commented Dec 2, 2025

@zarusz I added the Signed off message on my commit but its still complaining, plus the build is failing, saying it can't find a file. unfortunately I don't have time to investigate these :(

@zarusz
Copy link
Owner

zarusz commented Dec 3, 2025

Hey @t11omas, thanks for the contribution! I’ll review it shortly — this looks like a great addition to the library.

In the meantime, could you please squash your commits into a single one and include the required sign-off as described in the contributing guidelines?

Signed-off-by: Joe Smith <joe.smith@email.com>

Thanks!

@t11omas t11omas force-pushed the master branch 2 times, most recently from 5715afd to 9775cce Compare December 3, 2025 09:09
@zarusz
Copy link
Owner

zarusz commented Dec 3, 2025

@t11omas I've finished my review, good job there!

Please check one more comment around memory alloc consideration.

Also, I still see two commits, so if you could squash it all into 1 commit - that keeps the history clean.

Enables advanced routing and dispatching of messages to specific consumers or invokers based on message headers. This allows multiple logical consumers to share the same transport path (topic/queue).

- Introduces new `.Filter()` methods on the consumer builder for configuration.
- Implements runtime evaluation of filter predicates during invoker selection, supporting both consumer-level and per-invoker filters with clear precedence.
- Adds robust error handling for filter predicates, treating exceptions as non-matches and logging warnings.
- Updates documentation with a dedicated section, API details, examples, and troubleshooting.
- Includes a unit test to verify correct message dispatching with header-based filters.

Adds consumer message filtering by headers

Introduces the ability to filter incoming messages for specific consumers or invokers based on message headers. This enables advanced routing scenarios where multiple logical consumers share the same transport topic or queue.

*   Allows configuring filter predicates via the `.Filter()` method on the consumer builder.
*   Filters are evaluated at runtime during invoker selection, supporting both consumer-level and per-invoker predicates with clear precedence.
*   Includes robust error handling for filter predicates, treating exceptions as non-matches and logging warnings.
*   Updates the documentation with a dedicated section, API details, examples, and troubleshooting.
*   Adds a unit test to ensure correct message dispatching with header-based filters.

Signed-off-by: Thomas Anderson <t11omas1983@live.co.uk>

Enhances consumer matching with message context

The `TryMatchConsumerInvoker` method now includes `messageHeaders` and the raw `transportMessage` as parameters.
This allows for more dynamic and context-aware selection of message consumers, enabling routing decisions based on runtime header information or transport-specific details, rather than solely on message type.

Signed-off-by: Thomas Anderson <t11omas1983@live.co.uk>

Adjusts test filter signature

Aligns the message consumer filter delegate in tests with the updated API that now provides access to the message object alongside headers. Ensures compatibility with the new filter signature.

Signed-off-by: Thomas Anderson <t11omas1983@live.co.uk>

Refactors consumer filter delegate type

Introduces a new generic `ConsumerFilter` delegate to provide a more explicit and type-safe contract for filtering messages based on headers and the transport message.

Updates `ConsumerSettings` and related interfaces to use this new delegate type for the `Filter` property, improving consistency and clarity across the configuration.

Removes a less specific `Filter` builder overload from `AbstractConsumerBuilder` to streamline the API towards the more comprehensive `(headers, message)` filter signature, which now leverages the new delegate.

Adjusts filter selection logic in `MessageProcessor`. If a specific invoker has a filter configured, that filter is now bypassed, and no fallback to a parent consumer filter occurs. If the invoker has no specific filter, it defaults to the parent consumer's filter.

Signed-off-by: Thomas Andoerson <t11omas1983@live.co.uk>

Fixed test

refactored message processor as per PR comments to reduce the memory allocation on the single invoker scenario

Signed-off-by: Thomas Andoerson <t11omas1983@live.co.uk>
@sonarqubecloud
Copy link

sonarqubecloud bot commented Dec 4, 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants