From 5948f3f80b81cd4bf7c77143869e1a7bf573fe03 Mon Sep 17 00:00:00 2001 From: stuartmcgillivray <59926160+stuartmcgillivray@users.noreply.github.com> Date: Thu, 9 Nov 2023 14:19:34 +0000 Subject: [PATCH] Adding amqps support to rabbit activities (#4612) * Adding ampqs support to rabbit activities * Correcting case of Ssl --- .../RabbitMqMessageReceived.cs | 22 +++++++++++++ .../SendRabbitMqMessage.cs | 27 ++++++++++++++- .../Bookmarks/MessageReceivedBookmark.cs | 21 ++++++++++-- .../Configuration/RabbitMqBusConfiguration.cs | 33 ++++++++++++++++++- .../Services/Client.cs | 10 ++++-- .../Services/RabbitMqQueueStarter.cs | 5 ++- .../Services/Worker.cs | 2 +- .../Testing/RabbitMqTestQueueManager.cs | 5 ++- 8 files changed, 116 insertions(+), 9 deletions(-) diff --git a/src/activities/Elsa.Activities.RabbitMq/Activities/RabbitMqMessageReceived/RabbitMqMessageReceived.cs b/src/activities/Elsa.Activities.RabbitMq/Activities/RabbitMqMessageReceived/RabbitMqMessageReceived.cs index 94195b237c..bcd83e43fd 100644 --- a/src/activities/Elsa.Activities.RabbitMq/Activities/RabbitMqMessageReceived/RabbitMqMessageReceived.cs +++ b/src/activities/Elsa.Activities.RabbitMq/Activities/RabbitMqMessageReceived/RabbitMqMessageReceived.cs @@ -47,6 +47,28 @@ public class RabbitMqMessageReceived : Activity Category = PropertyCategories.Configuration)] public string ConnectionString { get; set; } = default!; + + [ActivityInput( + Order = 3, + Category = PropertyCategories.Configuration, + Name = "Enable SSL")] + public bool EnableSsl { get; set; } + + [ActivityInput( + Order = 4, + Category = PropertyCategories.Configuration, + Name = "SSL Host")] + public string SslHost { get; set; } + + [ActivityInput( + Order = 5, + Category = PropertyCategories.Configuration, + UIHint = ActivityInputUIHints.CheckList, + DefaultSyntax = SyntaxNames.Json, + Options = new[] { "Ssl2", "Ssl3", "Tls", "Tls11", "Tls12", "Tls13" }, + Name = "SSL Protocols")] + public HashSet SslProtocols { get; set; } = new() { }; + public string ClientId => RabbitMqClientConfigurationHelper.GetClientId(Id); diff --git a/src/activities/Elsa.Activities.RabbitMq/Activities/SendRabbitMqMessage/SendRabbitMqMessage.cs b/src/activities/Elsa.Activities.RabbitMq/Activities/SendRabbitMqMessage/SendRabbitMqMessage.cs index 82995627d9..0e3adbdbc0 100644 --- a/src/activities/Elsa.Activities.RabbitMq/Activities/SendRabbitMqMessage/SendRabbitMqMessage.cs +++ b/src/activities/Elsa.Activities.RabbitMq/Activities/SendRabbitMqMessage/SendRabbitMqMessage.cs @@ -61,11 +61,36 @@ public SendRabbitMqMessage(IMessageSenderClientFactory messageSenderClientFactor Category = PropertyCategories.Configuration)] public string ConnectionString { get; set; } = default!; + [ActivityInput( + Order = 2, + SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid }, + Category = PropertyCategories.Configuration, + Name = "Enable SSL")] + public bool EnableSsl { get; set; } + + [ActivityInput( + Order = 3, + SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid }, + Category = PropertyCategories.Configuration, + Name = "SSL Host")] + public string SslHost { get; set; } + + [ActivityInput( + Order = 4, + SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid,SyntaxNames.Json }, + Category = PropertyCategories.Configuration, + UIHint = ActivityInputUIHints.CheckList, + DefaultSyntax = SyntaxNames.Json, + Options = new[] { "Ssl2", "Ssl3", "Tls", "Tls11", "Tls12", "Tls13" }, + Name = "SSL Protocols" + )] + public HashSet SslProtocols { get; set; } = new() { }; + public string ClientId => RabbitMqClientConfigurationHelper.GetClientId(Id); protected override async ValueTask OnExecuteAsync(ActivityExecutionContext context) { - var config = new RabbitMqBusConfiguration(ConnectionString, ExchangeName, RoutingKey, Headers, ClientId); + var config = new RabbitMqBusConfiguration(ConnectionString, ExchangeName, RoutingKey, Headers, ClientId, EnableSsl, SslHost, SslProtocols); var client = await _messageSenderClientFactory.GetSenderAsync(config); diff --git a/src/activities/Elsa.Activities.RabbitMq/Bookmarks/MessageReceivedBookmark.cs b/src/activities/Elsa.Activities.RabbitMq/Bookmarks/MessageReceivedBookmark.cs index 8848a303fb..2d4e28445a 100644 --- a/src/activities/Elsa.Activities.RabbitMq/Bookmarks/MessageReceivedBookmark.cs +++ b/src/activities/Elsa.Activities.RabbitMq/Bookmarks/MessageReceivedBookmark.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Newtonsoft.Json; namespace Elsa.Activities.RabbitMq.Bookmarks { @@ -11,18 +12,31 @@ public MessageReceivedBookmark() { } - public MessageReceivedBookmark(string exchangeName, string routingKey, string connectionString, Dictionary headers) + public MessageReceivedBookmark(string exchangeName, string routingKey, string connectionString, Dictionary headers, bool sslEnabled, string sslHost, IEnumerable sslProtocols) { ExchangeName = exchangeName; RoutingKey = routingKey; ConnectionString = connectionString; + SslEnabled = sslEnabled; + SslHost = sslHost; + SslProtocols = sslProtocols ?? new List(); Headers = headers ?? new Dictionary(); } + [JsonProperty(Order = 1)] public string ExchangeName { get; set; } = default!; + [JsonProperty(Order = 2)] public string RoutingKey { get; set; } = default!; + [JsonProperty(Order = 3)] public string ConnectionString { get; set; } = default!; + [JsonProperty(Order = 4)] + public bool SslEnabled { get; set; } = default!; + [JsonProperty(Order = 5)] + public string SslHost { get; set; } = default!; + [JsonProperty(Order = 6)] public Dictionary Headers { get; set; } = default!; + [JsonProperty(Order = 7)] + public IEnumerable SslProtocols { get; set; } = default!; } public class QueueMessageReceivedBookmarkProvider : BookmarkProvider @@ -35,7 +49,10 @@ public override async ValueTask> GetBookmarksAsync(B exchangeName: (await context.ReadActivityPropertyAsync(x => x.ExchangeName, cancellationToken))!, routingKey: (await context.ReadActivityPropertyAsync(x => x.RoutingKey, cancellationToken))!, connectionString: (await context.ReadActivityPropertyAsync(x => x.ConnectionString, cancellationToken))!, - headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken))! + headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken))!, + sslEnabled:(await context.ReadActivityPropertyAsync(x => x.EnableSsl, cancellationToken))!, + sslHost: (await context.ReadActivityPropertyAsync(x => x.SslHost, cancellationToken))!, + sslProtocols: (await context.ReadActivityPropertyAsync(x => x.SslProtocols, cancellationToken))! )) }; } diff --git a/src/activities/Elsa.Activities.RabbitMq/Configuration/RabbitMqBusConfiguration.cs b/src/activities/Elsa.Activities.RabbitMq/Configuration/RabbitMqBusConfiguration.cs index f0998429ea..5f2f4e9fda 100644 --- a/src/activities/Elsa.Activities.RabbitMq/Configuration/RabbitMqBusConfiguration.cs +++ b/src/activities/Elsa.Activities.RabbitMq/Configuration/RabbitMqBusConfiguration.cs @@ -1,5 +1,7 @@ +using System; using System.Collections.Generic; using System.Linq; +using System.Security.Authentication; namespace Elsa.Activities.RabbitMq.Configuration { @@ -11,13 +13,19 @@ public class RabbitMqBusConfiguration public Dictionary Headers { get; } public string ClientId { get; } public bool AutoDeleteQueue { get; } + public bool EnableSsl { get; } + public string SslHost { get; } + public SslProtocols SslProtocols { get; } - public RabbitMqBusConfiguration(string connectionString, string exchangeName, string routingKey, Dictionary headers, string clientId, bool autoDeleteQueue = false) + public RabbitMqBusConfiguration(string connectionString, string exchangeName, string routingKey, Dictionary headers, string clientId, bool enableSsl, string sslHost, IEnumerable sslProtocols, bool autoDeleteQueue = false) { ConnectionString = connectionString; ExchangeName = exchangeName; RoutingKey = routingKey; Headers = headers ?? new Dictionary(); + EnableSsl = enableSsl; + SslHost = sslHost; + SslProtocols = ResolveSslProtocols(sslProtocols ?? Array.Empty()); ClientId = clientId; AutoDeleteQueue = autoDeleteQueue; } @@ -30,5 +38,28 @@ public override int GetHashCode() } public string TopicFullName => string.IsNullOrEmpty(ExchangeName) ? RoutingKey : $"{RoutingKey}@{ExchangeName}"; + + public IEnumerable SslProtocolsString => Enum.GetValues(typeof(SslProtocols)) + .Cast() + .Where(c => SslProtocols.HasFlag(c) && c != SslProtocols.None) + .Select(c => c.ToString()); + + private SslProtocols ResolveSslProtocols(IEnumerable sslProtocols) + { + var parsed = sslProtocols + .Select(s => + { + var val = (SslProtocols)Enum.Parse(typeof(System.Security.Authentication.SslProtocols), s); + return val; + }).ToList(); + + SslProtocols values = SslProtocols.None; + + foreach (var sslProtocol in parsed) + { + values |= sslProtocol; + } + return values; + } } } diff --git a/src/activities/Elsa.Activities.RabbitMq/Services/Client.cs b/src/activities/Elsa.Activities.RabbitMq/Services/Client.cs index 9d7804847f..5027eeb3dd 100644 --- a/src/activities/Elsa.Activities.RabbitMq/Services/Client.cs +++ b/src/activities/Elsa.Activities.RabbitMq/Services/Client.cs @@ -3,6 +3,7 @@ using Rebus.Bus; using Rebus.Config; using Rebus.Messages; +using Rebus.RabbitMq; using Rebus.Routing.TransportMessages; using System; using System.Collections.Generic; @@ -41,7 +42,9 @@ public void SubscribeWithHandler(Func })) .Transport(t => { - t.UseRabbitMq(Configuration.ConnectionString, Configuration.ClientId).InputQueueOptions(x => x.SetAutoDelete(Configuration.AutoDeleteQueue)); + t.UseRabbitMq(Configuration.ConnectionString, Configuration.ClientId) + .InputQueueOptions(x => x.SetAutoDelete(Configuration.AutoDeleteQueue)) + .Ssl(new SslSettings(Configuration.EnableSsl, Configuration.SslHost, version: Configuration.SslProtocols)); }) .Start(); @@ -64,7 +67,10 @@ private void ConfigureAsOneWayClient() { _bus = Configure .With(_activator) - .Transport(t => t.UseRabbitMqAsOneWayClient(Configuration.ConnectionString).InputQueueOptions(o => o.SetAutoDelete(autoDelete: true))) + .Transport(t => t.UseRabbitMqAsOneWayClient(Configuration.ConnectionString) + .InputQueueOptions(o => o.SetAutoDelete(autoDelete: true)) + .Ssl(new SslSettings(Configuration.EnableSsl, Configuration.SslHost, version: Configuration.SslProtocols)) + ) .Start(); } diff --git a/src/activities/Elsa.Activities.RabbitMq/Services/RabbitMqQueueStarter.cs b/src/activities/Elsa.Activities.RabbitMq/Services/RabbitMqQueueStarter.cs index 5696c3e558..7ad2636626 100644 --- a/src/activities/Elsa.Activities.RabbitMq/Services/RabbitMqQueueStarter.cs +++ b/src/activities/Elsa.Activities.RabbitMq/Services/RabbitMqQueueStarter.cs @@ -108,9 +108,12 @@ private RabbitMqBusConfiguration CreateConfigurationFromBookmark(MessageReceived var exchangeName = bookmark.ExchangeName; var routingKey = bookmark.RoutingKey; var headers = bookmark.Headers; + var enableSsl = bookmark.SslEnabled; + var sslHost = bookmark.SslHost; + var sslProtocols = bookmark.SslProtocols; var clientId = RabbitMqClientConfigurationHelper.GetClientId(activityId); - return new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers, clientId); + return new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers, clientId, enableSsl, sslHost, sslProtocols); } private async Task DisposeExistingWorkersAsync() diff --git a/src/activities/Elsa.Activities.RabbitMq/Services/Worker.cs b/src/activities/Elsa.Activities.RabbitMq/Services/Worker.cs index e5a8680f8c..2d189dead6 100644 --- a/src/activities/Elsa.Activities.RabbitMq/Services/Worker.cs +++ b/src/activities/Elsa.Activities.RabbitMq/Services/Worker.cs @@ -50,7 +50,7 @@ private async Task TriggerWorkflowsAsync(TransportMessage message, CancellationT var config = _client.Configuration; - var bookmark = new MessageReceivedBookmark(config.ExchangeName, config.RoutingKey, config.ConnectionString, config.Headers); + var bookmark = new MessageReceivedBookmark(config.ExchangeName, config.RoutingKey, config.ConnectionString, config.Headers, config.EnableSsl, config.SslHost, config.SslProtocolsString); var launchContext = new WorkflowsQuery(ActivityType, bookmark); using var scope = _scopeFactory.CreateScope(); diff --git a/src/activities/Elsa.Activities.RabbitMq/Testing/RabbitMqTestQueueManager.cs b/src/activities/Elsa.Activities.RabbitMq/Testing/RabbitMqTestQueueManager.cs index 14d2c7e4e4..99c99ec735 100644 --- a/src/activities/Elsa.Activities.RabbitMq/Testing/RabbitMqTestQueueManager.cs +++ b/src/activities/Elsa.Activities.RabbitMq/Testing/RabbitMqTestQueueManager.cs @@ -96,9 +96,12 @@ private async IAsyncEnumerable GetConfigurationsAsync( var routingKey = await activity.EvaluatePropertyValueAsync(x => x.RoutingKey, cancellationToken); var exchangeName = await activity.EvaluatePropertyValueAsync(x => x.ExchangeName, cancellationToken); var headers = await activity.EvaluatePropertyValueAsync(x => x.Headers, cancellationToken); + var enableSSL = await activity.EvaluatePropertyValueAsync(x => x.EnableSsl, cancellationToken); + var sslHost = await activity.EvaluatePropertyValueAsync(x => x.SslHost, cancellationToken); + var sslProtocols = await activity.EvaluatePropertyValueAsync(x => x.SslProtocols, cancellationToken); var clientId = RabbitMqClientConfigurationHelper.GetTestClientId(activity.ActivityBlueprint.Id); - var config = new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers!, clientId, autoDeleteQueue: true); + var config = new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers!, clientId, enableSSL, sslHost, sslProtocols, autoDeleteQueue: true); yield return config; }