diff --git a/shared.csproj b/shared.csproj
index 2e66e880..ed6915d3 100644
--- a/shared.csproj
+++ b/shared.csproj
@@ -1,7 +1,7 @@
- 0.7.2
+ 0.7.3
9
enable
CS8600;CS8602;CS8625;CS8618;CS8604;CS8601
diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/Motor.Extensions.Hosting.RabbitMQ.csproj b/src/Motor.Extensions.Hosting.RabbitMQ/Motor.Extensions.Hosting.RabbitMQ.csproj
index 0e29ec3e..559843c2 100644
--- a/src/Motor.Extensions.Hosting.RabbitMQ/Motor.Extensions.Hosting.RabbitMQ.csproj
+++ b/src/Motor.Extensions.Hosting.RabbitMQ/Motor.Extensions.Hosting.RabbitMQ.csproj
@@ -18,6 +18,12 @@
+
+
+ <_Parameter1>$(AssemblyName)_UnitTest
+
+
+
diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageConsumer.cs b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageConsumer.cs
index 94fb526f..60786974 100644
--- a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageConsumer.cs
+++ b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageConsumer.cs
@@ -18,12 +18,13 @@ public class RabbitMQMessageConsumer : IMessageConsumer where T : notnull
private readonly IHostApplicationLifetime _applicationLifetime;
private readonly IApplicationNameService _applicationNameService;
private readonly RabbitMQConsumerOptions _options;
- private readonly IRabbitMQConnectionFactory _connectionFactory;
private readonly ILogger> _logger;
private bool _started;
private IModel? _channel;
private CancellationToken _stoppingToken;
+ internal IRabbitMQConnectionFactory ConnectionFactory { get; }
+
public RabbitMQMessageConsumer(ILogger> logger,
IRabbitMQConnectionFactory connectionFactory,
IOptions> config,
@@ -31,7 +32,7 @@ public RabbitMQMessageConsumer(ILogger> logger,
IApplicationNameService applicationNameService)
{
_logger = logger;
- _connectionFactory = connectionFactory;
+ ConnectionFactory = connectionFactory;
_options = config.Value;
_applicationLifetime = applicationLifetime;
_applicationNameService = applicationNameService;
@@ -53,7 +54,7 @@ public Task StartAsync(CancellationToken token = default)
{
ThrowIfNoCallbackConfigured();
ThrowIfConsumerAlreadyStarted();
- _channel = _connectionFactory.CurrentChannel;
+ _channel = ConnectionFactory.CurrentChannel;
ConfigureChannel();
DeclareQueue();
StartConsumerOnChannel();
@@ -63,7 +64,7 @@ public Task StartAsync(CancellationToken token = default)
public Task StopAsync(CancellationToken token = default)
{
_started = false;
- _connectionFactory.Dispose();
+ ConnectionFactory.Dispose();
return Task.CompletedTask;
}
diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageHostBuilderExtensions.cs b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageHostBuilderExtensions.cs
index 4dcf0b8a..6e595f29 100644
--- a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageHostBuilderExtensions.cs
+++ b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageHostBuilderExtensions.cs
@@ -2,10 +2,13 @@
using CloudNative.CloudEvents.SystemTextJson;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Options;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
using Motor.Extensions.Diagnostics.Queue.Abstractions;
using Motor.Extensions.Hosting.Abstractions;
+using Motor.Extensions.Hosting.CloudEvents;
using Motor.Extensions.Hosting.RabbitMQ.Options;
+using MSOptions = Microsoft.Extensions.Options.Options;
namespace Motor.Extensions.Hosting.RabbitMQ
{
@@ -14,11 +17,18 @@ public static class RabbitMQMessageHostBuilderExtensions
public static void AddRabbitMQWithConfig(this IConsumerBuilder builder, IConfiguration config) where T : notnull
{
builder.AddTransient();
- builder.Configure>(config);
- builder.AddSingleton(sp =>
- RabbitMQConnectionFactory.From(sp.GetRequiredService>>().Value));
- builder.AddConsumer>();
- builder.AddSingleton>();
+ var consumerOptions = new RabbitMQConsumerOptions();
+ config.Bind(consumerOptions);
+ var connectionFactory = RabbitMQConnectionFactory.From(consumerOptions);
+ builder.AddConsumer(sp => new RabbitMQMessageConsumer(
+ sp.GetRequiredService>>(),
+ connectionFactory,
+ MSOptions.Create(consumerOptions),
+ sp.GetRequiredService(),
+ sp.GetRequiredService()));
+ builder.AddSingleton(sp =>
+ new RabbitMQQueueMonitor(sp.GetRequiredService>>(),
+ MSOptions.Create(consumerOptions), connectionFactory));
}
public static void AddRabbitMQ(this IConsumerBuilder builder, string configSection = "RabbitMQConsumer") where T : notnull
@@ -30,10 +40,13 @@ public static void AddRabbitMQWithConfig(this IPublisherBuilder builder, I
{
builder.AddTransient();
builder.Configure>(config);
- builder.AddSingleton(sp =>
- RabbitMQConnectionFactory.From(sp.GetRequiredService>>()
- .Value));
- builder.AddPublisher>();
+ var publisherOptions = new RabbitMQPublisherOptions();
+ config.Bind(publisherOptions);
+ var connectionFactory = RabbitMQConnectionFactory.From(publisherOptions);
+ builder.AddPublisher(sp
+ => new RabbitMQMessagePublisher(connectionFactory,
+ MSOptions.Create(publisherOptions),
+ sp.GetRequiredService()));
}
public static void AddRabbitMQ(this IPublisherBuilder builder, string configSection = "RabbitMQPublisher") where T : notnull
diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessagePublisher.cs b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessagePublisher.cs
index 9d135429..962dc575 100644
--- a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessagePublisher.cs
+++ b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessagePublisher.cs
@@ -15,17 +15,18 @@ public class RabbitMQMessagePublisher : ITypedMessagePublisher
{
private readonly CloudEventFormatter _cloudEventFormatter;
private readonly RabbitMQPublisherOptions _options;
- private readonly IRabbitMQConnectionFactory _connectionFactory;
private IModel? _channel;
private bool _connected;
+ internal IRabbitMQConnectionFactory ConnectionFactory { get; }
+
public RabbitMQMessagePublisher(
IRabbitMQConnectionFactory connectionFactory,
IOptions> config,
CloudEventFormatter cloudEventFormatter
)
{
- _connectionFactory = connectionFactory;
+ ConnectionFactory = connectionFactory;
_cloudEventFormatter = cloudEventFormatter;
_options = config.Value;
}
@@ -60,7 +61,7 @@ public async Task PublishMessageAsync(MotorCloudEvent motorCloudEvent, C
private Task StartAsync()
{
- _channel = _connectionFactory.CurrentChannel;
+ _channel = ConnectionFactory.CurrentChannel;
_connected = true;
return Task.CompletedTask;
}
diff --git a/test/Motor.Extensions.Hosting.RabbitMQ_UnitTest/RabbitMQMessageHostBuilderExtensionsTests.cs b/test/Motor.Extensions.Hosting.RabbitMQ_UnitTest/RabbitMQMessageHostBuilderExtensionsTests.cs
new file mode 100644
index 00000000..d75df93b
--- /dev/null
+++ b/test/Motor.Extensions.Hosting.RabbitMQ_UnitTest/RabbitMQMessageHostBuilderExtensionsTests.cs
@@ -0,0 +1,136 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Moq;
+using Motor.Extensions.Conversion.Abstractions;
+using Motor.Extensions.Hosting.Abstractions;
+using Motor.Extensions.Hosting.CloudEvents;
+using Motor.Extensions.Hosting.Consumer;
+using Motor.Extensions.Hosting.Publisher;
+using Motor.Extensions.Hosting.RabbitMQ;
+using Motor.Extensions.Utilities;
+using Motor.Extensions.Utilities.Abstractions;
+using Xunit;
+
+namespace Motor.Extensions.Hosting.RabbitMQ_UnitTest
+{
+ public class RabbitMQMessageHostBuilderExtensionsTests
+ {
+ private static IMotorHostBuilder GetHostBuilder()
+ {
+ return MotorHost.CreateDefaultBuilder()
+ .ConfigureSingleOutputService()
+ .ConfigureServices((_, services) =>
+ {
+ services.AddTransient(_ =>
+ {
+ var mock = new Mock();
+ mock.Setup(t => t.GetVersion()).Returns("test");
+ mock.Setup(t => t.GetLibVersion()).Returns("test");
+ mock.Setup(t => t.GetSource()).Returns(new Uri("motor://test"));
+ return mock.Object;
+ });
+ });
+ }
+
+ [Fact]
+ public void AddRabbitMQWithConfig_ConsumersWithDifferentConfigAdded_ConsumersWithDifferentConfigPresent()
+ {
+ var consumers = GetHostBuilder()
+ .ConfigureConsumer((_, builder) =>
+ {
+ builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host1"));
+ builder.AddDeserializer();
+ })
+ .ConfigureConsumer((_, builder) =>
+ {
+ builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host2"));
+ builder.AddDeserializer();
+ })
+ .Build().Services
+ .GetServices>().ToList();
+
+ Assert.Contains(consumers, c => ((RabbitMQMessageConsumer)c).ConnectionFactory.VirtualHost == "Host1");
+ Assert.Contains(consumers, c => ((RabbitMQMessageConsumer)c).ConnectionFactory.VirtualHost == "Host2");
+ }
+
+ [Fact]
+ public void AddRabbitMQWithConfig_PublishersWithDifferentConfigAdded_PublishersWithDifferentConfigPresent()
+ {
+ var publishers = GetHostBuilder()
+ .ConfigurePublisher((_, builder) =>
+ {
+ builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host1"));
+ builder.AddSerializer();
+ })
+ .ConfigurePublisher((_, builder) =>
+ {
+ builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host2"));
+ builder.AddSerializer();
+ })
+ .Build().Services
+ .GetServices>().ToList();
+
+ Assert.Contains(publishers, c => c.ConnectionFactory.VirtualHost == "Host1");
+ Assert.Contains(publishers, c => c.ConnectionFactory.VirtualHost == "Host2");
+ }
+
+ [Fact]
+ public void AddRabbitMQWithConfig_ConsumerAndPublisherWithDifferentConfigAdded_ConsumerAndPublisherWithDifferentConfigPresent()
+ {
+ var host = GetHostBuilder()
+ .ConfigureConsumer((_, builder) =>
+ {
+ builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host1"));
+ builder.AddDeserializer();
+ })
+ .ConfigurePublisher((_, builder) =>
+ {
+ builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host2"));
+ builder.AddSerializer();
+ })
+ .Build();
+
+ var consumers = host.Services.GetServices>().ToList();
+ var publishers = host.Services.GetServices>().ToList();
+
+ Assert.Contains(consumers, c => ((RabbitMQMessageConsumer)c).ConnectionFactory.VirtualHost == "Host1");
+ Assert.Contains(publishers, c => c.ConnectionFactory.VirtualHost == "Host2");
+ }
+
+ private static IConfiguration GetConfigWithVHost(string vHost)
+ {
+ var configDict = new Dictionary
+ {
+ { "Host", "localhost" },
+ { "User", "guest" },
+ { "Password", "guest" },
+ { "Queue:Name", "Test" },
+ { "VirtualHost", vHost },
+ { "PublishingTarget:RoutingKey", "routing.key" },
+ { "PublishingTarget:Exchange", "amq.topic" }
+ };
+ return new ConfigurationBuilder().AddInMemoryCollection(configDict).Build();
+ }
+
+ private class StringSerializer : IMessageSerializer
+ {
+ public byte[] Serialize(string message)
+ {
+ return Encoding.UTF8.GetBytes(message);
+ }
+ }
+
+ private class StringDeserializer : IMessageDeserializer
+ {
+ public string Deserialize(byte[] message)
+ {
+ return Encoding.UTF8.GetString(message);
+ }
+
+ }
+ }
+}