From 7dee9f4ccd4993c559462ff0976acd30a5190849 Mon Sep 17 00:00:00 2001
From: tio98 <>
Date: Tue, 24 Sep 2024 12:24:35 +0300
Subject: [PATCH 1/4] add-direct-opt: add direct opt
---
.../Configurator/AppConfigConfigurator.cs | 10 ++++
.../Configurator/Configuration/IIncoming.cs | 4 ++
.../Configurator/Configuration/IOutgoing.cs | 2 +
.../Contour/Configurator/IncomingElement.cs | 6 ++
.../Contour/Configurator/OutgoingElement.cs | 3 +
Sources/Contour/Headers.cs | 9 ++-
.../Receiving/IReceiverConfigurator.cs | 2 +
.../Receiving/ReceiverConfiguration.cs | 8 +++
Sources/Contour/Receiving/ReceiverOptions.cs | 4 ++
.../TypedReceiverConfigurationDecorator.cs | 5 ++
Sources/Contour/Sending/AbstractSender.cs | 5 ++
.../Contour/Sending/ISenderConfigurator.cs | 2 +
Sources/Contour/Sending/PublishingOptions.cs | 2 +
.../Contour/Sending/SenderConfiguration.cs | 7 +++
Sources/Contour/Sending/SenderOptions.cs | 2 +
.../RabbitMQ/Internal/RabbitBusDefaults.cs | 57 ++++++++++++++++---
.../RabbitMQ/Internal/RabbitChannel.cs | 2 +-
.../RabbitMQ/Topology/ExchangeBuilder.cs | 19 +++++++
.../Transport/RabbitMQ/Topology/Queue.cs | 11 ++--
.../RabbitMQ/Topology/QueueBuilder.cs | 8 +++
20 files changed, 153 insertions(+), 15 deletions(-)
diff --git a/Sources/Contour/Configurator/AppConfigConfigurator.cs b/Sources/Contour/Configurator/AppConfigConfigurator.cs
index aa33da4..c83b49a 100644
--- a/Sources/Contour/Configurator/AppConfigConfigurator.cs
+++ b/Sources/Contour/Configurator/AppConfigConfigurator.cs
@@ -240,6 +240,11 @@ public IBusConfigurator Configure(string endpointName, IBusConfigurator cfg)
configurator.Delayed();
}
+ if (outgoingElement.Direct)
+ {
+ configurator.Direct();
+ }
+
// Connection string
var connectionString = endpointConfig.ConnectionString;
if (!string.IsNullOrEmpty(outgoingElement.ConnectionString))
@@ -342,6 +347,11 @@ public IBusConfigurator Configure(string endpointName, IBusConfigurator cfg)
configurator.Delayed();
}
+ if (incomingElement.Direct)
+ {
+ configurator.Direct(incomingElement.DirectId);
+ }
+
// Connection string
var connectionString = endpointConfig.ConnectionString;
if (!string.IsNullOrEmpty(incomingElement.ConnectionString))
diff --git a/Sources/Contour/Configurator/Configuration/IIncoming.cs b/Sources/Contour/Configurator/Configuration/IIncoming.cs
index 8041911..e65a407 100644
--- a/Sources/Contour/Configurator/Configuration/IIncoming.cs
+++ b/Sources/Contour/Configurator/Configuration/IIncoming.cs
@@ -25,5 +25,9 @@ public interface IIncoming : IMessage
int? QueueMaxLengthBytes { get; }
bool Delayed { get; }
+
+ bool Direct { get; }
+
+ string DirectId { get; }
}
}
\ No newline at end of file
diff --git a/Sources/Contour/Configurator/Configuration/IOutgoing.cs b/Sources/Contour/Configurator/Configuration/IOutgoing.cs
index 61209a6..e582347 100644
--- a/Sources/Contour/Configurator/Configuration/IOutgoing.cs
+++ b/Sources/Contour/Configurator/Configuration/IOutgoing.cs
@@ -20,6 +20,8 @@ public interface IOutgoing : IMessage
bool Delayed { get; }
+ bool Direct { get; }
+
TimeSpan? ConfirmTimeout { get; }
}
}
\ No newline at end of file
diff --git a/Sources/Contour/Configurator/IncomingElement.cs b/Sources/Contour/Configurator/IncomingElement.cs
index dbfb6c0..3d33197 100644
--- a/Sources/Contour/Configurator/IncomingElement.cs
+++ b/Sources/Contour/Configurator/IncomingElement.cs
@@ -117,6 +117,12 @@ public int? QueueMaxLengthBytes
[ConfigurationProperty("delayed")]
public bool Delayed => (bool)base["delayed"];
+ [ConfigurationProperty("direct")]
+ public bool Direct => (bool)base["direct"];
+
+ [ConfigurationProperty("directId")]
+ public string DirectId => (string)base["directId"];
+
IQos IIncoming.Qos => this.Qos;
}
}
diff --git a/Sources/Contour/Configurator/OutgoingElement.cs b/Sources/Contour/Configurator/OutgoingElement.cs
index b098b5b..a4c2aad 100644
--- a/Sources/Contour/Configurator/OutgoingElement.cs
+++ b/Sources/Contour/Configurator/OutgoingElement.cs
@@ -94,6 +94,9 @@ public bool? ReuseConnection
[ConfigurationProperty("delayed", DefaultValue = false)]
public bool Delayed => (bool)base["delayed"];
+ [ConfigurationProperty("direct")]
+ public bool Direct => (bool)base["direct"];
+
///
/// Gets confirmation timeout on publishing.
///
diff --git a/Sources/Contour/Headers.cs b/Sources/Contour/Headers.cs
index 8492c9b..3788de8 100644
--- a/Sources/Contour/Headers.cs
+++ b/Sources/Contour/Headers.cs
@@ -53,6 +53,8 @@ public class Headers
///
public static readonly string Delay = "x-delay";
+ public static readonly string DirectId = "direct-id";
+
///
/// Время жизни сообщений в очереди.
///
@@ -219,7 +221,12 @@ public static void ApplyDelay(Dictionary headers, TimeSpan? dela
headers[Delay] = delay.Value.TotalMilliseconds;
}
}
-
+
+ public static void ApplyDirectId(Dictionary headers, string directId)
+ {
+ headers[DirectId] = directId;
+ }
+
///
/// Применяет к коллекции заголовков установку заголовка Ttl.
///
diff --git a/Sources/Contour/Receiving/IReceiverConfigurator.cs b/Sources/Contour/Receiving/IReceiverConfigurator.cs
index 0e27fa6..cdc94b4 100644
--- a/Sources/Contour/Receiving/IReceiverConfigurator.cs
+++ b/Sources/Contour/Receiving/IReceiverConfigurator.cs
@@ -92,6 +92,8 @@ public interface IReceiverConfigurator
///
/// Конфигуратор получателя.
IReceiverConfigurator Delayed();
+
+ IReceiverConfigurator Direct(string id);
}
///
diff --git a/Sources/Contour/Receiving/ReceiverConfiguration.cs b/Sources/Contour/Receiving/ReceiverConfiguration.cs
index 2785b10..dc061c6 100644
--- a/Sources/Contour/Receiving/ReceiverConfiguration.cs
+++ b/Sources/Contour/Receiving/ReceiverConfiguration.cs
@@ -205,5 +205,13 @@ public IReceiverConfigurator Delayed()
return this;
}
+
+ public IReceiverConfigurator Direct(string id)
+ {
+ this.Options.Direct = true;
+ this.Options.DirectId = id;
+
+ return this;
+ }
}
}
diff --git a/Sources/Contour/Receiving/ReceiverOptions.cs b/Sources/Contour/Receiving/ReceiverOptions.cs
index 93df706..bd4730a 100644
--- a/Sources/Contour/Receiving/ReceiverOptions.cs
+++ b/Sources/Contour/Receiving/ReceiverOptions.cs
@@ -76,6 +76,10 @@ public ReceiverOptions(BusOptions parent)
public bool Delayed { get; set; }
+ public bool Direct { get; set; }
+
+ public string DirectId { get; set; }
+
///
/// Создает новый экземпляр настроек как копию существующего.
///
diff --git a/Sources/Contour/Receiving/TypedReceiverConfigurationDecorator.cs b/Sources/Contour/Receiving/TypedReceiverConfigurationDecorator.cs
index 8974186..4d43db8 100644
--- a/Sources/Contour/Receiving/TypedReceiverConfigurationDecorator.cs
+++ b/Sources/Contour/Receiving/TypedReceiverConfigurationDecorator.cs
@@ -274,5 +274,10 @@ public IReceiverConfigurator Delayed()
return this;
}
+
+ public IReceiverConfigurator Direct(string id)
+ {
+ return this.configuration.Direct(id);
+ }
}
}
diff --git a/Sources/Contour/Sending/AbstractSender.cs b/Sources/Contour/Sending/AbstractSender.cs
index b45a4d1..7ed46d4 100644
--- a/Sources/Contour/Sending/AbstractSender.cs
+++ b/Sources/Contour/Sending/AbstractSender.cs
@@ -319,6 +319,11 @@ private IDictionary ApplyOptions(PublishingOptions options)
{
Headers.ApplyDelay(outputHeaders, options.Delay);
}
+
+ if (this.Configuration.Options.Direct)
+ {
+ Headers.ApplyDirectId(outputHeaders, options.DirectId);
+ }
Maybe ttl = BusOptions.Pick(options.Ttl, this.Configuration.Options.GetTtl());
Headers.ApplyTtl(outputHeaders, ttl);
diff --git a/Sources/Contour/Sending/ISenderConfigurator.cs b/Sources/Contour/Sending/ISenderConfigurator.cs
index f0dd078..0e46248 100644
--- a/Sources/Contour/Sending/ISenderConfigurator.cs
+++ b/Sources/Contour/Sending/ISenderConfigurator.cs
@@ -34,6 +34,8 @@ public interface ISenderConfigurator
/// Sender configurator.
ISenderConfiguration Delayed();
+ ISenderConfigurator Direct();
+
///
/// Устанавливает псевдоним метки отправляемого сообщения.
///
diff --git a/Sources/Contour/Sending/PublishingOptions.cs b/Sources/Contour/Sending/PublishingOptions.cs
index f29b732..2e4ab0c 100644
--- a/Sources/Contour/Sending/PublishingOptions.cs
+++ b/Sources/Contour/Sending/PublishingOptions.cs
@@ -37,6 +37,8 @@ public class PublishingOptions
///
public TimeSpan? Delay { get; set; }
+ public string DirectId { get; set; }
+
///
/// Дополнительные заголовки
///
diff --git a/Sources/Contour/Sending/SenderConfiguration.cs b/Sources/Contour/Sending/SenderConfiguration.cs
index d4f5302..914980c 100644
--- a/Sources/Contour/Sending/SenderConfiguration.cs
+++ b/Sources/Contour/Sending/SenderConfiguration.cs
@@ -101,6 +101,13 @@ public ISenderConfiguration Delayed()
return this;
}
+ public ISenderConfigurator Direct()
+ {
+ this.Options.Direct = true;
+
+ return this;
+ }
+
///
/// Проверяет корректность конфигурации.
///
diff --git a/Sources/Contour/Sending/SenderOptions.cs b/Sources/Contour/Sending/SenderOptions.cs
index 62db5d6..3f8dcd1 100644
--- a/Sources/Contour/Sending/SenderOptions.cs
+++ b/Sources/Contour/Sending/SenderOptions.cs
@@ -56,6 +56,8 @@ public SenderOptions(BusOptions parent)
///
public bool Delayed { get; set; }
+ public bool Direct { get; set; }
+
///
/// Время ожидания подтверждения получения сообщения, null если требуется бесконечное ожидание.
///
diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs
index a4be6c2..7f793b5 100644
--- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs
@@ -2,6 +2,8 @@
using Contour.Receiving;
using Contour.Sending;
using Contour.Transport.RabbitMQ.Topology;
+using System.Collections.Generic;
+using Contour.Helpers;
namespace Contour.Transport.RabbitMQ.Internal
{
@@ -35,9 +37,19 @@ private static IRouteResolver RouteResolverBuilderImpl(IRouteResolverBuilder bui
{
string label = builder.Sender.Label.Name;
ExchangeBuilder exchangeBuilder = Exchange.Named(label).Durable;
- exchangeBuilder = builder.Sender.Options.Delayed
- ? exchangeBuilder.DelayedFanout
- : exchangeBuilder.Fanout;
+
+ if (builder.Sender.Options.Delayed)
+ {
+ exchangeBuilder = builder.Sender.Options.Direct
+ ? exchangeBuilder.DelayedHeaders
+ : exchangeBuilder.DelayedFanout;
+ }
+ else
+ {
+ exchangeBuilder = builder.Sender.Options.Direct
+ ? exchangeBuilder.Headers
+ : exchangeBuilder.Fanout;
+ }
Exchange exchange = builder.Topology.Declare(exchangeBuilder);
@@ -59,9 +71,26 @@ private static ISubscriptionEndpoint SubscriptionEndpointBuilderImpl(ISubscripti
string queueName = builder.Endpoint.Address + "." + label;
- var queueBuilder = Queue
- .Named(queueName)
- .Durable;
+ QueueBuilder queueBuilder;
+
+ if (builder.Receiver.Options.Direct)
+ {
+ queueName += "." + builder.Receiver.Options.DirectId;
+ queueBuilder = Queue
+ .Named(queueName)
+ .AutoDelete
+ .Exclusive
+ .WithHeaders(new Dictionary()
+ {
+ { Headers.DirectId, builder.Receiver.Options.DirectId }
+ });
+ }
+ else
+ {
+ queueBuilder = Queue
+ .Named(queueName)
+ .Durable;
+ }
var options = builder.Receiver.Options;
@@ -77,9 +106,19 @@ private static ISubscriptionEndpoint SubscriptionEndpointBuilderImpl(ISubscripti
Queue queue = builder.Topology.Declare(queueBuilder);
ExchangeBuilder exchangeBuilder = Exchange.Named(label).Durable;
- exchangeBuilder = builder.Receiver.Options.Delayed
- ? exchangeBuilder.DelayedFanout
- : exchangeBuilder.Fanout;
+
+ if (builder.Receiver.Options.Delayed)
+ {
+ exchangeBuilder = builder.Receiver.Options.Direct
+ ? exchangeBuilder.DelayedHeaders
+ : exchangeBuilder.DelayedFanout;
+ }
+ else
+ {
+ exchangeBuilder = builder.Receiver.Options.Direct
+ ? exchangeBuilder.Headers
+ : exchangeBuilder.Fanout;
+ }
Exchange exchange = builder.Topology.Declare(exchangeBuilder);
diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs
index 86c1dee..56066d2 100644
--- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs
@@ -103,7 +103,7 @@ public void Bind(Queue queue, Exchange exchange, string routingKey)
{
try
{
- this.SafeNativeInvoke(n => n.QueueBind(queue.Name, exchange.Name, routingKey));
+ this.SafeNativeInvoke(n => n.QueueBind(queue.Name, exchange.Name, routingKey, queue.Arguments));
}
catch (Exception e)
{
diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/ExchangeBuilder.cs b/Sources/Contour/Transport/RabbitMQ/Topology/ExchangeBuilder.cs
index b939897..577e67f 100644
--- a/Sources/Contour/Transport/RabbitMQ/Topology/ExchangeBuilder.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Topology/ExchangeBuilder.cs
@@ -96,6 +96,15 @@ public ExchangeBuilder Topic
}
}
+ public ExchangeBuilder Headers
+ {
+ get
+ {
+ this.Instance.Type = ExchangeType.Headers;
+ return this;
+ }
+ }
+
public ExchangeBuilder DelayedDirect
{
get
@@ -125,6 +134,16 @@ public ExchangeBuilder DelayedTopic
return this;
}
}
+
+ public ExchangeBuilder DelayedHeaders
+ {
+ get
+ {
+ this.Instance.Type = DelayedExchangeType;
+ this.Instance.Arguments[DelayedExchangeSubtypeArgumentName] = ExchangeType.Headers;
+ return this;
+ }
+ }
#endregion
diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs b/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs
index 3890a91..524a5bd 100644
--- a/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs
@@ -2,6 +2,7 @@
using Contour.Receiving;
using Contour.Topology;
+using System.Collections.Generic;
namespace Contour.Transport.RabbitMQ.Topology
{
@@ -24,6 +25,8 @@ internal Queue(string name)
this.AutoDelete = false;
}
+ public IDictionary Arguments { get; internal set; }
+
///
/// Адрес очереди.
///
@@ -54,13 +57,13 @@ internal Queue(string name)
///
public TimeSpan? Ttl { get; internal set; }
- ///
- /// Максимальное количество сообщений в очереди.
+ ///
+ /// Максимальное количество сообщений в очереди.
///
public int? Limit { get; internal set; }
- ///
- /// Максимальное количество байт, которое занимают сообщения в очереди.
+ ///
+ /// Максимальное количество байт, которое занимают сообщения в очереди.
///
public int? MaxLengthBytes { get; internal set; }
diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs b/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs
index ceeeb19..94c8320 100644
--- a/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs
@@ -1,4 +1,5 @@
using System;
+using System.Collections.Generic;
namespace Contour.Transport.RabbitMQ.Topology
{
@@ -97,5 +98,12 @@ public QueueBuilder WithMaxLengthBytes(int bytes)
this.Instance.MaxLengthBytes = bytes;
return this;
}
+
+ public QueueBuilder WithHeaders(IDictionary arguments)
+ {
+ this.Instance.Arguments = arguments;
+ this.Instance.Arguments["x-match"] = "all";
+ return this;
+ }
}
}
From 072aa3f5b967dacdef66c4f0f0c48d9caa7270ef Mon Sep 17 00:00:00 2001
From: tio98 <>
Date: Tue, 24 Sep 2024 12:39:48 +0300
Subject: [PATCH 2/4] add-direct-opt: fixes
---
Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs | 1 +
1 file changed, 1 insertion(+)
diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs b/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs
index 524a5bd..bdeed84 100644
--- a/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs
@@ -23,6 +23,7 @@ internal Queue(string name)
this.Durable = false;
this.Exclusive = false;
this.AutoDelete = false;
+ this.Arguments = new Dictionary();
}
public IDictionary Arguments { get; internal set; }
From 5dc0477c5df36cb8bf5a11d9ba52a0d950600672 Mon Sep 17 00:00:00 2001
From: tio98 <>
Date: Tue, 24 Sep 2024 12:50:12 +0300
Subject: [PATCH 3/4] add-direct-opt: fixes
---
Sources/Contour/Headers.cs | 2 ++
Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs | 4 +++-
Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs | 3 ++-
3 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/Sources/Contour/Headers.cs b/Sources/Contour/Headers.cs
index 3788de8..3d567cd 100644
--- a/Sources/Contour/Headers.cs
+++ b/Sources/Contour/Headers.cs
@@ -55,6 +55,8 @@ public class Headers
public static readonly string DirectId = "direct-id";
+ public static readonly string MatchHeaders = "x-match";
+
///
/// Время жизни сообщений в очереди.
///
diff --git a/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs b/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs
index 15951e9..e972d1e 100644
--- a/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs
+++ b/Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs
@@ -60,7 +60,9 @@ public static IBusConfigurator UseRabbitMq(this IBusConfigurator busConfigurator
Headers.Ttl,
Headers.QueueMaxLength,
Headers.QueueMaxLengthBytes,
- Headers.Delay
+ Headers.Delay,
+ Headers.DirectId,
+ Headers.MatchHeaders
};
var messageHeaderStorage = new MessageHeaderStorage(blockedHeaders);
c.UseIncomingMessageHeaderStorage(messageHeaderStorage);
diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs b/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs
index 94c8320..b1d1657 100644
--- a/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using Contour.Helpers;
namespace Contour.Transport.RabbitMQ.Topology
{
@@ -102,7 +103,7 @@ public QueueBuilder WithMaxLengthBytes(int bytes)
public QueueBuilder WithHeaders(IDictionary arguments)
{
this.Instance.Arguments = arguments;
- this.Instance.Arguments["x-match"] = "all";
+ this.Instance.Arguments[Headers.MatchHeaders] = "all";
return this;
}
}
From a1545873655b6adc3d02be2d9ae990f5b4841157 Mon Sep 17 00:00:00 2001
From: tio98 <>
Date: Wed, 25 Sep 2024 19:06:00 +0300
Subject: [PATCH 4/4] add-direct-opt: fixes
---
.../RabbitMQ/Internal/RabbitBusDefaults.cs | 18 ++++++++++++------
.../RabbitMQ/Internal/RabbitChannel.cs | 4 ++--
.../Transport/RabbitMQ/Topology/Queue.cs | 4 ----
.../RabbitMQ/Topology/QueueBuilder.cs | 8 --------
.../RabbitMQ/Topology/TopologyBuilder.cs | 5 +++--
.../RabbitMQ/Topology/TopologyBuilderEx.cs | 6 ++++--
6 files changed, 21 insertions(+), 24 deletions(-)
diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs
index 7f793b5..abe5df7 100644
--- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs
@@ -79,11 +79,7 @@ private static ISubscriptionEndpoint SubscriptionEndpointBuilderImpl(ISubscripti
queueBuilder = Queue
.Named(queueName)
.AutoDelete
- .Exclusive
- .WithHeaders(new Dictionary()
- {
- { Headers.DirectId, builder.Receiver.Options.DirectId }
- });
+ .Exclusive;
}
else
{
@@ -122,7 +118,17 @@ private static ISubscriptionEndpoint SubscriptionEndpointBuilderImpl(ISubscripti
Exchange exchange = builder.Topology.Declare(exchangeBuilder);
- builder.Topology.Bind(exchange, queue);
+ Dictionary arguments = null;
+ if (builder.Receiver.Options.Direct)
+ {
+ arguments = new Dictionary()
+ {
+ { Headers.DirectId, builder.Receiver.Options.DirectId },
+ { Headers.MatchHeaders, "all" },
+ };
+ }
+
+ builder.Topology.Bind(exchange, queue, "", arguments);
return builder.ListenTo(queue, exchange);
}
diff --git a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs
index 56066d2..7105354 100644
--- a/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs
@@ -99,11 +99,11 @@ public void Accept(RabbitDelivery delivery)
///
/// The routing key.
///
- public void Bind(Queue queue, Exchange exchange, string routingKey)
+ public void Bind(Queue queue, Exchange exchange, string routingKey, IDictionary arguments = null)
{
try
{
- this.SafeNativeInvoke(n => n.QueueBind(queue.Name, exchange.Name, routingKey, queue.Arguments));
+ this.SafeNativeInvoke(n => n.QueueBind(queue.Name, exchange.Name, routingKey, arguments));
}
catch (Exception e)
{
diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs b/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs
index bdeed84..725e868 100644
--- a/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs
@@ -2,7 +2,6 @@
using Contour.Receiving;
using Contour.Topology;
-using System.Collections.Generic;
namespace Contour.Transport.RabbitMQ.Topology
{
@@ -23,11 +22,8 @@ internal Queue(string name)
this.Durable = false;
this.Exclusive = false;
this.AutoDelete = false;
- this.Arguments = new Dictionary();
}
- public IDictionary Arguments { get; internal set; }
-
///
/// Адрес очереди.
///
diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs b/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs
index b1d1657..d122f0d 100644
--- a/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Topology/QueueBuilder.cs
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
-using Contour.Helpers;
namespace Contour.Transport.RabbitMQ.Topology
{
@@ -99,12 +98,5 @@ public QueueBuilder WithMaxLengthBytes(int bytes)
this.Instance.MaxLengthBytes = bytes;
return this;
}
-
- public QueueBuilder WithHeaders(IDictionary arguments)
- {
- this.Instance.Arguments = arguments;
- this.Instance.Arguments[Headers.MatchHeaders] = "all";
- return this;
- }
}
}
diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilder.cs b/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilder.cs
index c8ba382..f6dee0e 100644
--- a/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilder.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilder.cs
@@ -3,6 +3,7 @@
using Contour.Receiving;
using Contour.Topology;
using Contour.Transport.RabbitMQ.Internal;
+using System.Collections.Generic;
namespace Contour.Transport.RabbitMQ.Topology
{
@@ -49,11 +50,11 @@ public TopologyBuilder(IChannelProvider channelProvider)
///
/// Ключ маршрутизации, используется для определения очереди, в которую должно быть отправлено сообщение.
///
- public void Bind(Exchange exchange, Queue queue, string routingKey = "")
+ public void Bind(Exchange exchange, Queue queue, string routingKey = "", IDictionary arguments = null)
{
using (var channel = (RabbitChannel)this.channelProvider.OpenChannel(CancellationToken.None))
{
- channel.Bind(queue, exchange, routingKey);
+ channel.Bind(queue, exchange, routingKey, arguments);
}
}
diff --git a/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilderEx.cs b/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilderEx.cs
index 0f964d4..b93c41d 100644
--- a/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilderEx.cs
+++ b/Sources/Contour/Transport/RabbitMQ/Topology/TopologyBuilderEx.cs
@@ -7,6 +7,8 @@
//
// --------------------------------------------------------------------------------------------------------------------
+using System.Collections.Generic;
+
namespace Contour.Transport.RabbitMQ.Topology
{
using Contour.Topology;
@@ -33,9 +35,9 @@ public static class TopologyBuilderEx
///
/// The routing key.
///
- public static void Bind(this ITopologyBuilder topology, Exchange exchange, Queue queue, string routingKey = "")
+ public static void Bind(this ITopologyBuilder topology, Exchange exchange, Queue queue, string routingKey = "", IDictionary arguments = null)
{
- ((TopologyBuilder)topology).Bind(exchange, queue, routingKey);
+ ((TopologyBuilder)topology).Bind(exchange, queue, routingKey, arguments);
}
///