From 8bfb60d0f0443da52bbf7395ce75dbb4898f2511 Mon Sep 17 00:00:00 2001 From: "d.v.tsvettsikh" Date: Sun, 18 May 2025 18:24:34 +0700 Subject: [PATCH] 3 pessimistic lock messages --- .../OutboxBackgroundService.cs | 23 ++-- .../Linq2db/OutboxLinqToDBForEFToolsImpl.cs | 33 +++++ src/Outbox.WebApi/Outbox.WebApi.csproj | 1 + src/Outbox.WebApi/Program.cs | 5 + src/Outbox.WebApi/appsettings.json | 1 + .../Configurations/OutboxConfiguration.cs | 1 + src/Outbox/Entities/OutboxMessage.cs | 1 + .../OutboxMessageEntityTypeConfiguration.cs | 3 + ...250518111014_AddAvailableAfter.Designer.cs | 115 ++++++++++++++++++ .../20250518111014_AddAvailableAfter.cs | 43 +++++++ .../Migrations/AppDbContextModelSnapshot.cs | 9 ++ 11 files changed, 228 insertions(+), 7 deletions(-) create mode 100644 src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs create mode 100644 src/Outbox/Migrations/20250518111014_AddAvailableAfter.Designer.cs create mode 100644 src/Outbox/Migrations/20250518111014_AddAvailableAfter.cs diff --git a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs index 21af1e5..d6f8dc9 100644 --- a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs +++ b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs @@ -1,10 +1,12 @@ using System.Text; using Confluent.Kafka; +using LinqToDB; +using LinqToDB.DataProvider.PostgreSQL; +using LinqToDB.EntityFrameworkCore; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; using Outbox.Configurations; using Outbox.Entities; -using Outbox.Extensions; namespace Outbox.WebApi.BackgroundServices; @@ -49,14 +51,23 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task ProcessMessagesAsync(AppDbContext dbContext, CancellationToken cancellationToken) { - var transaction = await dbContext.Database.BeginTransactionAsync(cancellationToken); - var outboxMessages = await dbContext.OutboxMessages + .Where(x => DateTimeOffset.UtcNow > x.AvailableAfter) .AsNoTracking() + .ToLinqToDB() .OrderBy(x => x.Id) .Take(_outboxOptions.Value.BatchSize) - .ForUpdateSkipLocked() - .ToArrayAsync(cancellationToken); + .SubQueryHint(PostgreSQLHints.ForUpdate) + .SubQueryHint(PostgreSQLHints.SkipLocked) + .AsSubQuery() + .UpdateWithOutput(x => x, + x => new OutboxMessage + { + AvailableAfter = DateTimeOffset.UtcNow + _outboxOptions.Value.LockedDelay + }, + (_, _, inserted) => inserted) + .AsQueryable() + .ToArrayAsyncLinqToDB(cancellationToken); if (!outboxMessages.Any()) return 0; @@ -67,8 +78,6 @@ await dbContext.OutboxMessages .Where(x => messageIds.Contains(x.Id)) .ExecuteDeleteAsync(cancellationToken); - await transaction.CommitAsync(cancellationToken); - return outboxMessages.Length; } diff --git a/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs b/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs new file mode 100644 index 0000000..0af401d --- /dev/null +++ b/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs @@ -0,0 +1,33 @@ +using System.Text.Json; +using LinqToDB; +using LinqToDB.EntityFrameworkCore; +using LinqToDB.Mapping; +using LinqToDB.Metadata; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; + +namespace Outbox.WebApi.Linq2db; + +public class OutboxLinqToDBForEFToolsImpl : LinqToDBForEFToolsImplDefault +{ + private readonly string _connectionsString; + + public OutboxLinqToDBForEFToolsImpl(string connectionsString) => _connectionsString = connectionsString; + + public override EFConnectionInfo ExtractConnectionInfo(IDbContextOptions? options) => + new() + { + ConnectionString = _connectionsString, + }; + + public override MappingSchema CreateMappingSchema(IModel model, IMetadataReader? metadataReader, IValueConverterSelector? convertorSelector, DataOptions dataOptions) + { + var result = base.CreateMappingSchema(model, metadataReader, convertorSelector, dataOptions); + + result.SetConverter>(str => JsonSerializer.Deserialize>(str) ?? new Dictionary()); + result.SetConverter, string>(dict => JsonSerializer.Serialize(dict)); + + return result; + } +} \ No newline at end of file diff --git a/src/Outbox.WebApi/Outbox.WebApi.csproj b/src/Outbox.WebApi/Outbox.WebApi.csproj index f6c6b7d..9b12cc2 100644 --- a/src/Outbox.WebApi/Outbox.WebApi.csproj +++ b/src/Outbox.WebApi/Outbox.WebApi.csproj @@ -10,6 +10,7 @@ + all diff --git a/src/Outbox.WebApi/Program.cs b/src/Outbox.WebApi/Program.cs index 578ae97..41c46f7 100644 --- a/src/Outbox.WebApi/Program.cs +++ b/src/Outbox.WebApi/Program.cs @@ -1,6 +1,7 @@ using System.Diagnostics; using Confluent.Kafka; using EFCore.MigrationExtensions.PostgreSQL; +using LinqToDB.EntityFrameworkCore; using Microsoft.EntityFrameworkCore; using Npgsql; using Outbox; @@ -8,6 +9,7 @@ using Outbox.Entities; using Outbox.WebApi.BackgroundServices; using Outbox.WebApi.EFCore; +using Outbox.WebApi.Linq2db; using Outbox.WebApi.Telemetry; var builder = WebApplication.CreateBuilder(args); @@ -29,6 +31,9 @@ optionsBuilder.UseSqlObjects(); }); +LinqToDBForEFTools.Implementation = new OutboxLinqToDBForEFToolsImpl(builder.Configuration.GetConnectionString("Outbox")!); +LinqToDBForEFTools.Initialize(); + builder.Services.AddKafkaClient(); builder.Services.Configure(builder.Configuration.GetSection("Outbox")); diff --git a/src/Outbox.WebApi/appsettings.json b/src/Outbox.WebApi/appsettings.json index 40759b9..f4a7e29 100644 --- a/src/Outbox.WebApi/appsettings.json +++ b/src/Outbox.WebApi/appsettings.json @@ -10,6 +10,7 @@ "Outbox": "Host=localhost;Port=5432;Database=outbox;Username=postgres;Password=postgres;" }, "Outbox": { + "LockedDelay": "00:00:15", "NoMessagesDelay": "00:00:10", "BatchSize": 100 }, diff --git a/src/Outbox/Configurations/OutboxConfiguration.cs b/src/Outbox/Configurations/OutboxConfiguration.cs index 83bfce2..f1b1abd 100644 --- a/src/Outbox/Configurations/OutboxConfiguration.cs +++ b/src/Outbox/Configurations/OutboxConfiguration.cs @@ -2,6 +2,7 @@ namespace Outbox.Configurations; public class OutboxConfiguration { + public TimeSpan LockedDelay { get; set; } public TimeSpan NoMessagesDelay { get; set; } public int BatchSize { get; set; } } \ No newline at end of file diff --git a/src/Outbox/Entities/OutboxMessage.cs b/src/Outbox/Entities/OutboxMessage.cs index be517cc..05f290e 100644 --- a/src/Outbox/Entities/OutboxMessage.cs +++ b/src/Outbox/Entities/OutboxMessage.cs @@ -9,4 +9,5 @@ public class OutboxMessage public string Payload { get; set; } = null!; public Dictionary Headers { get; set; } = null!; public DateTimeOffset CreatedAt { get; set; } + public DateTimeOffset AvailableAfter { get; set; } } \ No newline at end of file diff --git a/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs b/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs index 4973dd1..4595ff0 100644 --- a/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs +++ b/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs @@ -16,5 +16,8 @@ public void Configure(EntityTypeBuilder builder) builder.Property(x => x.Type).HasMaxLength(128); builder.Property(x => x.Topic).HasMaxLength(128); builder.Property(x => x.CreatedAt).HasDefaultValueSql("now()"); + builder.Property(x => x.AvailableAfter).HasDefaultValueSql("now()"); + + builder.HasIndex(x => x.AvailableAfter); } } \ No newline at end of file diff --git a/src/Outbox/Migrations/20250518111014_AddAvailableAfter.Designer.cs b/src/Outbox/Migrations/20250518111014_AddAvailableAfter.Designer.cs new file mode 100644 index 0000000..690a51b --- /dev/null +++ b/src/Outbox/Migrations/20250518111014_AddAvailableAfter.Designer.cs @@ -0,0 +1,115 @@ +// +using System; +using System.Collections.Generic; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; +using Outbox; + +#nullable disable + +namespace Outbox.Migrations +{ + [DbContext(typeof(AppDbContext))] + [Migration("20250518111014_AddAvailableAfter")] + partial class AddAvailableAfter + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("outbox") + .HasAnnotation("ProductVersion", "9.0.3") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("Outbox.Entities.OutboxMessage", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("AvailableAfter") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasColumnName("available_after") + .HasDefaultValueSql("now()"); + + b.Property("CreatedAt") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasColumnName("created_at") + .HasDefaultValueSql("now()"); + + b.Property>("Headers") + .IsRequired() + .HasColumnType("jsonb") + .HasColumnName("headers"); + + b.Property("Key") + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("key"); + + b.Property("Payload") + .IsRequired() + .HasColumnType("jsonb") + .HasColumnName("payload"); + + b.Property("Topic") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("topic"); + + b.Property("Type") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("type"); + + b.HasKey("Id") + .HasName("pk_outbox_messages"); + + b.HasIndex("AvailableAfter") + .HasDatabaseName("ix_outbox_messages_available_after"); + + b.ToTable("outbox_messages", "outbox"); + }); + + modelBuilder.Entity("Outbox.Entities.OutboxOffset", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("LastProcessedId") + .HasColumnType("integer") + .HasColumnName("last_processed_id"); + + b.HasKey("Id") + .HasName("pk_outbox_offsets"); + + b.ToTable("outbox_offsets", "outbox"); + + b.HasData( + new + { + Id = 1, + LastProcessedId = 0 + }); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/Outbox/Migrations/20250518111014_AddAvailableAfter.cs b/src/Outbox/Migrations/20250518111014_AddAvailableAfter.cs new file mode 100644 index 0000000..b529e9b --- /dev/null +++ b/src/Outbox/Migrations/20250518111014_AddAvailableAfter.cs @@ -0,0 +1,43 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace Outbox.Migrations +{ + /// + public partial class AddAvailableAfter : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.AddColumn( + name: "available_after", + schema: "outbox", + table: "outbox_messages", + type: "timestamp with time zone", + nullable: false, + defaultValueSql: "now()"); + + migrationBuilder.CreateIndex( + name: "ix_outbox_messages_available_after", + schema: "outbox", + table: "outbox_messages", + column: "available_after"); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "ix_outbox_messages_available_after", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.DropColumn( + name: "available_after", + schema: "outbox", + table: "outbox_messages"); + } + } +} diff --git a/src/Outbox/Migrations/AppDbContextModelSnapshot.cs b/src/Outbox/Migrations/AppDbContextModelSnapshot.cs index 10f1d1c..bfdaabd 100644 --- a/src/Outbox/Migrations/AppDbContextModelSnapshot.cs +++ b/src/Outbox/Migrations/AppDbContextModelSnapshot.cs @@ -33,6 +33,12 @@ protected override void BuildModel(ModelBuilder modelBuilder) NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + b.Property("AvailableAfter") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasColumnName("available_after") + .HasDefaultValueSql("now()"); + b.Property("CreatedAt") .ValueGeneratedOnAdd() .HasColumnType("timestamp with time zone") @@ -69,6 +75,9 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("Id") .HasName("pk_outbox_messages"); + b.HasIndex("AvailableAfter") + .HasDatabaseName("ix_outbox_messages_available_after"); + b.ToTable("outbox_messages", "outbox"); });