Skip to content

Commit

Permalink
Merge pull request #541 from Particular/hotfix-4.4.4
Browse files Browse the repository at this point in the history
Exceptions thrown in recoverability policy are not propagated as critical error
  • Loading branch information
boblangley authored Apr 17, 2019
2 parents 0e0aed9 + e256573 commit 060e0da
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/NServiceBus.RabbitMQ/NServiceBus.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@
</PropertyGroup>
<Error Condition="!Exists('..\packages\Particular.CodeRules.0.1.1\build\Particular.CodeRules.props')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\Particular.CodeRules.0.1.1\build\Particular.CodeRules.props'))" />
<Error Condition="!Exists('..\packages\GitVersionTask.3.6.5\build\dotnet\GitVersionTask.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\GitVersionTask.3.6.5\build\dotnet\GitVersionTask.targets'))" />
<Error Condition="!Exists('..\packages\Fody.3.3.3\build\Fody.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\Fody.3.3.3\build\Fody.targets'))" />
<Error Condition="!Exists('..\packages\Janitor.Fody.1.6.5\build\Janitor.Fody.props')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\Janitor.Fody.1.6.5\build\Janitor.Fody.props'))" />
<Error Condition="!Exists('..\packages\Obsolete.Fody.4.4.3\build\Obsolete.Fody.props')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\Obsolete.Fody.4.4.3\build\Obsolete.Fody.props'))" />
<Error Condition="!Exists('..\packages\NuGetPackager.0.6.5\build\NuGetPackager.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\NuGetPackager.0.6.5\build\NuGetPackager.targets'))" />
<Error Condition="!Exists('..\packages\Fody.4.2.1\build\Fody.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\Fody.4.2.1\build\Fody.targets'))" />
</Target>
<Import Project="..\packages\GitVersionTask.3.6.5\build\dotnet\GitVersionTask.targets" Condition="Exists('..\packages\GitVersionTask.3.6.5\build\dotnet\GitVersionTask.targets')" />
<Import Project="..\packages\Fody.3.3.3\build\Fody.targets" Condition="Exists('..\packages\Fody.3.3.3\build\Fody.targets')" />
<Import Project="..\packages\NuGetPackager.0.6.5\build\NuGetPackager.targets" Condition="Exists('..\packages\NuGetPackager.0.6.5\build\NuGetPackager.targets')" />
<Import Project="..\packages\Fody.4.2.1\build\Fody.targets" Condition="Exists('..\packages\Fody.4.2.1\build\Fody.targets')" />
</Project>
20 changes: 16 additions & 4 deletions src/NServiceBus.RabbitMQ/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class MessagePump : IPushMessages, IDisposable
PushSettings settings;
MessagePumpConnectionFailedCircuitBreaker circuitBreaker;
TaskScheduler exclusiveScheduler;
CriticalError criticalError;

// Start
int maxConcurrency;
Expand Down Expand Up @@ -58,6 +59,7 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
this.onMessage = onMessage;
this.onError = onError;
this.settings = settings;
this.criticalError = criticalError;

circuitBreaker = new MessagePumpConnectionFailedCircuitBreaker($"'{settings.InputQueue} MessagePump'", timeToWaitBeforeTriggeringCircuitBreaker, criticalError);

