|
1 | 1 | namespace SlimMessageBus.Host; |
2 | 2 |
|
3 | 3 | using System.Diagnostics; |
| 4 | +using System.Text.RegularExpressions; |
4 | 5 |
|
5 | 6 | using SlimMessageBus.Host.Consumer; |
6 | 7 |
|
@@ -187,56 +188,67 @@ protected Type GetMessageType(IReadOnlyDictionary<string, object> headers) |
187 | 188 |
|
188 | 189 | protected IEnumerable<IMessageTypeConsumerInvokerSettings> TryMatchConsumerInvoker(Type messageType, IReadOnlyDictionary<string, object> messageHeaders, object transportMessage) |
189 | 190 | { |
190 | | - |
191 | | - var invokers = _singleInvoker != null ? [_singleInvoker] : _invokers; |
192 | | - var found = false; |
193 | | - |
194 | 191 |
|
195 | | - foreach (var invoker in invokers.Where(x => RuntimeTypeCache.IsAssignableFrom(messageType, x.MessageType))) |
196 | | - { |
197 | | - // If either the invoker has a filter or the parent consumer has a filter evaluate it. |
198 | | - var invFilter = invoker.Filter ?? invoker.ParentSettings.Filter; |
199 | 192 |
|
200 | | - if (invFilter != null) |
| 193 | + if (_singleInvoker != null) |
| 194 | + { |
| 195 | + if (ApplyFilter(_singleInvoker, messageHeaders, transportMessage)) |
201 | 196 | { |
202 | | - bool matches; |
203 | | - try |
| 197 | + // fallback to the first one |
| 198 | + yield return _singleInvoker; |
| 199 | + } |
| 200 | + } |
| 201 | + else |
| 202 | + { |
| 203 | + var found = false; |
| 204 | + foreach (var invoker in _invokers.Where(x => RuntimeTypeCache.IsAssignableFrom(messageType, x.MessageType))) |
| 205 | + { |
| 206 | + if (ApplyFilter(invoker, messageHeaders, transportMessage)) |
204 | 207 | { |
205 | | - matches = invFilter(messageHeaders, transportMessage); |
| 208 | + found = true; |
| 209 | + yield return invoker; |
206 | 210 | } |
207 | | - catch (Exception ex) |
208 | | - { |
209 | | - // If filter throws, treat as non-match and log |
210 | | - FilterException(invoker.ConsumerType.Name, Path, ex); |
| 211 | + } |
211 | 212 |
|
212 | | - matches = false; |
| 213 | + if (!found) |
| 214 | + { |
| 215 | + if (_shouldLogWhenUnrecognizedMessageType) |
| 216 | + { |
| 217 | + var consumerTypes = string.Join(",", _invokers.Select(x => x.ConsumerType.Name)); |
| 218 | + LogNoConsumerTypeMatched(messageType, Path, MessageHeaders.MessageType, consumerTypes); |
213 | 219 | } |
214 | 220 |
|
215 | | - if (!matches) |
| 221 | + if (_shouldFailWhenUnrecognizedMessageType) |
216 | 222 | { |
217 | | - continue; |
| 223 | + throw new ConsumerMessageBusException($"The message on path {Path} declared {MessageHeaders.MessageType} header of type {messageType}, but none of the known consumer types {string.Join(",", _invokers.Select(x => x.ConsumerType.Name))} was able to handle that"); |
218 | 224 | } |
219 | 225 | } |
220 | | - |
221 | | - found = true; |
222 | | - yield return invoker; |
223 | 226 | } |
| 227 | + } |
| 228 | + |
| 229 | + private bool ApplyFilter(IMessageTypeConsumerInvokerSettings invoker, |
| 230 | + IReadOnlyDictionary<string, object> messageHeaders, |
| 231 | + object transportMessage) |
| 232 | + { |
| 233 | + // If either the invoker has a filter or the parent consumer has a filter evaluate it. |
| 234 | + var invFilter = invoker.Filter ?? invoker.ParentSettings.Filter; |
224 | 235 |
|
225 | | - if (!found) |
| 236 | + if (invFilter == null) |
226 | 237 | { |
227 | | - if (_shouldLogWhenUnrecognizedMessageType) |
228 | | - { |
229 | | - var consumerTypes = string.Join(",", _invokers.Select(x => x.ConsumerType.Name)); |
230 | | - LogNoConsumerTypeMatched(messageType, Path, MessageHeaders.MessageType, consumerTypes); |
231 | | - } |
| 238 | + return true; |
| 239 | + } |
232 | 240 |
|
233 | | - if (_shouldFailWhenUnrecognizedMessageType) |
234 | | - { |
235 | | - throw new ConsumerMessageBusException($"The message on path {Path} declared {MessageHeaders.MessageType} header of type {messageType}, but none of the known consumer types {string.Join(",", _invokers.Select(x => x.ConsumerType.Name))} was able to handle that"); |
236 | | - } |
| 241 | + try |
| 242 | + { |
| 243 | + return invFilter(messageHeaders, transportMessage); |
| 244 | + } |
| 245 | + catch (Exception ex) |
| 246 | + { |
| 247 | + // If filter throws, treat as non-match and log |
| 248 | + FilterException(_singleInvoker.ConsumerType.Name, Path, ex); |
| 249 | + return false; |
237 | 250 | } |
238 | 251 | } |
239 | | - |
240 | 252 | #region Logging |
241 | 253 |
|
242 | 254 | [LoggerMessage( |
|
0 commit comments