Skip to content

Commit

Permalink
Merge pull request #495 from Particular/hotfix-5.0.1
Browse files Browse the repository at this point in the history
Transport getting stuck with "System.Exception: Failed to add " in the logs
  • Loading branch information
Michał Wójcik authored Sep 19, 2018
2 parents 949cf94 + 60fc2a6 commit 3a6f82b
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void SetUp()
var config = ConnectionConfiguration.Create(connectionString, ReceiverQueue);

connectionFactory = new ConnectionFactory(config, null, false, false);
channelProvider = new ChannelProvider(connectionFactory, routingTopology, true);
channelProvider = new ChannelProvider(connectionFactory, config.RetryDelay, routingTopology, true);
channelProvider.CreateConnection();

messageDispatcher = new MessageDispatcher(channelProvider);
Expand Down
43 changes: 42 additions & 1 deletion src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ namespace NServiceBus.Transport.RabbitMQ
{
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using global::RabbitMQ.Client;
using Logging;

sealed class ChannelProvider : IChannelProvider, IDisposable
{
public ChannelProvider(ConnectionFactory connectionFactory, IRoutingTopology routingTopology, bool usePublisherConfirms)
public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, IRoutingTopology routingTopology, bool usePublisherConfirms)
{
this.connectionFactory = connectionFactory;
this.retryDelay = retryDelay;

this.routingTopology = routingTopology;
this.usePublisherConfirms = usePublisherConfirms;
Expand All @@ -19,6 +22,41 @@ public ChannelProvider(ConnectionFactory connectionFactory, IRoutingTopology rou
public void CreateConnection()
{
connection = connectionFactory.CreatePublishConnection();
connection.ConnectionShutdown += Connection_ConnectionShutdown;
}

void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
if (e.Initiator != ShutdownInitiator.Application)
{
Logger.WarnFormat("Connection to the broker lost: {0}", e.ReplyText);

Task.Run(Reconnect).Ignore();
}
}

async Task Reconnect()
{
var reconnected = false;

while (!reconnected)
{
Logger.InfoFormat("Attempting to reconnect in {0} seconds.", retryDelay.TotalSeconds);

await Task.Delay(retryDelay).ConfigureAwait(false);

try
{
CreateConnection();
reconnected = true;

Logger.Info("Connection to the broker reestablished successfully.");
}
catch(Exception e)
{
Logger.Info("Reconnecting to the broker failed.", e);
}
}
}

public ConfirmsAwareChannel GetPublishChannel()
Expand Down Expand Up @@ -59,9 +97,12 @@ public void Dispose()
}

readonly ConnectionFactory connectionFactory;
readonly TimeSpan retryDelay;
readonly IRoutingTopology routingTopology;
readonly bool usePublisherConfirms;
readonly ConcurrentQueue<ConfirmsAwareChannel> channels;
IConnection connection;

static readonly ILog Logger = LogManager.GetLogger(typeof(ChannelProvider));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ Task GetConfirmationTask()
var tcs = new TaskCompletionSource<bool>();
var added = messages.TryAdd(channel.NextPublishSeqNo, tcs);

if (!added) //debug check, this shouldn't happen
if (!added)
{
throw new Exception($"Failed to add {channel.NextPublishSeqNo}");
throw new Exception($"Cannot publish a message with sequence number '{channel.NextPublishSeqNo}' on this channel. A message was already published on this channel with the same confirmation number.");
}

return tcs.Task;
Expand Down Expand Up @@ -200,13 +200,13 @@ void SetException(ulong key, string exceptionMessage)
}
}


public void Dispose()
{
channel?.Dispose();
}

IModel channel;

readonly IRoutingTopology routingTopology;
readonly bool usePublisherConfirms;
readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> messages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public ConnectionFactory(ConnectionConfiguration connectionConfiguration, X509Ce
UserName = connectionConfiguration.UserName,
Password = connectionConfiguration.Password,
RequestedHeartbeat = connectionConfiguration.RequestedHeartbeat,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = connectionConfiguration.RetryDelay,
UseBackgroundThreadsForIO = true
};
Expand Down Expand Up @@ -63,14 +62,15 @@ public ConnectionFactory(ConnectionConfiguration connectionConfiguration, X509Ce
}
}

public IConnection CreatePublishConnection() => CreateConnection("Publish");
public IConnection CreatePublishConnection() => CreateConnection("Publish", false);

public IConnection CreateAdministrationConnection() => CreateConnection("Administration");
public IConnection CreateAdministrationConnection() => CreateConnection("Administration", false);

public IConnection CreateConnection(string connectionName)
public IConnection CreateConnection(string connectionName, bool automaticRecoveryEnabled = true)
{
lock (lockObject)
{
connectionFactory.AutomaticRecoveryEnabled = automaticRecoveryEnabled;
connectionFactory.ClientProperties["connected"] = DateTime.Now.ToString("G");

return connectionFactory.CreateConnection(connectionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public RabbitMQTransportInfrastructure(SettingsHolder settings, string connectio
usePublisherConfirms = true;
}

channelProvider = new ChannelProvider(connectionFactory, routingTopology, usePublisherConfirms);
channelProvider = new ChannelProvider(connectionFactory, connectionConfiguration.RetryDelay, routingTopology, usePublisherConfirms);
}

public override IEnumerable<Type> DeliveryConstraints
Expand Down

0 comments on commit 3a6f82b

Please sign in to comment.