Expand Down Expand Up @@ -185,7 +187,7 @@ async void Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
// E.g. the client may raise the event on a new, explicitly created thread each time.
// 2) we cannot tell whether we are in scenario (a) or scenario (b).
// E.g. the client may raise the event on a thread pool thread.
//
//
// In both cases, we cannot tell whether we need to yield or not, so we must yield.
await Task.Yield();
}
Expand Down Expand Up @@ -247,11 +249,21 @@ async Task Process(BasicDeliverEventArgs message)
await onMessage(messageContext).ConfigureAwait(false);
processed = true;
}
catch (Exception ex)
catch (Exception exception)
{
++numberOfDeliveryAttempts;
var errorContext = new ErrorContext(ex, headers, messageId, message.Body ?? new byte[0], transportTransaction, numberOfDeliveryAttempts);
errorHandled = await onError(errorContext).ConfigureAwait(false) == ErrorHandleResult.Handled;
var errorContext = new ErrorContext(exception, headers, messageId, message.Body ?? new byte[0], transportTransaction, numberOfDeliveryAttempts);
try
{
errorHandled = await onError(errorContext).ConfigureAwait(false) == ErrorHandleResult.Handled;
}
catch (Exception ex)
{
criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{messageId}`", ex);
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);

return;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.RabbitMQ/packages.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Fody" version="3.3.3" targetFramework="net452" developmentDependency="true" />
<package id="Fody" version="4.2.1" targetFramework="net452" developmentDependency="true" />
<package id="GitVersionTask" version="3.6.5" targetFramework="net452" developmentDependency="true" />
<package id="Janitor.Fody" version="1.6.5" targetFramework="net452" developmentDependency="true" />
<package id="Microsoft.Diagnostics.Tracing.EventSource.Redist" version="1.1.28" targetFramework="net452" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
namespace NServiceBus.TransportTests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using NServiceBus.Logging;
using NUnit.Framework;
using Transport;

Expand All @@ -13,16 +15,23 @@ public class When_on_error_throws : NServiceBusTransportTest
[TestCase(TransportTransactionMode.TransactionScope)]
public async Task Should_reinvoke_on_error_with_original_exception(TransportTransactionMode transactionMode)
{
var loggerFactory = new TransportTestLoggerFactory();
LogManager.UseFactory(loggerFactory);

var onErrorCalled = new TaskCompletionSource<ErrorContext>();
Exception criticalError = null;
var criticalErrorCalled = false;
string criticalErrorMessage = null;

OnTestTimeout(() => onErrorCalled.SetCanceled());

var firstInvocation = true;
string nativeMessageId = null;

await StartPump(
context =>
{
nativeMessageId = context.MessageId;

throw new Exception("Simulated exception");
},
context =>
Expand All @@ -39,15 +48,20 @@ await StartPump(
return Task.FromResult(ErrorHandleResult.Handled);
},
transactionMode,
(s, exception) => criticalError = exception);
(message, exception) =>
{
criticalErrorCalled = true;
criticalErrorMessage = message;
});

await SendMessage(InputQueueName);

var errorContext = await onErrorCalled.Task;

Assert.AreEqual("Simulated exception", errorContext.Exception.Message);
//Assert.AreEqual(2, errorContext.ImmediateProcessingFailures);
//Assert.IsNotNull(criticalError);
Assert.AreEqual("Simulated exception", errorContext.Exception.Message, "Should retry the message");
Assert.True(criticalErrorCalled, "Should invoke critical error");
Assert.AreEqual($"Failed to execute recoverability policy for message with native ID: `{nativeMessageId}`", criticalErrorMessage, "Incorrect critical error message.");
Assert.False(loggerFactory.LogItems.Any(item => item.Level > LogLevel.Info));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<Compile Include="App_Packages\NSB.TransportTests.6.1.1\When_user_aborts_processing.cs" />
<Compile Include="App_Packages\NSB.TransportTests.6.1.1\When_using_non_durable_delivery.cs" />
<Compile Include="ConfigureRabbitMQTransportInfrastructure.cs" />
<Compile Include="TransportTestLoggerFactory.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\NServiceBus.RabbitMQ\NServiceBus.RabbitMQ.csproj">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
namespace NServiceBus.TransportTests
{
using System;
using System.Collections.Generic;
using Logging;
using NUnit.Framework;

public class TransportTestLoggerFactory : ILoggerFactory
{
public ILog GetLogger(Type type)
{
return GetLogger(type.FullName);
}

public ILog GetLogger(string name)
{
return new TransportTestLogger(name, LogItems);
}

public List<LogItem> LogItems { get; } = new List<LogItem>();

public class LogItem
{
// ReSharper disable NotAccessedField.Global
public LogLevel Level;
public string Message;
// ReSharper restore NotAccessedField.Global
}

class TransportTestLogger : ILog
{
public TransportTestLogger(string name, List<LogItem> logItems)
{
this.name = name;
this.logItems = logItems;
}

public bool IsDebugEnabled { get; } = true;
public bool IsInfoEnabled { get; } = true;
public bool IsWarnEnabled { get; } = true;
public bool IsErrorEnabled { get; } = true;
public bool IsFatalEnabled { get; } = true;

public void Debug(string message)
{
Log(LogLevel.Debug, message);
}

public void Debug(string message, Exception exception)
{
Log(LogLevel.Debug, $"{message} {exception}");
}

public void DebugFormat(string format, params object[] args)
{
Log(LogLevel.Debug, string.Format(format, args));
}

public void Info(string message)
{
Log(LogLevel.Info, message);
}

public void Info(string message, Exception exception)
{
Log(LogLevel.Info, $"{message} {exception}");
}

public void InfoFormat(string format, params object[] args)
{
Log(LogLevel.Info, string.Format(format, args));
}

public void Warn(string message)
{
Log(LogLevel.Warn, message);
}

public void Warn(string message, Exception exception)
{
Log(LogLevel.Warn, $"{message} {exception}");
}

public void WarnFormat(string format, params object[] args)
{
Log(LogLevel.Warn, string.Format(format, args));
}

public void Error(string message)
{
Log(LogLevel.Error, message);
}

public void Error(string message, Exception exception)
{
Log(LogLevel.Error, $"{message} {exception}");
}

public void ErrorFormat(string format, params object[] args)
{
Log(LogLevel.Error, string.Format(format, args));
}

public void Fatal(string message)
{
Log(LogLevel.Fatal, message);
}

public void Fatal(string message, Exception exception)
{
Log(LogLevel.Fatal, $"{message} {exception}");
}

public void FatalFormat(string format, params object[] args)
{
Log(LogLevel.Fatal, string.Format(format, args));
}

void Log(LogLevel level, string message)
{
logItems.Add(new LogItem
{
Level = level,
Message = message
});

TestContext.WriteLine($"{DateTime.Now:T} {level} {name}: {message}");
}

string name;
List<LogItem> logItems;
}
}
}

0 comments on commit 060e0da

Please sign in to comment.