Skip to content

Commit

Permalink
Merge pull request #79 from wemogy/fix/registry-base
Browse files Browse the repository at this point in the history
Make Registries singleton isolate the cache
  • Loading branch information
SebastianKuesters authored Feb 1, 2024
2 parents 0e2facd + 2e2d3d8 commit c11cb05
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public void ConcurrencyShouldWork()
var serviceCollection = new ServiceCollection();
serviceCollection.AddTestApplication();
var serviceProvider = serviceCollection.BuildServiceProvider();
var commandRunnerRegistry = new CommandRunnerRegistry(serviceProvider);
var commandRunnerRegistry = new CommandRunnerRegistry();
var createUserCommand = new CreateUserCommand
{
Firstname = "John"
Expand All @@ -31,7 +31,7 @@ public void ConcurrencyShouldWork()
},
_ =>
{
commandRunnerRegistry.ExecuteCommandRunnerAsync(createUserCommand);
commandRunnerRegistry.ExecuteCommandRunnerAsync(serviceProvider, createUserCommand);
});
}
}
10 changes: 8 additions & 2 deletions src/core/Wemogy.CQRS/Commands/Mediators/CommandsMediator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,32 @@ public class CommandsMediator : ICommands
private readonly RecurringCommandRunnerRegistry _recurringCommandRunnerRegistry;
private readonly IScheduledCommandService? _scheduledCommandService;
private readonly IRecurringCommandService? _recurringCommandService;
private readonly IServiceProvider _serviceProvider;

public CommandsMediator(
CommandRunnerRegistry commandRunnerRegistry,
ScheduledCommandRunnerRegistry scheduledCommandRunnerRegistry,
RecurringCommandRunnerRegistry recurringCommandRunnerRegistry,
IServiceProvider serviceProvider,
IScheduledCommandService? scheduledCommandService = null,
IRecurringCommandService? recurringCommandService = null)
{
_commandRunnerRegistry = commandRunnerRegistry;
_scheduledCommandRunnerRegistry = scheduledCommandRunnerRegistry;
_recurringCommandRunnerRegistry = recurringCommandRunnerRegistry;
_serviceProvider = serviceProvider;
_scheduledCommandService = scheduledCommandService;
_recurringCommandService = recurringCommandService;
}

public Task<TResult> RunAsync<TResult>(ICommand<TResult> command)
{
return _commandRunnerRegistry.ExecuteCommandRunnerAsync(command);
return _commandRunnerRegistry.ExecuteCommandRunnerAsync(_serviceProvider, command);
}

public Task RunAsync(ICommand command)
{
return _commandRunnerRegistry.ExecuteCommandRunnerAsync(command);
return _commandRunnerRegistry.ExecuteCommandRunnerAsync(_serviceProvider, command);
}

