Skip to content

Commit fbad959

Browse files
Merge pull request #72 from wemogy/fix/renew-session-lock
Fix/renew session lock
2 parents 39d0b58 + a66ffe7 commit fbad959

File tree

3 files changed

+8
-47
lines changed

3 files changed

+8
-47
lines changed

src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Processors/AzureServiceBusCommandSessionProcessor`1.cs

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,10 @@ public class AzureServiceBusCommandSessionProcessor<TCommand> : IAzureServiceBus
2727
public bool IsAlive => _isStarted && !_serviceBusSessionProcessor.IsClosed;
2828

2929
private readonly string _handleMessageActivityName;
30-
private readonly TimeSpan _renewSessionLockInterval;
3130

3231
public AzureServiceBusCommandSessionProcessor(
3332
ServiceBusSessionProcessor serviceBusSessionProcessor,
34-
IServiceCollection serviceCollection,
35-
TimeSpan renewSessionLockInterval)
33+
IServiceCollection serviceCollection)
3634
{
3735
_serviceBusSessionProcessor = serviceBusSessionProcessor;
3836
_serviceCollection = serviceCollection;
@@ -46,7 +44,6 @@ public AzureServiceBusCommandSessionProcessor(
4644
.BuildServiceProvider()
4745
.GetRequiredService<ScheduledCommandDependencies>();
4846
_handleMessageActivityName = $"HandleMessageOf{typeof(TCommand).Name}";
49-
_renewSessionLockInterval = renewSessionLockInterval;
5047
}
5148

5249
public async Task HandleMessageAsync(ProcessSessionMessageEventArgs arg)
@@ -87,53 +84,18 @@ public async Task HandleMessageAsync(ProcessSessionMessageEventArgs arg)
8784
var scopeFactory = services.BuildServiceProvider().GetRequiredService<IServiceScopeFactory>();
8885
var scope = scopeFactory.CreateScope();
8986

90-
var renewSessionLockCancellationTokenSource = new CancellationTokenSource();
91-
var renewSessionLockCancellationToken = renewSessionLockCancellationTokenSource.Token;
92-
Task? renewSessionLockTask = null;
9387
try
9488
{
9589
var scheduledCommandRunner =
9690
scope.ServiceProvider.GetRequiredService<IScheduledCommandRunner<TCommand>>();
97-
var scheduledCommandRunnerTask = scheduledCommandRunner.RunAsync(scheduledCommand);
98-
99-
// renew session lock every 30 seconds
100-
renewSessionLockTask = Task.Run(
101-
async () =>
102-
{
103-
while (!renewSessionLockCancellationToken.IsCancellationRequested &&
104-
!scheduledCommandRunnerTask.IsCompleted)
105-
{
106-
await Task.Delay(_renewSessionLockInterval, renewSessionLockCancellationToken);
107-
Console.WriteLine($"Renewing session lock for session{arg.SessionId}...");
108-
try
109-
{
110-
await arg.RenewSessionLockAsync(renewSessionLockCancellationToken);
111-
Console.WriteLine($"Renewed session lock for session {arg.SessionId}");
112-
}
113-
catch (Exception e)
114-
{
115-
Console.WriteLine($"Failed to renew session lock for session {arg.SessionId}");
116-
Console.WriteLine(e);
117-
throw;
118-
}
119-
}
120-
},
121-
renewSessionLockCancellationTokenSource.Token);
122-
123-
await scheduledCommandRunnerTask;
124-
renewSessionLockCancellationTokenSource.Cancel();
91+
await scheduledCommandRunner.RunAsync(scheduledCommand);
12592
}
12693
catch (Exception e)
12794
{
12895
// ToDo: Dead letter message ==> Maybe remove try/catch let AutoComplete manage this
12996
Console.WriteLine(e);
13097
throw;
13198
}
132-
finally
133-
{
134-
renewSessionLockCancellationTokenSource.Dispose();
135-
renewSessionLockTask?.Dispose();
136-
}
13799
}
138100

139101
public async Task StartAsync(CancellationToken cancellationToken)

src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Setup/AzureServiceBusSetupEnvironment.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using Azure.Messaging.ServiceBus;
45
using Microsoft.Extensions.DependencyInjection;
56
using Wemogy.Core.Errors;
@@ -56,12 +57,10 @@ public AzureServiceBusSetupEnvironment AddDelayedProcessor<TCommand>(
5657
/// <param name="maxConcurrentSessions">The maximum number of concurrent sessions (default 1)</param>
5758
/// <param name="maxConcurrentCallsPerSession">The maximum number of concurrent calls per session (default 1)</param>
5859
/// <param name="configureSessionProcessorOptions">Optional custom configuration of the ServiceBusSessionProcessorOptions</param>
59-
/// <param name="renewSessionLockInterval">The interval to renew the session lock (default 1 minute)</param>
6060
public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor<TCommand>(
6161
int maxConcurrentSessions = 1,
6262
int maxConcurrentCallsPerSession = 1,
63-
Action<ServiceBusSessionProcessorOptions>? configureSessionProcessorOptions = null,
64-
TimeSpan? renewSessionLockInterval = null)
63+
Action<ServiceBusSessionProcessorOptions>? configureSessionProcessorOptions = null)
6564
where TCommand : ICommandBase
6665
{
6766
var queueName = GetQueueName<TCommand>();
@@ -79,7 +78,8 @@ public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor<TCommand>(
7978
{
8079
MaxConcurrentSessions = maxConcurrentSessions,
8180
MaxConcurrentCallsPerSession = maxConcurrentCallsPerSession,
82-
SessionIdleTimeout = TimeSpan.FromSeconds(2)
81+
SessionIdleTimeout = TimeSpan.FromSeconds(2),
82+
MaxAutoLockRenewalDuration = Timeout.InfiniteTimeSpan
8383
};
8484

8585
configureSessionProcessorOptions?.Invoke(serviceBusSessionProcessorOptions);
@@ -89,8 +89,7 @@ public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor<TCommand>(
8989
serviceBusSessionProcessorOptions);
9090
var processor = new AzureServiceBusCommandSessionProcessor<TCommand>(
9191
serviceBusSessionProcessor,
92-
_serviceCollection,
93-
renewSessionLockInterval ?? TimeSpan.FromMinutes(1));
92+
_serviceCollection);
9493

9594
return processor;
9695
});

src/extensions/azureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus/Wemogy.CQRS.Extensions.AzureServiceBus.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
</PropertyGroup>
1515

1616
<ItemGroup>
17-
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.16.2" />
17+
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.1" />
1818
<PackageReference Include="NuGetizer" Version="0.7.1">
1919
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
2020
<PrivateAssets>all</PrivateAssets>

0 commit comments

Comments
 (0)