diff --git a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs index eb614c1..e7ce521 100644 --- a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs +++ b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs @@ -1,11 +1,10 @@ using System.Text; -using System.Threading.Channels; using Confluent.Kafka; +using Medallion.Threading; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; using Outbox.Configurations; using Outbox.Entities; -using Outbox.Extensions; namespace Outbox.WebApi.BackgroundServices; @@ -14,21 +13,20 @@ public class OutboxBackgroundService : BackgroundService, IOutboxMessagesProcess private readonly IServiceProvider _serviceProvider; private readonly IOptions _outboxOptions; private readonly ILogger _logger; - - //channel is faster then AutoResetEvent - private readonly Channel _channel = Channel.CreateBounded(new BoundedChannelOptions(capacity: 1) - { - SingleReader = true, SingleWriter = false - }); + private readonly IDistributedLockProvider _distributedLockProvider; + + private readonly AutoResetEvent _autoResetEvent = new(false); public OutboxBackgroundService( IServiceProvider serviceProvider, IOptions outboxOptions, - ILogger logger) + ILogger logger, + IDistributedLockProvider distributedLockProvider) { _serviceProvider = serviceProvider; _outboxOptions = outboxOptions; _logger = logger; + _distributedLockProvider = distributedLockProvider; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -37,8 +35,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - _channel.Reader.TryRead(out _); - using var scope = _serviceProvider.CreateScope(); await using var dbContext = scope.ServiceProvider.GetRequiredService(); @@ -56,10 +52,10 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task ProcessMessagesAsync(AppDbContext dbContext, CancellationToken cancellationToken) { - await using var transaction = await dbContext.Database.BeginTransactionAsync(cancellationToken); - + //pg_bouncer must be in session mode + await using var advisoryLock = await _distributedLockProvider.AcquireLockAsync("Outbox", cancellationToken: cancellationToken); + var offset = await dbContext.OutboxOffsets - .ForUpdate() .FirstAsync(cancellationToken); var outboxMessages = await dbContext.OutboxMessages @@ -76,8 +72,6 @@ private async Task ProcessMessagesAsync(AppDbContext dbContext, Cancellatio offset.LastProcessedId = outboxMessages.Last().Id; await dbContext.SaveChangesAsync(cancellationToken); - await transaction.CommitAsync(cancellationToken); - return outboxMessages.Length; } @@ -107,19 +101,10 @@ private Task ProcessMessageAsync(TKey key, OutboxMessage message, Cancella } - public void NewMessagesPersisted() => _channel.Writer.TryWrite(false); + public void NewMessagesPersisted() => _autoResetEvent.Set(); private async ValueTask WaitForOutboxMessage(CancellationToken stoppingToken) { - try - { - using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); - cts.CancelAfter(_outboxOptions.Value.NoMessagesDelay); - await _channel.Reader.ReadAsync(cts.Token); - } - catch (OperationCanceledException) when (!stoppingToken.IsCancellationRequested) - { - // ignored - } + _autoResetEvent.WaitOne(_outboxOptions.Value.NoMessagesDelay); } } \ No newline at end of file diff --git a/src/Outbox.WebApi/Outbox.WebApi.csproj b/src/Outbox.WebApi/Outbox.WebApi.csproj index f6c6b7d..1e5e1e0 100644 --- a/src/Outbox.WebApi/Outbox.WebApi.csproj +++ b/src/Outbox.WebApi/Outbox.WebApi.csproj @@ -9,6 +9,7 @@ + diff --git a/src/Outbox.WebApi/Program.cs b/src/Outbox.WebApi/Program.cs index a66c5ec..b005ca4 100644 --- a/src/Outbox.WebApi/Program.cs +++ b/src/Outbox.WebApi/Program.cs @@ -1,6 +1,8 @@ using System.Diagnostics; using Confluent.Kafka; using EFCore.MigrationExtensions.PostgreSQL; +using Medallion.Threading; +using Medallion.Threading.Postgres; using Microsoft.EntityFrameworkCore; using Npgsql; using Outbox; @@ -28,6 +30,9 @@ optionsBuilder.UseSqlObjects(); }); +builder.Services.AddSingleton(_ => + new PostgresDistributedSynchronizationProvider(builder.Configuration.GetConnectionString("Outbox")!)); + builder.Services.AddKafkaClient();