Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Sources/Contour/Configurator/AppConfigConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ public IBusConfigurator Configure(string endpointName, IBusConfigurator cfg)
configurator.Delayed();
}

if (outgoingElement.Direct)
{
configurator.Direct();
}

// Connection string
var connectionString = endpointConfig.ConnectionString;
if (!string.IsNullOrEmpty(outgoingElement.ConnectionString))
Expand Down Expand Up @@ -342,6 +347,11 @@ public IBusConfigurator Configure(string endpointName, IBusConfigurator cfg)
configurator.Delayed();
}

if (incomingElement.Direct)
{
configurator.Direct(incomingElement.DirectId);
}

// Connection string
var connectionString = endpointConfig.ConnectionString;
if (!string.IsNullOrEmpty(incomingElement.ConnectionString))
Expand Down
4 changes: 4 additions & 0 deletions Sources/Contour/Configurator/Configuration/IIncoming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@ public interface IIncoming : IMessage
int? QueueMaxLengthBytes { get; }

bool Delayed { get; }

bool Direct { get; }

string DirectId { get; }
}
}
2 changes: 2 additions & 0 deletions Sources/Contour/Configurator/Configuration/IOutgoing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public interface IOutgoing : IMessage

bool Delayed { get; }

bool Direct { get; }

TimeSpan? ConfirmTimeout { get; }
}
}
6 changes: 6 additions & 0 deletions Sources/Contour/Configurator/IncomingElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ public int? QueueMaxLengthBytes
[ConfigurationProperty("delayed")]
public bool Delayed => (bool)base["delayed"];

[ConfigurationProperty("direct")]
public bool Direct => (bool)base["direct"];

[ConfigurationProperty("directId")]
public string DirectId => (string)base["directId"];

IQos IIncoming.Qos => this.Qos;
}
}
3 changes: 3 additions & 0 deletions Sources/Contour/Configurator/OutgoingElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public bool? ReuseConnection
[ConfigurationProperty("delayed", DefaultValue = false)]
public bool Delayed => (bool)base["delayed"];

[ConfigurationProperty("direct")]
public bool Direct => (bool)base["direct"];

/// <summary>
/// Gets confirmation timeout on publishing.
/// </summary>
Expand Down
11 changes: 10 additions & 1 deletion Sources/Contour/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public class Headers
/// </summary>
public static readonly string Delay = "x-delay";

public static readonly string DirectId = "direct-id";

public static readonly string MatchHeaders = "x-match";

/// <summary>
/// Время жизни сообщений в очереди.
/// </summary>
Expand Down Expand Up @@ -219,7 +223,12 @@ public static void ApplyDelay(Dictionary<string, object> headers, TimeSpan? dela
headers[Delay] = delay.Value.TotalMilliseconds;
}
}


public static void ApplyDirectId(Dictionary<string, object> headers, string directId)
{
headers[DirectId] = directId;
}

/// <summary>
/// Применяет к коллекции заголовков установку заголовка <c>Ttl</c>.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions Sources/Contour/Receiving/IReceiverConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public interface IReceiverConfigurator
/// </summary>
/// <returns>Конфигуратор получателя.</returns>
IReceiverConfigurator Delayed();

IReceiverConfigurator Direct(string id);
}

/// <summary>
Expand Down
8 changes: 8 additions & 0 deletions Sources/Contour/Receiving/ReceiverConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,5 +205,13 @@ public IReceiverConfigurator Delayed()

return this;
}

public IReceiverConfigurator Direct(string id)
{
this.Options.Direct = true;
this.Options.DirectId = id;

return this;
}
}
}
4 changes: 4 additions & 0 deletions Sources/Contour/Receiving/ReceiverOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public ReceiverOptions(BusOptions parent)

public bool Delayed { get; set; }

public bool Direct { get; set; }

public string DirectId { get; set; }

/// <summary>
/// Создает новый экземпляр настроек как копию существующего.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,5 +274,10 @@ public IReceiverConfigurator Delayed()

return this;
}

public IReceiverConfigurator Direct(string id)
{
return this.configuration.Direct(id);
}
}
}
5 changes: 5 additions & 0 deletions Sources/Contour/Sending/AbstractSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ private IDictionary<string, object> ApplyOptions(PublishingOptions options)
{
Headers.ApplyDelay(outputHeaders, options.Delay);
}

