From 54dfb244241593f73c3d8861a73bf7eb8a5be844 Mon Sep 17 00:00:00 2001 From: jericho Date: Mon, 29 Jan 2024 21:41:20 -0500 Subject: [PATCH 01/15] Metric to track how long a message is queued, waiting to be processed Resolves #38 --- Source/Picton.Messaging/AsyncMessagePump.cs | 7 ++++++ Source/Picton.Messaging/MessagePumpOptions.cs | 2 +- Source/Picton.Messaging/Metrics.cs | 23 +++++++++++++------ 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index 046911d..f02fb48 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -349,6 +349,13 @@ private async Task ProcessMessagesAsync(CancellationToken cancellationToken) { if (_queueManagers.TryGetValue(result.QueueName, out (QueueConfig Config, QueueManager QueueManager, QueueManager PoisonQueueManager, DateTime LastFetched, TimeSpan FetchDelay) queueInfo)) { + if (result.Message.InsertedOn.HasValue) + { + var elapsed = DateTimeOffset.UtcNow.Subtract(result.Message.InsertedOn.Value); + var messageWaitTime = (long)elapsed.TotalSeconds; + _metrics.Measure.Timer.Time(Metrics.MessageWaitBeforeProcessTimer, messageWaitTime); + } + using (_metrics.Measure.Timer.Time(Metrics.MessageProcessingTimer)) { try diff --git a/Source/Picton.Messaging/MessagePumpOptions.cs b/Source/Picton.Messaging/MessagePumpOptions.cs index c9951d8..af6622e 100644 --- a/Source/Picton.Messaging/MessagePumpOptions.cs +++ b/Source/Picton.Messaging/MessagePumpOptions.cs @@ -103,7 +103,7 @@ public MessagePumpOptions(string connectionString, int? concurrentTasks, QueueCl /// The delay is reset to zero when at lest one messages is found in the queue. /// /// The pupose of the delay is to ensure we don't query a given queue too often when we know it to be empty. - /// + /// /// Default value is 5 seconds. /// public TimeSpan EmptyQueueFetchDelay { get; set; } = _defaultEmptyQueueFetchDelay; diff --git a/Source/Picton.Messaging/Metrics.cs b/Source/Picton.Messaging/Metrics.cs index 4a782ec..4c4c2ca 100644 --- a/Source/Picton.Messaging/Metrics.cs +++ b/Source/Picton.Messaging/Metrics.cs @@ -12,17 +12,26 @@ internal static class Metrics /// public static CounterOptions MessagesProcessedCounter => new() { - Context = "Picton", + Context = "Picton.Messaging", Name = "MessagesProcessedCount", MeasurementUnit = Unit.Items }; + /// + /// Gets the timer indicating how long a message was in queue, waiting to be processed. + /// + public static TimerOptions MessageWaitBeforeProcessTimer => new() + { + Context = "Picton.Messaging", + Name = "MessageWaitBeforeProcessTimer" + }; + /// /// Gets the timer indicating the time it takes to process a message. /// public static TimerOptions MessageProcessingTimer => new() { - Context = "Picton", + Context = "Picton.Messaging", Name = "MessageProcessingTime" }; @@ -31,7 +40,7 @@ internal static class Metrics /// public static TimerOptions MessagesFetchingTimer => new() { - Context = "Picton", + Context = "Picton.Messaging", Name = "MessagesFetchingTime" }; @@ -40,7 +49,7 @@ internal static class Metrics /// public static CounterOptions QueueEmptyCounter => new() { - Context = "Picton", + Context = "Picton.Messaging", Name = "QueueEmptyCount" }; @@ -49,7 +58,7 @@ internal static class Metrics /// public static CounterOptions AllQueuesEmptyCounter => new() { - Context = "Picton", + Context = "Picton.Messaging", Name = "AllQueuesEmptyCount" }; @@ -58,7 +67,7 @@ internal static class Metrics /// public static GaugeOptions QueuedCloudMessagesGauge => new() { - Context = "Picton", + Context = "Picton.Messaging", Name = "QueuedCloudMessages", MeasurementUnit = Unit.Items }; @@ -68,7 +77,7 @@ internal static class Metrics /// public static GaugeOptions QueuedMemoryMessagesGauge => new() { - Context = "Picton", + Context = "Picton.Messaging", Name = "QueuedMemoryMessages", MeasurementUnit = Unit.Items }; From 11a23f353f5885292401c1cb4d97e140ac7569f3 Mon Sep 17 00:00:00 2001 From: jericho Date: Tue, 30 Jan 2024 10:42:41 -0500 Subject: [PATCH 02/15] Raise distinct events when a given queue is empty vs. when all queues are empty Resolves #39 --- .../TestsRunner.cs | 21 ++++++------------- Source/Picton.Messaging/AsyncMessagePump.cs | 21 ++++++++++++++++--- .../AsyncMessagePumpWithHandlers.cs | 20 +++++++++++++++--- .../AsyncMultiTenantMessagePump.cs | 20 +++++++++++++++--- ...AsyncMultiTenantMessagePumpWithHandlers.cs | 20 +++++++++++++++--- 5 files changed, 75 insertions(+), 27 deletions(-) diff --git a/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs b/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs index f1659b2..1a6a780 100644 --- a/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs +++ b/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs @@ -114,13 +114,10 @@ private async Task RunAsyncMessagePumpTests(string connectionString, string queu _logger.LogInformation("{messageContent}", message.Content.ToString()); }, - // Stop the timer and the message pump when the queue is empty. - OnEmpty = cancellationToken => + // Stop the message pump when there are no more messages to process. + OnAllQueuesEmpty = cancellationToken => { - // Stop the timer if (sw.IsRunning) sw.Stop(); - - // Stop the message pump _logger.LogDebug("Asking the message pump to stop..."); cts.Cancel(); } @@ -158,13 +155,10 @@ private async Task RunAsyncMessagePumpWithHandlersTests(string connectionString, var options = new MessagePumpOptions(connectionString, concurrentTasks, null, null); var messagePump = new AsyncMessagePumpWithHandlers(options, _logger, metrics) { - // Stop the timer and the message pump when the queue is empty. - OnEmpty = cancellationToken => + // Stop the message pump when there are no more messages to process. + OnAllQueuesEmpty = cancellationToken => { - // Stop the timer if (sw.IsRunning) sw.Stop(); - - // Stop the message pump _logger.LogDebug("Asking the message pump with handlers to stop..."); cts.Cancel(); } @@ -211,13 +205,10 @@ private async Task RunMultiTenantAsyncMessagePumpTests(string connectionString, _logger.LogInformation("{tenantId} - {messageContent}", tenantId, message.Content.ToString()); }, - // Stop the timer and the message pump when all tenant queues are empty. - OnEmpty = cancellationToken => + // Stop the message pump when there are no more messages to process. + OnAllQueuesEmpty = cancellationToken => { - // Stop the timer if (sw.IsRunning) sw.Stop(); - - // Stop the message pump _logger.LogDebug("Asking the multi-tenant message pump to stop..."); cts.Cancel(); } diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index f02fb48..1ff2ca1 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -60,18 +60,31 @@ public class AsyncMessagePump /// public Action OnError { get; set; } + /// + /// Gets or sets the logic to execute when a queue is empty. + /// + /// + /// + /// OnQueueEmpty = (queueName, cancellationToken) => _logger.LogInformation("Queue {queueName} is empty", queueName); + /// + /// + /// + /// If this property is not set, the default logic is to do nothing. + /// + public Action OnQueueEmpty { get; set; } + /// /// Gets or sets the logic to execute when all queues are empty. /// /// /// - /// OnEmpty = cancellationToken => Task.Delay(2500, cancellationToken).Wait(); + /// OnAllQueuesEmpty = cancellationToken => _logger.LogInformation("All queues are empty"); /// /// /// /// If this property is not set, the default logic is to do nothing. /// - public Action OnEmpty { get; set; } + public Action OnAllQueuesEmpty { get; set; } #endregion @@ -503,6 +516,8 @@ private async Task ProcessMessagesAsync(CancellationToken cancellationToken) if (delay > _messagePumpOptions.EmptyQueueMaxFetchDelay) delay = _messagePumpOptions.EmptyQueueMaxFetchDelay; _queueManagers[queueName] = (queueInfo.Config, queueInfo.QueueManager, queueInfo.PoisonQueueManager, DateTime.UtcNow, delay); + + OnQueueEmpty?.Invoke(queueName, cancellationToken); } } } @@ -523,7 +538,7 @@ private async Task ProcessMessagesAsync(CancellationToken cancellationToken) { // All queues are empty _metrics.Measure.Counter.Increment(Metrics.AllQueuesEmptyCounter); - OnEmpty?.Invoke(cancellationToken); + OnAllQueuesEmpty?.Invoke(cancellationToken); } catch (Exception e) when (e is TaskCanceledException || e is OperationCanceledException) { diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index e1e0a19..6dc3a5f 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -40,18 +40,31 @@ public class AsyncMessagePumpWithHandlers /// public Action OnError { get; set; } + /// + /// Gets or sets the logic to execute when a queue is empty. + /// + /// + /// + /// OnQueueEmpty = (queueName, cancellationToken) => _logger.LogInformation("Queue {queueName} is empty", queueName); + /// + /// + /// + /// If this property is not set, the default logic is to do nothing. + /// + public Action OnQueueEmpty { get; set; } + /// /// Gets or sets the logic to execute when all queues are empty. /// /// /// - /// OnEmpty = cancellationToken => Task.Delay(2500, cancellationToken).Wait(); + /// OnAllQueuesEmpty = (cancellationToken) => _logger.LogInformation("All queues are empty"); /// /// /// /// If this property is not set, the default logic is to do nothing. /// - public Action OnEmpty { get; set; } + public Action OnAllQueuesEmpty { get; set; } #endregion @@ -112,7 +125,8 @@ public void RemoveQueue(string queueName) /// A representing the asynchronous operation. public Task StartAsync(CancellationToken cancellationToken) { - _messagePump.OnEmpty = OnEmpty; + _messagePump.OnQueueEmpty = OnQueueEmpty; + _messagePump.OnAllQueuesEmpty = OnAllQueuesEmpty; _messagePump.OnError = OnError; _messagePump.OnMessage = (queueName, message, cancellationToken) => { diff --git a/Source/Picton.Messaging/AsyncMultiTenantMessagePump.cs b/Source/Picton.Messaging/AsyncMultiTenantMessagePump.cs index 8d6a1c8..9fa7ce9 100644 --- a/Source/Picton.Messaging/AsyncMultiTenantMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMultiTenantMessagePump.cs @@ -64,18 +64,31 @@ public class AsyncMultiTenantMessagePump /// public Action OnError { get; set; } + /// + /// Gets or sets the logic to execute when a queue is empty. + /// + /// + /// + /// OnQueueEmpty = (queueName, cancellationToken) => _logger.LogInformation("Queue {queueName} is empty", queueName); + /// + /// + /// + /// If this property is not set, the default logic is to do nothing. + /// + public Action OnQueueEmpty { get; set; } + /// /// Gets or sets the logic to execute when all queues are empty. /// /// /// - /// OnEmpty = cancellationToken => Task.Delay(2500, cancellationToken).Wait(); + /// OnAllQueuesEmpty = (cancellationToken) => _logger.LogInformation("All queues are empty"); /// /// /// /// If this property is not set, the default logic is to do nothing. /// - public Action OnEmpty { get; set; } + public Action OnAllQueuesEmpty { get; set; } #endregion @@ -118,7 +131,8 @@ public async Task StartAsync(CancellationToken cancellationToken) { if (OnMessage == null) throw new ArgumentNullException(nameof(OnMessage)); - _messagePump.OnEmpty = OnEmpty; + _messagePump.OnQueueEmpty = (queueName, cancellationToken) => OnQueueEmpty?.Invoke(queueName.TrimStart(_queueNamePrefix), cancellationToken); + _messagePump.OnAllQueuesEmpty = OnAllQueuesEmpty; _messagePump.OnError = (queueName, message, exception, isPoison) => OnError?.Invoke(queueName.TrimStart(_queueNamePrefix), message, exception, isPoison); _messagePump.OnMessage = (queueName, message, cancellationToken) => OnMessage?.Invoke(queueName.TrimStart(_queueNamePrefix), message, cancellationToken); diff --git a/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs index c32c727..1b7a41a 100644 --- a/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs @@ -40,18 +40,31 @@ public class AsyncMultiTenantMessagePumpWithHandlers /// public Action OnError { get; set; } + /// + /// Gets or sets the logic to execute when a queue is empty. + /// + /// + /// + /// OnQueueEmpty = (queueName, cancellationToken) => _logger.LogInformation("Queue {queueName} is empty", queueName); + /// + /// + /// + /// If this property is not set, the default logic is to do nothing. + /// + public Action OnQueueEmpty { get; set; } + /// /// Gets or sets the logic to execute when all queues are empty. /// /// /// - /// OnEmpty = cancellationToken => Task.Delay(2500, cancellationToken).Wait(); + /// OnAllQueuesEmpty = (cancellationToken) => _logger.LogInformation("All queues are empty"); /// /// /// /// If this property is not set, the default logic is to do nothing. /// - public Action OnEmpty { get; set; } + public Action OnAllQueuesEmpty { get; set; } #endregion @@ -89,7 +102,8 @@ public AsyncMultiTenantMessagePumpWithHandlers(MessagePumpOptions options, strin /// A representing the asynchronous operation. public Task StartAsync(CancellationToken cancellationToken) { - _messagePump.OnEmpty = OnEmpty; + _messagePump.OnQueueEmpty = (queueName, cancellationToken) => OnQueueEmpty?.Invoke(queueName.TrimStart(_queueNamePrefix), cancellationToken); + _messagePump.OnAllQueuesEmpty = OnAllQueuesEmpty; _messagePump.OnError = (queueName, message, exception, isPoison) => OnError?.Invoke(queueName.TrimStart(_queueNamePrefix), message, exception, isPoison); _messagePump.OnMessage = (queueName, message, cancellationToken) => { From e89fdb8d8ccb5c029d4223afe70a61a76fdd6b5e Mon Sep 17 00:00:00 2001 From: jericho Date: Tue, 30 Jan 2024 10:43:09 -0500 Subject: [PATCH 03/15] (GH-37) Add multiple queue that match a RegEx pattern --- .../AsyncMessagePumpWithHandlers.cs | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index 6dc3a5f..8c738a2 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -117,6 +117,28 @@ public void RemoveQueue(string queueName) _messagePump.RemoveQueue(queueName); } + /// + /// Add queues that meet the specified RegEx pattern. + /// + /// + /// All the queues that match the specified pattern will share the same poison queue if you specify the name of the poison queue. + /// If you omit this value, each queue will get their own poison queue. + /// + /// Similarly, all the queues that match the specified pattern will share the same oversize messages storage if you specify the name of the blob storage container. + /// If you omit this value, each queue will get their own blob container. + /// + /// The RegEx pattern. + /// Optional. The name of the queue where poison messages are automatically moved. + /// Optional. Specifies the visibility timeout value. The default value is 30 seconds. + /// Optional. A nonzero integer value that specifies the number of time we try to process a message before giving up and declaring the message to be "poison". The default value is 3. + /// Name of the blob storage where messages that exceed the maximum size for a queue message are stored. + /// The cancellation token. + /// The async task. + public Task AddQueuesByPatternAsync(string queueNamePattern, string poisonQueueName = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, string oversizeMessagesBlobStorageName = null, CancellationToken cancellationToken = default) + { + return _messagePump.AddQueuesByPatternAsync(queueNamePattern, poisonQueueName, visibilityTimeout, maxDequeueCount, oversizeMessagesBlobStorageName, cancellationToken); + } + /// /// Starts the message pump. /// From fab02e980e21e228796b4ea2845a4250c0774bc0 Mon Sep 17 00:00:00 2001 From: jericho Date: Tue, 30 Jan 2024 14:10:41 -0500 Subject: [PATCH 04/15] (GH-39) Fix unit tests to work with the new 'OnAllQueuesEmpty' event --- .../AsyncMessagePumpTests.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs b/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs index 72c66d9..721d42d 100644 --- a/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs +++ b/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs @@ -116,7 +116,7 @@ public async Task No_message_processed_when_queue_is_empty() { Interlocked.Increment(ref onErrorInvokeCount); }, - OnEmpty = cancellationToken => + OnAllQueuesEmpty = cancellationToken => { Interlocked.Increment(ref OnEmptyInvokeCount); cts.Cancel(); @@ -212,7 +212,7 @@ public async Task Message_processed() } } }, - OnEmpty = cancellationToken => + OnAllQueuesEmpty = cancellationToken => { Interlocked.Increment(ref OnEmptyInvokeCount); cts.Cancel(); @@ -299,7 +299,7 @@ public async Task Poison_message_is_rejected() { Interlocked.Increment(ref onErrorInvokeCount); }, - OnEmpty = cancellationToken => + OnAllQueuesEmpty = cancellationToken => { Interlocked.Increment(ref OnEmptyInvokeCount); cts.Cancel(); @@ -397,7 +397,7 @@ public async Task Poison_message_is_moved() { Interlocked.Increment(ref onErrorInvokeCount); }, - OnEmpty = cancellationToken => + OnAllQueuesEmpty = cancellationToken => { Interlocked.Increment(ref OnEmptyInvokeCount); cts.Cancel(); @@ -455,7 +455,7 @@ public async Task Exceptions_in_OnEmpty_are_ignored() { Interlocked.Increment(ref onErrorInvokeCount); }, - OnEmpty = cancellationToken => + OnAllQueuesEmpty = cancellationToken => { Interlocked.Increment(ref OnEmptyInvokeCount); @@ -551,7 +551,7 @@ public async Task Exceptions_in_OnError_are_ignored() Interlocked.Increment(ref onErrorInvokeCount); throw new Exception("This dummy exception should be ignored"); }, - OnEmpty = cancellationToken => + OnAllQueuesEmpty = cancellationToken => { Interlocked.Increment(ref OnEmptyInvokeCount); cts.Cancel(); From 1db134089c9da12450cc0833b647b36a9be09e3b Mon Sep 17 00:00:00 2001 From: jericho Date: Tue, 30 Jan 2024 14:19:42 -0500 Subject: [PATCH 05/15] Allow developers to configure 'FetchCount' Resolves #40 --- Source/Picton.Messaging/AsyncMessagePump.cs | 2 +- Source/Picton.Messaging/MessagePumpOptions.cs | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index 1ff2ca1..438dd73 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -475,7 +475,7 @@ private async Task ProcessMessagesAsync(CancellationToken cancellationToken) try { - messages = await queueInfo.QueueManager.GetMessagesAsync(_messagePumpOptions.ConcurrentTasks, queueInfo.Config.VisibilityTimeout, cancellationToken).ConfigureAwait(false); + messages = await queueInfo.QueueManager.GetMessagesAsync(_messagePumpOptions.FetchCount, queueInfo.Config.VisibilityTimeout, cancellationToken).ConfigureAwait(false); } catch (Exception e) when (e is TaskCanceledException || e is OperationCanceledException) { diff --git a/Source/Picton.Messaging/MessagePumpOptions.cs b/Source/Picton.Messaging/MessagePumpOptions.cs index af6622e..72006d9 100644 --- a/Source/Picton.Messaging/MessagePumpOptions.cs +++ b/Source/Picton.Messaging/MessagePumpOptions.cs @@ -10,6 +10,7 @@ namespace Picton.Messaging public record MessagePumpOptions { private const int _defaultConcurrentTasks = 25; + private const int _defaultFetchCount = 10; private static readonly TimeSpan _defaultFetchMessagesInterval = TimeSpan.FromSeconds(1); private static readonly TimeSpan _defaultCountAzureMessagesInterval = TimeSpan.FromSeconds(5); private static readonly TimeSpan _defaultCountMemoryMessagesInterval = TimeSpan.FromSeconds(5); @@ -54,6 +55,11 @@ public MessagePumpOptions(string connectionString, int? concurrentTasks, QueueCl /// public int ConcurrentTasks { get; set; } = _defaultConcurrentTasks; + /// + /// Gets or sets the number of messages fetched per queue. + /// + public int FetchCount { get; set; } = _defaultFetchCount; + /// /// Gets or sets the optional client options that define the transport /// pipeline policies for authentication, retries, etc., that are applied From dcb6c87ffdd69c2e89aaf26742b5ece6f2649f1c Mon Sep 17 00:00:00 2001 From: jericho Date: Tue, 30 Jan 2024 14:20:19 -0500 Subject: [PATCH 06/15] Optimize integration tests --- Source/Picton.Messaging.IntegrationTests/TestsRunner.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs b/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs index 1a6a780..144f535 100644 --- a/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs +++ b/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs @@ -5,6 +5,7 @@ using System; using System.Diagnostics; using System.IO; +using System.Net; using System.Threading; using System.Threading.Tasks; @@ -23,6 +24,9 @@ private enum ResultCodes public async Task RunAsync() { + ServicePointManager.DefaultConnectionLimit = 1000; + ServicePointManager.UseNagleAlgorithm = false; + // Configure Console var cts = new CancellationTokenSource(); Console.CancelKeyPress += (s, e) => From 92d1dc15c3c3e43cf3ef4d287dc2abb824b33864 Mon Sep 17 00:00:00 2001 From: jericho Date: Tue, 30 Jan 2024 14:28:20 -0500 Subject: [PATCH 07/15] (GH-40) FetchCount --- Source/Picton.Messaging/MessagePumpOptions.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Source/Picton.Messaging/MessagePumpOptions.cs b/Source/Picton.Messaging/MessagePumpOptions.cs index 72006d9..a624474 100644 --- a/Source/Picton.Messaging/MessagePumpOptions.cs +++ b/Source/Picton.Messaging/MessagePumpOptions.cs @@ -28,15 +28,17 @@ public MessagePumpOptions() /// /// The connection string. /// The number of concurrent tasks. In other words: the number of messages that can be processed at a time. + /// The number of mesages fetch from a queue at a time. /// The client options that define the transport pipeline policies for authentication, retries, etc., that are applied to every request to the queue. /// The client options that define the transport pipeline policies for authentication, retries, etc., that are applied to every request to the blob storage. /// The frequency at which messages are fetched from the Azure queues. The default value is 1 second. /// The frequency at which we count how many messages are queue in Azure, waiting to be fetched. Default is 5 seconds. /// the frequency at which we count how many messages have been fetched from Azure and are queued in memory, waiting to be processed. Default is 5 seconds. - public MessagePumpOptions(string connectionString, int? concurrentTasks, QueueClientOptions queueClientOptions = null, BlobClientOptions blobClientOptions = null, TimeSpan? fetchMessagesInterval = null, TimeSpan? countAzureMessagesInterval = null, TimeSpan? countMemoryMessagesInterval = null) + public MessagePumpOptions(string connectionString, int? concurrentTasks = null, int? fetchCount = null, QueueClientOptions queueClientOptions = null, BlobClientOptions blobClientOptions = null, TimeSpan? fetchMessagesInterval = null, TimeSpan? countAzureMessagesInterval = null, TimeSpan? countMemoryMessagesInterval = null) { ConnectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); ConcurrentTasks = concurrentTasks ?? _defaultConcurrentTasks; + FetchCount = fetchCount ?? _defaultFetchCount; QueueClientOptions = queueClientOptions; BlobClientOptions = blobClientOptions; FetchMessagesInterval = fetchMessagesInterval ?? _defaultFetchMessagesInterval; From 18e31ad04aa670d0e9bd1cf31851184d9db29f30 Mon Sep 17 00:00:00 2001 From: jericho Date: Wed, 21 Feb 2024 12:26:46 -0500 Subject: [PATCH 08/15] RecurrentCancellableTask was not intended to be public --- Source/Picton.Messaging/Utilities/RecurrentCancellableTask.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Picton.Messaging/Utilities/RecurrentCancellableTask.cs b/Source/Picton.Messaging/Utilities/RecurrentCancellableTask.cs index 80a1b45..c44970b 100644 --- a/Source/Picton.Messaging/Utilities/RecurrentCancellableTask.cs +++ b/Source/Picton.Messaging/Utilities/RecurrentCancellableTask.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading; using System.Threading.Tasks; @@ -10,7 +10,7 @@ namespace Picton.Messaging.Utilities /// /// From a StackOverflow discussion. /// - public static class RecurrentCancellableTask + internal static class RecurrentCancellableTask { /// /// Starts a new task in a recurrent manner repeating it according to the polling interval. From 6f1386814c477d78feed9358c6673152ea6a6675 Mon Sep 17 00:00:00 2001 From: jericho Date: Wed, 21 Feb 2024 12:39:55 -0500 Subject: [PATCH 09/15] Handle messages asynchronously Resolves #41 --- .../MyMessageHandler.cs | 9 +++++-- Source/Picton.Messaging/AsyncMessagePump.cs | 4 ++-- .../AsyncMessagePumpWithHandlers.cs | 24 +------------------ ...AsyncMultiTenantMessagePumpWithHandlers.cs | 2 +- .../Messages/IMessageHandler.cs | 11 ++++++--- 5 files changed, 19 insertions(+), 31 deletions(-) diff --git a/Source/Picton.Messaging.IntegrationTests/MyMessageHandler.cs b/Source/Picton.Messaging.IntegrationTests/MyMessageHandler.cs index bb3542a..8abc5a8 100644 --- a/Source/Picton.Messaging.IntegrationTests/MyMessageHandler.cs +++ b/Source/Picton.Messaging.IntegrationTests/MyMessageHandler.cs @@ -1,5 +1,7 @@ using Microsoft.Extensions.Logging; using Picton.Messaging.Messages; +using System.Threading; +using System.Threading.Tasks; namespace Picton.Messaging.IntegrationTests { @@ -11,9 +13,12 @@ public MyMessageHandler(ILogger log) { _log = log; } - public void Handle(MyMessage message) + + public Task HandleAsync(MyMessage message, CancellationToken cancellationToken) { - _log.LogInformation(message.MessageContent); + _log?.LogInformation(message.MessageContent); + + return Task.CompletedTask; } } } diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index 438dd73..6c56528 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -255,7 +255,7 @@ private async Task ProcessMessagesAsync(CancellationToken cancellationToken) var channel = Channel.CreateUnbounded<(string QueueName, CloudMessage Message)>(channelOptions); var channelCompleted = false; - // Define the task that fetches messages from the Azure queue + // Define the task that fetches messages from Azure RecurrentCancellableTask.StartNew( async () => { @@ -271,7 +271,7 @@ private async Task ProcessMessagesAsync(CancellationToken cancellationToken) } // Mark the channel as "complete" which means that no more messages will be written to it - else if (!channelCompleted) + else if (cancellationToken.IsCancellationRequested && !channelCompleted) { channelCompleted = channel.Writer.TryComplete(); } diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index 8c738a2..c798eaa 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -150,7 +150,7 @@ public Task StartAsync(CancellationToken cancellationToken) _messagePump.OnQueueEmpty = OnQueueEmpty; _messagePump.OnAllQueuesEmpty = OnAllQueuesEmpty; _messagePump.OnError = OnError; - _messagePump.OnMessage = (queueName, message, cancellationToken) => + _messagePump.OnMessage = async (queueName, message, cancellationToken) => { var contentType = message.Content.GetType(); @@ -189,28 +189,6 @@ internal void AddQueue(QueueManager queueManager, QueueManager poisonQueueManage _messagePump.AddQueue(queueManager, poisonQueueManager, visibilityTimeout, maxDequeueCount); } - private void ValidateOptions(MessagePumpOptions options) - { - if (options == null) throw new ArgumentNullException(nameof(options)); - if (string.IsNullOrEmpty(options.ConnectionString)) throw new ArgumentNullException(nameof(options.ConnectionString)); - if (options.ConcurrentTasks < 1) throw new ArgumentOutOfRangeException(nameof(options.ConcurrentTasks), "Number of concurrent tasks must be greather than zero"); - } - - private void ValidateQueueConfig(QueueConfig queueConfig) - { - if (queueConfig == null) throw new ArgumentNullException(nameof(queueConfig)); - if (string.IsNullOrEmpty(queueConfig.QueueName)) throw new ArgumentNullException(nameof(queueConfig.QueueName)); - if (queueConfig.MaxDequeueCount < 1) throw new ArgumentOutOfRangeException(nameof(queueConfig.MaxDequeueCount), $"Number of retries for {queueConfig.QueueName} must be greater than zero."); - } - - private void ValidateQueueConfigs(IEnumerable queueConfigs) - { - foreach (var queueConfig in queueConfigs) - { - ValidateQueueConfig(queueConfig); - } - } - #endregion } } diff --git a/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs index 1b7a41a..d17ac71 100644 --- a/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs @@ -105,7 +105,7 @@ public Task StartAsync(CancellationToken cancellationToken) _messagePump.OnQueueEmpty = (queueName, cancellationToken) => OnQueueEmpty?.Invoke(queueName.TrimStart(_queueNamePrefix), cancellationToken); _messagePump.OnAllQueuesEmpty = OnAllQueuesEmpty; _messagePump.OnError = (queueName, message, exception, isPoison) => OnError?.Invoke(queueName.TrimStart(_queueNamePrefix), message, exception, isPoison); - _messagePump.OnMessage = (queueName, message, cancellationToken) => + _messagePump.OnMessage = async (queueName, message, cancellationToken) => { var contentType = message.Content.GetType(); diff --git a/Source/Picton.Messaging/Messages/IMessageHandler.cs b/Source/Picton.Messaging/Messages/IMessageHandler.cs index 4cd8d44..1ff1f07 100644 --- a/Source/Picton.Messaging/Messages/IMessageHandler.cs +++ b/Source/Picton.Messaging/Messages/IMessageHandler.cs @@ -1,4 +1,7 @@ -namespace Picton.Messaging.Messages +using System.Threading; +using System.Threading.Tasks; + +namespace Picton.Messaging.Messages { /// /// Message handler interface. @@ -8,9 +11,11 @@ public interface IMessageHandler where T : IMessage { /// - /// Handles the specified message. + /// Handles the specified message asynchronously. /// /// The message. - void Handle(T message); + /// The cancellation token. + /// The async task. + Task HandleAsync(T message, CancellationToken cancellationToken); } } From 4248e861a0518a307bf2ebf45bee275394f76e43 Mon Sep 17 00:00:00 2001 From: jericho Date: Wed, 21 Feb 2024 12:43:44 -0500 Subject: [PATCH 10/15] Fix logger type --- Source/Picton.Messaging.IntegrationTests/TestsRunner.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs b/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs index 144f535..241c3f6 100644 --- a/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs +++ b/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs @@ -20,7 +20,7 @@ private enum ResultCodes Cancelled = 1223 } - private readonly ILogger _logger = logger; + private readonly ILogger _logger = logger; public async Task RunAsync() { From 29e22c150baf0a0f3d7d0f934a4cf50bdbd04a37 Mon Sep 17 00:00:00 2001 From: jericho Date: Wed, 21 Feb 2024 12:46:45 -0500 Subject: [PATCH 11/15] Use DI to instantiate message handlers Resolves #42 --- .../MyMessageHandler.cs | 4 +- .../TestsRunner.cs | 5 +- .../AsyncMessagePumpWithHandlers.cs | 45 ++++++--------- ...AsyncMultiTenantMessagePumpWithHandlers.cs | 46 ++++++--------- .../Picton.Messaging/Picton.Messaging.csproj | 1 + ...rsDiscoverer.cs => CloudMessageHandler.cs} | 57 +++++++++++++++++-- 6 files changed, 93 insertions(+), 65 deletions(-) rename Source/Picton.Messaging/Utilities/{MessageHandlersDiscoverer.cs => CloudMessageHandler.cs} (55%) diff --git a/Source/Picton.Messaging.IntegrationTests/MyMessageHandler.cs b/Source/Picton.Messaging.IntegrationTests/MyMessageHandler.cs index 8abc5a8..395e64f 100644 --- a/Source/Picton.Messaging.IntegrationTests/MyMessageHandler.cs +++ b/Source/Picton.Messaging.IntegrationTests/MyMessageHandler.cs @@ -7,9 +7,9 @@ namespace Picton.Messaging.IntegrationTests { public class MyMessageHandler : IMessageHandler { - private readonly ILogger _log; + private readonly ILogger _log; - public MyMessageHandler(ILogger log) + public MyMessageHandler(ILogger log) { _log = log; } diff --git a/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs b/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs index 241c3f6..acc86bb 100644 --- a/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs +++ b/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs @@ -11,7 +11,7 @@ namespace Picton.Messaging.IntegrationTests { - internal class TestsRunner(ILogger logger) + internal class TestsRunner(ILogger logger, IServiceProvider serviceProvider) { private enum ResultCodes { @@ -21,6 +21,7 @@ private enum ResultCodes } private readonly ILogger _logger = logger; + private readonly IServiceProvider _serviceProvider = serviceProvider; public async Task RunAsync() { @@ -157,7 +158,7 @@ private async Task RunAsyncMessagePumpWithHandlersTests(string connectionString, Stopwatch sw = null; var cts = new CancellationTokenSource(); var options = new MessagePumpOptions(connectionString, concurrentTasks, null, null); - var messagePump = new AsyncMessagePumpWithHandlers(options, _logger, metrics) + var messagePump = new AsyncMessagePumpWithHandlers(options, _serviceProvider, _logger, metrics) { // Stop the message pump when there are no more messages to process. OnAllQueuesEmpty = cancellationToken => diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index c798eaa..011b666 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -3,7 +3,6 @@ using Picton.Managers; using Picton.Messaging.Utilities; using System; -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -18,10 +17,9 @@ public class AsyncMessagePumpWithHandlers { #region FIELDS - private static IDictionary _messageHandlers; - - private readonly AsyncMessagePump _messagePump; private readonly ILogger _logger; + private readonly CloudMessageHandler _cloudMessageHandler; + private readonly AsyncMessagePump _messagePump; #endregion @@ -77,10 +75,22 @@ public class AsyncMessagePumpWithHandlers /// The logger. /// The system where metrics are published. public AsyncMessagePumpWithHandlers(MessagePumpOptions options, ILogger logger = null, IMetrics metrics = null) + : this(options, null, logger, metrics) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// Options for the mesage pump. + /// DI. + /// The logger. + /// The system where metrics are published. + public AsyncMessagePumpWithHandlers(MessagePumpOptions options, IServiceProvider serviceProvider, ILogger logger = null, IMetrics metrics = null) { - _messageHandlers = MessageHandlersDiscoverer.GetMessageHandlers(logger); - _messagePump = new AsyncMessagePump(options, logger, metrics); _logger = logger; + _cloudMessageHandler = new CloudMessageHandler(serviceProvider); + _messagePump = new AsyncMessagePump(options, logger, metrics); } #endregion @@ -152,28 +162,7 @@ public Task StartAsync(CancellationToken cancellationToken) _messagePump.OnError = OnError; _messagePump.OnMessage = async (queueName, message, cancellationToken) => { - var contentType = message.Content.GetType(); - - if (!_messageHandlers.TryGetValue(contentType, out Type[] handlers)) - { - throw new Exception($"Received a message of type {contentType.FullName} but could not find a class implementing IMessageHandler<{contentType.FullName}>"); - } - - foreach (var handlerType in handlers) - { - object handler = null; - if (handlerType.GetConstructor([typeof(ILogger)]) != null) - { - handler = Activator.CreateInstance(handlerType, [(object)_logger]); - } - else - { - handler = Activator.CreateInstance(handlerType); - } - - var handlerMethod = handlerType.GetMethod("Handle", [contentType]); - handlerMethod.Invoke(handler, [message.Content]); - } + await _cloudMessageHandler.HandleMessageAsync(message, cancellationToken).ConfigureAwait(false); }; return _messagePump.StartAsync(cancellationToken); diff --git a/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs index d17ac71..03c996d 100644 --- a/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs @@ -2,7 +2,6 @@ using Microsoft.Extensions.Logging; using Picton.Messaging.Utilities; using System; -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -16,11 +15,9 @@ public class AsyncMultiTenantMessagePumpWithHandlers { #region FIELDS - private static IDictionary _messageHandlers; - private readonly string _queueNamePrefix; private readonly ILogger _logger; - + private readonly CloudMessageHandler _cloudMessageHandler; private readonly AsyncMultiTenantMessagePump _messagePump; #endregion @@ -81,12 +78,26 @@ public class AsyncMultiTenantMessagePumpWithHandlers /// The logger. /// The system where metrics are published. public AsyncMultiTenantMessagePumpWithHandlers(MessagePumpOptions options, string queueNamePrefix, TimeSpan? discoverQueuesInterval = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, ILogger logger = null, IMetrics metrics = null) + : this(options, null, queueNamePrefix, discoverQueuesInterval, visibilityTimeout, maxDequeueCount, logger, metrics) { - _messageHandlers = MessageHandlersDiscoverer.GetMessageHandlers(logger); + } + /// + /// Initializes a new instance of the class. + /// + /// Options for the mesage pump. + /// DI. + /// The common prefix in the naming convention. + /// The frequency we check for queues in the Azure storage account matching the naming convention. Default is 30 seconds. + /// The visibility timeout. + /// The maximum dequeue count. + /// The logger. + /// The system where metrics are published. + public AsyncMultiTenantMessagePumpWithHandlers(MessagePumpOptions options, IServiceProvider serviceProvider, string queueNamePrefix, TimeSpan? discoverQueuesInterval = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, ILogger logger = null, IMetrics metrics = null) + { _queueNamePrefix = queueNamePrefix; _logger = logger; - + _cloudMessageHandler = new CloudMessageHandler(serviceProvider); _messagePump = new AsyncMultiTenantMessagePump(options, queueNamePrefix, discoverQueuesInterval, visibilityTimeout, maxDequeueCount, logger, metrics); } @@ -107,28 +118,7 @@ public Task StartAsync(CancellationToken cancellationToken) _messagePump.OnError = (queueName, message, exception, isPoison) => OnError?.Invoke(queueName.TrimStart(_queueNamePrefix), message, exception, isPoison); _messagePump.OnMessage = async (queueName, message, cancellationToken) => { - var contentType = message.Content.GetType(); - - if (!_messageHandlers.TryGetValue(contentType, out Type[] handlers)) - { - throw new Exception($"Received a message of type {contentType.FullName} but could not find a class implementing IMessageHandler<{contentType.FullName}>"); - } - - foreach (var handlerType in handlers) - { - object handler = null; - if (handlerType.GetConstructor([typeof(ILogger)]) != null) - { - handler = Activator.CreateInstance(handlerType, [(object)_logger]); - } - else - { - handler = Activator.CreateInstance(handlerType); - } - - var handlerMethod = handlerType.GetMethod("Handle", [contentType]); - handlerMethod.Invoke(handler, [message.Content]); - } + await _cloudMessageHandler.HandleMessageAsync(message, cancellationToken).ConfigureAwait(false); }; return _messagePump.StartAsync(cancellationToken); diff --git a/Source/Picton.Messaging/Picton.Messaging.csproj b/Source/Picton.Messaging/Picton.Messaging.csproj index cf46aa4..9bdd637 100644 --- a/Source/Picton.Messaging/Picton.Messaging.csproj +++ b/Source/Picton.Messaging/Picton.Messaging.csproj @@ -37,6 +37,7 @@ + diff --git a/Source/Picton.Messaging/Utilities/MessageHandlersDiscoverer.cs b/Source/Picton.Messaging/Utilities/CloudMessageHandler.cs similarity index 55% rename from Source/Picton.Messaging/Utilities/MessageHandlersDiscoverer.cs rename to Source/Picton.Messaging/Utilities/CloudMessageHandler.cs index 6c30b0c..8d40423 100644 --- a/Source/Picton.Messaging/Utilities/MessageHandlersDiscoverer.cs +++ b/Source/Picton.Messaging/Utilities/CloudMessageHandler.cs @@ -1,16 +1,62 @@ +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyModel; using Microsoft.Extensions.Logging; using Picton.Messaging.Messages; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; +using System.Threading; +using System.Threading.Tasks; namespace Picton.Messaging.Utilities { - internal static class MessageHandlersDiscoverer + internal class CloudMessageHandler { - public static IDictionary GetMessageHandlers(ILogger logger) + private static ConcurrentDictionary _messageHandlerTypes = GetMessageHandlers(null); + private static ConcurrentDictionary> _messageHandlerInstances = new(); + + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + + public CloudMessageHandler(IServiceProvider serviceProvider = null) + { + _serviceProvider = serviceProvider; + } + + public async Task HandleMessageAsync(CloudMessage message, CancellationToken cancellationToken) + { + var contentType = message.Content.GetType(); + + if (!_messageHandlerTypes.TryGetValue(contentType, out Type[] handlerTypes)) + { + throw new Exception($"Received a message of type {contentType.FullName} but could not find a class implementing IMessageHandler<{contentType.FullName}>"); + } + + foreach (var handlerType in handlerTypes) + { + // Get the message handler from cache or instantiate a new one + var handler = _messageHandlerInstances.GetOrAdd(handlerType, type => new Lazy(() => + { + // Let the DI service provider resolve the dependencies expected by the type constructor OR + // invoke the parameterless constructor when the DI service provider was not provided + var newHandlerInstance = _serviceProvider != null ? + ActivatorUtilities.CreateInstance(_serviceProvider, type) : + Activator.CreateInstance(type); + + // Return the newly created instance + return newHandlerInstance; + })).Value; + + // Invoke the "HandleAsync" method asynchronously + var handlerMethod = handlerType.GetMethod("HandleAsync", [contentType, typeof(CancellationToken)]); + var result = (Task)handlerMethod.Invoke(handler, [message.Content, cancellationToken]); + await result.ConfigureAwait(false); + } + } + + private static ConcurrentDictionary GetMessageHandlers(ILogger logger) { logger?.LogTrace("Discovering message handlers."); @@ -52,10 +98,11 @@ public static IDictionary GetMessageHandlers(ILogger logger) var messageHandlers = oneTypePerMessageHandler .GroupBy(h => h.MessageType) - .ToDictionary(group => group.Key, group => group.Select(t => t.Type) - .ToArray()); + .ToDictionary( + group => group.Key, + group => group.Select(t => t.Type).ToArray()); - return messageHandlers; + return new ConcurrentDictionary(messageHandlers); } private static Assembly[] GetLocalAssemblies() From 39b8d34757ad2e2d317acaf6cdb81efe0ddf445d Mon Sep 17 00:00:00 2001 From: jericho Date: Wed, 21 Feb 2024 12:46:58 -0500 Subject: [PATCH 12/15] Refresh resource files --- build.cake | 4 ++-- global.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/build.cake b/build.cake index d021908..8b96958 100644 --- a/build.cake +++ b/build.cake @@ -2,8 +2,8 @@ #tool dotnet:?package=GitVersion.Tool&version=5.12.0 #tool dotnet:?package=coveralls.net&version=4.0.1 #tool nuget:https://f.feedz.io/jericho/jericho/nuget/?package=GitReleaseManager&version=0.17.0-collaborators0003 -#tool nuget:?package=ReportGenerator&version=5.2.0 -#tool nuget:?package=xunit.runner.console&version=2.6.6 +#tool nuget:?package=ReportGenerator&version=5.2.1 +#tool nuget:?package=xunit.runner.console&version=2.7.0 #tool nuget:?package=CodecovUploader&version=0.7.1 // Install addins. diff --git a/global.json b/global.json index f43378f..667094a 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "8.0.101", + "version": "8.0.201", "rollForward": "patch", "allowPrerelease": false } From 77de1dd5f1d7cdcff10bbc24b6167c4e28829ad9 Mon Sep 17 00:00:00 2001 From: jericho Date: Wed, 21 Feb 2024 13:08:35 -0500 Subject: [PATCH 13/15] Improve the README.md --- README.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5f4e731..55217e1 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ ## About -Picton.Messaging is a C# library containing a high performance message processor (also known as a message "pump") designed to process messages from an Azure storage queue as efficiently as possible. +Picton.Messaging is a C# library containing a high performance message processor (also known as a message "pump") designed to process messages from Azure storage queues as efficiently as possible. I created Picton.Mesaging because I needed a way to process a large volume of messages from Azure storage queues as quickly and efficiently as possible. I searched for a long time, but I could never find a solution that met all my requirements. @@ -27,6 +27,8 @@ The sample code that Daniel shared during his webinars was very generic and not In December 2017 version 2.0 was released with a much more efficient method of fetching messages from the Azure queue: there is now a dedicated task for this pupose instead of allowing each individual concurent task to fetch their own messages. This means that the logic to increase/decrease the number of available slots in the SemaphoreSlim is no longer necessary and has ben removed. +In January 2024 version 9.0 was released with two major new features: the message pump is now able to monitor multiple queues and also a specialized version of the message pump was added to monitor queues that follow a naming convention. Additionaly, this specialized message pump queries the Azure storage at regular interval to detect if new queues have been created. This is, in my opinion, an ideal solution when you have a multi-tenant solution with one queue for each tenant. + ## Nuget @@ -66,7 +68,7 @@ namespace WorkerRole1 try { - this.RunAsync(this.cancellationTokenSource.Token).Wait(); + this.RunAsync(this.cancellationTokenSource.Token).Wait(); // <-- The cancellation token is important because it will be used to stop the message pump } finally { @@ -153,8 +155,11 @@ namespace WorkerRole1 messagePump.AddQueue("queue06", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob"); messagePump.AddQueue("queue07", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob"); + // You can add queues that match a given RegEx pattern + await messagePump.AddQueuesByPatternAsync("myqueue*", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob", cancellationToken).ConfigureAwait(false); + // Start the message pump - await messagePump.StartAsync(cancellationToken); + await messagePump.StartAsync(cancellationToken); // <-- Specifying the cancellation token is particularly important because that's how you later communicate to the message pump that you want it to stop } } } From 70c8fa008f04af3c3e45bd4f2bd5fd01369c97a7 Mon Sep 17 00:00:00 2001 From: jericho Date: Fri, 23 Feb 2024 11:10:20 -0500 Subject: [PATCH 14/15] (GH-42) The message handlers should be registered with the DI service --- .../Program.cs | 7 + .../TestsRunner.cs | 12 +- .../ExtensionsTests.cs | 2 +- .../AsyncMessagePumpWithHandlers.cs | 11 -- ...AsyncMultiTenantMessagePumpWithHandlers.cs | 15 --- .../{Extensions.cs => Extensions/Internal.cs} | 8 +- Source/Picton.Messaging/Extensions/Public.cs | 77 +++++++++++ .../Utilities/CloudMessageHandler.cs | 122 ++---------------- 8 files changed, 105 insertions(+), 149 deletions(-) rename Source/Picton.Messaging/{Extensions.cs => Extensions/Internal.cs} (95%) create mode 100644 Source/Picton.Messaging/Extensions/Public.cs diff --git a/Source/Picton.Messaging.IntegrationTests/Program.cs b/Source/Picton.Messaging.IntegrationTests/Program.cs index 2967d64..51e8e0f 100644 --- a/Source/Picton.Messaging.IntegrationTests/Program.cs +++ b/Source/Picton.Messaging.IntegrationTests/Program.cs @@ -23,6 +23,13 @@ private static void ConfigureServices(ServiceCollection services) { services .AddLogging(loggingBuilder => loggingBuilder.AddNLog(GetNLogConfiguration())) + + // You can either register each message handler by hand like this example: + //.AddSingleton, MyMessageHandler>() + + // Or you can allow Picton.Messaging to scan your assemblies and to register all message handlers like this: + .AddPictonMessageHandlers() + .AddTransient(); } diff --git a/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs b/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs index acc86bb..71b642b 100644 --- a/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs +++ b/Source/Picton.Messaging.IntegrationTests/TestsRunner.cs @@ -11,7 +11,7 @@ namespace Picton.Messaging.IntegrationTests { - internal class TestsRunner(ILogger logger, IServiceProvider serviceProvider) + internal class TestsRunner { private enum ResultCodes { @@ -20,8 +20,14 @@ private enum ResultCodes Cancelled = 1223 } - private readonly ILogger _logger = logger; - private readonly IServiceProvider _serviceProvider = serviceProvider; + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + + public TestsRunner(ILogger logger, IServiceProvider serviceProvider) + { + _logger = logger; + _serviceProvider = serviceProvider; + } public async Task RunAsync() { diff --git a/Source/Picton.Messaging.UnitTests/ExtensionsTests.cs b/Source/Picton.Messaging.UnitTests/ExtensionsTests.cs index 97a3ef2..475b322 100644 --- a/Source/Picton.Messaging.UnitTests/ExtensionsTests.cs +++ b/Source/Picton.Messaging.UnitTests/ExtensionsTests.cs @@ -1,4 +1,4 @@ -using Shouldly; +using Shouldly; using System; using Xunit; diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index 011b666..cfd16a0 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -68,17 +68,6 @@ public class AsyncMessagePumpWithHandlers #region CONSTRUCTORS - /// - /// Initializes a new instance of the class. - /// - /// Options for the mesage pump. - /// The logger. - /// The system where metrics are published. - public AsyncMessagePumpWithHandlers(MessagePumpOptions options, ILogger logger = null, IMetrics metrics = null) - : this(options, null, logger, metrics) - { - } - /// /// Initializes a new instance of the class. /// diff --git a/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs index 03c996d..ba036b8 100644 --- a/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs @@ -67,21 +67,6 @@ public class AsyncMultiTenantMessagePumpWithHandlers #region CONSTRUCTOR - /// - /// Initializes a new instance of the class. - /// - /// Options for the mesage pump. - /// The common prefix in the naming convention. - /// The frequency we check for queues in the Azure storage account matching the naming convention. Default is 30 seconds. - /// The visibility timeout. - /// The maximum dequeue count. - /// The logger. - /// The system where metrics are published. - public AsyncMultiTenantMessagePumpWithHandlers(MessagePumpOptions options, string queueNamePrefix, TimeSpan? discoverQueuesInterval = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, ILogger logger = null, IMetrics metrics = null) - : this(options, null, queueNamePrefix, discoverQueuesInterval, visibilityTimeout, maxDequeueCount, logger, metrics) - { - } - /// /// Initializes a new instance of the class. /// diff --git a/Source/Picton.Messaging/Extensions.cs b/Source/Picton.Messaging/Extensions/Internal.cs similarity index 95% rename from Source/Picton.Messaging/Extensions.cs rename to Source/Picton.Messaging/Extensions/Internal.cs index 20d99c8..71947fb 100644 --- a/Source/Picton.Messaging/Extensions.cs +++ b/Source/Picton.Messaging/Extensions/Internal.cs @@ -5,12 +5,10 @@ namespace Picton.Messaging { /// - /// Extension methods. + /// Internal extension methods. /// - internal static class Extensions + internal static class Internal { - #region PUBLIC EXTENSION METHODS - /// /// The purpose of this extension method is to avoid a Visual Studio warning about async calls that are not awaited. /// @@ -68,7 +66,5 @@ public static string ToDurationString(this TimeSpan timeSpan) return result.ToString().Trim(); } - - #endregion } } diff --git a/Source/Picton.Messaging/Extensions/Public.cs b/Source/Picton.Messaging/Extensions/Public.cs new file mode 100644 index 0000000..75b1b3e --- /dev/null +++ b/Source/Picton.Messaging/Extensions/Public.cs @@ -0,0 +1,77 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyModel; +using Picton.Messaging.Messages; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; + +namespace Picton.Messaging +{ + /// + /// Public extension methods. + /// + public static class Public + { + public static IServiceCollection AddPictonMessageHandlers(this IServiceCollection services) + { + if (services == null) + { + throw new ArgumentNullException("services"); + } + + var assemblies = GetLocalAssemblies(); + + var typesWithMessageHandlerInterfaces = assemblies + .SelectMany(x => x.GetTypes()) + .Where(t => !t.GetTypeInfo().IsInterface) + .Select(type => new + { + HandlerType = type, + InterfaceTypes = type + .GetInterfaces() + .Where(i => i.GetTypeInfo().IsGenericType) + .Where(i => i.GetGenericTypeDefinition() == typeof(IMessageHandler<>)) + .ToArray() + }) + .Where(t => t.InterfaceTypes != null && t.InterfaceTypes.Any()) + .ToArray(); + + foreach (var handlerType in typesWithMessageHandlerInterfaces) + { + foreach (var interfaceType in handlerType.InterfaceTypes) + { + services.AddSingleton(interfaceType, handlerType.HandlerType); + } + } + + return services; + } + + private static Assembly[] GetLocalAssemblies() + { + var dependencies = DependencyContext.Default.RuntimeLibraries; + + var assemblies = new List(); + foreach (var library in dependencies) + { + if (IsCandidateLibrary(library)) + { + var assembly = Assembly.Load(new AssemblyName(library.Name)); + assemblies.Add(assembly); + } + } + + return [.. assemblies]; + } + + private static bool IsCandidateLibrary(RuntimeLibrary library) + { + return !library.Name.StartsWith("Microsoft.", StringComparison.OrdinalIgnoreCase) && + !library.Name.StartsWith("System.", StringComparison.OrdinalIgnoreCase) && + !library.Name.StartsWith("NetStandard.", StringComparison.OrdinalIgnoreCase) && + !string.Equals(library.Type, "package", StringComparison.OrdinalIgnoreCase) && + !string.Equals(library.Type, "referenceassembly", StringComparison.OrdinalIgnoreCase); + } + } +} diff --git a/Source/Picton.Messaging/Utilities/CloudMessageHandler.cs b/Source/Picton.Messaging/Utilities/CloudMessageHandler.cs index 8d40423..8d52e63 100644 --- a/Source/Picton.Messaging/Utilities/CloudMessageHandler.cs +++ b/Source/Picton.Messaging/Utilities/CloudMessageHandler.cs @@ -1,12 +1,5 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyModel; -using Microsoft.Extensions.Logging; using Picton.Messaging.Messages; using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; using System.Threading; using System.Threading.Tasks; @@ -14,121 +7,24 @@ namespace Picton.Messaging.Utilities { internal class CloudMessageHandler { - private static ConcurrentDictionary _messageHandlerTypes = GetMessageHandlers(null); - private static ConcurrentDictionary> _messageHandlerInstances = new(); - private readonly IServiceProvider _serviceProvider; - private readonly ILogger _logger; - public CloudMessageHandler(IServiceProvider serviceProvider = null) + public CloudMessageHandler(IServiceProvider serviceProvider) { - _serviceProvider = serviceProvider; + _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); } public async Task HandleMessageAsync(CloudMessage message, CancellationToken cancellationToken) { + // Get the message handler from the DI service provider var contentType = message.Content.GetType(); + var handlerType = typeof(IMessageHandler<>).MakeGenericType([contentType]); + var handler = _serviceProvider.GetService(handlerType); - if (!_messageHandlerTypes.TryGetValue(contentType, out Type[] handlerTypes)) - { - throw new Exception($"Received a message of type {contentType.FullName} but could not find a class implementing IMessageHandler<{contentType.FullName}>"); - } - - foreach (var handlerType in handlerTypes) - { - // Get the message handler from cache or instantiate a new one - var handler = _messageHandlerInstances.GetOrAdd(handlerType, type => new Lazy(() => - { - // Let the DI service provider resolve the dependencies expected by the type constructor OR - // invoke the parameterless constructor when the DI service provider was not provided - var newHandlerInstance = _serviceProvider != null ? - ActivatorUtilities.CreateInstance(_serviceProvider, type) : - Activator.CreateInstance(type); - - // Return the newly created instance - return newHandlerInstance; - })).Value; - - // Invoke the "HandleAsync" method asynchronously - var handlerMethod = handlerType.GetMethod("HandleAsync", [contentType, typeof(CancellationToken)]); - var result = (Task)handlerMethod.Invoke(handler, [message.Content, cancellationToken]); - await result.ConfigureAwait(false); - } - } - - private static ConcurrentDictionary GetMessageHandlers(ILogger logger) - { - logger?.LogTrace("Discovering message handlers."); - - var assemblies = GetLocalAssemblies(); - - var assembliesCount = assemblies.Length; - if (assembliesCount == 0) logger?.LogTrace($"Did not find any local assembly."); - else if (assembliesCount == 1) logger?.LogTrace("Found 1 local assembly."); - else logger?.LogTrace($"Found {assemblies.Count()} local assemblies."); - - var typesWithMessageHandlerInterfaces = assemblies - .SelectMany(x => x.GetTypes()) - .Where(t => !t.GetTypeInfo().IsInterface) - .Select(type => new - { - Type = type, - MessageTypes = type - .GetInterfaces() - .Where(i => i.GetTypeInfo().IsGenericType) - .Where(i => i.GetGenericTypeDefinition() == typeof(IMessageHandler<>)) - .SelectMany(i => i.GetGenericArguments()) - }) - .Where(t => t.MessageTypes != null && t.MessageTypes.Any()) - .ToArray(); - - var classesCount = typesWithMessageHandlerInterfaces.Length; - if (classesCount == 0) logger?.LogTrace("Did not find any class implementing the 'IMessageHandler' interface."); - else if (classesCount == 1) logger?.LogTrace("Found 1 class implementing the 'IMessageHandler' interface."); - else logger?.LogTrace($"Found {typesWithMessageHandlerInterfaces.Count()} classes implementing the 'IMessageHandler' interface."); - - var oneTypePerMessageHandler = typesWithMessageHandlerInterfaces - .SelectMany(t => t.MessageTypes, (t, messageType) => - new - { - t.Type, - MessageType = messageType - }) - .ToArray(); - - var messageHandlers = oneTypePerMessageHandler - .GroupBy(h => h.MessageType) - .ToDictionary( - group => group.Key, - group => group.Select(t => t.Type).ToArray()); - - return new ConcurrentDictionary(messageHandlers); - } - - private static Assembly[] GetLocalAssemblies() - { - var dependencies = DependencyContext.Default.RuntimeLibraries; - - var assemblies = new List(); - foreach (var library in dependencies) - { - if (IsCandidateLibrary(library)) - { - var assembly = Assembly.Load(new AssemblyName(library.Name)); - assemblies.Add(assembly); - } - } - - return [.. assemblies]; - } - - private static bool IsCandidateLibrary(RuntimeLibrary library) - { - return !library.Name.StartsWith("Microsoft.", StringComparison.OrdinalIgnoreCase) && - !library.Name.StartsWith("System.", StringComparison.OrdinalIgnoreCase) && - !library.Name.StartsWith("NetStandard.", StringComparison.OrdinalIgnoreCase) && - !string.Equals(library.Type, "package", StringComparison.OrdinalIgnoreCase) && - !string.Equals(library.Type, "referenceassembly", StringComparison.OrdinalIgnoreCase); + // Invoke the "HandleAsync" method asynchronously + var handlerMethod = handlerType.GetMethod("HandleAsync", [contentType, typeof(CancellationToken)]); + var result = (Task)handlerMethod.Invoke(handler, [message.Content, cancellationToken]); + await result.ConfigureAwait(false); } } } From 384db202f4f32d2f1bc95475ad107a5b514fbfc3 Mon Sep 17 00:00:00 2001 From: jericho Date: Fri, 23 Feb 2024 20:18:44 -0500 Subject: [PATCH 15/15] (doc) Better examples --- README.md | 154 +++++++++++------- .../Program.cs | 6 - 2 files changed, 98 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 55217e1..8b9165d 100644 --- a/README.md +++ b/README.md @@ -45,71 +45,41 @@ The easiest way to include Picton.Messaging in your C# project is by grabing the PM> Install-Package Picton.Messaging ``` -Once you have the Picton.Messaging library properly referenced in your project, modify your RoleEntryPoint like this example: +## How to use -```csharp -using Microsoft.WindowsAzure.ServiceRuntime; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.RetryPolicies; -using Picton.Messaging; -using System; -using System.Diagnostics; - -namespace WorkerRole1 -{ - public class MyWorkerRole : RoleEntryPoint - { - private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); - private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false); - - public override void Run() - { - Trace.TraceInformation("WorkerRole is running"); +Once you have the Picton.Messaging library properly referenced in your project, you need to following two CSharp files: - try - { - this.RunAsync(this.cancellationTokenSource.Token).Wait(); // <-- The cancellation token is important because it will be used to stop the message pump - } - finally - { - this.runCompleteEvent.Set(); - } - } +### Program.cs - public override bool OnStart() - { - // Use TLS 1.2 for Service Bus connections - ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12; +```csharp +using WorkerService1; - // Set the maximum number of concurrent connections - ServicePointManager.DefaultConnectionLimit = 12; +var builder = Host.CreateApplicationBuilder(args); +builder.Services.AddHostedService(); - // For information on handling configuration changes - // see the MSDN topic at https://go.microsoft.com/fwlink/?LinkId=166357. +var host = builder.Build(); +host.Run(); +``` - bool result = base.OnStart(); +### Worker.cs - Trace.TraceInformation("WorkerRole has been started"); +```csharp +using Picton.Messaging; - return result; - } +namespace WorkerService1 +{ + public class Worker : BackgroundService + { + private readonly ILogger _logger; - public override void OnStop() + public Worker(ILogger logger) { - Trace.TraceInformation("WorkerRole is stopping"); - - // Invoking "Cancel()" will cause the AsyncMessagePump to stop - this.cancellationTokenSource.Cancel(); - this.runCompleteEvent.WaitOne(); - - base.OnStop(); - - Trace.TraceInformation("WorkerRole has stopped"); + _logger = logger; } - - private async Task RunAsync(CancellationToken cancellationToken) + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - var connectionString = "<-- insert connection string for your Azure account -->"; + var connectionString = "<-- connection string for your Azure storage account -->"; var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time // Configure the message pump @@ -155,11 +125,83 @@ namespace WorkerRole1 messagePump.AddQueue("queue06", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob"); messagePump.AddQueue("queue07", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob"); - // You can add queues that match a given RegEx pattern + // You can add all queues matching a given RegEx pattern await messagePump.AddQueuesByPatternAsync("myqueue*", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob", cancellationToken).ConfigureAwait(false); - // Start the message pump - await messagePump.StartAsync(cancellationToken); // <-- Specifying the cancellation token is particularly important because that's how you later communicate to the message pump that you want it to stop + // Start the mesage pump (the token is particularly important because that's how the message pump will be notified when the worker is stopping) + await messagePump.StartAsync(stoppingToken).ConfigureAwait(false); + } + } +} +``` + +## Message handlers + +As demonstrated in the previous code sample, you can define your own `OnMessage` delegate which gets invoked by the message pump when each message is processed. This is perfect for simple scenarios where your C# logic is rather simple. However, your C# code can become complicated pretty quickly as the complexity of your logic increases. + +The Picton.Messaging library includes a more advanced message pump that can simplify this situation for you: `AsyncMessagePumpWithHandlers`. You C# project must define so-called "handlers" for each of your message types. These handlers are simply c# classes that implement the `IMessageHandler` interface, where `T` is the type of the message. For example, if you expect to process messages of type `MyMessage`, you must define a class with signature similar to this: `public class MyMessageHandler : IMessageHandler`. + +Once you have created all the handlers you need, you must register them with your solution's DI service collection like in this example: + +### Program.cs + +```csharp +using Picton.Messaging; +using WorkerService1; + +var builder = Host.CreateApplicationBuilder(args); +builder.Services.AddHostedService(); + +/* + You can either register your message handlers one by one like this: + + builder.Services.AddSingleton, MyMessageHandler>() + builder.Services.AddSingleton, MyOtherMessageHandler>() + builder.Services.AddSingleton, AnotherMessageHandler>() +*/ + +// Or you can allow Picton.Messaging to scan your assemblies and to register all message handlers like this: +builder.Services.AddPictonMessageHandlers() + +var host = builder.Build(); +host.Run(); +``` + +### Worker.cs + +```csharp +using Picton.Messaging; + +namespace WorkerService1 +{ + public class Worker : BackgroundService + { + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + + public Worker(IServiceProvider serviceProvider, ILogger logger) + { + _serviceProvider = serviceProvider; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var connectionString = "<-- connection string for your Azure storage account -->"; + var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time + + var options = new MessagePumpOptions(connectionString, concurrentTasks, null, null); + var messagePump = new AsyncMessagePumpWithHandlers(options, _serviceProvider, _logger) + { + OnError = (queueName, message, exception, isPoison) => + { + // Insert your custom error handling + } + }; + + messagePump.AddQueue("myqueue", null, TimeSpan.FromMinutes(1), 3); + + await messagePump.StartAsync(stoppingToken).ConfigureAwait(false); } } } diff --git a/Source/Picton.Messaging.IntegrationTests/Program.cs b/Source/Picton.Messaging.IntegrationTests/Program.cs index 51e8e0f..93d2dcc 100644 --- a/Source/Picton.Messaging.IntegrationTests/Program.cs +++ b/Source/Picton.Messaging.IntegrationTests/Program.cs @@ -23,13 +23,7 @@ private static void ConfigureServices(ServiceCollection services) { services .AddLogging(loggingBuilder => loggingBuilder.AddNLog(GetNLogConfiguration())) - - // You can either register each message handler by hand like this example: - //.AddSingleton, MyMessageHandler>() - - // Or you can allow Picton.Messaging to scan your assemblies and to register all message handlers like this: .AddPictonMessageHandlers() - .AddTransient(); }