diff --git a/Sources/Contour/Configurator/AppConfigConfigurator.cs b/Sources/Contour/Configurator/AppConfigConfigurator.cs index aa33da4..c83b49a 100644 --- a/Sources/Contour/Configurator/AppConfigConfigurator.cs +++ b/Sources/Contour/Configurator/AppConfigConfigurator.cs @@ -240,6 +240,11 @@ public IBusConfigurator Configure(string endpointName, IBusConfigurator cfg) configurator.Delayed(); } + if (outgoingElement.Direct) + { + configurator.Direct(); + } + // Connection string var connectionString = endpointConfig.ConnectionString; if (!string.IsNullOrEmpty(outgoingElement.ConnectionString)) @@ -342,6 +347,11 @@ public IBusConfigurator Configure(string endpointName, IBusConfigurator cfg) configurator.Delayed(); } + if (incomingElement.Direct) + { + configurator.Direct(incomingElement.DirectId); + } + // Connection string var connectionString = endpointConfig.ConnectionString; if (!string.IsNullOrEmpty(incomingElement.ConnectionString)) diff --git a/Sources/Contour/Configurator/Configuration/IIncoming.cs b/Sources/Contour/Configurator/Configuration/IIncoming.cs index 8041911..e65a407 100644 --- a/Sources/Contour/Configurator/Configuration/IIncoming.cs +++ b/Sources/Contour/Configurator/Configuration/IIncoming.cs @@ -25,5 +25,9 @@ public interface IIncoming : IMessage int? QueueMaxLengthBytes { get; } bool Delayed { get; } + + bool Direct { get; } + + string DirectId { get; } } } \ No newline at end of file diff --git a/Sources/Contour/Configurator/Configuration/IOutgoing.cs b/Sources/Contour/Configurator/Configuration/IOutgoing.cs index 61209a6..e582347 100644 --- a/Sources/Contour/Configurator/Configuration/IOutgoing.cs +++ b/Sources/Contour/Configurator/Configuration/IOutgoing.cs @@ -20,6 +20,8 @@ public interface IOutgoing : IMessage bool Delayed { get; } + bool Direct { get; } + TimeSpan? ConfirmTimeout { get; } } } \ No newline at end of file diff --git a/Sources/Contour/Configurator/IncomingElement.cs b/Sources/Contour/Configurator/IncomingElement.cs index dbfb6c0..3d33197 100644 --- a/Sources/Contour/Configurator/IncomingElement.cs +++ b/Sources/Contour/Configurator/IncomingElement.cs @@ -117,6 +117,12 @@ public int? QueueMaxLengthBytes [ConfigurationProperty("delayed")] public bool Delayed => (bool)base["delayed"]; + [ConfigurationProperty("direct")] + public bool Direct => (bool)base["direct"]; + + [ConfigurationProperty("directId")] + public string DirectId => (string)base["directId"]; + IQos IIncoming.Qos => this.Qos; } } diff --git a/Sources/Contour/Configurator/OutgoingElement.cs b/Sources/Contour/Configurator/OutgoingElement.cs index b098b5b..a4c2aad 100644 --- a/Sources/Contour/Configurator/OutgoingElement.cs +++ b/Sources/Contour/Configurator/OutgoingElement.cs @@ -94,6 +94,9 @@ public bool? ReuseConnection [ConfigurationProperty("delayed", DefaultValue = false)] public bool Delayed => (bool)base["delayed"]; + [ConfigurationProperty("direct")] + public bool Direct => (bool)base["direct"]; + /// /// Gets confirmation timeout on publishing. /// diff --git a/Sources/Contour/Headers.cs b/Sources/Contour/Headers.cs index 8492c9b..3d567cd 100644 --- a/Sources/Contour/Headers.cs +++ b/Sources/Contour/Headers.cs @@ -53,6 +53,10 @@ public class Headers /// public static readonly string Delay = "x-delay"; + public static readonly string DirectId = "direct-id"; + + public static readonly string MatchHeaders = "x-match"; + /// /// Время жизни сообщений в очереди. /// @@ -219,7 +223,12 @@ public static void ApplyDelay(Dictionary headers, TimeSpan? dela headers[Delay] = delay.Value.TotalMilliseconds; } } - + + public static void ApplyDirectId(Dictionary headers, string directId) + { + headers[DirectId] = directId; + } + /// /// Применяет к коллекции заголовков установку заголовка Ttl. /// diff --git a/Sources/Contour/Receiving/IReceiverConfigurator.cs b/Sources/Contour/Receiving/IReceiverConfigurator.cs index 0e27fa6..cdc94b4 100644 --- a/Sources/Contour/Receiving/IReceiverConfigurator.cs +++ b/Sources/Contour/Receiving/IReceiverConfigurator.cs @@ -92,6 +92,8 @@ public interface IReceiverConfigurator /// /// Конфигуратор получателя. IReceiverConfigurator Delayed(); + + IReceiverConfigurator Direct(string id); } /// diff --git a/Sources/Contour/Receiving/ReceiverConfiguration.cs b/Sources/Contour/Receiving/ReceiverConfiguration.cs index 2785b10..dc061c6 100644 --- a/Sources/Contour/Receiving/ReceiverConfiguration.cs +++ b/Sources/Contour/Receiving/ReceiverConfiguration.cs @@ -205,5 +205,13 @@ public IReceiverConfigurator Delayed() return this; } + + public IReceiverConfigurator Direct(string id) + { + this.Options.Direct = true; + this.Options.DirectId = id; + + return this; + } } } diff --git a/Sources/Contour/Receiving/ReceiverOptions.cs b/Sources/Contour/Receiving/ReceiverOptions.cs index 93df706..bd4730a 100644 --- a/Sources/Contour/Receiving/ReceiverOptions.cs +++ b/Sources/Contour/Receiving/ReceiverOptions.cs @@ -76,6 +76,10 @@ public ReceiverOptions(BusOptions parent) public bool Delayed { get; set; } + public bool Direct { get; set; } + + public string DirectId { get; set; } + /// /// Создает новый экземпляр настроек как копию существующего. /// diff --git a/Sources/Contour/Receiving/TypedReceiverConfigurationDecorator.cs b/Sources/Contour/Receiving/TypedReceiverConfigurationDecorator.cs index 8974186..4d43db8 100644 --- a/Sources/Contour/Receiving/TypedReceiverConfigurationDecorator.cs +++ b/Sources/Contour/Receiving/TypedReceiverConfigurationDecorator.cs @@ -274,5 +274,10 @@ public IReceiverConfigurator Delayed() return this; } + + public IReceiverConfigurator Direct(string id) + { + return this.configuration.Direct(id); + } } } diff --git a/Sources/Contour/Sending/AbstractSender.cs b/Sources/Contour/Sending/AbstractSender.cs index b45a4d1..7ed46d4 100644 --- a/Sources/Contour/Sending/AbstractSender.cs +++ b/Sources/Contour/Sending/AbstractSender.cs @@ -319,6 +319,11 @@ private IDictionary ApplyOptions(PublishingOptions options) { Headers.ApplyDelay(outputHeaders, options.Delay); } + + if (this.Configuration.Options.Direct) + { + Headers.ApplyDirectId(outputHeaders, options.DirectId); + } Maybe ttl = BusOptions.Pick(options.Ttl, this.Configuration.Options.GetTtl()); Headers.ApplyTtl(outputHeaders, ttl); diff --git a/Sources/Contour/Sending/ISenderConfigurator.cs b/Sources/Contour/Sending/ISenderConfigurator.cs index f0dd078..0e46248 100644 --- a/Sources/Contour/Sending/ISenderConfigurator.cs +++ b/Sources/Contour/Sending/ISenderConfigurator.cs @@ -34,6 +34,8 @@ public interface ISenderConfigurator /// Sender configurator. ISenderConfiguration Delayed(); + ISenderConfigurator Direct(); + /// /// Устанавливает псевдоним метки отправляемого сообщения. /// diff --git a/Sources/Contour/Sending/PublishingOptions.cs b/Sources/Contour/Sending/PublishingOptions.cs index f29b732..2e4ab0c 100644 --- a/Sources/Contour/Sending/PublishingOptions.cs +++ b/Sources/Contour/Sending/PublishingOptions.cs @@ -37,6 +37,8 @@ public class PublishingOptions /// public TimeSpan? Delay { get; set; } + public string DirectId { get; set; } + /// /// Дополнительные заголовки /// diff --git a/Sources/Contour/Sending/SenderConfiguration.cs b/Sources/Contour/Sending/SenderConfiguration.cs index d4f5302..914980c 100644 --- a/Sources/Contour/Sending/SenderConfiguration.cs +++ b/Sources/Contour/Sending/SenderConfiguration.cs @@ -101,6 +101,13 @@ public ISenderConfiguration Delayed() return this; } + public ISenderConfigurator Direct() + { + this.Options.Direct = true; + + return this; + } + /// /// Проверяет корректность конфигурации. /// diff --git a/Sources/Contour/Sending/SenderOptions.cs b/Sources/Contour/Sending/SenderOptions.cs index 62db5d6..3f8dcd1 100644 --- a/Sources/Contour/Sending/SenderOptions.cs +++ b/Sources/Contour/Sending/SenderOptions.cs @@ -56,6 +56,8 @@ public SenderOptions(BusOptions parent) /// public bool Delayed { get; set; } + public bool Direct { get; set; } + /// /// Время ожидания подтверждения получения сообщения, null если требуется бесконечное ожидание. /// diff --git a/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs b/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs index 15951e9..e972d1e 100644 --- a/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs +++ b/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs @@ -60,7 +60,9 @@ public static IBusConfigurator UseRabbitMq(this IBusConfigurator busConfigurator Headers.Ttl, Headers.QueueMaxLength, Headers.QueueMaxLengthBytes, - Headers.Delay + Headers.Delay, + Headers.DirectId, + Headers.MatchHeaders }; var messageHeaderStorage = new MessageHeaderStorage(blockedHeaders); c.UseIncomingMessageHeaderStorage(messageHeaderStorage); diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs index a4be6c2..abe5df7 100644 --- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs +++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs @@ -2,6 +2,8 @@ using Contour.Receiving; using Contour.Sending; using Contour.Transport.RabbitMQ.Topology; +using System.Collections.Generic; +using Contour.Helpers; namespace Contour.Transport.RabbitMQ.Internal { @@ -35,9 +37,19 @@ private static IRouteResolver RouteResolverBuilderImpl(IRouteResolverBuilder bui { string label = builder.Sender.Label.Name; ExchangeBuilder exchangeBuilder = Exchange.Named(label).Durable; - exchangeBuilder = builder.Sender.Options.Delayed - ? exchangeBuilder.DelayedFanout - : exchangeBuilder.Fanout; + + if (builder.Sender.Options.Delayed) + { + exchangeBuilder = builder.Sender.Options.Direct + ? exchangeBuilder.DelayedHeaders + : exchangeBuilder.DelayedFanout; + } + else + { + exchangeBuilder = builder.Sender.Options.Direct + ? exchangeBuilder.Headers + : exchangeBuilder.Fanout; + } Exchange exchange = builder.Topology.Declare(exchangeBuilder); @@ -59,9 +71,22 @@ private static ISubscriptionEndpoint SubscriptionEndpointBuilderImpl(ISubscripti string queueName = builder.Endpoint.Address + "." + label; - var queueBuilder = Queue - .Named(queueName) - .Durable; + QueueBuilder queueBuilder; + + if (builder.Receiver.Options.Direct) + { + queueName += "." + builder.Receiver.Options.DirectId; + queueBuilder = Queue + .Named(queueName) + .AutoDelete + .Exclusive; + } + else + { + queueBuilder = Queue + .Named(queueName) + .Durable; + } var options = builder.Receiver.Options; @@ -77,13 +102,33 @@ private static ISubscriptionEndpoint SubscriptionEndpointBuilderImpl(ISubscripti Queue queue = builder.Topology.Declare(queueBuilder); ExchangeBuilder exchangeBuilder = Exchange.Named(label).Durable; - exchangeBuilder = builder.Receiver.Options.Delayed - ? exchangeBuilder.DelayedFanout - : exchangeBuilder.Fanout; + + if (builder.Receiver.Options.Delayed) + { + exchangeBuilder = builder.Receiver.Options.Direct + ? exchangeBuilder.DelayedHeaders + : exchangeBuilder.DelayedFanout; + } + else + { + exchangeBuilder = builder.Receiver.Options.Direct + ? exchangeBuilder.Headers + : exchangeBuilder.Fanout; + } Exchange exchange = builder.Topology.Declare(exchangeBuilder); - builder.Topology.Bind(exchange, queue); + Dictionary arguments = null; + if (builder.Receiver.Options.Direct) + { + arguments = new Dictionary() + { + { Headers.DirectId, builder.Receiver.Options.DirectId }, + { Headers.MatchHeaders, "all" }, + }; + } + + builder.Topology.Bind(exchange, queue, "", arguments); return builder.ListenTo(queue, exchange); } diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs index 86c1dee..7105354 100644 --- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs +++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs @@ -99,11 +99,11 @@ public void Accept(RabbitDelivery delivery) /// /// The routing key. /// - public void Bind(Queue queue, Exchange exchange, string routingKey) + public void Bind(Queue queue, Exchange exchange, string routingKey, IDictionary arguments = null) { try { - this.SafeNativeInvoke(n => n.QueueBind(queue.Name, exchange.Name, routingKey)); + this.SafeNativeInvoke(n => n.QueueBind(queue.Name, exchange.Name, routingKey, arguments)); } catch (Exception e) { diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/ExchangeBuilder.cs b/Sources/Contour/Transport/RabbitMQ/Topology/ExchangeBuilder.cs index b939897..577e67f 100644 --- a/Sources/Contour/Transport/RabbitMQ/Topology/ExchangeBuilder.cs +++ b/Sources/Contour/Transport/RabbitMQ/Topology/ExchangeBuilder.cs @@ -96,6 +96,15 @@ public ExchangeBuilder Topic } } + public ExchangeBuilder Headers + { + get + { + this.Instance.Type = ExchangeType.Headers; + return this; + } + } + public ExchangeBuilder DelayedDirect { get @@ -125,6 +134,16 @@ public ExchangeBuilder DelayedTopic return this; } } + + public ExchangeBuilder DelayedHeaders + { + get + { + this.Instance.Type = DelayedExchangeType; + this.Instance.Arguments[DelayedExchangeSubtypeArgumentName] = ExchangeType.Headers; + return this; + } + } #endregion diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs b/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs index 3890a91..725e868 100644 --- a/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs +++ b/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs @@ -54,13 +54,13 @@ internal Queue(string name) /// public TimeSpan? Ttl { get; internal set; } - /// - /// Максимальное количество сообщений в очереди. + /// + /// Максимальное количество сообщений в очереди. /// public int? Limit { get; internal set; } - /// - /// Максимальное количество байт, которое занимают сообщения в очереди. + /// + /// Максимальное количество байт, которое занимают сообщения в очереди. /// public int? MaxLengthBytes { get; internal set; } diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs b/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs index ceeeb19..d122f0d 100644 --- a/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs +++ b/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; namespace Contour.Transport.RabbitMQ.Topology { diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilder.cs b/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilder.cs index c8ba382..f6dee0e 100644 --- a/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilder.cs +++ b/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilder.cs @@ -3,6 +3,7 @@ using Contour.Receiving; using Contour.Topology; using Contour.Transport.RabbitMQ.Internal; +using System.Collections.Generic; namespace Contour.Transport.RabbitMQ.Topology { @@ -49,11 +50,11 @@ public TopologyBuilder(IChannelProvider channelProvider) /// /// Ключ маршрутизации, используется для определения очереди, в которую должно быть отправлено сообщение. /// - public void Bind(Exchange exchange, Queue queue, string routingKey = "") + public void Bind(Exchange exchange, Queue queue, string routingKey = "", IDictionary arguments = null) { using (var channel = (RabbitChannel)this.channelProvider.OpenChannel(CancellationToken.None)) { - channel.Bind(queue, exchange, routingKey); + channel.Bind(queue, exchange, routingKey, arguments); } } diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilderEx.cs b/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilderEx.cs index 0f964d4..b93c41d 100644 --- a/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilderEx.cs +++ b/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilderEx.cs @@ -7,6 +7,8 @@ // // -------------------------------------------------------------------------------------------------------------------- +using System.Collections.Generic; + namespace Contour.Transport.RabbitMQ.Topology { using Contour.Topology; @@ -33,9 +35,9 @@ public static class TopologyBuilderEx /// /// The routing key. /// - public static void Bind(this ITopologyBuilder topology, Exchange exchange, Queue queue, string routingKey = "") + public static void Bind(this ITopologyBuilder topology, Exchange exchange, Queue queue, string routingKey = "", IDictionary arguments = null) { - ((TopologyBuilder)topology).Bind(exchange, queue, routingKey); + ((TopologyBuilder)topology).Bind(exchange, queue, routingKey, arguments); } ///