if (this.Configuration.Options.Direct)
{
Headers.ApplyDirectId(outputHeaders, options.DirectId);
}

Maybe<TimeSpan?> ttl = BusOptions.Pick(options.Ttl, this.Configuration.Options.GetTtl());
Headers.ApplyTtl(outputHeaders, ttl);
Expand Down
2 changes: 2 additions & 0 deletions Sources/Contour/Sending/ISenderConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public interface ISenderConfigurator
/// <returns>Sender configurator.</returns>
ISenderConfiguration Delayed();

ISenderConfigurator Direct();

/// <summary>
/// Устанавливает псевдоним метки отправляемого сообщения.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions Sources/Contour/Sending/PublishingOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class PublishingOptions
/// </summary>
public TimeSpan? Delay { get; set; }

public string DirectId { get; set; }

/// <summary>
/// Дополнительные заголовки
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions Sources/Contour/Sending/SenderConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ public ISenderConfiguration Delayed()
return this;
}

public ISenderConfigurator Direct()
{
this.Options.Direct = true;

return this;
}

/// <summary>
/// Проверяет корректность конфигурации.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions Sources/Contour/Sending/SenderOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public SenderOptions(BusOptions parent)
/// </summary>
public bool Delayed { get; set; }

public bool Direct { get; set; }

/// <summary>
/// Время ожидания подтверждения получения сообщения, null если требуется бесконечное ожидание.
/// </summary>
Expand Down
4 changes: 3 additions & 1 deletion Sources/Contour/Transport/RabbitMQ/BusConfigurationEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public static IBusConfigurator UseRabbitMq(this IBusConfigurator busConfigurator
Headers.Ttl,
Headers.QueueMaxLength,
Headers.QueueMaxLengthBytes,
Headers.Delay
Headers.Delay,
Headers.DirectId,
Headers.MatchHeaders
};
var messageHeaderStorage = new MessageHeaderStorage(blockedHeaders);
c.UseIncomingMessageHeaderStorage(messageHeaderStorage);
Expand Down
65 changes: 55 additions & 10 deletions Sources/Contour/Transport/RabbitMQ/Internal/RabbitBusDefaults.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using Contour.Receiving;
using Contour.Sending;
using Contour.Transport.RabbitMQ.Topology;
using System.Collections.Generic;
using Contour.Helpers;

