From e740c8487211cbb9f2fc6e2c910715ac74dc3b50 Mon Sep 17 00:00:00 2001 From: rngcntr <7890887+rngcntr@users.noreply.github.com> Date: Fri, 6 Aug 2021 07:38:29 +0200 Subject: [PATCH] Allow multiple RabbitMQ connections with different configs (#285) * Allow multiple RabbitMQ connections with different configs * Add tests * Apply nitpicks Co-authored-by: Phillip Kemkes --- shared.csproj | 2 +- .../Motor.Extensions.Hosting.RabbitMQ.csproj | 6 + .../RabbitMQMessageConsumer.cs | 9 +- .../RabbitMQMessageHostBuilderExtensions.cs | 33 +++-- .../RabbitMQMessagePublisher.cs | 7 +- ...bbitMQMessageHostBuilderExtensionsTests.cs | 136 ++++++++++++++++++ 6 files changed, 175 insertions(+), 18 deletions(-) create mode 100644 test/Motor.Extensions.Hosting.RabbitMQ_UnitTest/RabbitMQMessageHostBuilderExtensionsTests.cs diff --git a/shared.csproj b/shared.csproj index 2e66e880..ed6915d3 100644 --- a/shared.csproj +++ b/shared.csproj @@ -1,7 +1,7 @@ - 0.7.2 + 0.7.3 9 enable CS8600;CS8602;CS8625;CS8618;CS8604;CS8601 diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/Motor.Extensions.Hosting.RabbitMQ.csproj b/src/Motor.Extensions.Hosting.RabbitMQ/Motor.Extensions.Hosting.RabbitMQ.csproj index 0e29ec3e..559843c2 100644 --- a/src/Motor.Extensions.Hosting.RabbitMQ/Motor.Extensions.Hosting.RabbitMQ.csproj +++ b/src/Motor.Extensions.Hosting.RabbitMQ/Motor.Extensions.Hosting.RabbitMQ.csproj @@ -18,6 +18,12 @@ + + + <_Parameter1>$(AssemblyName)_UnitTest + + + diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageConsumer.cs b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageConsumer.cs index 94fb526f..60786974 100644 --- a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageConsumer.cs +++ b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageConsumer.cs @@ -18,12 +18,13 @@ public class RabbitMQMessageConsumer : IMessageConsumer where T : notnull private readonly IHostApplicationLifetime _applicationLifetime; private readonly IApplicationNameService _applicationNameService; private readonly RabbitMQConsumerOptions _options; - private readonly IRabbitMQConnectionFactory _connectionFactory; private readonly ILogger> _logger; private bool _started; private IModel? _channel; private CancellationToken _stoppingToken; + internal IRabbitMQConnectionFactory ConnectionFactory { get; } + public RabbitMQMessageConsumer(ILogger> logger, IRabbitMQConnectionFactory connectionFactory, IOptions> config, @@ -31,7 +32,7 @@ public RabbitMQMessageConsumer(ILogger> logger, IApplicationNameService applicationNameService) { _logger = logger; - _connectionFactory = connectionFactory; + ConnectionFactory = connectionFactory; _options = config.Value; _applicationLifetime = applicationLifetime; _applicationNameService = applicationNameService; @@ -53,7 +54,7 @@ public Task StartAsync(CancellationToken token = default) { ThrowIfNoCallbackConfigured(); ThrowIfConsumerAlreadyStarted(); - _channel = _connectionFactory.CurrentChannel; + _channel = ConnectionFactory.CurrentChannel; ConfigureChannel(); DeclareQueue(); StartConsumerOnChannel(); @@ -63,7 +64,7 @@ public Task StartAsync(CancellationToken token = default) public Task StopAsync(CancellationToken token = default) { _started = false; - _connectionFactory.Dispose(); + ConnectionFactory.Dispose(); return Task.CompletedTask; } diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageHostBuilderExtensions.cs b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageHostBuilderExtensions.cs index 4dcf0b8a..6e595f29 100644 --- a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageHostBuilderExtensions.cs +++ b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageHostBuilderExtensions.cs @@ -2,10 +2,13 @@ using CloudNative.CloudEvents.SystemTextJson; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Motor.Extensions.Diagnostics.Queue.Abstractions; using Motor.Extensions.Hosting.Abstractions; +using Motor.Extensions.Hosting.CloudEvents; using Motor.Extensions.Hosting.RabbitMQ.Options; +using MSOptions = Microsoft.Extensions.Options.Options; namespace Motor.Extensions.Hosting.RabbitMQ { @@ -14,11 +17,18 @@ public static class RabbitMQMessageHostBuilderExtensions public static void AddRabbitMQWithConfig(this IConsumerBuilder builder, IConfiguration config) where T : notnull { builder.AddTransient(); - builder.Configure>(config); - builder.AddSingleton(sp => - RabbitMQConnectionFactory.From(sp.GetRequiredService>>().Value)); - builder.AddConsumer>(); - builder.AddSingleton>(); + var consumerOptions = new RabbitMQConsumerOptions(); + config.Bind(consumerOptions); + var connectionFactory = RabbitMQConnectionFactory.From(consumerOptions); + builder.AddConsumer(sp => new RabbitMQMessageConsumer( + sp.GetRequiredService>>(), + connectionFactory, + MSOptions.Create(consumerOptions), + sp.GetRequiredService(), + sp.GetRequiredService())); + builder.AddSingleton(sp => + new RabbitMQQueueMonitor(sp.GetRequiredService>>(), + MSOptions.Create(consumerOptions), connectionFactory)); } public static void AddRabbitMQ(this IConsumerBuilder builder, string configSection = "RabbitMQConsumer") where T : notnull @@ -30,10 +40,13 @@ public static void AddRabbitMQWithConfig(this IPublisherBuilder builder, I { builder.AddTransient(); builder.Configure>(config); - builder.AddSingleton(sp => - RabbitMQConnectionFactory.From(sp.GetRequiredService>>() - .Value)); - builder.AddPublisher>(); + var publisherOptions = new RabbitMQPublisherOptions(); + config.Bind(publisherOptions); + var connectionFactory = RabbitMQConnectionFactory.From(publisherOptions); + builder.AddPublisher(sp + => new RabbitMQMessagePublisher(connectionFactory, + MSOptions.Create(publisherOptions), + sp.GetRequiredService())); } public static void AddRabbitMQ(this IPublisherBuilder builder, string configSection = "RabbitMQPublisher") where T : notnull diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessagePublisher.cs b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessagePublisher.cs index 9d135429..962dc575 100644 --- a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessagePublisher.cs +++ b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessagePublisher.cs @@ -15,17 +15,18 @@ public class RabbitMQMessagePublisher : ITypedMessagePublisher { private readonly CloudEventFormatter _cloudEventFormatter; private readonly RabbitMQPublisherOptions _options; - private readonly IRabbitMQConnectionFactory _connectionFactory; private IModel? _channel; private bool _connected; + internal IRabbitMQConnectionFactory ConnectionFactory { get; } + public RabbitMQMessagePublisher( IRabbitMQConnectionFactory connectionFactory, IOptions> config, CloudEventFormatter cloudEventFormatter ) { - _connectionFactory = connectionFactory; + ConnectionFactory = connectionFactory; _cloudEventFormatter = cloudEventFormatter; _options = config.Value; } @@ -60,7 +61,7 @@ public async Task PublishMessageAsync(MotorCloudEvent motorCloudEvent, C private Task StartAsync() { - _channel = _connectionFactory.CurrentChannel; + _channel = ConnectionFactory.CurrentChannel; _connected = true; return Task.CompletedTask; } diff --git a/test/Motor.Extensions.Hosting.RabbitMQ_UnitTest/RabbitMQMessageHostBuilderExtensionsTests.cs b/test/Motor.Extensions.Hosting.RabbitMQ_UnitTest/RabbitMQMessageHostBuilderExtensionsTests.cs new file mode 100644 index 00000000..d75df93b --- /dev/null +++ b/test/Motor.Extensions.Hosting.RabbitMQ_UnitTest/RabbitMQMessageHostBuilderExtensionsTests.cs @@ -0,0 +1,136 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Moq; +using Motor.Extensions.Conversion.Abstractions; +using Motor.Extensions.Hosting.Abstractions; +using Motor.Extensions.Hosting.CloudEvents; +using Motor.Extensions.Hosting.Consumer; +using Motor.Extensions.Hosting.Publisher; +using Motor.Extensions.Hosting.RabbitMQ; +using Motor.Extensions.Utilities; +using Motor.Extensions.Utilities.Abstractions; +using Xunit; + +namespace Motor.Extensions.Hosting.RabbitMQ_UnitTest +{ + public class RabbitMQMessageHostBuilderExtensionsTests + { + private static IMotorHostBuilder GetHostBuilder() + { + return MotorHost.CreateDefaultBuilder() + .ConfigureSingleOutputService() + .ConfigureServices((_, services) => + { + services.AddTransient(_ => + { + var mock = new Mock(); + mock.Setup(t => t.GetVersion()).Returns("test"); + mock.Setup(t => t.GetLibVersion()).Returns("test"); + mock.Setup(t => t.GetSource()).Returns(new Uri("motor://test")); + return mock.Object; + }); + }); + } + + [Fact] + public void AddRabbitMQWithConfig_ConsumersWithDifferentConfigAdded_ConsumersWithDifferentConfigPresent() + { + var consumers = GetHostBuilder() + .ConfigureConsumer((_, builder) => + { + builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host1")); + builder.AddDeserializer(); + }) + .ConfigureConsumer((_, builder) => + { + builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host2")); + builder.AddDeserializer(); + }) + .Build().Services + .GetServices>().ToList(); + + Assert.Contains(consumers, c => ((RabbitMQMessageConsumer)c).ConnectionFactory.VirtualHost == "Host1"); + Assert.Contains(consumers, c => ((RabbitMQMessageConsumer)c).ConnectionFactory.VirtualHost == "Host2"); + } + + [Fact] + public void AddRabbitMQWithConfig_PublishersWithDifferentConfigAdded_PublishersWithDifferentConfigPresent() + { + var publishers = GetHostBuilder() + .ConfigurePublisher((_, builder) => + { + builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host1")); + builder.AddSerializer(); + }) + .ConfigurePublisher((_, builder) => + { + builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host2")); + builder.AddSerializer(); + }) + .Build().Services + .GetServices>().ToList(); + + Assert.Contains(publishers, c => c.ConnectionFactory.VirtualHost == "Host1"); + Assert.Contains(publishers, c => c.ConnectionFactory.VirtualHost == "Host2"); + } + + [Fact] + public void AddRabbitMQWithConfig_ConsumerAndPublisherWithDifferentConfigAdded_ConsumerAndPublisherWithDifferentConfigPresent() + { + var host = GetHostBuilder() + .ConfigureConsumer((_, builder) => + { + builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host1")); + builder.AddDeserializer(); + }) + .ConfigurePublisher((_, builder) => + { + builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host2")); + builder.AddSerializer(); + }) + .Build(); + + var consumers = host.Services.GetServices>().ToList(); + var publishers = host.Services.GetServices>().ToList(); + + Assert.Contains(consumers, c => ((RabbitMQMessageConsumer)c).ConnectionFactory.VirtualHost == "Host1"); + Assert.Contains(publishers, c => c.ConnectionFactory.VirtualHost == "Host2"); + } + + private static IConfiguration GetConfigWithVHost(string vHost) + { + var configDict = new Dictionary + { + { "Host", "localhost" }, + { "User", "guest" }, + { "Password", "guest" }, + { "Queue:Name", "Test" }, + { "VirtualHost", vHost }, + { "PublishingTarget:RoutingKey", "routing.key" }, + { "PublishingTarget:Exchange", "amq.topic" } + }; + return new ConfigurationBuilder().AddInMemoryCollection(configDict).Build(); + } + + private class StringSerializer : IMessageSerializer + { + public byte[] Serialize(string message) + { + return Encoding.UTF8.GetBytes(message); + } + } + + private class StringDeserializer : IMessageDeserializer + { + public string Deserialize(byte[] message) + { + return Encoding.UTF8.GetString(message); + } + + } + } +}