Skip to content

Commit

Permalink
Merge branch 'release/3.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jericho committed May 24, 2018
2 parents 3f34bb2 + dd1b7e0 commit 99620cd
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 88 deletions.
12 changes: 3 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@ namespace WorkerRole1
public override bool OnStart()
{
var storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
var cloudQueueClient = storageAccount.CreateCloudQueueClient();
cloudQueueClient.DefaultRequestOptions.RetryPolicy = new NoRetry();
var cloudQueue = cloudQueueClient.GetQueueReference("myqueue");
cloudQueue.CreateIfNotExists();
var queueName = "myqueue";
var poisonQueueName = "myqueue-poison";

// Configure the message pump
_messagePump = new AsyncMessagePump(cloudQueue, 1, 25, TimeSpan.FromMinutes(1), 3)
_messagePump = new AsyncMessagePump(queueName, storageAccount, 25, poisonQueueName, TimeSpan.FromMinutes(1), 3)
{
OnMessage = (message, cancellationToken) =>
{
Expand All @@ -82,10 +80,6 @@ namespace WorkerRole1
OnError = (message, exception, isPoison) =>
{
Trace.TraceInformation("An error occured: {0}", exception);
if (isPoison)
{
// Copy message to a poison queue otherwise it will be lost forever
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class ColoredConsoleLogProvider : ILogProvider
{LogLevel.Debug, ConsoleColor.Gray},
{LogLevel.Trace, ConsoleColor.DarkGray}
};
private LogLevel _minLevel = LogLevel.Trace;
private readonly LogLevel _minLevel = LogLevel.Trace;

public ColoredConsoleLogProvider(LogLevel minLevel = LogLevel.Trace)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFramework>netcoreapp2.0</TargetFramework>
<AssemblyName>Picton.Messaging.IntegrationTests</AssemblyName>
<RootNamespace>Picton.Messaging.IntegrationTests</RootNamespace>
</PropertyGroup>

<PropertyGroup>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="App.Metrics.Reporting.Http" Version="2.0.0" />
</ItemGroup>
Expand Down
1 change: 0 additions & 1 deletion Source/Picton.Messaging.IntegrationTests/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using App.Metrics.Scheduling;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
using Picton.Interfaces;
using Picton.Managers;
using Picton.Messaging.IntegrationTests.Datadog;
using Picton.Messaging.Logging;
Expand Down
130 changes: 70 additions & 60 deletions Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,22 @@ public void No_message_processed_when_queue_is_empty()

mockQueue.Setup(q => q.GetMessagesAsync(It.IsAny<int>(), It.IsAny<TimeSpan?>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>())).ReturnsAsync(Enumerable.Empty<CloudQueueMessage>());

var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3);
messagePump.OnMessage = (message, cancellationToken) =>
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3)
{
Interlocked.Increment(ref onMessageInvokeCount);
OnMessage = (message, cancellationToken) =>
{
Interlocked.Increment(ref onMessageInvokeCount);
},
OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
}
};
messagePump.OnQueueEmpty = cancellationToken =>
{
Interlocked.Increment(ref onQueueEmptyInvokeCount);
messagePump.Stop();
};
messagePump.OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
};

// Act
messagePump.Start();
Expand Down Expand Up @@ -181,27 +183,29 @@ public void Message_processed()
return Task.FromResult(true);
});

var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3);
messagePump.OnMessage = (message, cancellationToken) =>
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3)
{
Interlocked.Increment(ref onMessageInvokeCount);
OnMessage = (message, cancellationToken) =>
{
Interlocked.Increment(ref onMessageInvokeCount);
},
OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
if (isPoison)
{
lock (lockObject)
{
cloudMessage = null;
}
}
}
};
messagePump.OnQueueEmpty = cancellationToken =>
{
Interlocked.Increment(ref onQueueEmptyInvokeCount);
messagePump.Stop();
};
messagePump.OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
if (isPoison)
{
lock (lockObject)
{
cloudMessage = null;
}
}
};

// Act
messagePump.Start();
Expand Down Expand Up @@ -261,29 +265,31 @@ public void Poison_message_is_rejected()
return Task.FromResult(true);
});

