Skip to content

Commit

Permalink
Merge pull request #73 from wemogy/main
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
SebastianKuesters authored Dec 12, 2023
2 parents d45ab58 + fbad959 commit d11dc67
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ public class AzureServiceBusCommandSessionProcessor<TCommand> : IAzureServiceBus
public bool IsAlive => _isStarted && !_serviceBusSessionProcessor.IsClosed;

private readonly string _handleMessageActivityName;
private readonly TimeSpan _renewSessionLockInterval;

public AzureServiceBusCommandSessionProcessor(
ServiceBusSessionProcessor serviceBusSessionProcessor,
IServiceCollection serviceCollection,
TimeSpan renewSessionLockInterval)
IServiceCollection serviceCollection)
{
_serviceBusSessionProcessor = serviceBusSessionProcessor;
_serviceCollection = serviceCollection;
Expand All @@ -46,7 +44,6 @@ public AzureServiceBusCommandSessionProcessor(
.BuildServiceProvider()
.GetRequiredService<ScheduledCommandDependencies>();
_handleMessageActivityName = $"HandleMessageOf{typeof(TCommand).Name}";
_renewSessionLockInterval = renewSessionLockInterval;
}

public async Task HandleMessageAsync(ProcessSessionMessageEventArgs arg)
Expand Down Expand Up @@ -87,53 +84,18 @@ public async Task HandleMessageAsync(ProcessSessionMessageEventArgs arg)
var scopeFactory = services.BuildServiceProvider().GetRequiredService<IServiceScopeFactory>();
var scope = scopeFactory.CreateScope();

var renewSessionLockCancellationTokenSource = new CancellationTokenSource();
var renewSessionLockCancellationToken = renewSessionLockCancellationTokenSource.Token;
Task? renewSessionLockTask = null;
try
{
var scheduledCommandRunner =
scope.ServiceProvider.GetRequiredService<IScheduledCommandRunner<TCommand>>();
var scheduledCommandRunnerTask = scheduledCommandRunner.RunAsync(scheduledCommand);

// renew session lock every 30 seconds
renewSessionLockTask = Task.Run(
async () =>
{
while (!renewSessionLockCancellationToken.IsCancellationRequested &&
!scheduledCommandRunnerTask.IsCompleted)
{
await Task.Delay(_renewSessionLockInterval, renewSessionLockCancellationToken);
Console.WriteLine($"Renewing session lock for session{arg.SessionId}...");
try
{
await arg.RenewSessionLockAsync(renewSessionLockCancellationToken);
Console.WriteLine($"Renewed session lock for session {arg.SessionId}");
}
catch (Exception e)
{
Console.WriteLine($"Failed to renew session lock for session {arg.SessionId}");
Console.WriteLine(e);
throw;
}
}
},
renewSessionLockCancellationTokenSource.Token);

await scheduledCommandRunnerTask;
renewSessionLockCancellationTokenSource.Cancel();
await scheduledCommandRunner.RunAsync(scheduledCommand);
}
catch (Exception e)
{
// ToDo: Dead letter message ==> Maybe remove try/catch let AutoComplete manage this
Console.WriteLine(e);
throw;
}
finally
{
renewSessionLockCancellationTokenSource.Dispose();
renewSessionLockTask?.Dispose();
}
}

public async Task StartAsync(CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.DependencyInjection;
using Wemogy.Core.Errors;
Expand Down Expand Up @@ -56,12 +57,10 @@ public AzureServiceBusSetupEnvironment AddDelayedProcessor<TCommand>(
/// <param name="maxConcurrentSessions">The maximum number of concurrent sessions (default 1)</param>
/// <param name="maxConcurrentCallsPerSession">The maximum number of concurrent calls per session (default 1)</param>
/// <param name="configureSessionProcessorOptions">Optional custom configuration of the ServiceBusSessionProcessorOptions</param>
/// <param name="renewSessionLockInterval">The interval to renew the session lock (default 1 minute)</param>
public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor<TCommand>(
int maxConcurrentSessions = 1,
int maxConcurrentCallsPerSession = 1,
Action<ServiceBusSessionProcessorOptions>? configureSessionProcessorOptions = null,
TimeSpan? renewSessionLockInterval = null)
Action<ServiceBusSessionProcessorOptions>? configureSessionProcessorOptions = null)
where TCommand : ICommandBase
{
var queueName = GetQueueName<TCommand>();
Expand All @@ -79,7 +78,8 @@ public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor<TCommand>(
{
MaxConcurrentSessions = maxConcurrentSessions,
MaxConcurrentCallsPerSession = maxConcurrentCallsPerSession,
SessionIdleTimeout = TimeSpan.FromSeconds(2)
SessionIdleTimeout = TimeSpan.FromSeconds(2),
MaxAutoLockRenewalDuration = Timeout.InfiniteTimeSpan
};

configureSessionProcessorOptions?.Invoke(serviceBusSessionProcessorOptions);
Expand All @@ -89,8 +89,7 @@ public AzureServiceBusSetupEnvironment AddDelayedSessionProcessor<TCommand>(
serviceBusSessionProcessorOptions);
var processor = new AzureServiceBusCommandSessionProcessor<TCommand>(
serviceBusSessionProcessor,
_serviceCollection,
renewSessionLockInterval ?? TimeSpan.FromMinutes(1));
_serviceCollection);

return processor;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.16.2" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.1" />
<PackageReference Include="NuGetizer" Version="0.7.1">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
Expand Down

0 comments on commit d11dc67

Please sign in to comment.