diff --git a/Sources/Contour/Contour.csproj b/Sources/Contour/Contour.csproj index bc10bdd..d00b97e 100644 --- a/Sources/Contour/Contour.csproj +++ b/Sources/Contour/Contour.csproj @@ -14,7 +14,7 @@ - + diff --git a/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs b/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs index fcc4219..e816fb1 100644 --- a/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs +++ b/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using Contour.Configuration; @@ -127,5 +127,14 @@ public static IBusConfigurator UseRabbitMq(this IBusConfigurator busConfigurator return busConfigurator; } + + public static bool RmqUseAsyncConsuming { get; private set; } + + // Костыль, чтобы не тянеть в общую конфигурацию специфичную для эксперимента настройку + public static IBusConfiguration ConfigureRabbitMq(this IBusConfiguration busConfigurator, bool asyncConsuming) + { + RmqUseAsyncConsuming = asyncConsuming; + return busConfigurator; + } } } diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/Listener.cs b/Sources/Contour/Transport/RabbitMQ/Internal/Listener.cs index 68306ff..e371967 100644 --- a/Sources/Contour/Transport/RabbitMQ/Internal/Listener.cs +++ b/Sources/Contour/Transport/RabbitMQ/Internal/Listener.cs @@ -1,4 +1,4 @@ -using Contour.Transport.RabbitMQ.Topology; +using Contour.Transport.RabbitMQ.Topology; using RabbitMQ.Client; namespace Contour.Transport.RabbitMQ.Internal @@ -464,7 +464,32 @@ private async Task ConsumerTaskMethod(CancellationToken token) { try { - var consumer = this.InitializeConsumer(token, out var channel); + Func startConsuming; + if (BusConfigurationEx.RmqUseAsyncConsuming) + { + var consumer = this.InitializeAsyncConsumer(token, out var channel); + startConsuming = () => + { + this.StartConsumingAsync(consumer, channel, token); + return Task.CompletedTask; + }; + } + else + { + var consumer = this.InitializeConsumer(token, out var channel); + startConsuming = async () => + { + this.StartConsuming(consumer, channel); + + this.logger.Info($"Listner {this} start consuming."); + + while (!token.IsCancellationRequested) + { + var message = consumer.Dequeue(); + await this.Deliver(this.BuildDeliveryFrom(channel, message)); + } + }; + } var waitSecond = 0; // если шина так и не стала готова работать, то не смысла начинать слушать сообщения, что бы потом их потерять @@ -486,17 +511,9 @@ private async Task ConsumerTaskMethod(CancellationToken token) } } - this.StartConsuming(consumer, channel); - - this.logger.Info($"Listner {this} start consuming."); - - while (!token.IsCancellationRequested) - { - var message = consumer.Dequeue(); - await this.Deliver(this.BuildDeliveryFrom(channel, message)); - } + await startConsuming(); } - catch (OperationCanceledException) + catch (OperationCanceledException e) when (e.CancellationToken == token) { this.logger.Info("Consume operation of listener has been canceled"); } @@ -527,8 +544,8 @@ private Expectation CreateExpectation(string correlationId, Type expectedRespons } return new Expectation(d => this.BuildResponse(d, expectedResponseType), timeoutTicket); - } - + } + private CancellableQueueingConsumer InitializeConsumer(CancellationToken token, out RabbitChannel channel) { // Opening a new channel may lead to a new connection creation @@ -542,21 +559,97 @@ private CancellableQueueingConsumer InitializeConsumer(CancellationToken token, this.ReceiverOptions.GetQoS().Value); } - var consumer = channel.BuildCancellableConsumer(token); - + var consumer = channel.BuildCancellableConsumer(token); + + + return consumer; + } + + private void StartConsuming(IBasicConsumer consumer, RabbitChannel channel) + { + var tag = channel.StartConsuming( + this.endpoint.ListeningSource, + this.ReceiverOptions.IsAcceptRequired(), + consumer); + + this.logger.Trace( + $"A consumer tagged [{tag}] has been registered in listener of [{string.Join(",", this.AcceptedLabels)}]"); + } + + + private AsyncEventingBasicConsumer InitializeAsyncConsumer(CancellationToken token, out RabbitChannel channel) + { + // Opening a new channel may lead to a new connection creation + channel = this.connection.OpenChannel(token); + channel.Shutdown += this.OnChannelShutdown; + this.channels.Add(channel); + + if (this.ReceiverOptions.GetQoS().HasValue) + { + channel.SetQos( + this.ReceiverOptions.GetQoS().Value); + } + + var consumer = channel.BuildConsumer(); return consumer; } - private void StartConsuming(IBasicConsumer consumer, RabbitChannel channel) + private void StartConsumingAsync(AsyncEventingBasicConsumer consumer, RabbitChannel channel, CancellationToken token) { - var tag = channel.StartConsuming( + consumer.Received += (s, e) => HandleMessage(e); + + string tag = string.Empty; + + async Task HandleMessage(BasicDeliverEventArgs args) + { + RabbitDelivery delivery = null; + try + { + delivery = this.BuildDeliveryFrom(channel, args); + } + catch (Exception e) + { + this.logger.Fatal(x => x("Delivery object has not been constructed, fetch a follow messages of the consumer for '{0}' is unpossible.", this.endpoint.ListeningSource.Address), e); + throw; + } + + try + { + await this.Deliver(delivery); + } + catch (Exception e) + { + this.OnFailure(delivery, e); + } + } + + + CancellationTokenRegistration cancellationCallbackRegistration = default; + + void UnsubscribeConsumer() + { + channel.StopConsuming(tag); + cancellationCallbackRegistration.Dispose(); + } + + cancellationCallbackRegistration = token.Register(UnsubscribeConsumer); + + if (token.IsCancellationRequested) + { + UnsubscribeConsumer(); + return; + } + + tag = channel.StartConsuming( this.endpoint.ListeningSource, this.ReceiverOptions.IsAcceptRequired(), consumer); this.logger.Trace( $"A consumer tagged [{tag}] has been registered in listener of [{string.Join(",", this.AcceptedLabels)}]"); + + this.logger.Info($"Listner {this} start consuming."); } private void OnChannelShutdown(IChannel channel, ShutdownEventArgs args) diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBus.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBus.cs index 3aefdb2..b7774a2 100644 --- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBus.cs +++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBus.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -37,9 +37,10 @@ public RabbitBus(BusConfiguration configuration) this.cancellationTokenSource = new CancellationTokenSource(); var completion = new TaskCompletionSource(); completion.SetResult(new object()); - this.workTask = completion.Task; - - this.connectionPool = new RabbitConnectionPool(this); + this.workTask = completion.Task; + + var async = BusConfigurationEx.RmqUseAsyncConsuming; + this.connectionPool = new RabbitConnectionPool(this, async); } /// @@ -305,4 +306,4 @@ private void OnListenerCreated(object sender, ListenerCreatedEventArgs e) .ForEach(r => r.CheckIfCompatible(e.Listener)); } } -} \ No newline at end of file +} diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs index eda5b21..892bfae 100644 --- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs +++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs @@ -1,442 +1,464 @@ -namespace Contour.Transport.RabbitMQ.Internal -{ - using System; - using System.Collections.Generic; - using System.IO; - using System.Threading; - using Common.Logging; - using Helpers; - using Receiving; - using Topology; - using global::RabbitMQ.Client; - using System.Diagnostics; - - /// - /// The rabbit channel. - /// - internal sealed class RabbitChannel : IChannel - { - private readonly object sync = new object(); - private readonly IBusContext busContext; - - private readonly ILog logger; - - /// - /// Initializes a new instance of the class. - /// - /// A connection identifier to which this channel belongs - /// A native transport channel - /// A bus context - /// Used connection string - /// Key by connection string - public RabbitChannel(Guid connectionId, IModel model, IBusContext busContext, string connectionString, string connectionKey) - { - this.ConnectionId = connectionId; - this.Model = model; - this.busContext = busContext; - this.ConnectionString = connectionString; - this.logger = LogManager.GetLogger($"{this.GetType().FullName}({this.ConnectionId}, {this.GetHashCode()})"); - this.ConnectionKey = connectionKey; - - this.Model.ModelShutdown += this.OnModelShutdown; - } - - /// - /// Строка подключения - /// - internal string ConnectionString { get; } - - public Guid ConnectionId { get; } - - public string ConnectionKey { get; } - - /// - /// The failed. - /// - [Obsolete("Channel failures are no longer propagated outside via events, instead an exception is thrown")] - public event Action Failed = (channel, args) => { }; - - /// - /// Is fired on channel shutdown. - /// - public event Action Shutdown = (channel, args) => { }; - - /// - /// Gets the native. - /// - protected IModel Model { get; } - - /// - /// Aborts the channel - /// - public void Abort() - { - this.Model?.Abort(); - } - - /// - /// The accept. - /// - /// - /// The delivery. - /// - public void Accept(RabbitDelivery delivery) - { - this.logger.Trace(m => m("Accepting message [{0}] ({1}).", delivery.Label, delivery.Args.DeliveryTag)); - - this.SafeNativeInvoke(n => n.BasicAck(delivery.Args.DeliveryTag, false)); - } - - /// - /// The bind. - /// - /// - /// The queue. - /// - /// - /// The exchange. - /// - /// - /// The routing key. - /// - public void Bind(Queue queue, Exchange exchange, string routingKey) - { - try - { - this.SafeNativeInvoke(n => n.QueueBind(queue.Name, exchange.Name, routingKey)); - } - catch (Exception e) - { - this.logger.Error(m => m("Failed to bind queue to exchange [{1}:{2}] on channel: {0}", this.ToString(), exchange.Name, queue.Name), e); - throw; - } - } - - /// - /// The build cancellable consumer. - /// - /// - /// The cancellation token. - /// - /// - /// The . - /// - public CancellableQueueingConsumer BuildCancellableConsumer(CancellationToken cancellationToken) - { - return new CancellableQueueingConsumer(this.Model, cancellationToken); - } - - /// - /// The declare. - /// - /// - /// The exchange. - /// - public void Declare(Exchange exchange) - { - try - { - this.SafeNativeInvoke(n => n.ExchangeDeclare(exchange.Name, exchange.Type, exchange.Durable, exchange.AutoDelete, new Dictionary())); - } - catch (Exception e) - { - this.logger.Error(m => m("Failed to Declare Exchange [{1}] on channel: {0}", this.ToString(), exchange.Name), e); - throw; - } - } - - /// - /// The declare. - /// - /// - /// The queue. - /// - public void Declare(Queue queue) - { - try - { - var arguments = new Dictionary(); - if (queue.Ttl.HasValue) - { - arguments.Add(Contour.Headers.QueueMessageTtl, (long)queue.Ttl.Value.TotalMilliseconds); - } - - if (queue.Limit.HasValue) - { - arguments.Add(Contour.Headers.QueueMaxLength, (int)queue.Limit); - } - - if (queue.MaxLengthBytes.HasValue) - { - arguments.Add(Contour.Headers.QueueMaxLengthBytes, (int)queue.MaxLengthBytes); - } - - this.SafeNativeInvoke(n => - n.QueueDeclare(queue.Name, queue.Durable, queue.Exclusive, queue.AutoDelete, arguments)); - } - catch (Exception e) - { - this.logger.Error(m => m("Failed to Declare Queue [{1}] on channel: {0}", this.ToString(), queue.Name), - e); - throw; - } - } - - /// - /// The declare default queue. - /// - /// - /// The . - /// - public string DeclareDefaultQueue() - { - try - { - var queueName = string.Empty; - - this.SafeNativeInvoke(n => queueName = n.QueueDeclare()); - - return queueName; - } - catch (Exception e) - { - this.logger.Error(m => m("Failed to Declare Default Queue on channel: {0}", this.ToString()), e); - throw; - } - } - - /// - /// The dispose. - /// - public void Dispose() - { - if (this.Model != null) - { - this.Model.ModelShutdown -= this.OnModelShutdown; - - try - { - this.Model.Abort(); - this.Model.Dispose(); - } - catch (Exception) - { - // Suppress all errors here - } - } - } - - /// - /// Enable publish confirmation. - /// - public void EnablePublishConfirmation() - { - this.SafeNativeInvoke(n => n.ConfirmSelect()); - } - - /// - /// Get next seq no. - /// - /// - /// The . - /// - public ulong GetNextSeqNo() - { - ulong seqNo = 0UL; - this.SafeNativeInvoke(n => seqNo = n.NextPublishSeqNo); - return seqNo; - } - - /// - /// The on confirmation. - /// - /// - /// The handle confirmation. - /// - public void OnConfirmation(ConfirmationHandler handleConfirmation) - { - this.SafeNativeInvoke( - n => - { - n.BasicAcks += (model, args) => handleConfirmation(true, args.DeliveryTag, args.Multiple); - n.BasicNacks += (model, args) => handleConfirmation(false, args.DeliveryTag, args.Multiple); - }); - } - - /// - /// The publish. - /// - /// - /// The route. - /// - /// - /// The message. - /// - /// - /// The props visitor. - /// - public void Publish(IRoute route, IMessage message, Func> propsVisitor = null) - { - var nativeRoute = (RabbitRoute)route; - - this.logger.Trace(m => m("Emitting message [{0}] ({1}) through [{2}].", message.Label, message.Payload, nativeRoute)); - - var props = this.Model.CreateBasicProperties(); - var body = this.busContext.PayloadConverter.FromObject(message.Payload); - - if (props.Headers == null) - { - props.Headers = new Dictionary(); - } - - props.ContentType = this.busContext.PayloadConverter.ContentType; - props.Timestamp = new AmqpTimestamp(DateTime.UtcNow.ToUnixTimestamp()); - - var headers = propsVisitor?.Invoke(props); - headers.ForEach(i => props.Headers.Add(i)); - - this.busContext.MessageLabelHandler.Inject(props, message.Label); - - DiagnosticProps.Store(DiagnosticProps.Names.LastPublishAttemptConnectionString, this.ConnectionString); - - this.SafeNativeInvoke(n => n.BasicPublish(nativeRoute.Exchange, nativeRoute.RoutingKey, false, props, body)); - } - - /// - /// The reject. - /// - /// - /// The delivery. - /// - /// - /// The requeue. - /// - public void Reject(RabbitDelivery delivery, bool requeue) - { - this.logger.Trace(m => m("Rejecting message [{0}] ({1}).", delivery.Label, delivery.Args.DeliveryTag)); - - this.SafeNativeInvoke(n => n.BasicNack(delivery.Args.DeliveryTag, false, requeue)); - } - - /// - /// The reply. - /// - /// - /// The message. - /// - /// - /// The reply to. - /// - /// - /// The correlation id. - /// - public void Reply(IMessage message, RabbitRoute replyTo, string correlationId) - { - Func> propsVisitor = p => - { - p.CorrelationId = correlationId; - return message.Headers; - }; - - this.Publish(new RabbitRoute(replyTo.Exchange, replyTo.RoutingKey), message, propsVisitor); - } - - /// - /// The set qos. - /// - /// - /// The qos. - /// - public void SetQos(QoSParams qos) - { - try - { - this.SafeNativeInvoke(n => n.BasicQos(qos.PrefetchSize, qos.PrefetchCount, false)); - } - catch (Exception e) - { - this.logger.Error(m => m("Failed to set Qos on channel: {0}", this.ToString()), e); - throw; - } - } - - /// - /// The start consuming. - /// - /// - /// The listening source. - /// - /// - /// The require accept. - /// - /// - /// The consumer. - /// - /// - /// The . - /// - public string StartConsuming(IListeningSource listeningSource, bool requireAccept, IBasicConsumer consumer) - { - try - { - var consumerTag = string.Empty; - - this.SafeNativeInvoke(n => consumerTag = n.BasicConsume(listeningSource.Address, !requireAccept, consumer)); - - return consumerTag; - } - catch (Exception e) - { - this.logger.Error(m => m("Failed start consuming on channel."), e); - throw; - } - } - - /// - /// The unpack as. - /// - /// - /// The type. - /// - /// - /// The delivery. - /// - /// - /// The . - /// - public IMessage UnpackAs(Type type, RabbitDelivery delivery) - { - var payload = this.busContext.PayloadConverter.ToObject(delivery.Args.Body.ToArray(), type); - return new Message(delivery.Label, delivery.Headers, payload); - } - - private void OnModelShutdown(object sender, ShutdownEventArgs args) - { - this.logger.Trace($"Channel is closed due to '{args.ReplyText}'"); - this.Shutdown(this, args); - } - - /// - /// The safe native invoke. - /// - /// - /// The invoke action. - /// - private void SafeNativeInvoke(Action invokeAction) - { - try - { - lock (this.sync) - { - this.logger.Trace($"Performing channel action [{invokeAction.Method.Name}]"); - invokeAction(this.Model); - } - } - catch (Exception ex) - { - this.logger.Error($"Channel action failed due to {ex.Message}, connection string: [{this.ConnectionString}]", ex); - throw; - } - } - } -} +using RabbitMQ.Client.Events; + +namespace Contour.Transport.RabbitMQ.Internal +{ + using System; + using System.Collections.Generic; + using System.IO; + using Common.Logging; + using Helpers; + using Receiving; + using Topology; + using global::RabbitMQ.Client; + using System.Threading; + + /// + /// The rabbit channel. + /// + internal sealed class RabbitChannel : IChannel + { + private readonly object sync = new object(); + private readonly IBusContext busContext; + + private readonly ILog logger; + + /// + /// Initializes a new instance of the class. + /// + /// A connection identifier to which this channel belongs + /// A native transport channel + /// A bus context + /// Used connection string + /// Key by connection string + public RabbitChannel(Guid connectionId, IModel model, IBusContext busContext, string connectionString, string connectionKey) + { + this.ConnectionId = connectionId; + this.Model = model; + this.busContext = busContext; + this.ConnectionString = connectionString; + this.logger = LogManager.GetLogger($"{this.GetType().FullName}({this.ConnectionId}, {this.GetHashCode()})"); + this.ConnectionKey = connectionKey; + + this.Model.ModelShutdown += this.OnModelShutdown; + } + + /// + /// Строка подключения + /// + internal string ConnectionString { get; } + + public Guid ConnectionId { get; } + + public string ConnectionKey { get; } + + /// + /// The failed. + /// + [Obsolete("Channel failures are no longer propagated outside via events, instead an exception is thrown")] + public event Action Failed = (channel, args) => { }; + + /// + /// Is fired on channel shutdown. + /// + public event Action Shutdown = (channel, args) => { }; + + /// + /// Gets the native. + /// + protected IModel Model { get; } + + /// + /// Aborts the channel + /// + public void Abort() + { + this.Model?.Abort(); + } + + /// + /// The accept. + /// + /// + /// The delivery. + /// + public void Accept(RabbitDelivery delivery) + { + this.logger.Trace(m => m("Accepting message [{0}] ({1}).", delivery.Label, delivery.Args.DeliveryTag)); + + this.SafeNativeInvoke(n => n.BasicAck(delivery.Args.DeliveryTag, false)); + } + + /// + /// The bind. + /// + /// + /// The queue. + /// + /// + /// The exchange. + /// + /// + /// The routing key. + /// + public void Bind(Queue queue, Exchange exchange, string routingKey) + { + try + { + this.SafeNativeInvoke(n => n.QueueBind(queue.Name, exchange.Name, routingKey)); + } + catch (Exception e) + { + this.logger.Error(m => m("Failed to bind queue to exchange [{1}:{2}] on channel: {0}", this.ToString(), exchange.Name, queue.Name), e); + throw; + } + } + + /// + /// The build AsyncEventingBasicConsumer consumer. + /// + /// + /// The . + /// + public AsyncEventingBasicConsumer BuildConsumer() + { + return new AsyncEventingBasicConsumer(this.Model); + } + + + /// + /// The build cancellable consumer. + /// + /// + /// The cancellation token. + /// + /// + /// The . + /// + public CancellableQueueingConsumer BuildCancellableConsumer(CancellationToken cancellationToken) + { + return new CancellableQueueingConsumer(this.Model, cancellationToken); + } + + /// + /// The declare. + /// + /// + /// The exchange. + /// + public void Declare(Exchange exchange) + { + try + { + this.SafeNativeInvoke(n => n.ExchangeDeclare(exchange.Name, exchange.Type, exchange.Durable, exchange.AutoDelete, new Dictionary())); + } + catch (Exception e) + { + this.logger.Error(m => m("Failed to Declare Exchange [{1}] on channel: {0}", this.ToString(), exchange.Name), e); + throw; + } + } + + /// + /// The declare. + /// + /// + /// The queue. + /// + public void Declare(Queue queue) + { + try + { + var arguments = new Dictionary(); + if (queue.Ttl.HasValue) + { + arguments.Add(Contour.Headers.QueueMessageTtl, (long)queue.Ttl.Value.TotalMilliseconds); + } + + if (queue.Limit.HasValue) + { + arguments.Add(Contour.Headers.QueueMaxLength, (int)queue.Limit); + } + + if (queue.MaxLengthBytes.HasValue) + { + arguments.Add(Contour.Headers.QueueMaxLengthBytes, (int)queue.MaxLengthBytes); + } + + this.SafeNativeInvoke(n => + n.QueueDeclare(queue.Name, queue.Durable, queue.Exclusive, queue.AutoDelete, arguments)); + } + catch (Exception e) + { + this.logger.Error(m => m("Failed to Declare Queue [{1}] on channel: {0}", this.ToString(), queue.Name), + e); + throw; + } + } + + /// + /// The declare default queue. + /// + /// + /// The . + /// + public string DeclareDefaultQueue() + { + try + { + var queueName = string.Empty; + + this.SafeNativeInvoke(n => queueName = n.QueueDeclare()); + + return queueName; + } + catch (Exception e) + { + this.logger.Error(m => m("Failed to Declare Default Queue on channel: {0}", this.ToString()), e); + throw; + } + } + + /// + /// The dispose. + /// + public void Dispose() + { + if (this.Model != null) + { + this.Model.ModelShutdown -= this.OnModelShutdown; + + try + { + this.Model.Abort(); + this.Model.Dispose(); + } + catch (Exception) + { + // Suppress all errors here + } + } + } + + /// + /// Enable publish confirmation. + /// + public void EnablePublishConfirmation() + { + this.SafeNativeInvoke(n => n.ConfirmSelect()); + } + + /// + /// Get next seq no. + /// + /// + /// The . + /// + public ulong GetNextSeqNo() + { + ulong seqNo = 0UL; + this.SafeNativeInvoke(n => seqNo = n.NextPublishSeqNo); + return seqNo; + } + + /// + /// The on confirmation. + /// + /// + /// The handle confirmation. + /// + public void OnConfirmation(ConfirmationHandler handleConfirmation) + { + this.SafeNativeInvoke( + n => + { + n.BasicAcks += (model, args) => handleConfirmation(true, args.DeliveryTag, args.Multiple); + n.BasicNacks += (model, args) => handleConfirmation(false, args.DeliveryTag, args.Multiple); + }); + } + + /// + /// The publish. + /// + /// + /// The route. + /// + /// + /// The message. + /// + /// + /// The props visitor. + /// + public void Publish(IRoute route, IMessage message, Func> propsVisitor = null) + { + var nativeRoute = (RabbitRoute)route; + + this.logger.Trace(m => m("Emitting message [{0}] ({1}) through [{2}].", message.Label, message.Payload, nativeRoute)); + + var props = this.Model.CreateBasicProperties(); + var body = this.busContext.PayloadConverter.FromObject(message.Payload); + + if (props.Headers == null) + { + props.Headers = new Dictionary(); + } + + props.ContentType = this.busContext.PayloadConverter.ContentType; + props.Timestamp = new AmqpTimestamp(DateTime.UtcNow.ToUnixTimestamp()); + + var headers = propsVisitor?.Invoke(props); + headers.ForEach(i => props.Headers.Add(i)); + + this.busContext.MessageLabelHandler.Inject(props, message.Label); + + DiagnosticProps.Store(DiagnosticProps.Names.LastPublishAttemptConnectionString, this.ConnectionString); + + this.SafeNativeInvoke(n => n.BasicPublish(nativeRoute.Exchange, nativeRoute.RoutingKey, false, props, body)); + } + + /// + /// The reject. + /// + /// + /// The delivery. + /// + /// + /// The requeue. + /// + public void Reject(RabbitDelivery delivery, bool requeue) + { + this.logger.Trace(m => m("Rejecting message [{0}] ({1}).", delivery.Label, delivery.Args.DeliveryTag)); + + this.SafeNativeInvoke(n => n.BasicNack(delivery.Args.DeliveryTag, false, requeue)); + } + + /// + /// The reply. + /// + /// + /// The message. + /// + /// + /// The reply to. + /// + /// + /// The correlation id. + /// + public void Reply(IMessage message, RabbitRoute replyTo, string correlationId) + { + Func> propsVisitor = p => + { + p.CorrelationId = correlationId; + return message.Headers; + }; + + this.Publish(new RabbitRoute(replyTo.Exchange, replyTo.RoutingKey), message, propsVisitor); + } + + /// + /// The set qos. + /// + /// + /// The qos. + /// + public void SetQos(QoSParams qos) + { + try + { + this.SafeNativeInvoke(n => n.BasicQos(qos.PrefetchSize, qos.PrefetchCount, false)); + } + catch (Exception e) + { + this.logger.Error(m => m("Failed to set Qos on channel: {0}", this.ToString()), e); + throw; + } + } + + /// + /// The start consuming. + /// + /// + /// The listening source. + /// + /// + /// The require accept. + /// + /// + /// The consumer. + /// + /// + /// The . + /// + public string StartConsuming(IListeningSource listeningSource, bool requireAccept, IBasicConsumer consumer) + { + try + { + var consumerTag = string.Empty; + + this.SafeNativeInvoke(n => consumerTag = n.BasicConsume(listeningSource.Address, !requireAccept, consumer)); + + return consumerTag; + } + catch (Exception e) + { + this.logger.Error(m => m("Failed start consuming on channel."), e); + throw; + } + } + + /// + /// Unsubscribe consumer from queue via subscription ID. + /// + /// Subscription ID + public void StopConsuming(string tag) + { + this.Model.BasicCancel(tag); + } + + /// + /// The unpack as. + /// + /// + /// The type. + /// + /// + /// The delivery. + /// + /// + /// The . + /// + public IMessage UnpackAs(Type type, RabbitDelivery delivery) + { + var payload = this.busContext.PayloadConverter.ToObject(delivery.Args.Body.ToArray(), type); + return new Message(delivery.Label, delivery.Headers, payload); + } + + private void OnModelShutdown(object sender, ShutdownEventArgs args) + { + this.logger.Trace($"Channel is closed due to '{args.ReplyText}'"); + this.Shutdown(this, args); + } + + /// + /// The safe native invoke. + /// + /// + /// The invoke action. + /// + private void SafeNativeInvoke(Action invokeAction) + { + try + { + lock (this.sync) + { + this.logger.Trace($"Performing channel action [{invokeAction.Method.Name}]"); + invokeAction(this.Model); + } + } + catch (Exception ex) + { + this.logger.Error($"Channel action failed due to {ex.Message}, connection string: [{this.ConnectionString}]", ex); + throw; + } + } + } +} diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnection.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnection.cs index facc01a..ed61c7e 100644 --- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnection.cs +++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnection.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.IO; using System.Linq; @@ -27,7 +27,7 @@ internal class RabbitConnection : IRabbitConnection private readonly ConnectionFactory connectionFactory; private INativeConnection connection; - public RabbitConnection(IEndpoint endpoint, string connectionString, IBusContext busContext) + public RabbitConnection(IEndpoint endpoint, string connectionString, IBusContext busContext, bool async = false) { this.Id = Guid.NewGuid(); this.endpoint = endpoint; @@ -49,8 +49,14 @@ public RabbitConnection(IEndpoint endpoint, string connectionString, IBusContext Uri = new Uri(this.ConnectionString), AutomaticRecoveryEnabled = false, ClientProperties = clientProperties, - RequestedConnectionTimeout = TimeSpan.FromMilliseconds(ConnectionTimeout) + RequestedConnectionTimeout = TimeSpan.FromMilliseconds(ConnectionTimeout), }; + + if (async) + { + this.connectionFactory.DispatchConsumersAsync = true; + this.connectionFactory.ConsumerDispatchConcurrency = 1; + } } public event EventHandler Opened; @@ -258,4 +264,4 @@ private static string GetConnectionKey(string connectionString) return $"{ip}:{uri.Port}:{uri.Segments.Last()}"; } } -} \ No newline at end of file +} diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnectionPool.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnectionPool.cs index 976617e..c680e17 100644 --- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnectionPool.cs +++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnectionPool.cs @@ -2,9 +2,9 @@ namespace Contour.Transport.RabbitMQ.Internal { internal class RabbitConnectionPool : ConnectionPool { - public RabbitConnectionPool(IBusContext context) + public RabbitConnectionPool(IBusContext context, bool asyncConsuming) { - this.Provider = new RabbitConnectionProvider(context); + this.Provider = new RabbitConnectionProvider(context, asyncConsuming); } } -} \ No newline at end of file +} diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnectionProvider.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnectionProvider.cs index 01764d9..7454156 100644 --- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnectionProvider.cs +++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitConnectionProvider.cs @@ -1,4 +1,4 @@ -using Common.Logging; +using Common.Logging; namespace Contour.Transport.RabbitMQ.Internal { @@ -7,17 +7,19 @@ internal class RabbitConnectionProvider : IConnectionProvider private readonly ILog logger = LogManager.GetLogger(); private readonly IEndpoint endpoint; private readonly IBusContext context; + private readonly bool asyncConsuming; - public RabbitConnectionProvider(IBusContext context) + public RabbitConnectionProvider(IBusContext context, bool asyncConsuming) { this.endpoint = context.Endpoint; - this.context = context; + this.context = context; + this.asyncConsuming = asyncConsuming; } public IRabbitConnection Create(string connectionString) { this.logger.Trace($"Creating a new connection for endpoint [{this.endpoint}] at [{connectionString}]"); - return new RabbitConnection(this.endpoint, connectionString, this.context); + return new RabbitConnection(this.endpoint, connectionString, this.context, asyncConsuming); } } } diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitDelivery.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitDelivery.cs index 7a567f0..cafcc0b 100644 --- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitDelivery.cs +++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitDelivery.cs @@ -1,7 +1,6 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; -using System.Globalization; using System.Threading.Tasks; using Contour.Configuration; @@ -247,7 +246,9 @@ public IMessage UnpackAs(Type type) /// Заголовки сообщения. private IDictionary ExtractHeadersFrom(BasicDeliverEventArgs args) { - var h = new Dictionary(args.BasicProperties.Headers); + var h = args.BasicProperties.Headers != null + ? new Dictionary(args.BasicProperties.Headers) + : new Dictionary(); if (this.CorrelationId != null) { @@ -262,4 +263,4 @@ private IDictionary ExtractHeadersFrom(BasicDeliverEventArgs arg return h; } } -} \ No newline at end of file +}