Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/Contour/Contour.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<ItemGroup>
<PackageReference Include="Common.Logging" Version="3.4.1" />
<PackageReference Include="RabbitMQ.Client" Version="6.1.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.4" />
<PackageReference Include="System.Collections.Immutable" Version="1.7.0" />
<PackageReference Include="System.Configuration.ConfigurationManager" Version="4.5.0" />
</ItemGroup>
Expand Down
11 changes: 10 additions & 1 deletion Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;

using Contour.Configuration;
Expand Down Expand Up @@ -127,5 +127,14 @@ public static IBusConfigurator UseRabbitMq(this IBusConfigurator busConfigurator

return busConfigurator;
}

public static bool RmqUseAsyncConsuming { get; private set; }

// Костыль, чтобы не тянеть в общую конфигурацию специфичную для эксперимента настройку
public static IBusConfiguration ConfigureRabbitMq(this IBusConfiguration busConfigurator, bool asyncConsuming)
{
RmqUseAsyncConsuming = asyncConsuming;
return busConfigurator;
}
}
}
129 changes: 111 additions & 18 deletions Sources/Contour/Transport/RabbitMQ/Internal/Listener.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Contour.Transport.RabbitMQ.Topology;
using Contour.Transport.RabbitMQ.Topology;
using RabbitMQ.Client;

namespace Contour.Transport.RabbitMQ.Internal
Expand Down Expand Up @@ -464,7 +464,32 @@ private async Task ConsumerTaskMethod(CancellationToken token)
{
try
{
var consumer = this.InitializeConsumer(token, out var channel);
Func<Task> startConsuming;
if (BusConfigurationEx.RmqUseAsyncConsuming)
{
var consumer = this.InitializeAsyncConsumer(token, out var channel);
startConsuming = () =>
{
this.StartConsumingAsync(consumer, channel, token);
return Task.CompletedTask;
};
}
else
{
var consumer = this.InitializeConsumer(token, out var channel);
startConsuming = async () =>
{
this.StartConsuming(consumer, channel);

this.logger.Info($"Listner {this} start consuming.");

while (!token.IsCancellationRequested)
{
var message = consumer.Dequeue();
await this.Deliver(this.BuildDeliveryFrom(channel, message));
}
};
}

var waitSecond = 0;
// если шина так и не стала готова работать, то не смысла начинать слушать сообщения, что бы потом их потерять
Expand All @@ -486,17 +511,9 @@ private async Task ConsumerTaskMethod(CancellationToken token)
}
}

this.StartConsuming(consumer, channel);

this.logger.Info($"Listner {this} start consuming.");

while (!token.IsCancellationRequested)
{
var message = consumer.Dequeue();
await this.Deliver(this.BuildDeliveryFrom(channel, message));
}
await startConsuming();
}
catch (OperationCanceledException)
catch (OperationCanceledException e) when (e.CancellationToken == token)
{
this.logger.Info("Consume operation of listener has been canceled");
}
Expand Down Expand Up @@ -527,8 +544,8 @@ private Expectation CreateExpectation(string correlationId, Type expectedRespons
}

return new Expectation(d => this.BuildResponse(d, expectedResponseType), timeoutTicket);
}

}
private CancellableQueueingConsumer InitializeConsumer(CancellationToken token, out RabbitChannel channel)
{
// Opening a new channel may lead to a new connection creation
Expand All @@ -542,21 +559,97 @@ private CancellableQueueingConsumer InitializeConsumer(CancellationToken token,
this.ReceiverOptions.GetQoS().Value);
}

var consumer = channel.BuildCancellableConsumer(token);

var consumer = channel.BuildCancellableConsumer(token);


return consumer;
}

private void StartConsuming(IBasicConsumer consumer, RabbitChannel channel)
{
var tag = channel.StartConsuming(
this.endpoint.ListeningSource,
this.ReceiverOptions.IsAcceptRequired(),
consumer);

this.logger.Trace(
$"A consumer tagged [{tag}] has been registered in listener of [{string.Join(",", this.AcceptedLabels)}]");
}


private AsyncEventingBasicConsumer InitializeAsyncConsumer(CancellationToken token, out RabbitChannel channel)
{
// Opening a new channel may lead to a new connection creation
channel = this.connection.OpenChannel(token);
channel.Shutdown += this.OnChannelShutdown;
this.channels.Add(channel);

if (this.ReceiverOptions.GetQoS().HasValue)
{
channel.SetQos(
this.ReceiverOptions.GetQoS().Value);
}

var consumer = channel.BuildConsumer();

return consumer;
}

private void StartConsuming(IBasicConsumer consumer, RabbitChannel channel)
private void StartConsumingAsync(AsyncEventingBasicConsumer consumer, RabbitChannel channel, CancellationToken token)
{
var tag = channel.StartConsuming(
consumer.Received += (s, e) => HandleMessage(e);

string tag = string.Empty;

async Task HandleMessage(BasicDeliverEventArgs args)
{
RabbitDelivery delivery = null;
try
{
delivery = this.BuildDeliveryFrom(channel, args);
}
catch (Exception e)
{
this.logger.Fatal(x => x("Delivery object has not been constructed, fetch a follow messages of the consumer for '{0}' is unpossible.", this.endpoint.ListeningSource.Address), e);
throw;
}

try
{
await this.Deliver(delivery);
}
catch (Exception e)
{
this.OnFailure(delivery, e);
}
}


CancellationTokenRegistration cancellationCallbackRegistration = default;

void UnsubscribeConsumer()
{
channel.StopConsuming(tag);
cancellationCallbackRegistration.Dispose();
}

cancellationCallbackRegistration = token.Register(UnsubscribeConsumer);

if (token.IsCancellationRequested)
{
UnsubscribeConsumer();
return;
}

tag = channel.StartConsuming(
this.endpoint.ListeningSource,
this.ReceiverOptions.IsAcceptRequired(),
consumer);

this.logger.Trace(
$"A consumer tagged [{tag}] has been registered in listener of [{string.Join(",", this.AcceptedLabels)}]");

this.logger.Info($"Listner {this} start consuming.");
}

private void OnChannelShutdown(IChannel channel, ShutdownEventArgs args)
Expand Down
11 changes: 6 additions & 5 deletions Sources/Contour/Transport/RabbitMQ/Internal/RabbitBus.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -37,9 +37,10 @@ public RabbitBus(BusConfiguration configuration)
this.cancellationTokenSource = new CancellationTokenSource();
var completion = new TaskCompletionSource<object>();
completion.SetResult(new object());
this.workTask = completion.Task;

this.connectionPool = new RabbitConnectionPool(this);
this.workTask = completion.Task;

var async = BusConfigurationEx.RmqUseAsyncConsuming;
this.connectionPool = new RabbitConnectionPool(this, async);
}

/// <summary>
Expand Down Expand Up @@ -305,4 +306,4 @@ private void OnListenerCreated(object sender, ListenerCreatedEventArgs e)
.ForEach(r => r.CheckIfCompatible(e.Listener));
}
}
}
}
Loading