Skip to content

Commit

Permalink
Allow multiple RabbitMQ connections with different configs (#285)
Browse files Browse the repository at this point in the history
* Allow multiple RabbitMQ connections with different configs

* Add tests

* Apply nitpicks

Co-authored-by: Phillip Kemkes <phillip.kemkes@gdata.de>
  • Loading branch information
rngcntr and pkemkes authored Aug 6, 2021
1 parent a4e1fa9 commit e740c84
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 18 deletions.
2 changes: 1 addition & 1 deletion shared.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>

<PropertyGroup>
<Version>0.7.2</Version>
<Version>0.7.3</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
<WarningsAsErrors>CS8600;CS8602;CS8625;CS8618;CS8604;CS8601</WarningsAsErrors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
<ProjectReference Include="..\Motor.Extensions.Utilities.Abstractions\Motor.Extensions.Utilities.Abstractions.csproj" />
</ItemGroup>

<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleTo">
<_Parameter1>$(AssemblyName)_UnitTest</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

<Import Project="$(MSBuildThisFileDirectory)../../shared.csproj" />

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@ public class RabbitMQMessageConsumer<T> : IMessageConsumer<T> where T : notnull
private readonly IHostApplicationLifetime _applicationLifetime;
private readonly IApplicationNameService _applicationNameService;
private readonly RabbitMQConsumerOptions<T> _options;
private readonly IRabbitMQConnectionFactory<T> _connectionFactory;
private readonly ILogger<RabbitMQMessageConsumer<T>> _logger;
private bool _started;
private IModel? _channel;
private CancellationToken _stoppingToken;

internal IRabbitMQConnectionFactory<T> ConnectionFactory { get; }

public RabbitMQMessageConsumer(ILogger<RabbitMQMessageConsumer<T>> logger,
IRabbitMQConnectionFactory<T> connectionFactory,
IOptions<RabbitMQConsumerOptions<T>> config,
IHostApplicationLifetime applicationLifetime,
IApplicationNameService applicationNameService)
{
_logger = logger;
_connectionFactory = connectionFactory;
ConnectionFactory = connectionFactory;
_options = config.Value;
_applicationLifetime = applicationLifetime;
_applicationNameService = applicationNameService;
Expand All @@ -53,7 +54,7 @@ public Task StartAsync(CancellationToken token = default)
{
ThrowIfNoCallbackConfigured();
ThrowIfConsumerAlreadyStarted();
_channel = _connectionFactory.CurrentChannel;
_channel = ConnectionFactory.CurrentChannel;
ConfigureChannel();
DeclareQueue();
StartConsumerOnChannel();
Expand All @@ -63,7 +64,7 @@ public Task StartAsync(CancellationToken token = default)
public Task StopAsync(CancellationToken token = default)
{
_started = false;
_connectionFactory.Dispose();
ConnectionFactory.Dispose();
return Task.CompletedTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
using CloudNative.CloudEvents.SystemTextJson;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Motor.Extensions.Diagnostics.Queue.Abstractions;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Motor.Extensions.Hosting.RabbitMQ.Options;
using MSOptions = Microsoft.Extensions.Options.Options;

namespace Motor.Extensions.Hosting.RabbitMQ
{
Expand All @@ -14,11 +17,18 @@ public static class RabbitMQMessageHostBuilderExtensions
public static void AddRabbitMQWithConfig<T>(this IConsumerBuilder<T> builder, IConfiguration config) where T : notnull
{
builder.AddTransient<CloudEventFormatter, JsonEventFormatter>();
builder.Configure<RabbitMQConsumerOptions<T>>(config);
builder.AddSingleton(sp =>
RabbitMQConnectionFactory<T>.From(sp.GetRequiredService<IOptions<RabbitMQConsumerOptions<T>>>().Value));
builder.AddConsumer<RabbitMQMessageConsumer<T>>();
builder.AddSingleton<IQueueMonitor, RabbitMQQueueMonitor<T>>();
var consumerOptions = new RabbitMQConsumerOptions<T>();
config.Bind(consumerOptions);
var connectionFactory = RabbitMQConnectionFactory<T>.From(consumerOptions);
builder.AddConsumer(sp => new RabbitMQMessageConsumer<T>(
sp.GetRequiredService<ILogger<RabbitMQMessageConsumer<T>>>(),
connectionFactory,
MSOptions.Create(consumerOptions),
sp.GetRequiredService<IHostApplicationLifetime>(),
sp.GetRequiredService<IApplicationNameService>()));
builder.AddSingleton<IQueueMonitor>(sp =>
new RabbitMQQueueMonitor<T>(sp.GetRequiredService<ILogger<RabbitMQQueueMonitor<T>>>(),
MSOptions.Create(consumerOptions), connectionFactory));
}

public static void AddRabbitMQ<T>(this IConsumerBuilder<T> builder, string configSection = "RabbitMQConsumer") where T : notnull
Expand All @@ -30,10 +40,13 @@ public static void AddRabbitMQWithConfig<T>(this IPublisherBuilder<T> builder, I
{
builder.AddTransient<CloudEventFormatter, JsonEventFormatter>();
builder.Configure<RabbitMQPublisherOptions<T>>(config);
builder.AddSingleton(sp =>
RabbitMQConnectionFactory<T>.From(sp.GetRequiredService<IOptions<RabbitMQPublisherOptions<T>>>()
.Value));
builder.AddPublisher<RabbitMQMessagePublisher<T>>();
var publisherOptions = new RabbitMQPublisherOptions<T>();
config.Bind(publisherOptions);
var connectionFactory = RabbitMQConnectionFactory<T>.From(publisherOptions);
builder.AddPublisher(sp
=> new RabbitMQMessagePublisher<T>(connectionFactory,
MSOptions.Create(publisherOptions),
sp.GetRequiredService<CloudEventFormatter>()));
}

public static void AddRabbitMQ<T>(this IPublisherBuilder<T> builder, string configSection = "RabbitMQPublisher") where T : notnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ public class RabbitMQMessagePublisher<T> : ITypedMessagePublisher<byte[]>
{
private readonly CloudEventFormatter _cloudEventFormatter;
private readonly RabbitMQPublisherOptions<T> _options;
private readonly IRabbitMQConnectionFactory<T> _connectionFactory;
private IModel? _channel;
private bool _connected;

internal IRabbitMQConnectionFactory<T> ConnectionFactory { get; }

public RabbitMQMessagePublisher(
IRabbitMQConnectionFactory<T> connectionFactory,
IOptions<RabbitMQPublisherOptions<T>> config,
CloudEventFormatter cloudEventFormatter
)
{
_connectionFactory = connectionFactory;
ConnectionFactory = connectionFactory;
_cloudEventFormatter = cloudEventFormatter;
_options = config.Value;
}
Expand Down Expand Up @@ -60,7 +61,7 @@ public async Task PublishMessageAsync(MotorCloudEvent<byte[]> motorCloudEvent, C

private Task StartAsync()
{
_channel = _connectionFactory.CurrentChannel;
_channel = ConnectionFactory.CurrentChannel;
_connected = true;
return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Moq;
using Motor.Extensions.Conversion.Abstractions;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Motor.Extensions.Hosting.Consumer;
using Motor.Extensions.Hosting.Publisher;
using Motor.Extensions.Hosting.RabbitMQ;
using Motor.Extensions.Utilities;
using Motor.Extensions.Utilities.Abstractions;
using Xunit;

namespace Motor.Extensions.Hosting.RabbitMQ_UnitTest
{
public class RabbitMQMessageHostBuilderExtensionsTests
{
private static IMotorHostBuilder GetHostBuilder()
{
return MotorHost.CreateDefaultBuilder()
.ConfigureSingleOutputService<string, string>()
.ConfigureServices((_, services) =>
{
services.AddTransient(_ =>
{
var mock = new Mock<IApplicationNameService>();
mock.Setup(t => t.GetVersion()).Returns("test");
mock.Setup(t => t.GetLibVersion()).Returns("test");
mock.Setup(t => t.GetSource()).Returns(new Uri("motor://test"));
return mock.Object;
});
});
}

[Fact]
public void AddRabbitMQWithConfig_ConsumersWithDifferentConfigAdded_ConsumersWithDifferentConfigPresent()
{
var consumers = GetHostBuilder()
.ConfigureConsumer<string>((_, builder) =>
{
builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host1"));
builder.AddDeserializer<StringDeserializer>();
})
.ConfigureConsumer<string>((_, builder) =>
{
builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host2"));
builder.AddDeserializer<StringDeserializer>();
})
.Build().Services
.GetServices<IMessageConsumer<string>>().ToList();

Assert.Contains(consumers, c => ((RabbitMQMessageConsumer<string>)c).ConnectionFactory.VirtualHost == "Host1");
Assert.Contains(consumers, c => ((RabbitMQMessageConsumer<string>)c).ConnectionFactory.VirtualHost == "Host2");
}

[Fact]
public void AddRabbitMQWithConfig_PublishersWithDifferentConfigAdded_PublishersWithDifferentConfigPresent()
{
var publishers = GetHostBuilder()
.ConfigurePublisher<string>((_, builder) =>
{
builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host1"));
builder.AddSerializer<StringSerializer>();
})
.ConfigurePublisher<string>((_, builder) =>
{
builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host2"));
builder.AddSerializer<StringSerializer>();
})
.Build().Services
.GetServices<RabbitMQMessagePublisher<string>>().ToList();

Assert.Contains(publishers, c => c.ConnectionFactory.VirtualHost == "Host1");
Assert.Contains(publishers, c => c.ConnectionFactory.VirtualHost == "Host2");
}

[Fact]
public void AddRabbitMQWithConfig_ConsumerAndPublisherWithDifferentConfigAdded_ConsumerAndPublisherWithDifferentConfigPresent()
{
var host = GetHostBuilder()
.ConfigureConsumer<string>((_, builder) =>
{
builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host1"));
builder.AddDeserializer<StringDeserializer>();
})
.ConfigurePublisher<string>((_, builder) =>
{
builder.AddRabbitMQWithConfig(GetConfigWithVHost("Host2"));
builder.AddSerializer<StringSerializer>();
})
.Build();

var consumers = host.Services.GetServices<IMessageConsumer<string>>().ToList();
var publishers = host.Services.GetServices<RabbitMQMessagePublisher<string>>().ToList();

Assert.Contains(consumers, c => ((RabbitMQMessageConsumer<string>)c).ConnectionFactory.VirtualHost == "Host1");
Assert.Contains(publishers, c => c.ConnectionFactory.VirtualHost == "Host2");
}

private static IConfiguration GetConfigWithVHost(string vHost)
{
var configDict = new Dictionary<string, string>
{
{ "Host", "localhost" },
{ "User", "guest" },
{ "Password", "guest" },
{ "Queue:Name", "Test" },
{ "VirtualHost", vHost },
{ "PublishingTarget:RoutingKey", "routing.key" },
{ "PublishingTarget:Exchange", "amq.topic" }
};
return new ConfigurationBuilder().AddInMemoryCollection(configDict).Build();
}

private class StringSerializer : IMessageSerializer<string>
{
public byte[] Serialize(string message)
{
return Encoding.UTF8.GetBytes(message);
}
}

private class StringDeserializer : IMessageDeserializer<string>
{
public string Deserialize(byte[] message)
{
return Encoding.UTF8.GetString(message);
}

}
}
}

0 comments on commit e740c84

Please sign in to comment.