From c3f37570d4b26ec189fa8761d90db2624e9b6c1d Mon Sep 17 00:00:00 2001 From: "d.v.tsvettsikh" Date: Sun, 18 May 2025 17:29:12 +0700 Subject: [PATCH 1/2] 2 lock messages for update --- .../OutboxBackgroundService.cs | 23 ++++++++----------- src/Outbox.WebApi/Outbox.WebApi.csproj | 1 - src/Outbox.WebApi/Program.cs | 10 +------- 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs index e7ce521..21af1e5 100644 --- a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs +++ b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs @@ -1,10 +1,10 @@ using System.Text; using Confluent.Kafka; -using Medallion.Threading; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; using Outbox.Configurations; using Outbox.Entities; +using Outbox.Extensions; namespace Outbox.WebApi.BackgroundServices; @@ -13,20 +13,17 @@ public class OutboxBackgroundService : BackgroundService, IOutboxMessagesProcess private readonly IServiceProvider _serviceProvider; private readonly IOptions _outboxOptions; private readonly ILogger _logger; - private readonly IDistributedLockProvider _distributedLockProvider; private readonly AutoResetEvent _autoResetEvent = new(false); public OutboxBackgroundService( IServiceProvider serviceProvider, IOptions outboxOptions, - ILogger logger, - IDistributedLockProvider distributedLockProvider) + ILogger logger) { _serviceProvider = serviceProvider; _outboxOptions = outboxOptions; _logger = logger; - _distributedLockProvider = distributedLockProvider; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -52,25 +49,25 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task ProcessMessagesAsync(AppDbContext dbContext, CancellationToken cancellationToken) { - //pg_bouncer must be in session mode - await using var advisoryLock = await _distributedLockProvider.AcquireLockAsync("Outbox", cancellationToken: cancellationToken); - - var offset = await dbContext.OutboxOffsets - .FirstAsync(cancellationToken); + var transaction = await dbContext.Database.BeginTransactionAsync(cancellationToken); var outboxMessages = await dbContext.OutboxMessages .AsNoTracking() - .Where(x => x.Id > offset.LastProcessedId) .OrderBy(x => x.Id) .Take(_outboxOptions.Value.BatchSize) + .ForUpdateSkipLocked() .ToArrayAsync(cancellationToken); if (!outboxMessages.Any()) return 0; await ProcessOutboxMessagesAsync(outboxMessages, cancellationToken); - offset.LastProcessedId = outboxMessages.Last().Id; - await dbContext.SaveChangesAsync(cancellationToken); + var messageIds = outboxMessages.Select(x => x.Id).ToArray(); + await dbContext.OutboxMessages + .Where(x => messageIds.Contains(x.Id)) + .ExecuteDeleteAsync(cancellationToken); + + await transaction.CommitAsync(cancellationToken); return outboxMessages.Length; } diff --git a/src/Outbox.WebApi/Outbox.WebApi.csproj b/src/Outbox.WebApi/Outbox.WebApi.csproj index 1e5e1e0..f6c6b7d 100644 --- a/src/Outbox.WebApi/Outbox.WebApi.csproj +++ b/src/Outbox.WebApi/Outbox.WebApi.csproj @@ -9,7 +9,6 @@ - diff --git a/src/Outbox.WebApi/Program.cs b/src/Outbox.WebApi/Program.cs index b005ca4..578ae97 100644 --- a/src/Outbox.WebApi/Program.cs +++ b/src/Outbox.WebApi/Program.cs @@ -1,8 +1,6 @@ using System.Diagnostics; using Confluent.Kafka; using EFCore.MigrationExtensions.PostgreSQL; -using Medallion.Threading; -using Medallion.Threading.Postgres; using Microsoft.EntityFrameworkCore; using Npgsql; using Outbox; @@ -30,9 +28,6 @@ optionsBuilder.UseSqlObjects(); }); -builder.Services.AddSingleton(_ => - new PostgresDistributedSynchronizationProvider(builder.Configuration.GetConnectionString("Outbox")!)); - builder.Services.AddKafkaClient(); @@ -44,10 +39,7 @@ builder.Services.AddSingleton(sp => sp.GetRequiredService()); builder.Services.AddOpenTelemetry() - .WithTracing(bld => - { - //bld.AddSource(ActivitySources.OutboxSource); - }); + .WithTracing(); var app = builder.Build(); From 8b2c1b4bd48d54e152fc429a1a53c660dbe3e9a1 Mon Sep 17 00:00:00 2001 From: "d.v.tsvettsikh" Date: Fri, 20 Jun 2025 10:39:30 +0700 Subject: [PATCH 2/2] Generate data for tests --- generate_data.sql | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 generate_data.sql diff --git a/generate_data.sql b/generate_data.sql new file mode 100644 index 0000000..3ffb674 --- /dev/null +++ b/generate_data.sql @@ -0,0 +1,9 @@ +INSERT INTO outbox_messages +(topic, "key", "type", payload, headers) +SELECT + 'topic-' || n.v%2, + n.v, + 'type', + '{"byte250":"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"}', + '{"traceparent": "00-9588ce39e007adce6d63a55779ea22e1-500119597384256a-00"}' +FROM generate_series(1,1000000) AS n(v); \ No newline at end of file