From 1bfc4d739da748723f516fa8a0f25e5a40f803f5 Mon Sep 17 00:00:00 2001 From: "d.v.tsvettsikh" Date: Mon, 19 May 2025 08:16:21 +0700 Subject: [PATCH 1/3] 4 virtual partitions with transaction id --- .../OutboxBackgroundService.cs | 78 ++++++-- src/Outbox.WebApi/EFCore/OutboxInterceptor.cs | 8 +- .../Linq2db/OutboxLinqToDBForEFToolsImpl.cs | 4 + .../Linq2db/PostgreSqlExtensions.cs | 9 + src/Outbox.WebApi/Program.cs | 1 + src/Outbox/Entities/OutboxMessage.cs | 3 +- src/Outbox/Entities/OutboxOffset.cs | 5 + .../OutboxMessageEntityTypeConfiguration.cs | 4 +- .../OutboxOffsetEntityTypeConfiguration.cs | 24 ++- src/Outbox/IOutboxMessagesProcessor.cs | 2 +- ...0250519000818_AddTransactionId.Designer.cs | 158 +++++++++++++++ .../20250519000818_AddTransactionId.cs | 181 ++++++++++++++++++ .../Migrations/AppDbContextModelSnapshot.cs | 63 +++++- 13 files changed, 506 insertions(+), 34 deletions(-) create mode 100644 src/Outbox.WebApi/Linq2db/PostgreSqlExtensions.cs create mode 100644 src/Outbox/Migrations/20250519000818_AddTransactionId.Designer.cs create mode 100644 src/Outbox/Migrations/20250519000818_AddTransactionId.cs diff --git a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs index d6f8dc9..120adde 100644 --- a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs +++ b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs @@ -3,10 +3,10 @@ using LinqToDB; using LinqToDB.DataProvider.PostgreSQL; using LinqToDB.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; using Outbox.Configurations; using Outbox.Entities; +using Outbox.WebApi.Linq2db; namespace Outbox.WebApi.BackgroundServices; @@ -17,6 +17,8 @@ public class OutboxBackgroundService : BackgroundService, IOutboxMessagesProcess private readonly ILogger _logger; private readonly AutoResetEvent _autoResetEvent = new(false); + + private (string Topic, int Partition)? _offsetWithMessages; public OutboxBackgroundService( IServiceProvider serviceProvider, @@ -39,7 +41,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) var processedMessages = await ProcessMessagesAsync(dbContext, stoppingToken); - if (processedMessages != _outboxOptions.Value.BatchSize) + if (processedMessages == -1) //no partition await WaitForOutboxMessage(stoppingToken); } catch (Exception e) @@ -51,32 +53,74 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task ProcessMessagesAsync(AppDbContext dbContext, CancellationToken cancellationToken) { - var outboxMessages = await dbContext.OutboxMessages - .Where(x => DateTimeOffset.UtcNow > x.AvailableAfter) - .AsNoTracking() - .ToLinqToDB() - .OrderBy(x => x.Id) - .Take(_outboxOptions.Value.BatchSize) + //change tracking can be disabled + //dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; + + var dataConnection = dbContext.CreateLinqToDBConnection(); + + IQueryable offsetsQuery = dataConnection.GetTable(); + if (_offsetWithMessages != null) + { + offsetsQuery = offsetsQuery.Where(x => x.Topic == _offsetWithMessages.Value.Topic && + x.Partition == _offsetWithMessages.Value.Partition); + + _offsetWithMessages = null; + } + else + { + offsetsQuery = offsetsQuery.Where(x => DateTimeOffset.UtcNow > x.AvailableAfter) + .OrderBy(x => x.AvailableAfter); //earliest available + } + + var offsets = await offsetsQuery + .Take(1) .SubQueryHint(PostgreSQLHints.ForUpdate) .SubQueryHint(PostgreSQLHints.SkipLocked) .AsSubQuery() .UpdateWithOutput(x => x, - x => new OutboxMessage + x => new OutboxOffset { AvailableAfter = DateTimeOffset.UtcNow + _outboxOptions.Value.LockedDelay }, (_, _, inserted) => inserted) .AsQueryable() .ToArrayAsyncLinqToDB(cancellationToken); + + if (!offsets.Any()) return -1; //no partition + var offset = offsets.Single(); + + var outboxMessages = await dataConnection.GetTable() + .Where(x => x.Topic == offset.Topic && + x.Partition == offset.Partition && + ( + x.TransactionId > offset.LastProcessedTransactionId || + x.TransactionId == offset.LastProcessedTransactionId && x.Id > offset.LastProcessedId + ) && + x.TransactionId < PostgreSqlExtensions.MinCurrentTransactionId + ) + .OrderBy(x => x.TransactionId).ThenBy(x => x.Id) + .Take(_outboxOptions.Value.BatchSize) + .ToArrayAsyncLinqToDB(cancellationToken); - if (!outboxMessages.Any()) return 0; + if (!outboxMessages.Any()) + { + await dataConnection.GetTable() + .Where(x => x.Id == offset.Id) + .Set(x => x.AvailableAfter, DateTimeOffset.UtcNow + _outboxOptions.Value.NoMessagesDelay) + .UpdateAsync(cancellationToken); + + return 0; + } await ProcessOutboxMessagesAsync(outboxMessages, cancellationToken); - var messageIds = outboxMessages.Select(x => x.Id).ToArray(); - await dbContext.OutboxMessages - .Where(x => messageIds.Contains(x.Id)) - .ExecuteDeleteAsync(cancellationToken); + var lastMessage = outboxMessages.Last(); + await dataConnection.GetTable() + .Where(x => x.Id == offset.Id) + .Set(x => x.LastProcessedId, lastMessage.Id) + .Set(x => x.LastProcessedTransactionId, lastMessage.TransactionId) + .Set(x => x.AvailableAfter, DateTimeOffset.UtcNow) + .UpdateAsync(cancellationToken); return outboxMessages.Length; } @@ -107,7 +151,11 @@ private Task ProcessMessageAsync(TKey key, OutboxMessage message, Cancella } - public void NewMessagesPersisted() => _autoResetEvent.Set(); + public void NewMessagesPersisted(string topic, int partition) + { + _offsetWithMessages = (topic, partition); + _autoResetEvent.Set(); + } private async ValueTask WaitForOutboxMessage(CancellationToken stoppingToken) { diff --git a/src/Outbox.WebApi/EFCore/OutboxInterceptor.cs b/src/Outbox.WebApi/EFCore/OutboxInterceptor.cs index f214ba4..08b3cbb 100644 --- a/src/Outbox.WebApi/EFCore/OutboxInterceptor.cs +++ b/src/Outbox.WebApi/EFCore/OutboxInterceptor.cs @@ -12,22 +12,22 @@ public class OutboxInterceptor : ISaveChangesInterceptor public OutboxInterceptor(IOutboxMessagesProcessor outboxMessagesProcessor) => _outboxMessagesProcessor = outboxMessagesProcessor; - private readonly ConcurrentDictionary _contextsWithOutboxMessages = new(); + private readonly ConcurrentDictionary _contextsWithOutboxMessages = new(); private void OnSavingChanges(DbContext context) { var message = context.ChangeTracker.Entries() .FirstOrDefault(x => x.State == EntityState.Added); - if (message != null) _contextsWithOutboxMessages.TryAdd(context, message.Entity.Topic); + if (message != null) _contextsWithOutboxMessages.TryAdd(context, (message.Entity.Topic, message.Entity.Partition)); } private void OnSaveChanges(DbContext context) { if (_contextsWithOutboxMessages.ContainsKey(context)) { - if (_contextsWithOutboxMessages.TryRemove(context, out _)) + if (_contextsWithOutboxMessages.TryRemove(context, out var item)) { - _outboxMessagesProcessor.NewMessagesPersisted(); + _outboxMessagesProcessor.NewMessagesPersisted(item.Topic, item.Partition); } } } diff --git a/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs b/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs index 0af401d..432a854 100644 --- a/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs +++ b/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs @@ -1,5 +1,6 @@ using System.Text.Json; using LinqToDB; +using LinqToDB.Data; using LinqToDB.EntityFrameworkCore; using LinqToDB.Mapping; using LinqToDB.Metadata; @@ -25,6 +26,9 @@ public override MappingSchema CreateMappingSchema(IModel model, IMetadataReader? { var result = base.CreateMappingSchema(model, metadataReader, convertorSelector, dataOptions); + //default parameter type for ulong is decimal, it produces an error on mapping to xid8 + result.SetConvertExpression(value => new DataParameter(null, value, DataType.UInt64)); + result.SetConverter>(str => JsonSerializer.Deserialize>(str) ?? new Dictionary()); result.SetConverter, string>(dict => JsonSerializer.Serialize(dict)); diff --git a/src/Outbox.WebApi/Linq2db/PostgreSqlExtensions.cs b/src/Outbox.WebApi/Linq2db/PostgreSqlExtensions.cs new file mode 100644 index 0000000..a95c7b3 --- /dev/null +++ b/src/Outbox.WebApi/Linq2db/PostgreSqlExtensions.cs @@ -0,0 +1,9 @@ +using LinqToDB; + +namespace Outbox.WebApi.Linq2db; + +public static class PostgreSqlExtensions +{ + [Sql.Expression(ProviderName.PostgreSQL, "pg_snapshot_xmin(pg_current_snapshot())", ServerSideOnly = true)] + public static ulong MinCurrentTransactionId => throw new NotImplementedException(); +} \ No newline at end of file diff --git a/src/Outbox.WebApi/Program.cs b/src/Outbox.WebApi/Program.cs index 41c46f7..12185e1 100644 --- a/src/Outbox.WebApi/Program.cs +++ b/src/Outbox.WebApi/Program.cs @@ -63,6 +63,7 @@ var message = new OutboxMessage { Topic = dto.Topic, + Partition = 0, //hash key % partitions count, ... Type = dto.Type, Key = dto.Key, Payload = dto.Payload, diff --git a/src/Outbox/Entities/OutboxMessage.cs b/src/Outbox/Entities/OutboxMessage.cs index 05f290e..5920293 100644 --- a/src/Outbox/Entities/OutboxMessage.cs +++ b/src/Outbox/Entities/OutboxMessage.cs @@ -3,11 +3,12 @@ public class OutboxMessage { public int Id { get; set; } + public ulong TransactionId { get; set; } public string Topic { get; set; } = null!; + public int Partition { get; set; } public string? Key { get; set; } public string Type { get; set; } = null!; //metadata public string Payload { get; set; } = null!; public Dictionary Headers { get; set; } = null!; public DateTimeOffset CreatedAt { get; set; } - public DateTimeOffset AvailableAfter { get; set; } } \ No newline at end of file diff --git a/src/Outbox/Entities/OutboxOffset.cs b/src/Outbox/Entities/OutboxOffset.cs index 0a065bf..615251f 100644 --- a/src/Outbox/Entities/OutboxOffset.cs +++ b/src/Outbox/Entities/OutboxOffset.cs @@ -3,5 +3,10 @@ namespace Outbox.Entities; public class OutboxOffset { public int Id { get; set; } + public string Topic { get; set; } = null!; + public int Partition { get; set; } public int LastProcessedId { get; set; } + public ulong LastProcessedTransactionId { get; set; } + public DateTimeOffset AvailableAfter { get; set; } + } diff --git a/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs b/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs index 4595ff0..749273a 100644 --- a/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs +++ b/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs @@ -10,14 +10,14 @@ public void Configure(EntityTypeBuilder builder) { builder.HasKey(x => x.Id); + builder.Property(x => x.TransactionId).HasDefaultValueSql("pg_current_xact_id()").HasColumnType("xid8"); builder.Property(x => x.Payload).HasColumnType("jsonb"); builder.Property(x => x.Headers).HasColumnType("jsonb"); builder.Property(x => x.Key).HasMaxLength(128); builder.Property(x => x.Type).HasMaxLength(128); builder.Property(x => x.Topic).HasMaxLength(128); builder.Property(x => x.CreatedAt).HasDefaultValueSql("now()"); - builder.Property(x => x.AvailableAfter).HasDefaultValueSql("now()"); - builder.HasIndex(x => x.AvailableAfter); + builder.HasIndex(x => new {x.Topic, x.Partition, x.TransactionId, x.Id}); } } \ No newline at end of file diff --git a/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs b/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs index 4bd4b77..1a9f6a0 100644 --- a/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs +++ b/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs @@ -8,6 +8,28 @@ public class OutboxOffsetEntityTypeConfiguration : IEntityTypeConfiguration builder) { - builder.HasData(new OutboxOffset {Id = 1, LastProcessedId = 0}); + builder.Property(x => x.Topic).HasMaxLength(128); + builder.Property(e => e.LastProcessedTransactionId).HasColumnType("xid8").HasDefaultValueSql("'0'::xid8"); + builder.Property(x => x.AvailableAfter).HasDefaultValueSql("now()"); + + builder.HasIndex(x => new {x.Topic, x.Partition}).IsUnique(); + + builder.HasData(new OutboxOffset + { + Id = 2, + Topic = "topic-1", + Partition = 0, + LastProcessedId = 0, + LastProcessedTransactionId = 0, + AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00") + }, new OutboxOffset + { + Id = 3, + Topic = "topic-1", + Partition = 1, + LastProcessedId = 0, + LastProcessedTransactionId = 0, + AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00") + }); } } diff --git a/src/Outbox/IOutboxMessagesProcessor.cs b/src/Outbox/IOutboxMessagesProcessor.cs index a2cfc36..d0cd2ff 100644 --- a/src/Outbox/IOutboxMessagesProcessor.cs +++ b/src/Outbox/IOutboxMessagesProcessor.cs @@ -2,5 +2,5 @@ namespace Outbox; public interface IOutboxMessagesProcessor { - void NewMessagesPersisted(); + void NewMessagesPersisted(string topic, int partition); } \ No newline at end of file diff --git a/src/Outbox/Migrations/20250519000818_AddTransactionId.Designer.cs b/src/Outbox/Migrations/20250519000818_AddTransactionId.Designer.cs new file mode 100644 index 0000000..181d664 --- /dev/null +++ b/src/Outbox/Migrations/20250519000818_AddTransactionId.Designer.cs @@ -0,0 +1,158 @@ +// +using System; +using System.Collections.Generic; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; +using Outbox; + +#nullable disable + +namespace Outbox.Migrations +{ + [DbContext(typeof(AppDbContext))] + [Migration("20250519000818_AddTransactionId")] + partial class AddTransactionId + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("outbox") + .HasAnnotation("ProductVersion", "9.0.3") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("Outbox.Entities.OutboxMessage", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("CreatedAt") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasColumnName("created_at") + .HasDefaultValueSql("now()"); + + b.Property>("Headers") + .IsRequired() + .HasColumnType("jsonb") + .HasColumnName("headers"); + + b.Property("Key") + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("key"); + + b.Property("Partition") + .HasColumnType("integer") + .HasColumnName("partition"); + + b.Property("Payload") + .IsRequired() + .HasColumnType("jsonb") + .HasColumnName("payload"); + + b.Property("Topic") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("topic"); + + b.Property("TransactionId") + .ValueGeneratedOnAdd() + .HasColumnType("xid8") + .HasColumnName("transaction_id") + .HasDefaultValueSql("pg_current_xact_id()"); + + b.Property("Type") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("type"); + + b.HasKey("Id") + .HasName("pk_outbox_messages"); + + b.HasIndex("Topic", "Partition", "TransactionId", "Id") + .HasDatabaseName("ix_outbox_messages_topic_partition_transaction_id_id"); + + b.ToTable("outbox_messages", "outbox"); + }); + + modelBuilder.Entity("Outbox.Entities.OutboxOffset", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("AvailableAfter") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasColumnName("available_after") + .HasDefaultValueSql("now()"); + + b.Property("LastProcessedId") + .HasColumnType("integer") + .HasColumnName("last_processed_id"); + + b.Property("LastProcessedTransactionId") + .ValueGeneratedOnAdd() + .HasColumnType("xid8") + .HasColumnName("last_processed_transaction_id") + .HasDefaultValueSql("'0'::xid8"); + + b.Property("Partition") + .HasColumnType("integer") + .HasColumnName("partition"); + + b.Property("Topic") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("topic"); + + b.HasKey("Id") + .HasName("pk_outbox_offsets"); + + b.HasIndex("Topic", "Partition") + .IsUnique() + .HasDatabaseName("ix_outbox_offsets_topic_partition"); + + b.ToTable("outbox_offsets", "outbox"); + + b.HasData( + new + { + Id = 2, + AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), + LastProcessedId = 0, + LastProcessedTransactionId = 0ul, + Partition = 0, + Topic = "topic-1" + }, + new + { + Id = 3, + AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), + LastProcessedId = 0, + LastProcessedTransactionId = 0ul, + Partition = 1, + Topic = "topic-1" + }); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/Outbox/Migrations/20250519000818_AddTransactionId.cs b/src/Outbox/Migrations/20250519000818_AddTransactionId.cs new file mode 100644 index 0000000..58d8a36 --- /dev/null +++ b/src/Outbox/Migrations/20250519000818_AddTransactionId.cs @@ -0,0 +1,181 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +#pragma warning disable CA1814 // Prefer jagged arrays over multidimensional + +namespace Outbox.Migrations +{ + /// + public partial class AddTransactionId : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "ix_outbox_messages_available_after", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.DeleteData( + schema: "outbox", + table: "outbox_offsets", + keyColumn: "id", + keyValue: 1); + + migrationBuilder.DropColumn( + name: "available_after", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.AddColumn( + name: "available_after", + schema: "outbox", + table: "outbox_offsets", + type: "timestamp with time zone", + nullable: false, + defaultValueSql: "now()"); + + migrationBuilder.AddColumn( + name: "last_processed_transaction_id", + schema: "outbox", + table: "outbox_offsets", + type: "xid8", + nullable: false, + defaultValueSql: "'0'::xid8"); + + migrationBuilder.AddColumn( + name: "partition", + schema: "outbox", + table: "outbox_offsets", + type: "integer", + nullable: false, + defaultValue: 0); + + migrationBuilder.AddColumn( + name: "topic", + schema: "outbox", + table: "outbox_offsets", + type: "character varying(128)", + maxLength: 128, + nullable: false, + defaultValue: ""); + + migrationBuilder.AddColumn( + name: "partition", + schema: "outbox", + table: "outbox_messages", + type: "integer", + nullable: false, + defaultValue: 0); + + migrationBuilder.AddColumn( + name: "transaction_id", + schema: "outbox", + table: "outbox_messages", + type: "xid8", + nullable: false, + defaultValueSql: "pg_current_xact_id()"); + + migrationBuilder.InsertData( + schema: "outbox", + table: "outbox_offsets", + columns: new[] { "id", "available_after", "last_processed_id", "partition", "topic" }, + values: new object[,] + { + { 2, new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), 0, 0, "topic-1" }, + { 3, new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), 0, 1, "topic-1" } + }); + + migrationBuilder.CreateIndex( + name: "ix_outbox_offsets_topic_partition", + schema: "outbox", + table: "outbox_offsets", + columns: new[] { "topic", "partition" }, + unique: true); + + migrationBuilder.CreateIndex( + name: "ix_outbox_messages_topic_partition_transaction_id_id", + schema: "outbox", + table: "outbox_messages", + columns: new[] { "topic", "partition", "transaction_id", "id" }); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "ix_outbox_offsets_topic_partition", + schema: "outbox", + table: "outbox_offsets"); + + migrationBuilder.DropIndex( + name: "ix_outbox_messages_topic_partition_transaction_id_id", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.DeleteData( + schema: "outbox", + table: "outbox_offsets", + keyColumn: "id", + keyValue: 2); + + migrationBuilder.DeleteData( + schema: "outbox", + table: "outbox_offsets", + keyColumn: "id", + keyValue: 3); + + migrationBuilder.DropColumn( + name: "available_after", + schema: "outbox", + table: "outbox_offsets"); + + migrationBuilder.DropColumn( + name: "last_processed_transaction_id", + schema: "outbox", + table: "outbox_offsets"); + + migrationBuilder.DropColumn( + name: "partition", + schema: "outbox", + table: "outbox_offsets"); + + migrationBuilder.DropColumn( + name: "topic", + schema: "outbox", + table: "outbox_offsets"); + + migrationBuilder.DropColumn( + name: "partition", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.DropColumn( + name: "transaction_id", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.AddColumn( + name: "available_after", + schema: "outbox", + table: "outbox_messages", + type: "timestamp with time zone", + nullable: false, + defaultValueSql: "now()"); + + migrationBuilder.InsertData( + schema: "outbox", + table: "outbox_offsets", + columns: new[] { "id", "last_processed_id" }, + values: new object[] { 1, 0 }); + + migrationBuilder.CreateIndex( + name: "ix_outbox_messages_available_after", + schema: "outbox", + table: "outbox_messages", + column: "available_after"); + } + } +} diff --git a/src/Outbox/Migrations/AppDbContextModelSnapshot.cs b/src/Outbox/Migrations/AppDbContextModelSnapshot.cs index bfdaabd..f97703a 100644 --- a/src/Outbox/Migrations/AppDbContextModelSnapshot.cs +++ b/src/Outbox/Migrations/AppDbContextModelSnapshot.cs @@ -33,12 +33,6 @@ protected override void BuildModel(ModelBuilder modelBuilder) NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); - b.Property("AvailableAfter") - .ValueGeneratedOnAdd() - .HasColumnType("timestamp with time zone") - .HasColumnName("available_after") - .HasDefaultValueSql("now()"); - b.Property("CreatedAt") .ValueGeneratedOnAdd() .HasColumnType("timestamp with time zone") @@ -55,6 +49,10 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("character varying(128)") .HasColumnName("key"); + b.Property("Partition") + .HasColumnType("integer") + .HasColumnName("partition"); + b.Property("Payload") .IsRequired() .HasColumnType("jsonb") @@ -66,6 +64,12 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("character varying(128)") .HasColumnName("topic"); + b.Property("TransactionId") + .ValueGeneratedOnAdd() + .HasColumnType("xid8") + .HasColumnName("transaction_id") + .HasDefaultValueSql("pg_current_xact_id()"); + b.Property("Type") .IsRequired() .HasMaxLength(128) @@ -75,8 +79,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("Id") .HasName("pk_outbox_messages"); - b.HasIndex("AvailableAfter") - .HasDatabaseName("ix_outbox_messages_available_after"); + b.HasIndex("Topic", "Partition", "TransactionId", "Id") + .HasDatabaseName("ix_outbox_messages_topic_partition_transaction_id_id"); b.ToTable("outbox_messages", "outbox"); }); @@ -90,20 +94,59 @@ protected override void BuildModel(ModelBuilder modelBuilder) NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + b.Property("AvailableAfter") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasColumnName("available_after") + .HasDefaultValueSql("now()"); + b.Property("LastProcessedId") .HasColumnType("integer") .HasColumnName("last_processed_id"); + b.Property("LastProcessedTransactionId") + .ValueGeneratedOnAdd() + .HasColumnType("xid8") + .HasColumnName("last_processed_transaction_id") + .HasDefaultValueSql("'0'::xid8"); + + b.Property("Partition") + .HasColumnType("integer") + .HasColumnName("partition"); + + b.Property("Topic") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("topic"); + b.HasKey("Id") .HasName("pk_outbox_offsets"); + b.HasIndex("Topic", "Partition") + .IsUnique() + .HasDatabaseName("ix_outbox_offsets_topic_partition"); + b.ToTable("outbox_offsets", "outbox"); b.HasData( new { - Id = 1, - LastProcessedId = 0 + Id = 2, + AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), + LastProcessedId = 0, + LastProcessedTransactionId = 0ul, + Partition = 0, + Topic = "topic-1" + }, + new + { + Id = 3, + AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), + LastProcessedId = 0, + LastProcessedTransactionId = 0ul, + Partition = 1, + Topic = "topic-1" }); }); #pragma warning restore 612, 618 From 358105ea4a794d70e2788a30af11e2d009a11111 Mon Sep 17 00:00:00 2001 From: "d.v.tsvettsikh" Date: Fri, 20 Jun 2025 10:36:40 +0700 Subject: [PATCH 2/3] optimization --- .../BackgroundServices/OutboxBackgroundService.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs index 120adde..43c229b 100644 --- a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs +++ b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs @@ -3,6 +3,7 @@ using LinqToDB; using LinqToDB.DataProvider.PostgreSQL; using LinqToDB.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; using Outbox.Configurations; using Outbox.Entities; @@ -53,8 +54,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task ProcessMessagesAsync(AppDbContext dbContext, CancellationToken cancellationToken) { - //change tracking can be disabled - //dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; + dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var dataConnection = dbContext.CreateLinqToDBConnection(); @@ -92,6 +92,7 @@ private async Task ProcessMessagesAsync(AppDbContext dbContext, Cancellatio var outboxMessages = await dataConnection.GetTable() .Where(x => x.Topic == offset.Topic && x.Partition == offset.Partition && + x.TransactionId >= offset.LastProcessedTransactionId && ( x.TransactionId > offset.LastProcessedTransactionId || x.TransactionId == offset.LastProcessedTransactionId && x.Id > offset.LastProcessedId From 369794b9a59fe9142570391b8d2057e687598e38 Mon Sep 17 00:00:00 2001 From: "d.v.tsvettsikh" Date: Fri, 20 Jun 2025 10:44:27 +0700 Subject: [PATCH 3/3] fix data generation --- generate_data.sql | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/generate_data.sql b/generate_data.sql index 3ffb674..db3c44b 100644 --- a/generate_data.sql +++ b/generate_data.sql @@ -1,7 +1,10 @@ +--SELECT pg_snapshot_xmin(pg_current_snapshot()); must return value > 1_000_000 INSERT INTO outbox_messages -(topic, "key", "type", payload, headers) +(topic, "partition", transaction_id, "key", "type", payload, headers) SELECT - 'topic-' || n.v%2, + 'topic-1', + n.v%2, + n.v::text::xid8, n.v, 'type', '{"byte250":"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"}',