diff --git a/generate_data.sql b/generate_data.sql index db3c44b..29a274c 100644 --- a/generate_data.sql +++ b/generate_data.sql @@ -1,10 +1,8 @@ ---SELECT pg_snapshot_xmin(pg_current_snapshot()); must return value > 1_000_000 INSERT INTO outbox_messages -(topic, "partition", transaction_id, "key", "type", payload, headers) +(topic, "partition", "key", "type", payload, headers) SELECT 'topic-1', n.v%2, - n.v::text::xid8, n.v, 'type', '{"byte250":"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"}', diff --git a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs index 43c229b..80edef9 100644 --- a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs +++ b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs @@ -7,7 +7,6 @@ using Microsoft.Extensions.Options; using Outbox.Configurations; using Outbox.Entities; -using Outbox.WebApi.Linq2db; namespace Outbox.WebApi.BackgroundServices; @@ -92,14 +91,8 @@ private async Task ProcessMessagesAsync(AppDbContext dbContext, Cancellatio var outboxMessages = await dataConnection.GetTable() .Where(x => x.Topic == offset.Topic && x.Partition == offset.Partition && - x.TransactionId >= offset.LastProcessedTransactionId && - ( - x.TransactionId > offset.LastProcessedTransactionId || - x.TransactionId == offset.LastProcessedTransactionId && x.Id > offset.LastProcessedId - ) && - x.TransactionId < PostgreSqlExtensions.MinCurrentTransactionId - ) - .OrderBy(x => x.TransactionId).ThenBy(x => x.Id) + x.Id > offset.LastProcessedId) + .OrderBy(x => x.Id) .Take(_outboxOptions.Value.BatchSize) .ToArrayAsyncLinqToDB(cancellationToken); @@ -119,7 +112,6 @@ await dataConnection.GetTable() await dataConnection.GetTable() .Where(x => x.Id == offset.Id) .Set(x => x.LastProcessedId, lastMessage.Id) - .Set(x => x.LastProcessedTransactionId, lastMessage.TransactionId) .Set(x => x.AvailableAfter, DateTimeOffset.UtcNow) .UpdateAsync(cancellationToken); diff --git a/src/Outbox.WebApi/EFCore/IOutboxMessageContext.cs b/src/Outbox.WebApi/EFCore/IOutboxMessageContext.cs new file mode 100644 index 0000000..c5c029f --- /dev/null +++ b/src/Outbox.WebApi/EFCore/IOutboxMessageContext.cs @@ -0,0 +1,57 @@ +using System.Data.Common; +using System.Text.Json; +using Npgsql; +using NpgsqlTypes; +using Outbox.Entities; + +namespace Outbox.WebApi.EFCore; + +public interface IOutboxMessageContext +{ + void Add(OutboxMessage message); + Task SaveChangesAsync(DbTransaction transaction, CancellationToken ct); +} + +public class OutboxMessageContext : IOutboxMessageContext +{ + private readonly IOutboxMessagesProcessor _outboxMessagesProcessor; + private readonly List _messages = new(); + + public OutboxMessageContext(IOutboxMessagesProcessor outboxMessagesProcessor) => + _outboxMessagesProcessor = outboxMessagesProcessor; + + public void Add(OutboxMessage message) + { + _messages.Add(message); + } + + public async Task SaveChangesAsync(DbTransaction transaction, CancellationToken ct) + { + if (!_messages.Any()) return; + + await using var batch = new NpgsqlBatch(transaction.Connection as NpgsqlConnection, transaction as NpgsqlTransaction); + + var lockCommand = batch.CreateBatchCommand(); + //LOCK TABLE outbox.outbox_messages IN ROW EXCLUSIVE MODE; --alternative to advisory lock + lockCommand.CommandText = "SELECT pg_advisory_xact_lock(554738281823524)"; + batch.BatchCommands.Add(lockCommand); + + foreach (var message in _messages) + { + var command = batch.CreateBatchCommand(); + command.CommandText = "insert into outbox.outbox_messages(topic, partition, type, key, payload, headers) values (@topic, @partition, @type, @key, @payload, @headers)"; + command.Parameters.AddWithValue("@topic", message.Topic); + command.Parameters.AddWithValue("@partition", message.Partition); + command.Parameters.AddWithValue("@type", message.Type); + command.Parameters.AddWithValue("@key", message.Key); + command.Parameters.AddWithValue("@payload", NpgsqlDbType.Jsonb, message.Payload); + command.Parameters.AddWithValue("@headers", NpgsqlDbType.Jsonb, JsonSerializer.Serialize(message.Headers)); + + batch.BatchCommands.Add(command); + } + + await batch.ExecuteNonQueryAsync(ct); + + _outboxMessagesProcessor.NewMessagesPersisted(_messages.First().Topic, _messages.First().Partition); + } +} \ No newline at end of file diff --git a/src/Outbox.WebApi/EFCore/OutboxInterceptor.cs b/src/Outbox.WebApi/EFCore/OutboxInterceptor.cs deleted file mode 100644 index 08b3cbb..0000000 --- a/src/Outbox.WebApi/EFCore/OutboxInterceptor.cs +++ /dev/null @@ -1,82 +0,0 @@ -using System.Collections.Concurrent; -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Diagnostics; -using Outbox.Entities; - -namespace Outbox.WebApi.EFCore; - -public class OutboxInterceptor : ISaveChangesInterceptor -{ - private readonly IOutboxMessagesProcessor _outboxMessagesProcessor; - - public OutboxInterceptor(IOutboxMessagesProcessor outboxMessagesProcessor) => - _outboxMessagesProcessor = outboxMessagesProcessor; - - private readonly ConcurrentDictionary _contextsWithOutboxMessages = new(); - - private void OnSavingChanges(DbContext context) - { - var message = context.ChangeTracker.Entries() - .FirstOrDefault(x => x.State == EntityState.Added); - if (message != null) _contextsWithOutboxMessages.TryAdd(context, (message.Entity.Topic, message.Entity.Partition)); - } - - private void OnSaveChanges(DbContext context) - { - if (_contextsWithOutboxMessages.ContainsKey(context)) - { - if (_contextsWithOutboxMessages.TryRemove(context, out var item)) - { - _outboxMessagesProcessor.NewMessagesPersisted(item.Topic, item.Partition); - } - } - } - - public InterceptionResult SavingChanges(DbContextEventData eventData, InterceptionResult result) - { - if (eventData.Context != null) - OnSavingChanges(eventData.Context); - - return result; - } - - public int SavedChanges(SaveChangesCompletedEventData eventData, int result) - { - if (eventData.Context != null) - OnSaveChanges(eventData.Context); - - return result; - } - - public void SaveChangesFailed(DbContextErrorEventData eventData) - { - if (eventData.Context != null) - OnSaveChanges(eventData.Context); - } - - public ValueTask> SavingChangesAsync(DbContextEventData eventData, InterceptionResult result, - CancellationToken cancellationToken = new()) - { - if (eventData.Context != null) - OnSavingChanges(eventData.Context); - return ValueTask.FromResult(result); - } - - public ValueTask SavedChangesAsync(SaveChangesCompletedEventData eventData, int result, - CancellationToken cancellationToken = new()) - { - if (eventData.Context != null) - OnSaveChanges(eventData.Context); - - return ValueTask.FromResult(result); - } - - public Task SaveChangesFailedAsync(DbContextErrorEventData eventData, - CancellationToken cancellationToken = new()) - { - if (eventData.Context != null) - OnSaveChanges(eventData.Context); - - return Task.CompletedTask; - } -} \ No newline at end of file diff --git a/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs b/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs index 432a854..afc05b6 100644 --- a/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs +++ b/src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs @@ -26,9 +26,6 @@ public override MappingSchema CreateMappingSchema(IModel model, IMetadataReader? { var result = base.CreateMappingSchema(model, metadataReader, convertorSelector, dataOptions); - //default parameter type for ulong is decimal, it produces an error on mapping to xid8 - result.SetConvertExpression(value => new DataParameter(null, value, DataType.UInt64)); - result.SetConverter>(str => JsonSerializer.Deserialize>(str) ?? new Dictionary()); result.SetConverter, string>(dict => JsonSerializer.Serialize(dict)); diff --git a/src/Outbox.WebApi/Linq2db/PostgreSqlExtensions.cs b/src/Outbox.WebApi/Linq2db/PostgreSqlExtensions.cs deleted file mode 100644 index a95c7b3..0000000 --- a/src/Outbox.WebApi/Linq2db/PostgreSqlExtensions.cs +++ /dev/null @@ -1,9 +0,0 @@ -using LinqToDB; - -namespace Outbox.WebApi.Linq2db; - -public static class PostgreSqlExtensions -{ - [Sql.Expression(ProviderName.PostgreSQL, "pg_snapshot_xmin(pg_current_snapshot())", ServerSideOnly = true)] - public static ulong MinCurrentTransactionId => throw new NotImplementedException(); -} \ No newline at end of file diff --git a/src/Outbox.WebApi/Program.cs b/src/Outbox.WebApi/Program.cs index 12185e1..d6cf090 100644 --- a/src/Outbox.WebApi/Program.cs +++ b/src/Outbox.WebApi/Program.cs @@ -3,6 +3,7 @@ using EFCore.MigrationExtensions.PostgreSQL; using LinqToDB.EntityFrameworkCore; using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; using Npgsql; using Outbox; using Outbox.Configurations; @@ -26,18 +27,19 @@ dataSource, options => options.MigrationsHistoryTable("_migrations", "outbox")) .UseSnakeCaseNamingConvention() - .AddInterceptors(new ForUpdateInterceptor(), serviceProvider.GetRequiredService()); + .AddInterceptors(new ForUpdateInterceptor()); optionsBuilder.UseSqlObjects(); }); +builder.Services.AddScoped(); + LinqToDBForEFTools.Implementation = new OutboxLinqToDBForEFToolsImpl(builder.Configuration.GetConnectionString("Outbox")!); LinqToDBForEFTools.Initialize(); builder.Services.AddKafkaClient(); builder.Services.Configure(builder.Configuration.GetSection("Outbox")); -builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddHostedService(sp => sp.GetRequiredService()); @@ -57,7 +59,7 @@ app.UseHttpsRedirection(); -app.MapPost("/messages", async (CreateMessageDto dto, AppDbContext dbContext, CancellationToken ct) => +app.MapPost("/messages", async (CreateMessageDto dto, AppDbContext dbContext, IOutboxMessageContext outboxMessageContext, CancellationToken ct) => { var activityContext = Activity.Current?.Context; var message = new OutboxMessage @@ -69,10 +71,15 @@ Payload = dto.Payload, Headers = activityContext.GetHeaders() }; - - dbContext.OutboxMessages.Add(message); + outboxMessageContext.Add(message); + + var transaction = await dbContext.Database.BeginTransactionAsync(ct); - await dbContext.SaveChangesAsync(ct); + await dbContext.SaveChangesAsync(ct); //save business entities + + await outboxMessageContext.SaveChangesAsync(transaction.GetDbTransaction(), ct); + + await transaction.CommitAsync(ct); }) .WithName("CreateMessage"); diff --git a/src/Outbox/Entities/OutboxMessage.cs b/src/Outbox/Entities/OutboxMessage.cs index 5920293..e24e548 100644 --- a/src/Outbox/Entities/OutboxMessage.cs +++ b/src/Outbox/Entities/OutboxMessage.cs @@ -3,7 +3,6 @@ public class OutboxMessage { public int Id { get; set; } - public ulong TransactionId { get; set; } public string Topic { get; set; } = null!; public int Partition { get; set; } public string? Key { get; set; } diff --git a/src/Outbox/Entities/OutboxOffset.cs b/src/Outbox/Entities/OutboxOffset.cs index 615251f..1b2d78f 100644 --- a/src/Outbox/Entities/OutboxOffset.cs +++ b/src/Outbox/Entities/OutboxOffset.cs @@ -6,7 +6,6 @@ public class OutboxOffset public string Topic { get; set; } = null!; public int Partition { get; set; } public int LastProcessedId { get; set; } - public ulong LastProcessedTransactionId { get; set; } public DateTimeOffset AvailableAfter { get; set; } } diff --git a/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs b/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs index 749273a..cf5b9df 100644 --- a/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs +++ b/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs @@ -9,8 +9,7 @@ public class OutboxMessageEntityTypeConfiguration : IEntityTypeConfiguration builder) { builder.HasKey(x => x.Id); - - builder.Property(x => x.TransactionId).HasDefaultValueSql("pg_current_xact_id()").HasColumnType("xid8"); + builder.Property(x => x.Payload).HasColumnType("jsonb"); builder.Property(x => x.Headers).HasColumnType("jsonb"); builder.Property(x => x.Key).HasMaxLength(128); @@ -18,6 +17,6 @@ public void Configure(EntityTypeBuilder builder) builder.Property(x => x.Topic).HasMaxLength(128); builder.Property(x => x.CreatedAt).HasDefaultValueSql("now()"); - builder.HasIndex(x => new {x.Topic, x.Partition, x.TransactionId, x.Id}); + builder.HasIndex(x => new {x.Topic, x.Partition, x.Id}); } } \ No newline at end of file diff --git a/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs b/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs index 1a9f6a0..bede6da 100644 --- a/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs +++ b/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs @@ -9,7 +9,6 @@ public class OutboxOffsetEntityTypeConfiguration : IEntityTypeConfiguration builder) { builder.Property(x => x.Topic).HasMaxLength(128); - builder.Property(e => e.LastProcessedTransactionId).HasColumnType("xid8").HasDefaultValueSql("'0'::xid8"); builder.Property(x => x.AvailableAfter).HasDefaultValueSql("now()"); builder.HasIndex(x => new {x.Topic, x.Partition}).IsUnique(); @@ -20,7 +19,6 @@ public void Configure(EntityTypeBuilder builder) Topic = "topic-1", Partition = 0, LastProcessedId = 0, - LastProcessedTransactionId = 0, AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00") }, new OutboxOffset { @@ -28,7 +26,6 @@ public void Configure(EntityTypeBuilder builder) Topic = "topic-1", Partition = 1, LastProcessedId = 0, - LastProcessedTransactionId = 0, AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00") }); } diff --git a/src/Outbox/Migrations/20250519025811_RemoveTransactionId.Designer.cs b/src/Outbox/Migrations/20250519025811_RemoveTransactionId.Designer.cs new file mode 100644 index 0000000..6d0e33c --- /dev/null +++ b/src/Outbox/Migrations/20250519025811_RemoveTransactionId.Designer.cs @@ -0,0 +1,144 @@ +// +using System; +using System.Collections.Generic; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; +using Outbox; + +#nullable disable + +namespace Outbox.Migrations +{ + [DbContext(typeof(AppDbContext))] + [Migration("20250519025811_RemoveTransactionId")] + partial class RemoveTransactionId + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("outbox") + .HasAnnotation("ProductVersion", "9.0.3") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("Outbox.Entities.OutboxMessage", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("CreatedAt") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasColumnName("created_at") + .HasDefaultValueSql("now()"); + + b.Property>("Headers") + .IsRequired() + .HasColumnType("jsonb") + .HasColumnName("headers"); + + b.Property("Key") + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("key"); + + b.Property("Partition") + .HasColumnType("integer") + .HasColumnName("partition"); + + b.Property("Payload") + .IsRequired() + .HasColumnType("jsonb") + .HasColumnName("payload"); + + b.Property("Topic") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("topic"); + + b.Property("Type") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("type"); + + b.HasKey("Id") + .HasName("pk_outbox_messages"); + + b.HasIndex("Topic", "Partition", "Id") + .HasDatabaseName("ix_outbox_messages_topic_partition_id"); + + b.ToTable("outbox_messages", "outbox"); + }); + + modelBuilder.Entity("Outbox.Entities.OutboxOffset", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("AvailableAfter") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasColumnName("available_after") + .HasDefaultValueSql("now()"); + + b.Property("LastProcessedId") + .HasColumnType("integer") + .HasColumnName("last_processed_id"); + + b.Property("Partition") + .HasColumnType("integer") + .HasColumnName("partition"); + + b.Property("Topic") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("topic"); + + b.HasKey("Id") + .HasName("pk_outbox_offsets"); + + b.HasIndex("Topic", "Partition") + .IsUnique() + .HasDatabaseName("ix_outbox_offsets_topic_partition"); + + b.ToTable("outbox_offsets", "outbox"); + + b.HasData( + new + { + Id = 2, + AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), + LastProcessedId = 0, + Partition = 0, + Topic = "topic-1" + }, + new + { + Id = 3, + AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), + LastProcessedId = 0, + Partition = 1, + Topic = "topic-1" + }); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/Outbox/Migrations/20250519025811_RemoveTransactionId.cs b/src/Outbox/Migrations/20250519025811_RemoveTransactionId.cs new file mode 100644 index 0000000..533a524 --- /dev/null +++ b/src/Outbox/Migrations/20250519025811_RemoveTransactionId.cs @@ -0,0 +1,82 @@ +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace Outbox.Migrations +{ + /// + public partial class RemoveTransactionId : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "ix_outbox_messages_topic_partition_transaction_id_id", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.DropColumn( + name: "last_processed_transaction_id", + schema: "outbox", + table: "outbox_offsets"); + + migrationBuilder.DropColumn( + name: "transaction_id", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.CreateIndex( + name: "ix_outbox_messages_topic_partition_id", + schema: "outbox", + table: "outbox_messages", + columns: new[] { "topic", "partition", "id" }); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "ix_outbox_messages_topic_partition_id", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.AddColumn( + name: "last_processed_transaction_id", + schema: "outbox", + table: "outbox_offsets", + type: "xid8", + nullable: false, + defaultValueSql: "'0'::xid8"); + + migrationBuilder.AddColumn( + name: "transaction_id", + schema: "outbox", + table: "outbox_messages", + type: "xid8", + nullable: false, + defaultValueSql: "pg_current_xact_id()"); + + migrationBuilder.UpdateData( + schema: "outbox", + table: "outbox_offsets", + keyColumn: "id", + keyValue: 2, + columns: new string[0], + values: new object[0]); + + migrationBuilder.UpdateData( + schema: "outbox", + table: "outbox_offsets", + keyColumn: "id", + keyValue: 3, + columns: new string[0], + values: new object[0]); + + migrationBuilder.CreateIndex( + name: "ix_outbox_messages_topic_partition_transaction_id_id", + schema: "outbox", + table: "outbox_messages", + columns: new[] { "topic", "partition", "transaction_id", "id" }); + } + } +} diff --git a/src/Outbox/Migrations/AppDbContextModelSnapshot.cs b/src/Outbox/Migrations/AppDbContextModelSnapshot.cs index f97703a..7338d08 100644 --- a/src/Outbox/Migrations/AppDbContextModelSnapshot.cs +++ b/src/Outbox/Migrations/AppDbContextModelSnapshot.cs @@ -64,12 +64,6 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("character varying(128)") .HasColumnName("topic"); - b.Property("TransactionId") - .ValueGeneratedOnAdd() - .HasColumnType("xid8") - .HasColumnName("transaction_id") - .HasDefaultValueSql("pg_current_xact_id()"); - b.Property("Type") .IsRequired() .HasMaxLength(128) @@ -79,8 +73,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("Id") .HasName("pk_outbox_messages"); - b.HasIndex("Topic", "Partition", "TransactionId", "Id") - .HasDatabaseName("ix_outbox_messages_topic_partition_transaction_id_id"); + b.HasIndex("Topic", "Partition", "Id") + .HasDatabaseName("ix_outbox_messages_topic_partition_id"); b.ToTable("outbox_messages", "outbox"); }); @@ -104,12 +98,6 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("integer") .HasColumnName("last_processed_id"); - b.Property("LastProcessedTransactionId") - .ValueGeneratedOnAdd() - .HasColumnType("xid8") - .HasColumnName("last_processed_transaction_id") - .HasDefaultValueSql("'0'::xid8"); - b.Property("Partition") .HasColumnType("integer") .HasColumnName("partition"); @@ -135,7 +123,6 @@ protected override void BuildModel(ModelBuilder modelBuilder) Id = 2, AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), LastProcessedId = 0, - LastProcessedTransactionId = 0ul, Partition = 0, Topic = "topic-1" }, @@ -144,7 +131,6 @@ protected override void BuildModel(ModelBuilder modelBuilder) Id = 3, AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), LastProcessedId = 0, - LastProcessedTransactionId = 0ul, Partition = 1, Topic = "topic-1" });