From 4695dfb8011a632840eb7292fb2bc9c0247e7ace Mon Sep 17 00:00:00 2001 From: Timo Notheisen <65653426+tnotheis@users.noreply.github.com> Date: Sat, 21 Dec 2024 06:04:18 +0100 Subject: [PATCH] Upgrade RabbitMQ client library to v7 (#1001) * chore: set query splitting behavior to avoid exception * chore: upgrade package * chore: adapt to new API * refactor: make all event bus methods async * chore: remove redundant ExchangeDeclare call * chore: remove redundant log statement * feat: set ConsumerDispatchConcurrency to 5 * refactor: await eventBus.Publish methods * feat: remove concurrent processing of events * test: use existing tier in ActualDeletionWorkerTests * test: correctly setup test data * fix: don't try to send a push notification to a non existing identity * chore: move log statement to correct place * feat: introduce and use EnsureExchangeExists method --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .../SyncRuns/{id}/ExternalEvents/GET.feature | 1 + .../src/DatabaseMigrator/DummyEventBus.cs | 9 +- .../EventHandlerService.cs | 16 +- .../ActualDeletionWorkerTests.cs | 53 ++++--- .../src/BuildingBlocks.API/AbstractModule.cs | 6 +- .../Infrastructure/EventBus/IEventBus.cs | 10 +- .../BuildingBlocks.Infrastructure.csproj | 2 +- .../EventBusAzureServiceBus.cs | 12 +- .../EventBusGoogleCloudPubSub.cs | 10 +- .../InMemoryEventBusSubscriptionsManager.cs | 12 +- .../DefaultRabbitMQPersisterConnection.cs | 118 +++++++++------ .../EventBus/RabbitMQ/EventBusRabbitMQ.cs | 139 +++++++++++------- .../RabbitMQ/IRabbitMQPersisterConnection.cs | 4 +- .../Database/AbstractDbContextBase.cs | 20 +-- .../GoogleCloudPubSubTests.cs | 28 ++-- .../AnnouncementsModule.cs | 3 +- .../ChallengesModule.cs | 4 +- .../ExternalEventCreatedDomainEventHandler.cs | 11 +- .../Extensions/IEventBusExtensions.cs | 24 +-- .../Tiers/Commands/DeleteTier/Handler.cs | 2 +- .../src/Devices.ConsumerApi/DevicesModule.cs | 4 +- .../Persistence/Database/DevicesDbContext.cs | 20 +-- .../src/Files.ConsumerApi/FilesModule.cs | 3 +- .../Extensions/IEventBusExtensions.cs | 14 +- .../Messages.ConsumerApi/MessagesModule.cs | 4 +- .../Extensions/IEventBusExtensions.cs | 28 ++-- .../src/Quotas.ConsumerApi/QuotasModule.cs | 4 +- .../Repository/IdentitiesRepository.cs | 1 + .../Extensions/IEventBusExtensions.cs | 12 +- .../RelationshipsModule.cs | 4 +- .../Aggregates/Relationships/Relationship.cs | 9 +- .../Extensions/IEventBusExtensions.cs | 28 ++-- .../SynchronizationModule.cs | 4 +- .../Tags/src/Tags.ConsumerApi/TagsModule.cs | 3 +- .../src/Tokens.ConsumerApi/TokensModule.cs | 3 +- 35 files changed, 354 insertions(+), 271 deletions(-) diff --git a/Applications/ConsumerApi/test/ConsumerApi.Tests.Integration/Features/SyncRuns/{id}/ExternalEvents/GET.feature b/Applications/ConsumerApi/test/ConsumerApi.Tests.Integration/Features/SyncRuns/{id}/ExternalEvents/GET.feature index ee650e669b..1ff63207bb 100644 --- a/Applications/ConsumerApi/test/ConsumerApi.Tests.Integration/Features/SyncRuns/{id}/ExternalEvents/GET.feature +++ b/Applications/ConsumerApi/test/ConsumerApi.Tests.Integration/Features/SyncRuns/{id}/ExternalEvents/GET.feature @@ -26,6 +26,7 @@ Feature: GET /SyncRuns/{id}/ExternalEvents And an active Relationship r between i1 and i2 And i1 has sent a Message m to i2 And i1 has terminated r + And 2 second(s) have passed And a sync run sr started by i2 When i2 sends a GET request to the /SyncRuns/sr.id/ExternalEvents endpoint Then the response status code is 200 (OK) diff --git a/Applications/DatabaseMigrator/src/DatabaseMigrator/DummyEventBus.cs b/Applications/DatabaseMigrator/src/DatabaseMigrator/DummyEventBus.cs index cc9e479793..f64bd5a778 100644 --- a/Applications/DatabaseMigrator/src/DatabaseMigrator/DummyEventBus.cs +++ b/Applications/DatabaseMigrator/src/DatabaseMigrator/DummyEventBus.cs @@ -5,15 +5,18 @@ namespace Backbone.DatabaseMigrator; public class DummyEventBus : IEventBus { - public void Publish(DomainEvent @event) + public Task Publish(DomainEvent @event) { + return Task.CompletedTask; } - public void StartConsuming() + public Task StartConsuming() { + return Task.CompletedTask; } - public void Subscribe() where T : DomainEvent where TH : IDomainEventHandler + public Task Subscribe() where T : DomainEvent where TH : IDomainEventHandler { + return Task.CompletedTask; } } diff --git a/Applications/EventHandlerService/src/EventHandlerService/EventHandlerService.cs b/Applications/EventHandlerService/src/EventHandlerService/EventHandlerService.cs index f0574be97a..1cd8f44fce 100644 --- a/Applications/EventHandlerService/src/EventHandlerService/EventHandlerService.cs +++ b/Applications/EventHandlerService/src/EventHandlerService/EventHandlerService.cs @@ -16,12 +16,10 @@ public EventHandlerService(IEventBus eventBus, IEnumerable modul _logger = logger; } - public Task StartAsync(CancellationToken cancellationToken) + public async Task StartAsync(CancellationToken cancellationToken) { - SubscribeToEvents(); - StartConsuming(); - - return Task.CompletedTask; + await SubscribeToEvents(); + await StartConsuming(); } public Task StopAsync(CancellationToken cancellationToken) @@ -29,19 +27,19 @@ public Task StopAsync(CancellationToken cancellationToken) return Task.CompletedTask; } - private void SubscribeToEvents() + private async Task SubscribeToEvents() { _logger.LogInformation("Subscribing to events..."); foreach (var module in _modules) { - module.ConfigureEventBus(_eventBus); + await module.ConfigureEventBus(_eventBus); } _logger.LogInformation("Successfully subscribed to events."); } - private void StartConsuming() + private async Task StartConsuming() { - _eventBus.StartConsuming(); + await _eventBus.StartConsuming(); } } diff --git a/Applications/IdentityDeletionJobs/test/Job.IdentityDeletion.Tests.Integration/ActualDeletionWorkerTests.cs b/Applications/IdentityDeletionJobs/test/Job.IdentityDeletion.Tests.Integration/ActualDeletionWorkerTests.cs index 6794fcf2c1..8832d65643 100644 --- a/Applications/IdentityDeletionJobs/test/Job.IdentityDeletion.Tests.Integration/ActualDeletionWorkerTests.cs +++ b/Applications/IdentityDeletionJobs/test/Job.IdentityDeletion.Tests.Integration/ActualDeletionWorkerTests.cs @@ -88,12 +88,11 @@ public async Task Anonymizes_the_identity_in_all_messages_instead_of_deleting_th { // Arrange var identityToBeDeleted = await SeedDatabaseWithIdentityWithRipeDeletionProcess(); - var peer = CreateRandomIdentityAddress(); + var peer = await SeedDatabaseWithIdentity(); + var relationship = await SeedDatabaseWithActiveRelationshipBetween(identityToBeDeleted, peer); - GetService(); - - var sentMessage = await SeedDatabaseWithMessage(from: identityToBeDeleted.Address, to: peer); - var receivedMessage = await SeedDatabaseWithMessage(from: peer, to: identityToBeDeleted.Address); + var sentMessage = await SeedDatabaseWithMessage(relationship, from: identityToBeDeleted, to: peer); + var receivedMessage = await SeedDatabaseWithMessage(relationship, from: peer, to: identityToBeDeleted); // Act await _host.StartAsync(); @@ -116,11 +115,9 @@ public async Task Deletes_relationships() { // Arrange var identityToBeDeleted = await SeedDatabaseWithIdentityWithRipeDeletionProcess(); - var peer = CreateRandomIdentityAddress(); - - GetService(); + var peerOfIdentityToBeDeleted = await SeedDatabaseWithIdentity(); - await SeedDatabaseWithActiveRelationshipBetween(peer, identityToBeDeleted.Address); + await SeedDatabaseWithActiveRelationshipBetween(identityToBeDeleted, peerOfIdentityToBeDeleted); // Act await _host.StartAsync(); @@ -138,8 +135,6 @@ public async Task Deletes_relationship_templates() // Arrange var identityToBeDeleted = await SeedDatabaseWithIdentityWithRipeDeletionProcess(); - GetService(); - await SeedDatabaseWithRelationshipTemplateOf(identityToBeDeleted.Address); // Act @@ -154,27 +149,29 @@ public async Task Deletes_relationship_templates() #region Seeders - private async Task SeedDatabaseWithMessage(IdentityAddress from, IdentityAddress to) + private async Task SeedDatabaseWithMessage(Relationship relationship, Identity from, Identity to) { var dbContext = GetService(); - var recipient = new RecipientInformation(to, RelationshipId.New(), []); - var message = new Message(from, DeviceId.New(), [], [], [recipient]); + var recipient = new RecipientInformation(to.Address, RelationshipId.Parse(relationship.Id.Value), []); + var message = new Message(from.Address, from.Devices.First().Id, [], [], [recipient]); await dbContext.SaveEntity(message); return message; } - private async Task SeedDatabaseWithActiveRelationshipBetween(IdentityAddress from, IdentityAddress to) + private async Task SeedDatabaseWithActiveRelationshipBetween(Identity from, Identity to) { var dbContext = GetService(); - var template = new RelationshipTemplate(from, DeviceId.New(), null, null, []); - var relationship = new Relationship(template, to, DeviceId.New(), [], []); - relationship.Accept(from, DeviceId.New(), []); + var template = new RelationshipTemplate(to.Address, to.Devices.First().Id, null, null, []); + var relationship = new Relationship(template, from.Address, from.Devices.First().Id, [], []); + relationship.Accept(to.Address, to.Devices.First().Id, []); await dbContext.SaveEntity(relationship); + + return relationship; } private async Task SeedDatabaseWithRelationshipTemplateOf(IdentityAddress owner) @@ -190,10 +187,11 @@ private async Task SeedDatabaseWithIdentityWithRipeDeletionProcess() { var dbContext = GetService(); - var identity = new Identity("test", CreateRandomIdentityAddress(), [], TierId.Generate(), 1, CommunicationLanguage.DEFAULT_LANGUAGE); + var tier = await dbContext.Tiers.FirstAsync(t => t.Id != Tier.QUEUED_FOR_DELETION.Id); - var device = new Device(identity, CommunicationLanguage.DEFAULT_LANGUAGE); - identity.Devices.Add(device); + var identity = new Identity("test", CreateRandomIdentityAddress(), [], tier.Id, 1, CommunicationLanguage.DEFAULT_LANGUAGE); + + var device = identity.Devices.First(); SystemTime.Set(SystemTime.UtcNow.AddMonths(-1)); identity.StartDeletionProcessAsOwner(device.Id); @@ -204,6 +202,19 @@ private async Task SeedDatabaseWithIdentityWithRipeDeletionProcess() return identity; } + private async Task SeedDatabaseWithIdentity() + { + var dbContext = GetService(); + + var tier = await dbContext.Tiers.FirstAsync(t => t.Id != Tier.QUEUED_FOR_DELETION.Id); + + var identity = new Identity("test", CreateRandomIdentityAddress(), [], tier.Id, 1, CommunicationLanguage.DEFAULT_LANGUAGE); + + await dbContext.SaveEntity(identity); + + return identity; + } + #endregion private T GetService() where T : notnull diff --git a/BuildingBlocks/src/BuildingBlocks.API/AbstractModule.cs b/BuildingBlocks/src/BuildingBlocks.API/AbstractModule.cs index c261f58d14..9212e235ea 100644 --- a/BuildingBlocks/src/BuildingBlocks.API/AbstractModule.cs +++ b/BuildingBlocks/src/BuildingBlocks.API/AbstractModule.cs @@ -10,7 +10,9 @@ public abstract class AbstractModule public abstract void ConfigureServices(IServiceCollection services, IConfigurationSection configuration); - public abstract void ConfigureEventBus(IEventBus eventBus); + public abstract Task ConfigureEventBus(IEventBus eventBus); - public virtual void PostStartupValidation(IServiceProvider serviceProvider) { } + public virtual void PostStartupValidation(IServiceProvider serviceProvider) + { + } } diff --git a/BuildingBlocks/src/BuildingBlocks.Application.Abstractions/Infrastructure/EventBus/IEventBus.cs b/BuildingBlocks/src/BuildingBlocks.Application.Abstractions/Infrastructure/EventBus/IEventBus.cs index de3fb18467..94831b71a4 100644 --- a/BuildingBlocks/src/BuildingBlocks.Application.Abstractions/Infrastructure/EventBus/IEventBus.cs +++ b/BuildingBlocks/src/BuildingBlocks.Application.Abstractions/Infrastructure/EventBus/IEventBus.cs @@ -4,18 +4,18 @@ namespace Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventB public interface IEventBus { - void Publish(IEnumerable events) + async Task Publish(IEnumerable events) { foreach (var domainEvent in events) { - Publish(domainEvent); + await Publish(domainEvent); } } - void Publish(DomainEvent @event); - void StartConsuming(); + Task Publish(DomainEvent @event); + Task StartConsuming(); - void Subscribe() + Task Subscribe() where T : DomainEvent where TH : IDomainEventHandler; } diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/BuildingBlocks.Infrastructure.csproj b/BuildingBlocks/src/BuildingBlocks.Infrastructure/BuildingBlocks.Infrastructure.csproj index 14b7220464..e6e848553d 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/BuildingBlocks.Infrastructure.csproj +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/BuildingBlocks.Infrastructure.csproj @@ -18,7 +18,7 @@ - + diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/AzureServiceBus/EventBusAzureServiceBus.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/AzureServiceBus/EventBusAzureServiceBus.cs index fb0e4dcaad..ff59554cb2 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/AzureServiceBus/EventBusAzureServiceBus.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/AzureServiceBus/EventBusAzureServiceBus.cs @@ -53,7 +53,7 @@ public async ValueTask DisposeAsync() await _processor.CloseAsync(); } - public async void Publish(DomainEvent @event) + public async Task Publish(DomainEvent @event) { var eventName = @event.GetType().Name.Replace(DOMAIN_EVENT_SUFFIX, ""); var jsonMessage = JsonConvert.SerializeObject(@event, new JsonSerializerSettings @@ -78,7 +78,7 @@ await _logger.TraceTime(async () => _logger.LogDebug("Successfully sent domain event with id '{MessageId}'.", message.MessageId); } - public void Subscribe() + public async Task Subscribe() where T : DomainEvent where TH : IDomainEventHandler { @@ -90,12 +90,12 @@ public void Subscribe() { _logger.LogInformation("Trying to create subscription on Service Bus..."); - _serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(TOPIC_NAME, _subscriptionName, + await _serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(TOPIC_NAME, _subscriptionName, new CreateRuleOptions { Filter = new CorrelationRuleFilter { Subject = eventName }, Name = eventName - }).GetAwaiter().GetResult(); + }); _logger.LogInformation("Successfully created subscription on Service Bus."); } @@ -109,9 +109,9 @@ public void Subscribe() _subscriptionManager.AddSubscription(); } - public void StartConsuming() + public async Task StartConsuming() { - RegisterSubscriptionClientMessageHandlerAsync().GetAwaiter().GetResult(); + await RegisterSubscriptionClientMessageHandlerAsync(); } private async Task RegisterSubscriptionClientMessageHandlerAsync() diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/GoogleCloudPubSub/EventBusGoogleCloudPubSub.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/GoogleCloudPubSub/EventBusGoogleCloudPubSub.cs index f25bf037a5..16c1f54246 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/GoogleCloudPubSub/EventBusGoogleCloudPubSub.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/GoogleCloudPubSub/EventBusGoogleCloudPubSub.cs @@ -52,7 +52,7 @@ public async ValueTask DisposeAsync() await _connection.DisposeAsync(); } - public async void Publish(DomainEvent @event) + public async Task Publish(DomainEvent @event) { var eventName = @event.GetType().Name.Replace(DOMAIN_EVENT_SUFFIX, ""); @@ -77,7 +77,7 @@ public async void Publish(DomainEvent @event) _logger.EventWasNotProcessed(messageId); } - public void Subscribe() + public Task Subscribe() where T : DomainEvent where TH : IDomainEventHandler { @@ -86,11 +86,13 @@ public void Subscribe() _logger.LogInformation("Subscribing to event '{EventName}' with {EventHandler}", eventName, typeof(TH).Name); _subscriptionManager.AddSubscription(); + + return Task.CompletedTask; } - public void StartConsuming() + public async Task StartConsuming() { - _connection.SubscriberClient.StartAsync(OnIncomingEvent); + await _connection.SubscriberClient.StartAsync(OnIncomingEvent); } private static string RemoveDomainEventSuffix(string typeName) diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/InMemoryEventBusSubscriptionsManager.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/InMemoryEventBusSubscriptionsManager.cs index fd424ed882..1c0963bbd5 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/InMemoryEventBusSubscriptionsManager.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/InMemoryEventBusSubscriptionsManager.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventBus; using Backbone.BuildingBlocks.Domain.Events; @@ -5,12 +6,7 @@ namespace Backbone.BuildingBlocks.Infrastructure.EventBus; public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager { - private readonly Dictionary> _handlers; - - public InMemoryEventBusSubscriptionsManager() - { - _handlers = new Dictionary>(); - } + private readonly ConcurrentDictionary> _handlers = []; public void Clear() { @@ -45,7 +41,7 @@ public string GetEventKey() return GetEventKey(typeof(T)); } - public static string GetEventKey(Type eventType) + private static string GetEventKey(Type eventType) { return eventType.Name; } @@ -54,7 +50,7 @@ private void DoAddSubscription(Type handlerType, Type eventType) { var eventName = GetEventKey(eventType); - if (!HasSubscriptionsForEvent(eventName)) _handlers.Add(eventName, []); + _handlers.TryAdd(eventName, []); if (_handlers[eventName].Any(s => s.HandlerType == handlerType)) throw new ArgumentException( diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs index 8100dc5038..4552aa3aaf 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Net.Sockets; using Microsoft.Extensions.Logging; using Polly; @@ -10,11 +11,11 @@ namespace Backbone.BuildingBlocks.Infrastructure.EventBus.RabbitMQ; public class DefaultRabbitMqPersistentConnection : IRabbitMqPersistentConnection { + private readonly SemaphoreSlim _semaphore = new(1); private readonly IConnectionFactory _connectionFactory; private readonly ILogger _logger; private readonly int _retryCount; - private readonly object _syncRoot = new(); private IConnection? _connection; private bool _disposed; @@ -28,81 +29,103 @@ public DefaultRabbitMqPersistentConnection(IConnectionFactory connectionFactory, public bool IsConnected => _connection is { IsOpen: true } && !_disposed; - public IModel CreateModel() + public async Task Connect() { - if (!IsConnected) - throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); - - return _connection!.CreateModel(); - } + if (IsConnected) + return; - public void Dispose() - { - if (_disposed) return; - - _disposed = true; + await _semaphore.WaitAsync(); try { - _connection?.Dispose(); + if (IsConnected) + return; + + await ConnectInternal(); } - catch (IOException ex) + finally { - _logger.LogCritical(ex, "There was an error while disposing the connection."); + _semaphore.Release(); } } - public bool TryConnect() + private async Task ConnectInternal() { _logger.LogInformation("RabbitMQ Client is trying to connect"); - lock (_syncRoot) + var policy = Policy.Handle() + .Or() + .WaitAndRetryAsync(_retryCount, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (ex, _) => _logger.ConnectionError(ex)); + + await policy.ExecuteAsync(async () => _connection = await _connectionFactory.CreateConnectionAsync()); + + if (!IsConnected) { - var policy = Policy.Handle() - .Or() - .WaitAndRetry(_retryCount, - retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), - (ex, _) => _logger.ConnectionError(ex)); + _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); + throw new Exception("RabbitMQ connections could not be created and opened"); + } - policy.Execute(() => _connection = _connectionFactory - .CreateConnection()); + _connection!.ConnectionShutdownAsync += OnConnectionShutdown; + _connection.CallbackExceptionAsync += OnCallbackException; + _connection.ConnectionBlockedAsync += OnConnectionBlocked; - if (!IsConnected) - { - _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); - return false; - } + _logger.LogInformation("RabbitMQ persistent connection acquired a connection to '{hostName}' and is subscribed to failure events", _connection.Endpoint.HostName); + } - _connection!.ConnectionShutdown += OnConnectionShutdown; - _connection!.CallbackException += OnCallbackException; - _connection!.ConnectionBlocked += OnConnectionBlocked; + public async Task CreateChannel() + { + Debug.Assert(IsConnected, "RabbitMQ connection is not established"); - _logger.LogInformation( - "RabbitMQ persistent connection acquired a connection '{hostName}' and is subscribed to failure events", _connection.Endpoint.HostName); + var channel = await _connection!.CreateChannelAsync(); - return true; - } + _logger.CreatedChannel(); + + return channel; } - private void OnConnectionBlocked(object? sender, ConnectionBlockedEventArgs e) + public void Dispose() { if (_disposed) return; + + _disposed = true; + + try + { + _connection?.Dispose(); + } + catch (IOException ex) + { + _logger.LogCritical(ex, "There was an error while disposing the connection."); + } + } + + private async Task OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) + { + if (_disposed) + return; + _logger.ConnectionIsBlocked(); - TryConnect(); + await Connect(); } - private void OnCallbackException(object? sender, CallbackExceptionEventArgs e) + private async Task OnCallbackException(object sender, CallbackExceptionEventArgs e) { - if (_disposed) return; + if (_disposed) + return; + _logger.ConnectionThrewAnException(); - TryConnect(); + await Connect(); } - private void OnConnectionShutdown(object? sender, ShutdownEventArgs reason) + private async Task OnConnectionShutdown(object sender, ShutdownEventArgs reason) { - if (_disposed) return; + if (_disposed) + return; + _logger.ConnectionIsShutdown(); - TryConnect(); + await Connect(); } } @@ -115,6 +138,13 @@ internal static partial class DefaultRabbitMqPersistentConnectionLogs Message = "There was an error while trying to connect to RabbitMQ. Attempting to retry...")] public static partial void ConnectionError(this ILogger logger, Exception exception); + [LoggerMessage( + EventId = 953485, + EventName = "DefaultRabbitMqPersistentConnection.CreatedChannel", + Level = LogLevel.Debug, + Message = "Successfully created a new channel.")] + public static partial void CreatedChannel(this ILogger logger); + [LoggerMessage( EventId = 119836, EventName = "DefaultRabbitMqPersistentConnection.ConnectionIsShutdown", diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs index e8c4e323a6..b6e3c7f085 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs @@ -17,7 +17,7 @@ namespace Backbone.BuildingBlocks.Infrastructure.EventBus.RabbitMQ; public class EventBusRabbitMq : IEventBus, IDisposable { - private const string BROKER_NAME = "event_bus"; + private const string EXCHANGE_NAME = "event_bus"; private const string AUTOFAC_SCOPE_NAME = "event_bus"; private readonly ILifetimeScope _autofac; @@ -28,12 +28,13 @@ public class EventBusRabbitMq : IEventBus, IDisposable private readonly HandlerRetryBehavior _handlerRetryBehavior; private readonly IEventBusSubscriptionsManager _subsManager; - private IModel _consumerChannel; - private readonly string? _queueName; - private EventingBasicConsumer? _consumer; + private IChannel? _consumerChannel; + private readonly string _queueName; + private AsyncEventingBasicConsumer? _consumer; + private bool _exchangeExistenceEnsured; public EventBusRabbitMq(IRabbitMqPersistentConnection persistentConnection, ILogger logger, - ILifetimeScope autofac, IEventBusSubscriptionsManager? subsManager, HandlerRetryBehavior handlerRetryBehavior, string? queueName = null, + ILifetimeScope autofac, IEventBusSubscriptionsManager? subsManager, HandlerRetryBehavior handlerRetryBehavior, string queueName, int connectionRetryCount = 5) { _persistentConnection = @@ -41,7 +42,6 @@ public EventBusRabbitMq(IRabbitMqPersistentConnection persistentConnection, ILog _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _queueName = queueName; - _consumerChannel = CreateConsumerChannel(); _autofac = autofac; _connectionRetryCount = connectionRetryCount; _handlerRetryBehavior = handlerRetryBehavior; @@ -49,30 +49,28 @@ public EventBusRabbitMq(IRabbitMqPersistentConnection persistentConnection, ILog public void Dispose() { - _consumerChannel.Dispose(); + _consumerChannel?.Dispose(); _subsManager.Clear(); } - public void StartConsuming() + public async Task StartConsuming() { - using var channel = _persistentConnection.CreateModel(); - channel.ExchangeDeclare(BROKER_NAME, "direct"); - if (_consumer is null) { throw new Exception("Cannot start consuming without a consumer set."); } - _consumerChannel.BasicConsume(_queueName, false, _consumer); + await _consumerChannel!.BasicConsumeAsync(_queueName, false, _consumer); } - public void Publish(DomainEvent @event) + public async Task Publish(DomainEvent @event) { - if (!_persistentConnection.IsConnected) _persistentConnection.TryConnect(); + if (!_persistentConnection.IsConnected) + await _persistentConnection.Connect(); var policy = Policy.Handle() .Or() - .WaitAndRetry(_connectionRetryCount, + .WaitAndRetryAsync(_connectionRetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, _) => _logger.ErrorOnPublish(ex)); @@ -80,8 +78,6 @@ public void Publish(DomainEvent @event) _logger.LogInformation("Creating RabbitMQ channel to publish a '{EventName}'.", eventName); - _logger.LogInformation("Declaring RabbitMQ exchange to publish a '{EventName}'.", eventName); - var message = JsonConvert.SerializeObject(@event, new JsonSerializerSettings { ContractResolver = new ContractResolverWithPrivates() @@ -89,18 +85,19 @@ public void Publish(DomainEvent @event) var body = Encoding.UTF8.GetBytes(message); - policy.Execute(() => + await policy.ExecuteAsync(async () => { _logger.LogDebug("Publishing a {EventName} to RabbitMQ.", eventName); - using var channel = _persistentConnection.CreateModel(); - var properties = channel.CreateBasicProperties(); - properties.DeliveryMode = 2; // persistent - properties.MessageId = @event.DomainEventId; - - properties.CorrelationId = CustomLogContext.GetCorrelationId(); + await using var channel = await _persistentConnection.CreateChannel(); + var properties = new BasicProperties + { + DeliveryMode = DeliveryModes.Persistent, + MessageId = @event.DomainEventId, + CorrelationId = CustomLogContext.GetCorrelationId() + }; - channel.BasicPublish(BROKER_NAME, + await channel.BasicPublishAsync(EXCHANGE_NAME, eventName, true, properties, @@ -110,20 +107,52 @@ public void Publish(DomainEvent @event) }); } - public void Subscribe() + private async Task EnsureExchangeExists() + { + if (_exchangeExistenceEnsured) + return; + + try + { + await using var channel = await _persistentConnection.CreateChannel(); + await channel.ExchangeDeclarePassiveAsync(EXCHANGE_NAME); + _exchangeExistenceEnsured = true; + } + catch (OperationInterruptedException ex) + { + if (ex.ShutdownReason?.ReplyCode == 404) + { + try + { + await using var channel = await _persistentConnection.CreateChannel(); + await channel.ExchangeDeclareAsync(EXCHANGE_NAME, "direct"); + _exchangeExistenceEnsured = true; + } + catch (Exception) + { + _logger.LogCritical("The exchange '{ExchangeName}' does not exist and could not be created.", EXCHANGE_NAME); + throw new Exception($"The exchange '{EXCHANGE_NAME}' does not exist and could not be created."); + } + } + } + } + + public async Task Subscribe() where T : DomainEvent where TH : IDomainEventHandler { var eventName = _subsManager.GetEventKey(); - DoInternalSubscription(eventName); + await DoInternalSubscription(eventName); _logger.LogInformation("Subscribing to event '{EventName}' with {EventHandler}", eventName, typeof(TH).Name); _subsManager.AddSubscription(); } - private void DoInternalSubscription(string eventName) + private async Task DoInternalSubscription(string eventName) { + await EnsureConsumerChannelExists(); + var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (containsKey) { @@ -131,37 +160,47 @@ private void DoInternalSubscription(string eventName) return; } - if (!_persistentConnection.IsConnected) _persistentConnection.TryConnect(); + if (!_persistentConnection.IsConnected) + await _persistentConnection.Connect(); _logger.LogTrace("Trying to bind queue '{QueueName}' on RabbitMQ ...", _queueName); - using var channel = _persistentConnection.CreateModel(); - channel.QueueBind(_queueName, - BROKER_NAME, + await EnsureExchangeExists(); + + await _consumerChannel!.QueueBindAsync(_queueName, + EXCHANGE_NAME, eventName); _logger.LogTrace("Successfully bound queue '{QueueName}' on RabbitMQ.", _queueName); } - private IModel CreateConsumerChannel() + private async Task EnsureConsumerChannelExists() { - if (!_persistentConnection.IsConnected) _persistentConnection.TryConnect(); + if (_consumerChannel is null) + { + await CreateConsumerChannel(); + } + } + + private async Task CreateConsumerChannel() + { + if (!_persistentConnection.IsConnected) + await _persistentConnection.Connect(); _logger.LogInformation("Creating RabbitMQ consumer channel"); - var channel = _persistentConnection.CreateModel(); + _consumerChannel = await _persistentConnection.CreateChannel(); - channel.ExchangeDeclare(BROKER_NAME, - "direct"); + await EnsureExchangeExists(); - channel.QueueDeclare(_queueName, - true, - false, - false, - null); + await _consumerChannel.QueueDeclareAsync(_queueName, + durable: true, + exclusive: false, + autoDelete: false + ); - _consumer = new EventingBasicConsumer(channel); - _consumer.Received += async (_, eventArgs) => + _consumer = new AsyncEventingBasicConsumer(_consumerChannel); + _consumer.ReceivedAsync += async (_, eventArgs) => { var eventName = eventArgs.RoutingKey; var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray()); @@ -175,26 +214,24 @@ private IModel CreateConsumerChannel() { await ProcessEvent(eventName, message); - channel.BasicAck(eventArgs.DeliveryTag, false); + await _consumerChannel.BasicAckAsync(eventArgs.DeliveryTag, false); } } catch (Exception ex) { - channel.BasicReject(eventArgs.DeliveryTag, false); + await _consumerChannel.BasicRejectAsync(eventArgs.DeliveryTag, false); _logger.ErrorWhileProcessingDomainEvent(eventName, ex); } }; - channel.CallbackException += (_, ea) => + _consumerChannel.CallbackExceptionAsync += async (_, ea) => { _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel"); - _consumerChannel.Dispose(); - _consumerChannel = CreateConsumerChannel(); + _consumerChannel?.Dispose(); + await CreateConsumerChannel(); }; - - return channel; } private async Task ProcessEvent(string eventName, string message) diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs index 6d4a65a71d..abca43b063 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs @@ -7,7 +7,7 @@ public interface IRabbitMqPersistentConnection { bool IsConnected { get; } - bool TryConnect(); + Task Connect(); - IModel CreateModel(); + Task CreateChannel(); } diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/Persistence/Database/AbstractDbContextBase.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/Persistence/Database/AbstractDbContextBase.cs index fe83439b6b..df80e27575 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/Persistence/Database/AbstractDbContextBase.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/Persistence/Database/AbstractDbContextBase.cs @@ -84,11 +84,11 @@ public async Task RunInTransaction(Func> func, IsolationLevel isol return await RunInTransaction(func, null, isolationLevel); } - public override Task SaveChangesAsync(CancellationToken cancellationToken = new()) + public override async Task SaveChangesAsync(CancellationToken cancellationToken = new()) { var entities = GetChangedEntities(); - var result = base.SaveChangesAsync(cancellationToken); - PublishDomainEvents(entities); + var result = await base.SaveChangesAsync(cancellationToken); + await PublishDomainEvents(entities); return result; } @@ -124,7 +124,7 @@ public override int SaveChanges() { var entities = GetChangedEntities(); var result = base.SaveChanges(); - PublishDomainEvents(entities); + PublishDomainEvents(entities).GetAwaiter().GetResult(); return result; } @@ -133,16 +133,16 @@ public override int SaveChanges(bool acceptAllChangesOnSuccess) { var entities = GetChangedEntities(); var result = base.SaveChanges(acceptAllChangesOnSuccess); - PublishDomainEvents(entities); + PublishDomainEvents(entities).GetAwaiter().GetResult(); return result; } - public override Task SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new()) + public override async Task SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new()) { var entities = GetChangedEntities(); - var result = base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); - PublishDomainEvents(entities); + var result = await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); + await PublishDomainEvents(entities); return result; } @@ -153,11 +153,11 @@ private List GetChangedEntities() => ChangeTracker .Select(x => (Entity)x.Entity) .ToList(); - private void PublishDomainEvents(List entities) + private async Task PublishDomainEvents(List entities) { foreach (var e in entities) { - _eventBus.Publish(e.DomainEvents); + await _eventBus.Publish(e.DomainEvents); e.ClearDomainEvents(); } } diff --git a/BuildingBlocks/test/BuildingBlocks.Infrastructure.Tests/EventBus/GoogleCloudPubSub/GoogleCloudPubSubTests.cs b/BuildingBlocks/test/BuildingBlocks.Infrastructure.Tests/EventBus/GoogleCloudPubSub/GoogleCloudPubSubTests.cs index cff720eb89..4c5695eb92 100644 --- a/BuildingBlocks/test/BuildingBlocks.Infrastructure.Tests/EventBus/GoogleCloudPubSub/GoogleCloudPubSubTests.cs +++ b/BuildingBlocks/test/BuildingBlocks.Infrastructure.Tests/EventBus/GoogleCloudPubSub/GoogleCloudPubSubTests.cs @@ -32,9 +32,9 @@ public async Task One_subscriber_for_one_event() var subscriber = _factory.CreateEventBus(); var publisher = _factory.CreateEventBus(); - subscriber.Subscribe(); + await subscriber.Subscribe(); - publisher.Publish(new TestEvent1DomainEvent()); + await publisher.Publish(new TestEvent1DomainEvent()); TestEvent1DomainEventHandler1.ShouldEventuallyHaveOneTriggeredInstance(); } @@ -47,10 +47,10 @@ public async Task Subscribe_to_the_same_event_twice_with_the_same_subscriber() var subscriber = _factory.CreateEventBus(); var publisher = _factory.CreateEventBus(); - subscriber.Subscribe(); - subscriber.Subscribe(); + await subscriber.Subscribe(); + await subscriber.Subscribe(); - publisher.Publish(new TestEvent1DomainEvent()); + await publisher.Publish(new TestEvent1DomainEvent()); TestEvent1DomainEventHandler1.ShouldEventuallyHaveOneTriggeredInstance(); TestEvent1DomainEventHandler2.ShouldEventuallyHaveOneTriggeredInstance(); @@ -65,10 +65,10 @@ public async Task Two_subscribers_for_the_same_event_both_receive_the_event() var subscriber2 = _factory.CreateEventBus("subscription2"); var publisher = _factory.CreateEventBus(); - subscriber1.Subscribe(); - subscriber2.Subscribe(); + await subscriber1.Subscribe(); + await subscriber2.Subscribe(); - publisher.Publish(new TestEvent1DomainEvent()); + await publisher.Publish(new TestEvent1DomainEvent()); TestEvent1DomainEventHandler1.ShouldEventuallyHaveOneTriggeredInstance(); TestEvent1DomainEventHandler2.ShouldEventuallyHaveOneTriggeredInstance(); @@ -83,10 +83,10 @@ public async Task Only_one_instance_of_a_subscriber_receives_the_event() var subscriber1B = _factory.CreateEventBus("subscription1"); var publisher = _factory.CreateEventBus(); - subscriber1A.Subscribe(); - subscriber1B.Subscribe(); + await subscriber1A.Subscribe(); + await subscriber1B.Subscribe(); - publisher.Publish(new TestEvent1DomainEvent()); + await publisher.Publish(new TestEvent1DomainEvent()); await Task.Delay(5.Seconds()); // wait some time to make sure all subscribers were notified @@ -109,10 +109,10 @@ public async Task The_correct_event_handler_is_called_when_multiple_subscription var subscriber1 = _factory.CreateEventBus(); var publisher = _factory.CreateEventBus(); - subscriber1.Subscribe(); - subscriber1.Subscribe(); + await subscriber1.Subscribe(); + await subscriber1.Subscribe(); - publisher.Publish(new TestEvent1DomainEvent()); + await publisher.Publish(new TestEvent1DomainEvent()); TestEvent1DomainEventHandler1.ShouldEventuallyHaveOneTriggeredInstance(); TestEvent1DomainEventHandler2.ShouldNotHaveAnyTriggeredInstance(); diff --git a/Modules/Announcements/src/Announcements.ConsumerApi/AnnouncementsModule.cs b/Modules/Announcements/src/Announcements.ConsumerApi/AnnouncementsModule.cs index d0a46778f2..1b2eb363f2 100644 --- a/Modules/Announcements/src/Announcements.ConsumerApi/AnnouncementsModule.cs +++ b/Modules/Announcements/src/Announcements.ConsumerApi/AnnouncementsModule.cs @@ -35,7 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override Task ConfigureEventBus(IEventBus eventBus) { + return Task.CompletedTask; } } diff --git a/Modules/Challenges/src/Challenges.ConsumerApi/ChallengesModule.cs b/Modules/Challenges/src/Challenges.ConsumerApi/ChallengesModule.cs index cf8a938181..d2a10964d6 100644 --- a/Modules/Challenges/src/Challenges.ConsumerApi/ChallengesModule.cs +++ b/Modules/Challenges/src/Challenges.ConsumerApi/ChallengesModule.cs @@ -35,8 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override Task ConfigureEventBus(IEventBus eventBus) { - + return Task.CompletedTask; } } diff --git a/Modules/Devices/src/Devices.Application/DomainEvents/Incoming/ExternalEventCreated/ExternalEventCreatedDomainEventHandler.cs b/Modules/Devices/src/Devices.Application/DomainEvents/Incoming/ExternalEventCreated/ExternalEventCreatedDomainEventHandler.cs index 37c7255488..1da0c66b43 100644 --- a/Modules/Devices/src/Devices.Application/DomainEvents/Incoming/ExternalEventCreated/ExternalEventCreatedDomainEventHandler.cs +++ b/Modules/Devices/src/Devices.Application/DomainEvents/Incoming/ExternalEventCreated/ExternalEventCreatedDomainEventHandler.cs @@ -1,4 +1,3 @@ -using Backbone.BuildingBlocks.Application.Abstractions.Exceptions; using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventBus; using Backbone.BuildingBlocks.Application.PushNotifications; using Backbone.Modules.Devices.Application.Infrastructure.Persistence.Repository; @@ -24,13 +23,9 @@ public async Task Handle(ExternalEventCreatedDomainEvent @event) if (@event.IsDeliveryBlocked) return; - var identity = await _identitiesRepository.FindByAddress(@event.Owner, CancellationToken.None) ?? throw new NotFoundException(nameof(Identity)); + var identity = await _identitiesRepository.FindByAddress(@event.Owner, CancellationToken.None); - if (identity.Status != IdentityStatus.ToBeDeleted) - await _pushSenderService.SendNotification( - new ExternalEventCreatedPushNotification(), - SendPushNotificationFilter.AllDevicesOf(@event.Owner), - CancellationToken.None - ); + if (identity != null && identity.Status != IdentityStatus.ToBeDeleted) + await _pushSenderService.SendNotification(new ExternalEventCreatedPushNotification(), SendPushNotificationFilter.AllDevicesOf(@event.Owner), CancellationToken.None); } } diff --git a/Modules/Devices/src/Devices.Application/Extensions/IEventBusExtensions.cs b/Modules/Devices/src/Devices.Application/Extensions/IEventBusExtensions.cs index 271dbdb2de..38626e03e6 100644 --- a/Modules/Devices/src/Devices.Application/Extensions/IEventBusExtensions.cs +++ b/Modules/Devices/src/Devices.Application/Extensions/IEventBusExtensions.cs @@ -13,27 +13,27 @@ namespace Backbone.Modules.Devices.Application.Extensions; public static class IEventBusExtensions { - public static void AddDevicesDomainEventSubscriptions(this IEventBus eventBus) + public static async Task AddDevicesDomainEventSubscriptions(this IEventBus eventBus) { - eventBus.SubscribeToAnnouncementsEvents(); - eventBus.SubscribeToDevicesEvents(); - eventBus.SubscribeToSynchronizationEvents(); + await eventBus.SubscribeToAnnouncementsEvents(); + await eventBus.SubscribeToDevicesEvents(); + await eventBus.SubscribeToSynchronizationEvents(); } - private static void SubscribeToAnnouncementsEvents(this IEventBus eventBus) + private static async Task SubscribeToAnnouncementsEvents(this IEventBus eventBus) { - eventBus.Subscribe(); + await eventBus.Subscribe(); } - private static void SubscribeToDevicesEvents(this IEventBus eventBus) + private static async Task SubscribeToDevicesEvents(this IEventBus eventBus) { - eventBus.Subscribe(); + await eventBus.Subscribe(); } - private static void SubscribeToSynchronizationEvents(this IEventBus eventBus) + private static async Task SubscribeToSynchronizationEvents(this IEventBus eventBus) { - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); } } diff --git a/Modules/Devices/src/Devices.Application/Tiers/Commands/DeleteTier/Handler.cs b/Modules/Devices/src/Devices.Application/Tiers/Commands/DeleteTier/Handler.cs index 33a1549323..3ee73ea72c 100644 --- a/Modules/Devices/src/Devices.Application/Tiers/Commands/DeleteTier/Handler.cs +++ b/Modules/Devices/src/Devices.Application/Tiers/Commands/DeleteTier/Handler.cs @@ -41,6 +41,6 @@ public async Task Handle(DeleteTierCommand request, CancellationToken cancellati await _tiersRepository.Remove(tier); - _eventBus.Publish(new TierDeletedDomainEvent(tier)); + _ = _eventBus.Publish(new TierDeletedDomainEvent(tier)); } } diff --git a/Modules/Devices/src/Devices.ConsumerApi/DevicesModule.cs b/Modules/Devices/src/Devices.ConsumerApi/DevicesModule.cs index a4a31a573a..a3dacdb5c7 100644 --- a/Modules/Devices/src/Devices.ConsumerApi/DevicesModule.cs +++ b/Modules/Devices/src/Devices.ConsumerApi/DevicesModule.cs @@ -40,9 +40,9 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override async Task ConfigureEventBus(IEventBus eventBus) { - eventBus.AddDevicesDomainEventSubscriptions(); + await eventBus.AddDevicesDomainEventSubscriptions(); } public override void PostStartupValidation(IServiceProvider serviceProvider) diff --git a/Modules/Devices/src/Devices.Infrastructure/Persistence/Database/DevicesDbContext.cs b/Modules/Devices/src/Devices.Infrastructure/Persistence/Database/DevicesDbContext.cs index 07868d07db..be4b996d77 100644 --- a/Modules/Devices/src/Devices.Infrastructure/Persistence/Database/DevicesDbContext.cs +++ b/Modules/Devices/src/Devices.Infrastructure/Persistence/Database/DevicesDbContext.cs @@ -112,11 +112,11 @@ public async Task RunInTransaction(Func> func, IsolationLevel isol return await RunInTransaction(func, null, isolationLevel); } - public override Task SaveChangesAsync(CancellationToken cancellationToken = new()) + public override async Task SaveChangesAsync(CancellationToken cancellationToken = new()) { var entities = GetChangedEntities(); - var result = base.SaveChangesAsync(cancellationToken); - PublishDomainEvents(entities); + var result = await base.SaveChangesAsync(cancellationToken); + await PublishDomainEvents(entities); return result; } @@ -151,7 +151,7 @@ public override int SaveChanges() { var entities = GetChangedEntities(); var result = base.SaveChanges(); - PublishDomainEvents(entities); + PublishDomainEvents(entities).GetAwaiter().GetResult(); return result; } @@ -160,16 +160,16 @@ public override int SaveChanges(bool acceptAllChangesOnSuccess) { var entities = GetChangedEntities(); var result = base.SaveChanges(acceptAllChangesOnSuccess); - PublishDomainEvents(entities); + PublishDomainEvents(entities).GetAwaiter().GetResult(); return result; } - public override Task SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new()) + public override async Task SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new()) { var entities = GetChangedEntities(); - var result = base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); - PublishDomainEvents(entities); + var result = await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); + await PublishDomainEvents(entities); return result; } @@ -240,11 +240,11 @@ private List GetChangedEntities() => ChangeTracker .Select(x => (Entity)x.Entity) .ToList(); - private void PublishDomainEvents(List entities) + private async Task PublishDomainEvents(List entities) { foreach (var e in entities) { - _eventBus.Publish(e.DomainEvents); + await _eventBus.Publish(e.DomainEvents); e.ClearDomainEvents(); } } diff --git a/Modules/Files/src/Files.ConsumerApi/FilesModule.cs b/Modules/Files/src/Files.ConsumerApi/FilesModule.cs index 9b3e0375d6..97796f62ac 100644 --- a/Modules/Files/src/Files.ConsumerApi/FilesModule.cs +++ b/Modules/Files/src/Files.ConsumerApi/FilesModule.cs @@ -37,7 +37,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override Task ConfigureEventBus(IEventBus eventBus) { + return Task.CompletedTask; } } diff --git a/Modules/Messages/src/Messages.Application/Extensions/IEventBusExtensions.cs b/Modules/Messages/src/Messages.Application/Extensions/IEventBusExtensions.cs index 5dcd2ed112..3905da63cb 100644 --- a/Modules/Messages/src/Messages.Application/Extensions/IEventBusExtensions.cs +++ b/Modules/Messages/src/Messages.Application/Extensions/IEventBusExtensions.cs @@ -8,19 +8,19 @@ namespace Backbone.Modules.Messages.Application.Extensions; public static class IEventBusExtensions { - public static void AddMessagesDomainEventSubscriptions(this IEventBus eventBus) + public static async Task AddMessagesDomainEventSubscriptions(this IEventBus eventBus) { - SubscribeToMessagesEvents(eventBus); - SubscribeToRelationshipsEvents(eventBus); + await SubscribeToMessagesEvents(eventBus); + await SubscribeToRelationshipsEvents(eventBus); } - private static void SubscribeToMessagesEvents(IEventBus eventBus) + private static async Task SubscribeToMessagesEvents(IEventBus eventBus) { - eventBus.Subscribe(); + await eventBus.Subscribe(); } - private static void SubscribeToRelationshipsEvents(IEventBus eventBus) + private static async Task SubscribeToRelationshipsEvents(IEventBus eventBus) { - eventBus.Subscribe(); + await eventBus.Subscribe(); } } diff --git a/Modules/Messages/src/Messages.ConsumerApi/MessagesModule.cs b/Modules/Messages/src/Messages.ConsumerApi/MessagesModule.cs index e8359c2e4a..2cf9c8e8ea 100644 --- a/Modules/Messages/src/Messages.ConsumerApi/MessagesModule.cs +++ b/Modules/Messages/src/Messages.ConsumerApi/MessagesModule.cs @@ -35,8 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override async Task ConfigureEventBus(IEventBus eventBus) { - eventBus.AddMessagesDomainEventSubscriptions(); + await eventBus.AddMessagesDomainEventSubscriptions(); } } diff --git a/Modules/Quotas/src/Quotas.Application/Extensions/IEventBusExtensions.cs b/Modules/Quotas/src/Quotas.Application/Extensions/IEventBusExtensions.cs index c8075e250d..3af336534a 100644 --- a/Modules/Quotas/src/Quotas.Application/Extensions/IEventBusExtensions.cs +++ b/Modules/Quotas/src/Quotas.Application/Extensions/IEventBusExtensions.cs @@ -25,23 +25,23 @@ namespace Backbone.Modules.Quotas.Application.Extensions; public static class IEventBusExtensions { - public static void AddQuotasDomainEventSubscriptions(this IEventBus eventBus) + public static async Task AddQuotasDomainEventSubscriptions(this IEventBus eventBus) { - SubscribeToSynchronizationEvents(eventBus); + await SubscribeToSynchronizationEvents(eventBus); } - private static void SubscribeToSynchronizationEvents(IEventBus eventBus) + private static async Task SubscribeToSynchronizationEvents(IEventBus eventBus) { - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); } } diff --git a/Modules/Quotas/src/Quotas.ConsumerApi/QuotasModule.cs b/Modules/Quotas/src/Quotas.ConsumerApi/QuotasModule.cs index 01b3073006..c39bfd1174 100644 --- a/Modules/Quotas/src/Quotas.ConsumerApi/QuotasModule.cs +++ b/Modules/Quotas/src/Quotas.ConsumerApi/QuotasModule.cs @@ -37,8 +37,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati services.AddResponseCaching(); } - public override void ConfigureEventBus(IEventBus eventBus) + public override async Task ConfigureEventBus(IEventBus eventBus) { - eventBus.AddQuotasDomainEventSubscriptions(); + await eventBus.AddQuotasDomainEventSubscriptions(); } } diff --git a/Modules/Quotas/src/Quotas.Infrastructure/Persistence/Repository/IdentitiesRepository.cs b/Modules/Quotas/src/Quotas.Infrastructure/Persistence/Repository/IdentitiesRepository.cs index 27acb128d3..270518d4a1 100644 --- a/Modules/Quotas/src/Quotas.Infrastructure/Persistence/Repository/IdentitiesRepository.cs +++ b/Modules/Quotas/src/Quotas.Infrastructure/Persistence/Repository/IdentitiesRepository.cs @@ -47,6 +47,7 @@ public async Task> FindByAddresses(IReadOnlyCollection identityAddresses.Contains(i.Address)) .IncludeAll(_dbContext) + .AsSplitQuery() .ToListAsync(cancellationToken); } diff --git a/Modules/Relationships/src/Relationships.Application/Extensions/IEventBusExtensions.cs b/Modules/Relationships/src/Relationships.Application/Extensions/IEventBusExtensions.cs index 1949320c73..4f95b76447 100644 --- a/Modules/Relationships/src/Relationships.Application/Extensions/IEventBusExtensions.cs +++ b/Modules/Relationships/src/Relationships.Application/Extensions/IEventBusExtensions.cs @@ -8,15 +8,15 @@ namespace Backbone.Modules.Relationships.Application.Extensions; public static class IEventBusExtensions { - public static void AddRelationshipsDomainEventSubscriptions(this IEventBus eventBus) + public static async Task AddRelationshipsDomainEventSubscriptions(this IEventBus eventBus) { - SubscribeToIdentitiesEvents(eventBus); + await SubscribeToIdentitiesEvents(eventBus); } - private static void SubscribeToIdentitiesEvents(IEventBus eventBus) + private static async Task SubscribeToIdentitiesEvents(IEventBus eventBus) { - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); } } diff --git a/Modules/Relationships/src/Relationships.ConsumerApi/RelationshipsModule.cs b/Modules/Relationships/src/Relationships.ConsumerApi/RelationshipsModule.cs index 0b5631ed0a..f259727a42 100644 --- a/Modules/Relationships/src/Relationships.ConsumerApi/RelationshipsModule.cs +++ b/Modules/Relationships/src/Relationships.ConsumerApi/RelationshipsModule.cs @@ -35,8 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override async Task ConfigureEventBus(IEventBus eventBus) { - eventBus.AddRelationshipsDomainEventSubscriptions(); + await eventBus.AddRelationshipsDomainEventSubscriptions(); } } diff --git a/Modules/Relationships/src/Relationships.Domain/Aggregates/Relationships/Relationship.cs b/Modules/Relationships/src/Relationships.Domain/Aggregates/Relationships/Relationship.cs index 8f60d65976..9f6c16f5d8 100644 --- a/Modules/Relationships/src/Relationships.Domain/Aggregates/Relationships/Relationship.cs +++ b/Modules/Relationships/src/Relationships.Domain/Aggregates/Relationships/Relationship.cs @@ -307,7 +307,8 @@ public void DecomposeDueToIdentityDeletion(IdentityAddress identityToBeDeleted, { EnsureHasParticipant(identityToBeDeleted); - if (From == identityToBeDeleted && FromHasDecomposed || To == identityToBeDeleted && ToHasDecomposed) return; + if (From == identityToBeDeleted && FromHasDecomposed || To == identityToBeDeleted && ToHasDecomposed) + return; if (Status is RelationshipStatus.DeletionProposed) DecomposeAsSecondParticipant(identityToBeDeleted, null, RelationshipAuditLogEntryReason.DecompositionDueToIdentityDeletion); @@ -354,8 +355,10 @@ private void DecomposeAsSecondParticipant(IdentityAddress activeIdentity, Device ); AuditLog.Add(auditLogEntry); - FromHasDecomposed = true; - ToHasDecomposed = true; + if (From == activeIdentity) + FromHasDecomposed = true; + else + ToHasDecomposed = true; } private void EnsureRelationshipNotDecomposedBy(IdentityAddress activeIdentity) diff --git a/Modules/Synchronization/src/Synchronization.Application/Extensions/IEventBusExtensions.cs b/Modules/Synchronization/src/Synchronization.Application/Extensions/IEventBusExtensions.cs index 0ce6264bca..b9da878d52 100644 --- a/Modules/Synchronization/src/Synchronization.Application/Extensions/IEventBusExtensions.cs +++ b/Modules/Synchronization/src/Synchronization.Application/Extensions/IEventBusExtensions.cs @@ -22,26 +22,26 @@ namespace Backbone.Modules.Synchronization.Application.Extensions; public static class IEventBusExtensions { - public static void AddSynchronizationDomainEventSubscriptions(this IEventBus eventBus) + public static async Task AddSynchronizationDomainEventSubscriptions(this IEventBus eventBus) { - SubscribeToMessagesEvents(eventBus); - SubscribeToRelationshipsEvents(eventBus); + await SubscribeToMessagesEvents(eventBus); + await SubscribeToRelationshipsEvents(eventBus); } - private static void SubscribeToMessagesEvents(IEventBus eventBus) + private static async Task SubscribeToMessagesEvents(IEventBus eventBus) { - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); } - private static void SubscribeToRelationshipsEvents(IEventBus eventBus) + private static async Task SubscribeToRelationshipsEvents(IEventBus eventBus) { - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); } } diff --git a/Modules/Synchronization/src/Synchronization.ConsumerApi/SynchronizationModule.cs b/Modules/Synchronization/src/Synchronization.ConsumerApi/SynchronizationModule.cs index 4c73a84e25..96f20fdc59 100644 --- a/Modules/Synchronization/src/Synchronization.ConsumerApi/SynchronizationModule.cs +++ b/Modules/Synchronization/src/Synchronization.ConsumerApi/SynchronizationModule.cs @@ -35,8 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override async Task ConfigureEventBus(IEventBus eventBus) { - eventBus.AddSynchronizationDomainEventSubscriptions(); + await eventBus.AddSynchronizationDomainEventSubscriptions(); } } diff --git a/Modules/Tags/src/Tags.ConsumerApi/TagsModule.cs b/Modules/Tags/src/Tags.ConsumerApi/TagsModule.cs index 9dacc76eb4..dfd60f1e93 100644 --- a/Modules/Tags/src/Tags.ConsumerApi/TagsModule.cs +++ b/Modules/Tags/src/Tags.ConsumerApi/TagsModule.cs @@ -20,8 +20,9 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati services.AddPersistence(); } - public override void ConfigureEventBus(IEventBus eventBus) + public override Task ConfigureEventBus(IEventBus eventBus) { // No Event bus needed here + return Task.CompletedTask; } } diff --git a/Modules/Tokens/src/Tokens.ConsumerApi/TokensModule.cs b/Modules/Tokens/src/Tokens.ConsumerApi/TokensModule.cs index aac517056f..a61507d541 100644 --- a/Modules/Tokens/src/Tokens.ConsumerApi/TokensModule.cs +++ b/Modules/Tokens/src/Tokens.ConsumerApi/TokensModule.cs @@ -35,7 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override Task ConfigureEventBus(IEventBus eventBus) { + return Task.CompletedTask; } }