From fc85236f29d6d0c62617eabd2833971208e1a4de Mon Sep 17 00:00:00 2001 From: "d.v.tsvettsikh" Date: Sun, 18 May 2025 16:07:21 +0700 Subject: [PATCH 1/2] Advisory lock and AutoResetEvent --- .../OutboxBackgroundService.cs | 38 ++++++------------- src/Outbox.WebApi/Outbox.WebApi.csproj | 1 + src/Outbox.WebApi/Program.cs | 5 +++ .../Telemetry/ActivitySources.cs | 9 ----- 4 files changed, 18 insertions(+), 35 deletions(-) delete mode 100644 src/Outbox.WebApi/Telemetry/ActivitySources.cs diff --git a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs index eb614c1..f47c104 100644 --- a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs +++ b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs @@ -1,11 +1,10 @@ using System.Text; -using System.Threading.Channels; using Confluent.Kafka; +using Medallion.Threading; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; using Outbox.Configurations; using Outbox.Entities; -using Outbox.Extensions; namespace Outbox.WebApi.BackgroundServices; @@ -14,21 +13,21 @@ public class OutboxBackgroundService : BackgroundService, IOutboxMessagesProcess private readonly IServiceProvider _serviceProvider; private readonly IOptions _outboxOptions; private readonly ILogger _logger; - + private readonly IDistributedLockProvider _distributedLockProvider; + //channel is faster then AutoResetEvent - private readonly Channel _channel = Channel.CreateBounded(new BoundedChannelOptions(capacity: 1) - { - SingleReader = true, SingleWriter = false - }); + private readonly AutoResetEvent _autoResetEvent = new(false); public OutboxBackgroundService( IServiceProvider serviceProvider, IOptions outboxOptions, - ILogger logger) + ILogger logger, + IDistributedLockProvider distributedLockProvider) { _serviceProvider = serviceProvider; _outboxOptions = outboxOptions; _logger = logger; + _distributedLockProvider = distributedLockProvider; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -37,8 +36,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - _channel.Reader.TryRead(out _); - using var scope = _serviceProvider.CreateScope(); await using var dbContext = scope.ServiceProvider.GetRequiredService(); @@ -56,10 +53,10 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task ProcessMessagesAsync(AppDbContext dbContext, CancellationToken cancellationToken) { - await using var transaction = await dbContext.Database.BeginTransactionAsync(cancellationToken); - + //pg_bouncer must be in session mode + await using var advisoryLock = await _distributedLockProvider.AcquireLockAsync("Outbox", cancellationToken: cancellationToken); + var offset = await dbContext.OutboxOffsets - .ForUpdate() .FirstAsync(cancellationToken); var outboxMessages = await dbContext.OutboxMessages @@ -76,8 +73,6 @@ private async Task ProcessMessagesAsync(AppDbContext dbContext, Cancellatio offset.LastProcessedId = outboxMessages.Last().Id; await dbContext.SaveChangesAsync(cancellationToken); - await transaction.CommitAsync(cancellationToken); - return outboxMessages.Length; } @@ -107,19 +102,10 @@ private Task ProcessMessageAsync(TKey key, OutboxMessage message, Cancella } - public void NewMessagesPersisted() => _channel.Writer.TryWrite(false); + public void NewMessagesPersisted() => _autoResetEvent.Set(); private async ValueTask WaitForOutboxMessage(CancellationToken stoppingToken) { - try - { - using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); - cts.CancelAfter(_outboxOptions.Value.NoMessagesDelay); - await _channel.Reader.ReadAsync(cts.Token); - } - catch (OperationCanceledException) when (!stoppingToken.IsCancellationRequested) - { - // ignored - } + _autoResetEvent.WaitOne(_outboxOptions.Value.NoMessagesDelay); } } \ No newline at end of file diff --git a/src/Outbox.WebApi/Outbox.WebApi.csproj b/src/Outbox.WebApi/Outbox.WebApi.csproj index f6c6b7d..1e5e1e0 100644 --- a/src/Outbox.WebApi/Outbox.WebApi.csproj +++ b/src/Outbox.WebApi/Outbox.WebApi.csproj @@ -9,6 +9,7 @@ + diff --git a/src/Outbox.WebApi/Program.cs b/src/Outbox.WebApi/Program.cs index a66c5ec..b005ca4 100644 --- a/src/Outbox.WebApi/Program.cs +++ b/src/Outbox.WebApi/Program.cs @@ -1,6 +1,8 @@ using System.Diagnostics; using Confluent.Kafka; using EFCore.MigrationExtensions.PostgreSQL; +using Medallion.Threading; +using Medallion.Threading.Postgres; using Microsoft.EntityFrameworkCore; using Npgsql; using Outbox; @@ -28,6 +30,9 @@ optionsBuilder.UseSqlObjects(); }); +builder.Services.AddSingleton(_ => + new PostgresDistributedSynchronizationProvider(builder.Configuration.GetConnectionString("Outbox")!)); + builder.Services.AddKafkaClient(); diff --git a/src/Outbox.WebApi/Telemetry/ActivitySources.cs b/src/Outbox.WebApi/Telemetry/ActivitySources.cs deleted file mode 100644 index aa87011..0000000 --- a/src/Outbox.WebApi/Telemetry/ActivitySources.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Diagnostics; - -namespace Outbox.WebApi.Telemetry; - -public static class ActivitySources -{ - public const string OutboxSource = "Outbox"; - public static readonly ActivitySource Tracing = new(OutboxSource); -} \ No newline at end of file From ecd907353486aeaef6820d80fd1a99c2ae753990 Mon Sep 17 00:00:00 2001 From: "d.v.tsvettsikh" Date: Sun, 18 May 2025 16:08:12 +0700 Subject: [PATCH 2/2] fix --- src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs index f47c104..e7ce521 100644 --- a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs +++ b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs @@ -15,7 +15,6 @@ public class OutboxBackgroundService : BackgroundService, IOutboxMessagesProcess private readonly ILogger _logger; private readonly IDistributedLockProvider _distributedLockProvider; - //channel is faster then AutoResetEvent private readonly AutoResetEvent _autoResetEvent = new(false); public OutboxBackgroundService(