public Task<string> ScheduleAsync<TCommand>(TCommand command, TimeSpan delay = default)
Expand All @@ -55,6 +58,7 @@ public Task<string> ScheduleAsync<TCommand>(TCommand command, DelayOptions<TComm
where TCommand : ICommandBase
{
return _scheduledCommandRunnerRegistry.ExecuteScheduledCommandRunnerAsync(
_serviceProvider,
command,
new ScheduleOptions<TCommand>(delayOptions));
}
Expand All @@ -63,6 +67,7 @@ public Task<string> ScheduleAsync<TCommand>(TCommand command, ThrottleOptions<TC
where TCommand : ICommandBase
{
return _scheduledCommandRunnerRegistry.ExecuteScheduledCommandRunnerAsync(
_serviceProvider,
command,
new ScheduleOptions<TCommand>(throttleOptions));
}
Expand All @@ -86,6 +91,7 @@ public Task ScheduleRecurringAsync(
string cronExpression)
{
return _recurringCommandRunnerRegistry.ExecuteRecurringCommandRunnerAsync(
_serviceProvider,
name,
command,
cronExpression);
Expand Down
26 changes: 11 additions & 15 deletions src/core/Wemogy.CQRS/Commands/Registries/CommandRunnerRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,10 @@ namespace Wemogy.CQRS.Commands.Registries;

public class CommandRunnerRegistry : RegistryBase<Type, TypeMethodRegistryEntry>
{
private readonly IServiceProvider _serviceProvider;

public CommandRunnerRegistry(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

public Task ExecuteCommandRunnerAsync(ICommand command)
public Task ExecuteCommandRunnerAsync(IServiceProvider serviceProvider, ICommand command)
{
var commandRunnerEntry = GetCommandRunnerEntry(command);
return ExecuteCommandRunnerAsync(commandRunnerEntry, command);
return ExecuteCommandRunnerAsync(serviceProvider, commandRunnerEntry, command);
}

private TypeMethodRegistryEntry GetCommandRunnerEntry(ICommand command)
Expand All @@ -32,17 +25,17 @@ private TypeMethodRegistryEntry GetCommandRunnerEntry(ICommand command)
return commandRunnerEntry;
}

private Task ExecuteCommandRunnerAsync(TypeMethodRegistryEntry commandRunnerEntry, ICommand command)
private Task ExecuteCommandRunnerAsync(IServiceProvider serviceProvider, TypeMethodRegistryEntry commandRunnerEntry, ICommand command)
{
object commandRunner = _serviceProvider.GetRequiredService(commandRunnerEntry.Type);
object commandRunner = serviceProvider.GetRequiredService(commandRunnerEntry.Type);
dynamic res = commandRunnerEntry.Method.Invoke(commandRunner, new object[] { command });
return res;
}

public Task<TResult> ExecuteCommandRunnerAsync<TResult>(ICommand<TResult> command)
public Task<TResult> ExecuteCommandRunnerAsync<TResult>(IServiceProvider serviceProvider, ICommand<TResult> command)
{
var commandRunnerEntry = GetCommandRunnerEntry(command);
return ExecuteCommandRunnerAsync(commandRunnerEntry, command);
return ExecuteCommandRunnerAsync(serviceProvider, commandRunnerEntry, command);
}

private TypeMethodRegistryEntry GetCommandRunnerEntry<TResult>(ICommand<TResult> command)
Expand All @@ -52,9 +45,12 @@ private TypeMethodRegistryEntry GetCommandRunnerEntry<TResult>(ICommand<TResult>
return commandRunnerEntry;
}

private Task<TResult> ExecuteCommandRunnerAsync<TResult>(TypeMethodRegistryEntry commandRunnerEntry, ICommand<TResult> command)
private Task<TResult> ExecuteCommandRunnerAsync<TResult>(
IServiceProvider serviceProvider,
TypeMethodRegistryEntry commandRunnerEntry,
ICommand<TResult> command)
{
object commandRunner = _serviceProvider.GetRequiredService(commandRunnerEntry.Type);
object commandRunner = serviceProvider.GetRequiredService(commandRunnerEntry.Type);
dynamic res = commandRunnerEntry.Method.Invoke(commandRunner, new object[] { command });
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,16 @@ namespace Wemogy.CQRS.Commands.Registries;

public class RecurringCommandRunnerRegistry : RegistryBase<Type, TypeMethodRegistryEntry>
{
private readonly IServiceProvider _serviceProvider;

public RecurringCommandRunnerRegistry(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

public Task ExecuteRecurringCommandRunnerAsync(
IServiceProvider serviceProvider,
string recurringCommandId,
ICommandBase command,
string cronExpression)
{
var recurringCommandRunnerEntry = GetRecurringCommandRunnerEntry(command);
return ExecuteRecurringCommandRunnerAsync(
recurringCommandRunnerEntry,
serviceProvider,
recurringCommandId,
command,
cronExpression);
Expand All @@ -41,11 +36,12 @@ private TypeMethodRegistryEntry GetRecurringCommandRunnerEntry(ICommandBase comm

private Task ExecuteRecurringCommandRunnerAsync(
TypeMethodRegistryEntry recurringCommandRunnerEntry,
IServiceProvider serviceProvider,
string recurringCommandId,
ICommandBase command,
string cronExpression)
{
object recurringCommandRunner = _serviceProvider.GetRequiredService(recurringCommandRunnerEntry.Type);
object recurringCommandRunner = serviceProvider.GetRequiredService(recurringCommandRunnerEntry.Type);
var parameters = new object[]
{
recurringCommandId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,16 @@ namespace Wemogy.CQRS.Commands.Registries;

public class ScheduledCommandRunnerRegistry : RegistryBase<Type, TypeMethodRegistryEntry>
{
private readonly IServiceProvider _serviceProvider;

public ScheduledCommandRunnerRegistry(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

public Task<string> ExecuteScheduledCommandRunnerAsync<TCommand>(
IServiceProvider serviceProvider,
TCommand command,
ScheduleOptions<TCommand> scheduleOptions)
where TCommand : ICommandBase
{
var scheduledCommandRunnerEntry = GetScheduledCommandRunnerEntry(command);
return ExecuteScheduledCommandRunnerAsync(
scheduledCommandRunnerEntry,
serviceProvider,
command,
scheduleOptions);
}
Expand All @@ -41,11 +36,12 @@ private TypeMethodRegistryEntry GetScheduledCommandRunnerEntry(ICommandBase comm

private Task<string> ExecuteScheduledCommandRunnerAsync<TCommand>(
TypeMethodRegistryEntry scheduledCommandRunnerEntry,
IServiceProvider serviceProvider,
TCommand command,
ScheduleOptions<TCommand> scheduleOptions)
where TCommand : ICommandBase
{
object scheduledCommandRunner = _serviceProvider.GetRequiredService(scheduledCommandRunnerEntry.Type);
object scheduledCommandRunner = serviceProvider.GetRequiredService(scheduledCommandRunnerEntry.Type);
var parameters = new object[]
{
command,
Expand Down
4 changes: 2 additions & 2 deletions src/core/Wemogy.CQRS/Common/Abstractions/RegistryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ namespace Wemogy.CQRS.Common.Abstractions;

public abstract class RegistryBase<TKey, TValue>
{
private static readonly ConcurrentDictionary<TKey, TValue> EntriesCache = new ConcurrentDictionary<TKey, TValue>();
private readonly ConcurrentDictionary<TKey, TValue> _entriesCache = new ConcurrentDictionary<TKey, TValue>();

protected TValue GetRegistryEntry(TKey key)
{
return EntriesCache.GetOrAdd(key, InitializeEntry);
return _entriesCache.GetOrAdd(key, InitializeEntry);
}

protected abstract TValue InitializeEntry(TKey key);
Expand Down
8 changes: 4 additions & 4 deletions src/core/Wemogy.CQRS/DependencyInjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ private static void AddCommands(this IServiceCollection serviceCollection, List<
serviceCollection.AddScoped<ICommands, CommandsMediator>();

// Add Registries
serviceCollection.AddScoped<CommandRunnerRegistry>();
serviceCollection.AddScoped<ScheduledCommandRunnerRegistry>();
serviceCollection.AddScoped<RecurringCommandRunnerRegistry>();
serviceCollection.AddSingleton<CommandRunnerRegistry>();
serviceCollection.AddSingleton<ScheduledCommandRunnerRegistry>();
serviceCollection.AddSingleton<RecurringCommandRunnerRegistry>();
}

private static void AddQueries(this IServiceCollection serviceCollection, List<Assembly> assemblies)
Expand Down Expand Up @@ -161,7 +161,7 @@ private static void AddQueries(this IServiceCollection serviceCollection, List<A
serviceCollection.AddScoped<IQueries, QueriesMediator>();

// Add QueryRunnerRegistry
serviceCollection.AddScoped<QueryRunnerRegistry>();
serviceCollection.AddSingleton<QueryRunnerRegistry>();
}

private static void AddImplementation(
Expand Down
7 changes: 5 additions & 2 deletions src/core/Wemogy.CQRS/Queries/Mediators/QueriesMediator.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Wemogy.CQRS.Queries.Abstractions;
Expand All @@ -8,14 +9,16 @@ namespace Wemogy.CQRS.Queries.Mediators;
public class QueriesMediator : IQueries
{
private readonly QueryRunnerRegistry _queryRunnerRegistry;
private readonly IServiceProvider _serviceProvider;

public QueriesMediator(QueryRunnerRegistry queryRunnerRegistry)
public QueriesMediator(QueryRunnerRegistry queryRunnerRegistry, IServiceProvider serviceProvider)
{
_queryRunnerRegistry = queryRunnerRegistry;
_serviceProvider = serviceProvider;
}

public Task<TResult> QueryAsync<TResult>(IQuery<TResult> query, CancellationToken cancellationToken = default)
{
return _queryRunnerRegistry.ExecuteQueryRunnerAsync(query, cancellationToken);
return _queryRunnerRegistry.ExecuteQueryRunnerAsync(_serviceProvider, query, cancellationToken);
}
}
17 changes: 7 additions & 10 deletions src/core/Wemogy.CQRS/Queries/Registries/QueryRunnerRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@ namespace Wemogy.CQRS.Queries.Registries;

public class QueryRunnerRegistry : RegistryBase<Type, TypeMethodRegistryEntry>
{
private readonly IServiceProvider _serviceProvider;

public QueryRunnerRegistry(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

public Task<TResult> ExecuteQueryRunnerAsync<TResult>(IQuery<TResult> query, CancellationToken cancellationToken)
public Task<TResult> ExecuteQueryRunnerAsync<TResult>(
IServiceProvider serviceProvider,
IQuery<TResult> query,
CancellationToken cancellationToken)
{
var queryRunnerEntry = GetQueryRunnerEntry(query);
return ExecuteQueryRunnerAsync(queryRunnerEntry, query, cancellationToken);
return ExecuteQueryRunnerAsync(queryRunnerEntry, serviceProvider, query, cancellationToken);
}

private TypeMethodRegistryEntry GetQueryRunnerEntry<TResult>(IQuery<TResult> query)
Expand All @@ -35,10 +31,11 @@ private TypeMethodRegistryEntry GetQueryRunnerEntry<TResult>(IQuery<TResult> que

private Task<TResult> ExecuteQueryRunnerAsync<TResult>(
TypeMethodRegistryEntry queryRunnerEntry,
IServiceProvider serviceProvider,
IQuery<TResult> query,
CancellationToken cancellationToken)
{
var queryRunner = _serviceProvider.GetRequiredService(queryRunnerEntry.Type);
var queryRunner = serviceProvider.GetRequiredService(queryRunnerEntry.Type);
dynamic res = queryRunnerEntry.Method.Invoke(queryRunner, new object[] { query, cancellationToken });
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ public async Task ScheduleAsync_ShouldWorkWithoutPassingDelay()
PrintContextCommandHandler.ExecutedCount[command.Id].Should().Be(1);
}

[Fact]
public async Task ScheduleAsync_ShouldWorkWhenCommandRunnerWasUsedBefore()
{
// Arrange
var command = new PrintContextCommand();
await StartHostedServiceAsync();
await _commands.RunAsync(command);
PrintContextCommandHandler.ExecutedCount.Clear();

// Act
await _commands.ScheduleAsync(command);

// Assert
await Task.Delay(TimeSpan.FromSeconds(1));
PrintContextCommandHandler.ExecutedCount[command.Id].Should().Be(1);
}

private async Task StartHostedServiceAsync()
{
var hostedService = _serviceProvider
Expand Down

0 comments on commit c11cb05

Please sign in to comment.