var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), retries);
messagePump.OnMessage = (message, cancellationToken) =>
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), retries)
{
Interlocked.Increment(ref onMessageInvokeCount);
throw new Exception("An error occured when attempting to process the message");
OnMessage = (message, cancellationToken) =>
{
Interlocked.Increment(ref onMessageInvokeCount);
throw new Exception("An error occured when attempting to process the message");
},
OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
if (isPoison)
{
lock (lockObject)
{
isRejected = true;
cloudMessage = null;
}
}
}
};
messagePump.OnQueueEmpty = cancellationToken =>
{
Interlocked.Increment(ref onQueueEmptyInvokeCount);
messagePump.Stop();
};
messagePump.OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
if (isPoison)
{
lock (lockObject)
{
isRejected = true;
cloudMessage = null;
}
}
};

// Act
messagePump.Start();
Expand Down Expand Up @@ -352,29 +358,31 @@ public void Poison_message_is_moved()
return Task.FromResult(true);
});

var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, poisonQueueName, TimeSpan.FromMinutes(1), retries);
messagePump.OnMessage = (message, cancellationToken) =>
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, poisonQueueName, TimeSpan.FromMinutes(1), retries)
{
Interlocked.Increment(ref onMessageInvokeCount);
throw new Exception("An error occured when attempting to process the message");
OnMessage = (message, cancellationToken) =>
{
Interlocked.Increment(ref onMessageInvokeCount);
throw new Exception("An error occured when attempting to process the message");
},
OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
if (isPoison)
{
lock (lockObject)
{
isRejected = true;
cloudMessage = null;
}
}
}
};
messagePump.OnQueueEmpty = cancellationToken =>
{
Interlocked.Increment(ref onQueueEmptyInvokeCount);
messagePump.Stop();
};
messagePump.OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
if (isPoison)
{
lock (lockObject)
{
isRejected = true;
cloudMessage = null;
}
}
};

// Act
messagePump.Start();
Expand Down Expand Up @@ -409,10 +417,16 @@ public void Exceptions_in_OnQueueEmpty_are_ignored()

mockQueue.Setup(q => q.GetMessagesAsync(It.IsAny<int>(), It.IsAny<TimeSpan?>(), It.IsAny<QueueRequestOptions>(), It.IsAny<OperationContext>(), It.IsAny<CancellationToken>())).ReturnsAsync(Enumerable.Empty<CloudQueueMessage>());

var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3);
messagePump.OnMessage = (message, cancellationToken) =>
var messagePump = new AsyncMessagePump(queueName, mockStorageAccount.Object, 1, null, TimeSpan.FromMinutes(1), 3)
{
Interlocked.Increment(ref onMessageInvokeCount);
OnMessage = (message, cancellationToken) =>
{
Interlocked.Increment(ref onMessageInvokeCount);
},
OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
}
};
messagePump.OnQueueEmpty = cancellationToken =>
{
Expand All @@ -431,10 +445,6 @@ public void Exceptions_in_OnQueueEmpty_are_ignored()
// Stop the message pump
messagePump.Stop();
};
messagePump.OnError = (message, exception, isPoison) =>
{
Interlocked.Increment(ref onErrorInvokeCount);
};

// Act
messagePump.Start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TargetFrameworks>net452;netcoreapp1.0;netcoreapp2.0</TargetFrameworks>
<AssemblyName>Picton.Messaging.UnitTests</AssemblyName>
<RootNamespace>Picton.Messaging.UnitTests</RootNamespace>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" />
<PackageReference Include="Moq" Version="4.8.2" />
<PackageReference Include="Shouldly" Version="3.0.0" />
<PackageReference Include="xunit" Version="2.3.1" />
Expand Down
11 changes: 5 additions & 6 deletions Source/Picton.Messaging/App_Packages/LibLog.4.2/LibLog.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
//------------------------------------------------------------------------------
// <auto-generated>
// This tag ensures the content of this file is not analyzed by StyleCop.Analyzers
// </auto-generated>
//------------------------------------------------------------------------------

