diff --git a/README.md b/README.md index f0333f2..62871b8 100644 --- a/README.md +++ b/README.md @@ -67,13 +67,11 @@ namespace WorkerRole1 public override bool OnStart() { var storageAccount = CloudStorageAccount.DevelopmentStorageAccount; - var cloudQueueClient = storageAccount.CreateCloudQueueClient(); - cloudQueueClient.DefaultRequestOptions.RetryPolicy = new NoRetry(); - var cloudQueue = cloudQueueClient.GetQueueReference("myqueue"); - cloudQueue.CreateIfNotExists(); + var queueName = "myqueue"; + var poisonQueueName = "myqueue-poison"; // Configure the message pump - _messagePump = new AsyncMessagePump(cloudQueue, 1, 25, TimeSpan.FromMinutes(1), 3) + _messagePump = new AsyncMessagePump(queueName, storageAccount, 25, poisonQueueName, TimeSpan.FromMinutes(1), 3) { OnMessage = (message, cancellationToken) => { @@ -82,10 +80,6 @@ namespace WorkerRole1 OnError = (message, exception, isPoison) => { Trace.TraceInformation("An error occured: {0}", exception); - if (isPoison) - { - // Copy message to a poison queue otherwise it will be lost forever - } } }; diff --git a/Source/Picton.Messaging.IntegrationTests/ColoredConsoleLogProvider.cs b/Source/Picton.Messaging.IntegrationTests/ColoredConsoleLogProvider.cs index 2b49128..19e631b 100644 --- a/Source/Picton.Messaging.IntegrationTests/ColoredConsoleLogProvider.cs +++ b/Source/Picton.Messaging.IntegrationTests/ColoredConsoleLogProvider.cs @@ -16,7 +16,7 @@ public class ColoredConsoleLogProvider : ILogProvider {LogLevel.Debug, ConsoleColor.Gray}, {LogLevel.Trace, ConsoleColor.DarkGray} }; - private LogLevel _minLevel = LogLevel.Trace; + private readonly LogLevel _minLevel = LogLevel.Trace; public ColoredConsoleLogProvider(LogLevel minLevel = LogLevel.Trace) { diff --git a/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj b/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj index e26b001..4ec71a5 100644 --- a/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj +++ b/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj @@ -1,12 +1,16 @@ - + Exe - netcoreapp1.1 + netcoreapp2.0 Picton.Messaging.IntegrationTests Picton.Messaging.IntegrationTests + + latest + + diff --git a/Source/Picton.Messaging.IntegrationTests/Program.cs b/Source/Picton.Messaging.IntegrationTests/Program.cs index 2fbf1cb..d895e42 100644 --- a/Source/Picton.Messaging.IntegrationTests/Program.cs +++ b/Source/Picton.Messaging.IntegrationTests/Program.cs @@ -2,7 +2,6 @@ using App.Metrics.Scheduling; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Queue; -using Picton.Interfaces; using Picton.Managers; using Picton.Messaging.IntegrationTests.Datadog; using Picton.Messaging.Logging; diff --git a/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs b/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs index 668c837..d4df310 100644 --- a/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs +++ b/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs @@ -103,20 +103,22 @@ public void No_message_processed_when_queue_is_empty() mockQueue.Setup(q => q.GetMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(Enumerable.Empty()); - var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3); - messagePump.OnMessage = (message, cancellationToken) => + var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3) { - Interlocked.Increment(ref onMessageInvokeCount); + OnMessage = (message, cancellationToken) => + { + Interlocked.Increment(ref onMessageInvokeCount); + }, + OnError = (message, exception, isPoison) => + { + Interlocked.Increment(ref onErrorInvokeCount); + } }; messagePump.OnQueueEmpty = cancellationToken => { Interlocked.Increment(ref onQueueEmptyInvokeCount); messagePump.Stop(); }; - messagePump.OnError = (message, exception, isPoison) => - { - Interlocked.Increment(ref onErrorInvokeCount); - }; // Act messagePump.Start(); @@ -181,27 +183,29 @@ public void Message_processed() return Task.FromResult(true); }); - var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3); - messagePump.OnMessage = (message, cancellationToken) => + var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3) { - Interlocked.Increment(ref onMessageInvokeCount); + OnMessage = (message, cancellationToken) => + { + Interlocked.Increment(ref onMessageInvokeCount); + }, + OnError = (message, exception, isPoison) => + { + Interlocked.Increment(ref onErrorInvokeCount); + if (isPoison) + { + lock (lockObject) + { + cloudMessage = null; + } + } + } }; messagePump.OnQueueEmpty = cancellationToken => { Interlocked.Increment(ref onQueueEmptyInvokeCount); messagePump.Stop(); }; - messagePump.OnError = (message, exception, isPoison) => - { - Interlocked.Increment(ref onErrorInvokeCount); - if (isPoison) - { - lock (lockObject) - { - cloudMessage = null; - } - } - }; // Act messagePump.Start(); @@ -261,29 +265,31 @@ public void Poison_message_is_rejected() return Task.FromResult(true); }); - var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), retries); - messagePump.OnMessage = (message, cancellationToken) => + var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), retries) { - Interlocked.Increment(ref onMessageInvokeCount); - throw new Exception("An error occured when attempting to process the message"); + OnMessage = (message, cancellationToken) => + { + Interlocked.Increment(ref onMessageInvokeCount); + throw new Exception("An error occured when attempting to process the message"); + }, + OnError = (message, exception, isPoison) => + { + Interlocked.Increment(ref onErrorInvokeCount); + if (isPoison) + { + lock (lockObject) + { + isRejected = true; + cloudMessage = null; + } + } + } }; messagePump.OnQueueEmpty = cancellationToken => { Interlocked.Increment(ref onQueueEmptyInvokeCount); messagePump.Stop(); }; - messagePump.OnError = (message, exception, isPoison) => - { - Interlocked.Increment(ref onErrorInvokeCount); - if (isPoison) - { - lock (lockObject) - { - isRejected = true; - cloudMessage = null; - } - } - }; // Act messagePump.Start(); @@ -352,29 +358,31 @@ public void Poison_message_is_moved() return Task.FromResult(true); }); - var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, poisonQueueName, TimeSpan.FromMinutes(1), retries); - messagePump.OnMessage = (message, cancellationToken) => + var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, poisonQueueName, TimeSpan.FromMinutes(1), retries) { - Interlocked.Increment(ref onMessageInvokeCount); - throw new Exception("An error occured when attempting to process the message"); + OnMessage = (message, cancellationToken) => + { + Interlocked.Increment(ref onMessageInvokeCount); + throw new Exception("An error occured when attempting to process the message"); + }, + OnError = (message, exception, isPoison) => + { + Interlocked.Increment(ref onErrorInvokeCount); + if (isPoison) + { + lock (lockObject) + { + isRejected = true; + cloudMessage = null; + } + } + } }; messagePump.OnQueueEmpty = cancellationToken => { Interlocked.Increment(ref onQueueEmptyInvokeCount); messagePump.Stop(); }; - messagePump.OnError = (message, exception, isPoison) => - { - Interlocked.Increment(ref onErrorInvokeCount); - if (isPoison) - { - lock (lockObject) - { - isRejected = true; - cloudMessage = null; - } - } - }; // Act messagePump.Start(); @@ -409,10 +417,16 @@ public void Exceptions_in_OnQueueEmpty_are_ignored() mockQueue.Setup(q => q.GetMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(Enumerable.Empty()); - var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3); - messagePump.OnMessage = (message, cancellationToken) => + var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3) { - Interlocked.Increment(ref onMessageInvokeCount); + OnMessage = (message, cancellationToken) => + { + Interlocked.Increment(ref onMessageInvokeCount); + }, + OnError = (message, exception, isPoison) => + { + Interlocked.Increment(ref onErrorInvokeCount); + } }; messagePump.OnQueueEmpty = cancellationToken => { @@ -431,10 +445,6 @@ public void Exceptions_in_OnQueueEmpty_are_ignored() // Stop the message pump messagePump.Stop(); }; - messagePump.OnError = (message, exception, isPoison) => - { - Interlocked.Increment(ref onErrorInvokeCount); - }; // Act messagePump.Start(); diff --git a/Source/Picton.Messaging.UnitTests/Picton.Messaging.UnitTests.csproj b/Source/Picton.Messaging.UnitTests/Picton.Messaging.UnitTests.csproj index ab34c04..21d8717 100644 --- a/Source/Picton.Messaging.UnitTests/Picton.Messaging.UnitTests.csproj +++ b/Source/Picton.Messaging.UnitTests/Picton.Messaging.UnitTests.csproj @@ -1,13 +1,13 @@  - netcoreapp1.1 + net452;netcoreapp1.0;netcoreapp2.0 Picton.Messaging.UnitTests Picton.Messaging.UnitTests - + diff --git a/Source/Picton.Messaging/App_Packages/LibLog.4.2/LibLog.cs b/Source/Picton.Messaging/App_Packages/LibLog.4.2/LibLog.cs index dab3e68..9ae843d 100644 --- a/Source/Picton.Messaging/App_Packages/LibLog.4.2/LibLog.cs +++ b/Source/Picton.Messaging/App_Packages/LibLog.4.2/LibLog.cs @@ -1,9 +1,4 @@ -//------------------------------------------------------------------------------ -// -// This tag ensures the content of this file is not analyzed by StyleCop.Analyzers -// -//------------------------------------------------------------------------------ - +// //=============================================================================== // LibLog // @@ -504,6 +499,7 @@ interface ILogProvider /// /// A key. /// A value. + /// ??? /// A disposable that when disposed removes the map from the context. IDisposable OpenMappedContext(string key, object value, bool destructure = false); } @@ -669,6 +665,7 @@ static IDisposable OpenNestedContext(string message) /// /// A key. /// A value. + /// ??? /// An that closes context when disposed. [SuppressMessage("Microsoft.Naming", "CA2204:Literals should be spelled correctly", MessageId = "SetCurrentLogProvider")] #if LIBLOG_PUBLIC @@ -785,7 +782,9 @@ internal class LoggerExecutionWrapper : ILog private readonly Func _getIsDisabled; internal const string FailedToGenerateLogMessage = "Failed to generate log message"; +#if !LIBLOG_PORTABLE Func _lastExtensionMethod; +#endif internal LoggerExecutionWrapper(Logger logger, Func getIsDisabled = null) { diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index 42e5f3b..69c0e90 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -78,7 +78,7 @@ public class AsyncMessagePump /// Name of the queue. /// The cloud storage account. /// The number of concurrent tasks. - /// Name of the queue where messages are automatically moved to when they fail to be processed after 'maxDequeueCount' attempts. You can indicate that you do not want messages to be automatically moved by leaving this value empty. In such a scenario, you are responsible for handling so called 'poinson' messages. + /// Name of the queue where messages are automatically moved to when they fail to be processed after 'maxDequeueCount' attempts. You can indicate that you do not want messages to be automatically moved by leaving this value empty. In such a scenario, you are responsible for handling so called 'poison' messages. /// The visibility timeout. /// The maximum dequeue count. /// The system where metrics are published diff --git a/Source/Picton.Messaging/Picton.Messaging.csproj b/Source/Picton.Messaging/Picton.Messaging.csproj index cfae4f1..f37eea5 100644 --- a/Source/Picton.Messaging/Picton.Messaging.csproj +++ b/Source/Picton.Messaging/Picton.Messaging.csproj @@ -1,12 +1,10 @@  - net452;netstandard1.6 + net452;netstandard1.6;netstandard2.0 anycpu true Library - $(PackageTargetFallback);dnxcore50 - 1.6.1 $(SemVer) full @@ -28,7 +26,7 @@ - + @@ -41,14 +39,22 @@ - + + + + + $(DefineConstants);NETFULL;LIBLOG_PORTABLE - + $(DefineConstants);NETSTANDARD;LIBLOG_PORTABLE + + true + +