From 2e2d3d83669159d703888469933e527d9a2e1383 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20K=C3=BCsters?= Date: Thu, 1 Feb 2024 08:39:07 +0100 Subject: [PATCH] Make Registries singleton isolate the cache --- .../Registries/CommandRunnerRegistryTests.cs | 4 +-- .../Commands/Mediators/CommandsMediator.cs | 10 +++++-- .../Registries/CommandRunnerRegistry.cs | 26 ++++++++----------- .../RecurringCommandRunnerRegistry.cs | 12 +++------ .../ScheduledCommandRunnerRegistry.cs | 12 +++------ .../Common/Abstractions/RegistryBase.cs | 4 +-- src/core/Wemogy.CQRS/DependencyInjection.cs | 8 +++--- .../Queries/Mediators/QueriesMediator.cs | 7 +++-- .../Queries/Registries/QueryRunnerRegistry.cs | 17 +++++------- ...eServiceBusScheduledCommandServiceTests.cs | 17 ++++++++++++ 10 files changed, 64 insertions(+), 53 deletions(-) diff --git a/src/core/Wemogy.CQRS.UnitTests/Commands/Registries/CommandRunnerRegistryTests.cs b/src/core/Wemogy.CQRS.UnitTests/Commands/Registries/CommandRunnerRegistryTests.cs index 635f344..cf57888 100644 --- a/src/core/Wemogy.CQRS.UnitTests/Commands/Registries/CommandRunnerRegistryTests.cs +++ b/src/core/Wemogy.CQRS.UnitTests/Commands/Registries/CommandRunnerRegistryTests.cs @@ -16,7 +16,7 @@ public void ConcurrencyShouldWork() var serviceCollection = new ServiceCollection(); serviceCollection.AddTestApplication(); var serviceProvider = serviceCollection.BuildServiceProvider(); - var commandRunnerRegistry = new CommandRunnerRegistry(serviceProvider); + var commandRunnerRegistry = new CommandRunnerRegistry(); var createUserCommand = new CreateUserCommand { Firstname = "John" @@ -31,7 +31,7 @@ public void ConcurrencyShouldWork() }, _ => { - commandRunnerRegistry.ExecuteCommandRunnerAsync(createUserCommand); + commandRunnerRegistry.ExecuteCommandRunnerAsync(serviceProvider, createUserCommand); }); } } diff --git a/src/core/Wemogy.CQRS/Commands/Mediators/CommandsMediator.cs b/src/core/Wemogy.CQRS/Commands/Mediators/CommandsMediator.cs index 3bbb7ae..b5990a8 100644 --- a/src/core/Wemogy.CQRS/Commands/Mediators/CommandsMediator.cs +++ b/src/core/Wemogy.CQRS/Commands/Mediators/CommandsMediator.cs @@ -14,29 +14,32 @@ public class CommandsMediator : ICommands private readonly RecurringCommandRunnerRegistry _recurringCommandRunnerRegistry; private readonly IScheduledCommandService? _scheduledCommandService; private readonly IRecurringCommandService? _recurringCommandService; + private readonly IServiceProvider _serviceProvider; public CommandsMediator( CommandRunnerRegistry commandRunnerRegistry, ScheduledCommandRunnerRegistry scheduledCommandRunnerRegistry, RecurringCommandRunnerRegistry recurringCommandRunnerRegistry, + IServiceProvider serviceProvider, IScheduledCommandService? scheduledCommandService = null, IRecurringCommandService? recurringCommandService = null) { _commandRunnerRegistry = commandRunnerRegistry; _scheduledCommandRunnerRegistry = scheduledCommandRunnerRegistry; _recurringCommandRunnerRegistry = recurringCommandRunnerRegistry; + _serviceProvider = serviceProvider; _scheduledCommandService = scheduledCommandService; _recurringCommandService = recurringCommandService; } public Task RunAsync(ICommand command) { - return _commandRunnerRegistry.ExecuteCommandRunnerAsync(command); + return _commandRunnerRegistry.ExecuteCommandRunnerAsync(_serviceProvider, command); } public Task RunAsync(ICommand command) { - return _commandRunnerRegistry.ExecuteCommandRunnerAsync(command); + return _commandRunnerRegistry.ExecuteCommandRunnerAsync(_serviceProvider, command); } public Task ScheduleAsync(TCommand command, TimeSpan delay = default) @@ -55,6 +58,7 @@ public Task ScheduleAsync(TCommand command, DelayOptions(delayOptions)); } @@ -63,6 +67,7 @@ public Task ScheduleAsync(TCommand command, ThrottleOptions(throttleOptions)); } @@ -86,6 +91,7 @@ public Task ScheduleRecurringAsync( string cronExpression) { return _recurringCommandRunnerRegistry.ExecuteRecurringCommandRunnerAsync( + _serviceProvider, name, command, cronExpression); diff --git a/src/core/Wemogy.CQRS/Commands/Registries/CommandRunnerRegistry.cs b/src/core/Wemogy.CQRS/Commands/Registries/CommandRunnerRegistry.cs index 9c9d5a1..d4466fe 100644 --- a/src/core/Wemogy.CQRS/Commands/Registries/CommandRunnerRegistry.cs +++ b/src/core/Wemogy.CQRS/Commands/Registries/CommandRunnerRegistry.cs @@ -12,17 +12,10 @@ namespace Wemogy.CQRS.Commands.Registries; public class CommandRunnerRegistry : RegistryBase { - private readonly IServiceProvider _serviceProvider; - - public CommandRunnerRegistry(IServiceProvider serviceProvider) - { - _serviceProvider = serviceProvider; - } - - public Task ExecuteCommandRunnerAsync(ICommand command) + public Task ExecuteCommandRunnerAsync(IServiceProvider serviceProvider, ICommand command) { var commandRunnerEntry = GetCommandRunnerEntry(command); - return ExecuteCommandRunnerAsync(commandRunnerEntry, command); + return ExecuteCommandRunnerAsync(serviceProvider, commandRunnerEntry, command); } private TypeMethodRegistryEntry GetCommandRunnerEntry(ICommand command) @@ -32,17 +25,17 @@ private TypeMethodRegistryEntry GetCommandRunnerEntry(ICommand command) return commandRunnerEntry; } - private Task ExecuteCommandRunnerAsync(TypeMethodRegistryEntry commandRunnerEntry, ICommand command) + private Task ExecuteCommandRunnerAsync(IServiceProvider serviceProvider, TypeMethodRegistryEntry commandRunnerEntry, ICommand command) { - object commandRunner = _serviceProvider.GetRequiredService(commandRunnerEntry.Type); + object commandRunner = serviceProvider.GetRequiredService(commandRunnerEntry.Type); dynamic res = commandRunnerEntry.Method.Invoke(commandRunner, new object[] { command }); return res; } - public Task ExecuteCommandRunnerAsync(ICommand command) + public Task ExecuteCommandRunnerAsync(IServiceProvider serviceProvider, ICommand command) { var commandRunnerEntry = GetCommandRunnerEntry(command); - return ExecuteCommandRunnerAsync(commandRunnerEntry, command); + return ExecuteCommandRunnerAsync(serviceProvider, commandRunnerEntry, command); } private TypeMethodRegistryEntry GetCommandRunnerEntry(ICommand command) @@ -52,9 +45,12 @@ private TypeMethodRegistryEntry GetCommandRunnerEntry(ICommand return commandRunnerEntry; } - private Task ExecuteCommandRunnerAsync(TypeMethodRegistryEntry commandRunnerEntry, ICommand command) + private Task ExecuteCommandRunnerAsync( + IServiceProvider serviceProvider, + TypeMethodRegistryEntry commandRunnerEntry, + ICommand command) { - object commandRunner = _serviceProvider.GetRequiredService(commandRunnerEntry.Type); + object commandRunner = serviceProvider.GetRequiredService(commandRunnerEntry.Type); dynamic res = commandRunnerEntry.Method.Invoke(commandRunner, new object[] { command }); return res; } diff --git a/src/core/Wemogy.CQRS/Commands/Registries/RecurringCommandRunnerRegistry.cs b/src/core/Wemogy.CQRS/Commands/Registries/RecurringCommandRunnerRegistry.cs index 3afe1d8..ef38cea 100644 --- a/src/core/Wemogy.CQRS/Commands/Registries/RecurringCommandRunnerRegistry.cs +++ b/src/core/Wemogy.CQRS/Commands/Registries/RecurringCommandRunnerRegistry.cs @@ -12,14 +12,8 @@ namespace Wemogy.CQRS.Commands.Registries; public class RecurringCommandRunnerRegistry : RegistryBase { - private readonly IServiceProvider _serviceProvider; - - public RecurringCommandRunnerRegistry(IServiceProvider serviceProvider) - { - _serviceProvider = serviceProvider; - } - public Task ExecuteRecurringCommandRunnerAsync( + IServiceProvider serviceProvider, string recurringCommandId, ICommandBase command, string cronExpression) @@ -27,6 +21,7 @@ public Task ExecuteRecurringCommandRunnerAsync( var recurringCommandRunnerEntry = GetRecurringCommandRunnerEntry(command); return ExecuteRecurringCommandRunnerAsync( recurringCommandRunnerEntry, + serviceProvider, recurringCommandId, command, cronExpression); @@ -41,11 +36,12 @@ private TypeMethodRegistryEntry GetRecurringCommandRunnerEntry(ICommandBase comm private Task ExecuteRecurringCommandRunnerAsync( TypeMethodRegistryEntry recurringCommandRunnerEntry, + IServiceProvider serviceProvider, string recurringCommandId, ICommandBase command, string cronExpression) { - object recurringCommandRunner = _serviceProvider.GetRequiredService(recurringCommandRunnerEntry.Type); + object recurringCommandRunner = serviceProvider.GetRequiredService(recurringCommandRunnerEntry.Type); var parameters = new object[] { recurringCommandId, diff --git a/src/core/Wemogy.CQRS/Commands/Registries/ScheduledCommandRunnerRegistry.cs b/src/core/Wemogy.CQRS/Commands/Registries/ScheduledCommandRunnerRegistry.cs index 67e6de7..c07806c 100644 --- a/src/core/Wemogy.CQRS/Commands/Registries/ScheduledCommandRunnerRegistry.cs +++ b/src/core/Wemogy.CQRS/Commands/Registries/ScheduledCommandRunnerRegistry.cs @@ -13,14 +13,8 @@ namespace Wemogy.CQRS.Commands.Registries; public class ScheduledCommandRunnerRegistry : RegistryBase { - private readonly IServiceProvider _serviceProvider; - - public ScheduledCommandRunnerRegistry(IServiceProvider serviceProvider) - { - _serviceProvider = serviceProvider; - } - public Task ExecuteScheduledCommandRunnerAsync( + IServiceProvider serviceProvider, TCommand command, ScheduleOptions scheduleOptions) where TCommand : ICommandBase @@ -28,6 +22,7 @@ public Task ExecuteScheduledCommandRunnerAsync( var scheduledCommandRunnerEntry = GetScheduledCommandRunnerEntry(command); return ExecuteScheduledCommandRunnerAsync( scheduledCommandRunnerEntry, + serviceProvider, command, scheduleOptions); } @@ -41,11 +36,12 @@ private TypeMethodRegistryEntry GetScheduledCommandRunnerEntry(ICommandBase comm private Task ExecuteScheduledCommandRunnerAsync( TypeMethodRegistryEntry scheduledCommandRunnerEntry, + IServiceProvider serviceProvider, TCommand command, ScheduleOptions scheduleOptions) where TCommand : ICommandBase { - object scheduledCommandRunner = _serviceProvider.GetRequiredService(scheduledCommandRunnerEntry.Type); + object scheduledCommandRunner = serviceProvider.GetRequiredService(scheduledCommandRunnerEntry.Type); var parameters = new object[] { command, diff --git a/src/core/Wemogy.CQRS/Common/Abstractions/RegistryBase.cs b/src/core/Wemogy.CQRS/Common/Abstractions/RegistryBase.cs index 85f549b..b57ebd1 100644 --- a/src/core/Wemogy.CQRS/Common/Abstractions/RegistryBase.cs +++ b/src/core/Wemogy.CQRS/Common/Abstractions/RegistryBase.cs @@ -4,11 +4,11 @@ namespace Wemogy.CQRS.Common.Abstractions; public abstract class RegistryBase { - private static readonly ConcurrentDictionary EntriesCache = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _entriesCache = new ConcurrentDictionary(); protected TValue GetRegistryEntry(TKey key) { - return EntriesCache.GetOrAdd(key, InitializeEntry); + return _entriesCache.GetOrAdd(key, InitializeEntry); } protected abstract TValue InitializeEntry(TKey key); diff --git a/src/core/Wemogy.CQRS/DependencyInjection.cs b/src/core/Wemogy.CQRS/DependencyInjection.cs index 3b2ab6a..c11dfdd 100644 --- a/src/core/Wemogy.CQRS/DependencyInjection.cs +++ b/src/core/Wemogy.CQRS/DependencyInjection.cs @@ -115,9 +115,9 @@ private static void AddCommands(this IServiceCollection serviceCollection, List< serviceCollection.AddScoped(); // Add Registries - serviceCollection.AddScoped(); - serviceCollection.AddScoped(); - serviceCollection.AddScoped(); + serviceCollection.AddSingleton(); + serviceCollection.AddSingleton(); + serviceCollection.AddSingleton(); } private static void AddQueries(this IServiceCollection serviceCollection, List assemblies) @@ -161,7 +161,7 @@ private static void AddQueries(this IServiceCollection serviceCollection, List(); // Add QueryRunnerRegistry - serviceCollection.AddScoped(); + serviceCollection.AddSingleton(); } private static void AddImplementation( diff --git a/src/core/Wemogy.CQRS/Queries/Mediators/QueriesMediator.cs b/src/core/Wemogy.CQRS/Queries/Mediators/QueriesMediator.cs index 11ba0e7..f28e75f 100644 --- a/src/core/Wemogy.CQRS/Queries/Mediators/QueriesMediator.cs +++ b/src/core/Wemogy.CQRS/Queries/Mediators/QueriesMediator.cs @@ -1,3 +1,4 @@ +using System; using System.Threading; using System.Threading.Tasks; using Wemogy.CQRS.Queries.Abstractions; @@ -8,14 +9,16 @@ namespace Wemogy.CQRS.Queries.Mediators; public class QueriesMediator : IQueries { private readonly QueryRunnerRegistry _queryRunnerRegistry; + private readonly IServiceProvider _serviceProvider; - public QueriesMediator(QueryRunnerRegistry queryRunnerRegistry) + public QueriesMediator(QueryRunnerRegistry queryRunnerRegistry, IServiceProvider serviceProvider) { _queryRunnerRegistry = queryRunnerRegistry; + _serviceProvider = serviceProvider; } public Task QueryAsync(IQuery query, CancellationToken cancellationToken = default) { - return _queryRunnerRegistry.ExecuteQueryRunnerAsync(query, cancellationToken); + return _queryRunnerRegistry.ExecuteQueryRunnerAsync(_serviceProvider, query, cancellationToken); } } diff --git a/src/core/Wemogy.CQRS/Queries/Registries/QueryRunnerRegistry.cs b/src/core/Wemogy.CQRS/Queries/Registries/QueryRunnerRegistry.cs index 5f35a59..1c54777 100644 --- a/src/core/Wemogy.CQRS/Queries/Registries/QueryRunnerRegistry.cs +++ b/src/core/Wemogy.CQRS/Queries/Registries/QueryRunnerRegistry.cs @@ -13,17 +13,13 @@ namespace Wemogy.CQRS.Queries.Registries; public class QueryRunnerRegistry : RegistryBase { - private readonly IServiceProvider _serviceProvider; - - public QueryRunnerRegistry(IServiceProvider serviceProvider) - { - _serviceProvider = serviceProvider; - } - - public Task ExecuteQueryRunnerAsync(IQuery query, CancellationToken cancellationToken) + public Task ExecuteQueryRunnerAsync( + IServiceProvider serviceProvider, + IQuery query, + CancellationToken cancellationToken) { var queryRunnerEntry = GetQueryRunnerEntry(query); - return ExecuteQueryRunnerAsync(queryRunnerEntry, query, cancellationToken); + return ExecuteQueryRunnerAsync(queryRunnerEntry, serviceProvider, query, cancellationToken); } private TypeMethodRegistryEntry GetQueryRunnerEntry(IQuery query) @@ -35,10 +31,11 @@ private TypeMethodRegistryEntry GetQueryRunnerEntry(IQuery que private Task ExecuteQueryRunnerAsync( TypeMethodRegistryEntry queryRunnerEntry, + IServiceProvider serviceProvider, IQuery query, CancellationToken cancellationToken) { - var queryRunner = _serviceProvider.GetRequiredService(queryRunnerEntry.Type); + var queryRunner = serviceProvider.GetRequiredService(queryRunnerEntry.Type); dynamic res = queryRunnerEntry.Method.Invoke(queryRunner, new object[] { query, cancellationToken }); return res; } diff --git a/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus.UnitTests/Services/AzureServiceBusScheduledCommandServiceTests.cs b/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus.UnitTests/Services/AzureServiceBusScheduledCommandServiceTests.cs index 75512c9..9e5fd32 100644 --- a/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus.UnitTests/Services/AzureServiceBusScheduledCommandServiceTests.cs +++ b/src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus.UnitTests/Services/AzureServiceBusScheduledCommandServiceTests.cs @@ -75,6 +75,23 @@ public async Task ScheduleAsync_ShouldWorkWithoutPassingDelay() PrintContextCommandHandler.ExecutedCount[command.Id].Should().Be(1); } + [Fact] + public async Task ScheduleAsync_ShouldWorkWhenCommandRunnerWasUsedBefore() + { + // Arrange + var command = new PrintContextCommand(); + await StartHostedServiceAsync(); + await _commands.RunAsync(command); + PrintContextCommandHandler.ExecutedCount.Clear(); + + // Act + await _commands.ScheduleAsync(command); + + // Assert + await Task.Delay(TimeSpan.FromSeconds(1)); + PrintContextCommandHandler.ExecutedCount[command.Id].Should().Be(1); + } + private async Task StartHostedServiceAsync() { var hostedService = _serviceProvider