// <auto-generated />
//===============================================================================
// LibLog
//
Expand Down Expand Up @@ -504,6 +499,7 @@ interface ILogProvider
/// </summary>
/// <param name="key">A key.</param>
/// <param name="value">A value.</param>
/// <param name="destructure">???</param>
/// <returns>A disposable that when disposed removes the map from the context.</returns>
IDisposable OpenMappedContext(string key, object value, bool destructure = false);
}
Expand Down Expand Up @@ -669,6 +665,7 @@ static IDisposable OpenNestedContext(string message)
/// </summary>
/// <param name="key">A key.</param>
/// <param name="value">A value.</param>
/// <param name="destructure">???</param>
/// <returns>An <see cref="IDisposable"/> that closes context when disposed.</returns>
[SuppressMessage("Microsoft.Naming", "CA2204:Literals should be spelled correctly", MessageId = "SetCurrentLogProvider")]
#if LIBLOG_PUBLIC
Expand Down Expand Up @@ -785,7 +782,9 @@ internal class LoggerExecutionWrapper : ILog
private readonly Func<bool> _getIsDisabled;
internal const string FailedToGenerateLogMessage = "Failed to generate log message";

#if !LIBLOG_PORTABLE
Func<string> _lastExtensionMethod;
#endif

internal LoggerExecutionWrapper(Logger logger, Func<bool> getIsDisabled = null)
{
Expand Down
2 changes: 1 addition & 1 deletion Source/Picton.Messaging/AsyncMessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class AsyncMessagePump
/// <param name="queueName">Name of the queue.</param>
/// <param name="storageAccount">The cloud storage account.</param>
/// <param name="concurrentTasks">The number of concurrent tasks.</param>
/// <param name="poisonQueueName">Name of the queue where messages are automatically moved to when they fail to be processed after 'maxDequeueCount' attempts. You can indicate that you do not want messages to be automatically moved by leaving this value empty. In such a scenario, you are responsible for handling so called 'poinson' messages.</param>
/// <param name="poisonQueueName">Name of the queue where messages are automatically moved to when they fail to be processed after 'maxDequeueCount' attempts. You can indicate that you do not want messages to be automatically moved by leaving this value empty. In such a scenario, you are responsible for handling so called 'poison' messages.</param>
/// <param name="visibilityTimeout">The visibility timeout.</param>
/// <param name="maxDequeueCount">The maximum dequeue count.</param>
/// <param name="metrics">The system where metrics are published</param>
Expand Down
18 changes: 12 additions & 6 deletions Source/Picton.Messaging/Picton.Messaging.csproj
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net452;netstandard1.6</TargetFrameworks>
<TargetFrameworks>net452;netstandard1.6;netstandard2.0</TargetFrameworks>
<PlatformTarget>anycpu</PlatformTarget>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<OutputType>Library</OutputType>
<PackageTargetFallback Condition=" '$(TargetFramework)' == 'netstandard1.6' ">$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.6' ">1.6.1</NetStandardImplicitPackageVersion>
<Version>$(SemVer)</Version>
<DebugType>full</DebugType>
</PropertyGroup>
Expand All @@ -28,7 +26,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="App.Metrics" Version="2.0.0" />
<PackageReference Include="App.Metrics" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyModel" Version="2.0.4" />
<PackageReference Include="Picton" Version="3.0.0" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
Expand All @@ -41,14 +39,22 @@
<Reference Include="Microsoft.CSharp" />
</ItemGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'net452' ">
<ItemGroup Condition="'$(TargetFramework)' != 'net452'">
<PackageReference Include="Microsoft.CSharp" Version="4.4.1" />
</ItemGroup>

<PropertyGroup Condition="'$(TargetFramework)' == 'net452' ">
<DefineConstants>$(DefineConstants);NETFULL;LIBLOG_PORTABLE</DefineConstants>
</PropertyGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard1.6' ">
<PropertyGroup Condition="'$(TargetFramework)' != 'net452' ">
<DefineConstants>$(DefineConstants);NETSTANDARD;LIBLOG_PORTABLE</DefineConstants>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Release' ">
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>

<ItemGroup>
<AdditionalFiles Include="$(SolutionDir)\stylecop.json" Link="stylecop.json" />
</ItemGroup>
Expand Down

0 comments on commit 99620cd

Please sign in to comment.