namespace Contour.Transport.RabbitMQ.Internal
{
Expand Down Expand Up @@ -35,9 +37,19 @@ private static IRouteResolver RouteResolverBuilderImpl(IRouteResolverBuilder bui
{
string label = builder.Sender.Label.Name;
ExchangeBuilder exchangeBuilder = Exchange.Named(label).Durable;
exchangeBuilder = builder.Sender.Options.Delayed
? exchangeBuilder.DelayedFanout
: exchangeBuilder.Fanout;

if (builder.Sender.Options.Delayed)
{
exchangeBuilder = builder.Sender.Options.Direct
? exchangeBuilder.DelayedHeaders
: exchangeBuilder.DelayedFanout;
}
else
{
exchangeBuilder = builder.Sender.Options.Direct
? exchangeBuilder.Headers
: exchangeBuilder.Fanout;
}

Exchange exchange = builder.Topology.Declare(exchangeBuilder);

Expand All @@ -59,9 +71,22 @@ private static ISubscriptionEndpoint SubscriptionEndpointBuilderImpl(ISubscripti

string queueName = builder.Endpoint.Address + "." + label;

var queueBuilder = Queue
.Named(queueName)
.Durable;
QueueBuilder queueBuilder;

if (builder.Receiver.Options.Direct)
{
queueName += "." + builder.Receiver.Options.DirectId;
queueBuilder = Queue
.Named(queueName)
.AutoDelete
.Exclusive;
}
else
{
queueBuilder = Queue
.Named(queueName)
.Durable;
}

var options = builder.Receiver.Options;

Expand All @@ -77,13 +102,33 @@ private static ISubscriptionEndpoint SubscriptionEndpointBuilderImpl(ISubscripti

Queue queue = builder.Topology.Declare(queueBuilder);
ExchangeBuilder exchangeBuilder = Exchange.Named(label).Durable;
exchangeBuilder = builder.Receiver.Options.Delayed
? exchangeBuilder.DelayedFanout
: exchangeBuilder.Fanout;

if (builder.Receiver.Options.Delayed)
{
exchangeBuilder = builder.Receiver.Options.Direct
? exchangeBuilder.DelayedHeaders
: exchangeBuilder.DelayedFanout;
}
else
{
exchangeBuilder = builder.Receiver.Options.Direct
? exchangeBuilder.Headers
: exchangeBuilder.Fanout;
}

Exchange exchange = builder.Topology.Declare(exchangeBuilder);

builder.Topology.Bind(exchange, queue);
Dictionary<string, object> arguments = null;
if (builder.Receiver.Options.Direct)
{
arguments = new Dictionary<string, object>()
{
{ Headers.DirectId, builder.Receiver.Options.DirectId },
{ Headers.MatchHeaders, "all" },
};
}

builder.Topology.Bind(exchange, queue, "", arguments);

return builder.ListenTo(queue, exchange);
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/Contour/Transport/RabbitMQ/Internal/RabbitChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ public void Accept(RabbitDelivery delivery)
/// <param name="routingKey">
/// The routing key.
/// </param>
public void Bind(Queue queue, Exchange exchange, string routingKey)
public void Bind(Queue queue, Exchange exchange, string routingKey, IDictionary<string, object> arguments = null)
{
try
{
this.SafeNativeInvoke(n => n.QueueBind(queue.Name, exchange.Name, routingKey));
this.SafeNativeInvoke(n => n.QueueBind(queue.Name, exchange.Name, routingKey, arguments));
}
catch (Exception e)
{
Expand Down
19 changes: 19 additions & 0 deletions Sources/Contour/Transport/RabbitMQ/Topology/ExchangeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ public ExchangeBuilder Topic
}
}

public ExchangeBuilder Headers
{
get
{
this.Instance.Type = ExchangeType.Headers;
return this;
}
}

public ExchangeBuilder DelayedDirect
{
get
Expand Down Expand Up @@ -125,6 +134,16 @@ public ExchangeBuilder DelayedTopic
return this;
}
}

public ExchangeBuilder DelayedHeaders
{
get
{
this.Instance.Type = DelayedExchangeType;
this.Instance.Arguments[DelayedExchangeSubtypeArgumentName] = ExchangeType.Headers;
return this;
}
}

#endregion

Expand Down
8 changes: 4 additions & 4 deletions Sources/Contour/Transport/RabbitMQ/Topology/Queue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ internal Queue(string name)
/// </summary>
public TimeSpan? Ttl { get; internal set; }

/// <summary>
/// Максимальное количество сообщений в очереди.
/// <summary>
/// Максимальное количество сообщений в очереди.
/// </summary>
public int? Limit { get; internal set; }

/// <summary>
/// Максимальное количество байт, которое занимают сообщения в очереди.
/// <summary>
/// Максимальное количество байт, которое занимают сообщения в очереди.
/// </summary>
public int? MaxLengthBytes { get; internal set; }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;

namespace Contour.Transport.RabbitMQ.Topology
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Contour.Receiving;
using Contour.Topology;
using Contour.Transport.RabbitMQ.Internal;
using System.Collections.Generic;

namespace Contour.Transport.RabbitMQ.Topology
{
Expand Down Expand Up @@ -49,11 +50,11 @@ public TopologyBuilder(IChannelProvider<IChannel> channelProvider)
/// <param name="routingKey">
/// Ключ маршрутизации, используется для определения очереди, в которую должно быть отправлено сообщение.
/// </param>
public void Bind(Exchange exchange, Queue queue, string routingKey = "")
public void Bind(Exchange exchange, Queue queue, string routingKey = "", IDictionary<string, object> arguments = null)
{
using (var channel = (RabbitChannel)this.channelProvider.OpenChannel(CancellationToken.None))
{
channel.Bind(queue, exchange, routingKey);
channel.Bind(queue, exchange, routingKey, arguments);
}
}

Expand Down
Loading