Skip to content

Commit afe2467

Browse files
Merge pull request #68 from wemogy/feature/renew-sesssion-lock
Introduce renew session lock interval
2 parents 4f3aac0 + fa3469f commit afe2467

File tree

2 files changed

+44
-5
lines changed

2 files changed

+44
-5
lines changed

src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Processors/AzureServiceBusCommandSessionProcessor`1.cs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,13 @@ public class AzureServiceBusCommandSessionProcessor<TCommand> : IAzureServiceBus
2626
/// </summary>
2727
public bool IsAlive => _isStarted && !_serviceBusSessionProcessor.IsClosed;
2828

29+
private readonly string _handleMessageActivityName;
30+
private readonly TimeSpan _renewSessionLockInterval;
31+
2932
public AzureServiceBusCommandSessionProcessor(
3033
ServiceBusSessionProcessor serviceBusSessionProcessor,
31-
IServiceCollection serviceCollection)
34+
IServiceCollection serviceCollection,
35+
TimeSpan renewSessionLockInterval)
3236
{
3337
_serviceBusSessionProcessor = serviceBusSessionProcessor;
3438
_serviceCollection = serviceCollection;
@@ -41,10 +45,13 @@ public AzureServiceBusCommandSessionProcessor(
4145
_scheduledCommandDependencies = serviceCollection
4246
.BuildServiceProvider()
4347
.GetRequiredService<ScheduledCommandDependencies>();
48+
_handleMessageActivityName = $"HandleMessageOf{typeof(TCommand).Name}";
49+
_renewSessionLockInterval = renewSessionLockInterval;
4450
}
4551

4652
public async Task HandleMessageAsync(ProcessSessionMessageEventArgs arg)
4753
{
54+
using var activity = Observability.DefaultActivities.StartActivity(_handleMessageActivityName);
4855
var services = new ServiceCollection();
4956
foreach (var serviceDescriptor in _serviceCollection)
5057
{
@@ -80,18 +87,44 @@ public async Task HandleMessageAsync(ProcessSessionMessageEventArgs arg)
8087
var scopeFactory = services.BuildServiceProvider().GetRequiredService<IServiceScopeFactory>();
8188
var scope = scopeFactory.CreateScope();
8289

90+
var renewSessionLockCancellationTokenSource = new CancellationTokenSource();
91+
var renewSessionLockCancellationToken = renewSessionLockCancellationTokenSource.Token;
92+
Task? renewSessionLockTask = null;
8393
try
8494
{
85-
var scheduledCommandRunner = scope.ServiceProvider.GetRequiredService<IScheduledCommandRunner<TCommand>>();
95+
var scheduledCommandRunner =
96+
scope.ServiceProvider.GetRequiredService<IScheduledCommandRunner<TCommand>>();
97+
var scheduledCommandRunnerTask = scheduledCommandRunner.RunAsync(scheduledCommand);
8698

87-
await scheduledCommandRunner.RunAsync(scheduledCommand);
99+
// renew session lock every 30 seconds
100+
renewSessionLockTask = Task.Run(
101+
async () =>
102+
{
103+
while (!renewSessionLockCancellationToken.IsCancellationRequested &&
104+
!scheduledCommandRunnerTask.IsCompleted)
105+
{
106+
await Task.Delay(_renewSessionLockInterval, renewSessionLockCancellationToken);
107+
Console.WriteLine($"Renewing session lock for session{arg.SessionId}...");
108+
await arg.RenewSessionLockAsync(arg.CancellationToken);
109+
Console.WriteLine($"Renewed session lock for session {arg.SessionId}");
110+
}
111+
},
112+
renewSessionLockCancellationTokenSource.Token);
113+
114+
await scheduledCommandRunnerTask;
115+
renewSessionLockCancellationTokenSource.Cancel();
88116
}
89117
catch (Exception e)
90118
{
91119
// ToDo: Dead letter message ==> Maybe remove try/catch let AutoComplete manage this
92120
Console.WriteLine(e);
93121
throw;
94122
}
123+
finally
124+
{
125+
renewSessionLockCancellationTokenSource.Dispose();
126+
renewSessionLockTask?.Dispose();
127+
}
95128
}
96129

97130
public async Task StartAsync(CancellationToken cancellationToken)

src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Setup/AzureServiceBusSetupEnvironment.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,15 @@ public AzureServiceBusSetupEnvironment AddDelayedProcessor<TCommand>(
5353
/// <summary>
5454
/// Creates a ServiceBusProcessor and subscribes to messages of type <typeparamref name="TCommand"/>
5555
/// </summary>
56+
/// <param name="maxConcurrentSessions">The maximum number of concurrent sessions (default 1)</param>
57+
/// <param name="maxConcurrentCallsPerSession">The maximum number of concurrent calls per session (default 1)</param>
58+
/// <param name="configureSessionProcessorOptions">Optional custom configuration of the ServiceBusSessionProcessorOptions</param>
59+
/// <param name="renewSessionLockInterval">The interval to renew the session lock (default 1 minute)</param>
5660
public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor<TCommand>(
5761
int maxConcurrentSessions = 1,
5862
int maxConcurrentCallsPerSession = 1,
59-
Action<ServiceBusSessionProcessorOptions>? configureSessionProcessorOptions = null)
63+
Action<ServiceBusSessionProcessorOptions>? configureSessionProcessorOptions = null,
64+
TimeSpan? renewSessionLockInterval = null)
6065
where TCommand : ICommandBase
6166
{
6267
var queueName = GetQueueName<TCommand>();
@@ -84,7 +89,8 @@ public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor<TCommand>(
8489
serviceBusSessionProcessorOptions);
8590
var processor = new AzureServiceBusCommandSessionProcessor<TCommand>(
8691
serviceBusSessionProcessor,
87-
_serviceCollection);
92+
_serviceCollection,
93+
renewSessionLockInterval ?? TimeSpan.FromMinutes(1));
8894

8995
return processor;
9096
});

0 commit comments

Comments
 (0)