From 1a489636a5bb69a2808c32b77336d9b910354577 Mon Sep 17 00:00:00 2001 From: "d.v.tsvettsikh" Date: Mon, 19 May 2025 13:25:49 +0700 Subject: [PATCH] 6 no missing values in partition numbers --- .../OutboxBackgroundService.cs | 4 +- .../EFCore/DbDesignTimeServices.cs | 7 + .../EFCore/DesignTimeDbContextFactory.cs | 23 ++ .../EFCore/IOutboxMessageContext.cs | 6 +- src/Outbox/AppDbContext.cs | 4 + src/Outbox/Entities/OutboxMessage.cs | 1 + src/Outbox/Entities/OutboxOffset.cs | 2 +- src/Outbox/Entities/OutboxOffsetSequence.cs | 9 + .../OutboxMessageEntityTypeConfiguration.cs | 2 +- .../OutboxOffsetEntityTypeConfiguration.cs | 4 +- ...oxOffsetSequenceEntityTypeConfiguration.cs | 25 +++ ...50519062044_AddPartitionNumber.Designer.cs | 202 ++++++++++++++++++ .../20250519062044_AddPartitionNumber.cs | 109 ++++++++++ .../Migrations/AppDbContextModelSnapshot.cs | 70 +++++- src/Outbox/Outbox.csproj | 5 + src/Outbox/Sql/insert_outbox_message.sql | 15 ++ 16 files changed, 473 insertions(+), 15 deletions(-) create mode 100644 src/Outbox.WebApi/EFCore/DbDesignTimeServices.cs create mode 100644 src/Outbox.WebApi/EFCore/DesignTimeDbContextFactory.cs create mode 100644 src/Outbox/Entities/OutboxOffsetSequence.cs create mode 100644 src/Outbox/EntityTypeConfigurations/OutboxOffsetSequenceEntityTypeConfiguration.cs create mode 100644 src/Outbox/Migrations/20250519062044_AddPartitionNumber.Designer.cs create mode 100644 src/Outbox/Migrations/20250519062044_AddPartitionNumber.cs create mode 100644 src/Outbox/Sql/insert_outbox_message.sql diff --git a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs index 968b8ef..8dad11a 100644 --- a/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs +++ b/src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs @@ -91,7 +91,7 @@ private async Task ProcessMessagesAsync(AppDbContext dbContext, Cancellatio var outboxMessages = await dataConnection.GetTable() .Where(x => x.Topic == offset.Topic && x.Partition == offset.Partition && - x.Id > offset.LastProcessedId) + x.Number > offset.LastProcessedNumber) .OrderBy(x => x.Id) .Take(_outboxOptions.Value.BatchSize) .ToArrayAsyncLinqToDB(cancellationToken); @@ -111,7 +111,7 @@ await dataConnection.GetTable() var lastMessage = outboxMessages.Last(); await dataConnection.GetTable() .Where(x => x.Id == offset.Id) - .Set(x => x.LastProcessedId, lastMessage.Id) + .Set(x => x.LastProcessedNumber, lastMessage.Number) .Set(x => x.AvailableAfter, DateTimeOffset.UtcNow) .UpdateAsync(cancellationToken); diff --git a/src/Outbox.WebApi/EFCore/DbDesignTimeServices.cs b/src/Outbox.WebApi/EFCore/DbDesignTimeServices.cs new file mode 100644 index 0000000..96a9383 --- /dev/null +++ b/src/Outbox.WebApi/EFCore/DbDesignTimeServices.cs @@ -0,0 +1,7 @@ +using EFCore.MigrationExtensions.PostgreSQL; + +namespace Outbox.WebApi.EFCore; + +public class DbDesignTimeServices : CustomNpgsqlDesignTimeServices +{ +} \ No newline at end of file diff --git a/src/Outbox.WebApi/EFCore/DesignTimeDbContextFactory.cs b/src/Outbox.WebApi/EFCore/DesignTimeDbContextFactory.cs new file mode 100644 index 0000000..449bd03 --- /dev/null +++ b/src/Outbox.WebApi/EFCore/DesignTimeDbContextFactory.cs @@ -0,0 +1,23 @@ +using EFCore.MigrationExtensions.PostgreSQL; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Design; + +namespace Outbox.WebApi.EFCore; + +public class DesignTimeDbContextFactory : IDesignTimeDbContextFactory +{ + public AppDbContext CreateDbContext(string[] args) + { + Console.WriteLine("Using DesignTimeDbContextFactory"); + + var builder = new DbContextOptionsBuilder(); + + builder.UseSqlObjects(); + builder.UseSnakeCaseNamingConvention(); + + builder.UseNpgsql("Host=localhost;Port=5432;Database=outbox;Username=postgres;Password=postgres;", + options => options.MigrationsHistoryTable("_migrations", "outbox")); + + return new AppDbContext(builder.Options); + } +} \ No newline at end of file diff --git a/src/Outbox.WebApi/EFCore/IOutboxMessageContext.cs b/src/Outbox.WebApi/EFCore/IOutboxMessageContext.cs index c5c029f..ba5af05 100644 --- a/src/Outbox.WebApi/EFCore/IOutboxMessageContext.cs +++ b/src/Outbox.WebApi/EFCore/IOutboxMessageContext.cs @@ -39,9 +39,9 @@ public async Task SaveChangesAsync(DbTransaction transaction, CancellationToken foreach (var message in _messages) { var command = batch.CreateBatchCommand(); - command.CommandText = "insert into outbox.outbox_messages(topic, partition, type, key, payload, headers) values (@topic, @partition, @type, @key, @payload, @headers)"; - command.Parameters.AddWithValue("@topic", message.Topic); - command.Parameters.AddWithValue("@partition", message.Partition); + command.CommandText = "call outbox.insert_outbox_message(@ptopic, @ppartition, @type, @key, @payload, @headers)"; + command.Parameters.AddWithValue("@ptopic", message.Topic); + command.Parameters.AddWithValue("@ppartition", message.Partition); command.Parameters.AddWithValue("@type", message.Type); command.Parameters.AddWithValue("@key", message.Key); command.Parameters.AddWithValue("@payload", NpgsqlDbType.Jsonb, message.Payload); diff --git a/src/Outbox/AppDbContext.cs b/src/Outbox/AppDbContext.cs index fdd799b..c26a364 100644 --- a/src/Outbox/AppDbContext.cs +++ b/src/Outbox/AppDbContext.cs @@ -1,3 +1,4 @@ +using EFCore.MigrationExtensions.SqlObjects; using Microsoft.EntityFrameworkCore; using Outbox.Entities; @@ -11,11 +12,14 @@ public AppDbContext(DbContextOptions options) : base(options) public DbSet OutboxMessages { get; set; } public DbSet OutboxOffsets { get; set; } + public DbSet OutboxOffsetSequences { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.HasDefaultSchema("outbox"); modelBuilder.ApplyConfigurationsFromAssembly(GetType().Assembly); + + modelBuilder.AddSqlObjects(folder: "Sql", assembly: GetType().Assembly); } } \ No newline at end of file diff --git a/src/Outbox/Entities/OutboxMessage.cs b/src/Outbox/Entities/OutboxMessage.cs index e24e548..b162aec 100644 --- a/src/Outbox/Entities/OutboxMessage.cs +++ b/src/Outbox/Entities/OutboxMessage.cs @@ -5,6 +5,7 @@ public class OutboxMessage public int Id { get; set; } public string Topic { get; set; } = null!; public int Partition { get; set; } + public int Number { get; set; } public string? Key { get; set; } public string Type { get; set; } = null!; //metadata public string Payload { get; set; } = null!; diff --git a/src/Outbox/Entities/OutboxOffset.cs b/src/Outbox/Entities/OutboxOffset.cs index 1b2d78f..4666488 100644 --- a/src/Outbox/Entities/OutboxOffset.cs +++ b/src/Outbox/Entities/OutboxOffset.cs @@ -5,7 +5,7 @@ public class OutboxOffset public int Id { get; set; } public string Topic { get; set; } = null!; public int Partition { get; set; } - public int LastProcessedId { get; set; } + public int LastProcessedNumber { get; set; } public DateTimeOffset AvailableAfter { get; set; } } diff --git a/src/Outbox/Entities/OutboxOffsetSequence.cs b/src/Outbox/Entities/OutboxOffsetSequence.cs new file mode 100644 index 0000000..a3982aa --- /dev/null +++ b/src/Outbox/Entities/OutboxOffsetSequence.cs @@ -0,0 +1,9 @@ +namespace Outbox.Entities; + +public class OutboxOffsetSequence +{ + public int Id { get; set; } + public string Topic { get; set; } = null!; + public int Partition { get; set; } + public int Value { get; set; } +} \ No newline at end of file diff --git a/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs b/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs index cf5b9df..f68e454 100644 --- a/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs +++ b/src/Outbox/EntityTypeConfigurations/OutboxMessageEntityTypeConfiguration.cs @@ -17,6 +17,6 @@ public void Configure(EntityTypeBuilder builder) builder.Property(x => x.Topic).HasMaxLength(128); builder.Property(x => x.CreatedAt).HasDefaultValueSql("now()"); - builder.HasIndex(x => new {x.Topic, x.Partition, x.Id}); + builder.HasIndex(x => new {x.Topic, x.Partition, x.Number}); } } \ No newline at end of file diff --git a/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs b/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs index bede6da..b46c8c1 100644 --- a/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs +++ b/src/Outbox/EntityTypeConfigurations/OutboxOffsetEntityTypeConfiguration.cs @@ -18,14 +18,14 @@ public void Configure(EntityTypeBuilder builder) Id = 2, Topic = "topic-1", Partition = 0, - LastProcessedId = 0, + LastProcessedNumber = 0, AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00") }, new OutboxOffset { Id = 3, Topic = "topic-1", Partition = 1, - LastProcessedId = 0, + LastProcessedNumber = 0, AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00") }); } diff --git a/src/Outbox/EntityTypeConfigurations/OutboxOffsetSequenceEntityTypeConfiguration.cs b/src/Outbox/EntityTypeConfigurations/OutboxOffsetSequenceEntityTypeConfiguration.cs new file mode 100644 index 0000000..856a0d4 --- /dev/null +++ b/src/Outbox/EntityTypeConfigurations/OutboxOffsetSequenceEntityTypeConfiguration.cs @@ -0,0 +1,25 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; +using Outbox.Entities; + +namespace Outbox.EntityTypeConfigurations; + +public class OutboxOffsetSequenceEntityTypeConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.Property(x => x.Topic).HasMaxLength(128); + + builder.HasData(new OutboxOffsetSequence + { + Id = 1, + Topic = "topic-1", + Partition = 0, + },new OutboxOffsetSequence + { + Id = 2, + Topic = "topic-1", + Partition = 1 + }); + } +} diff --git a/src/Outbox/Migrations/20250519062044_AddPartitionNumber.Designer.cs b/src/Outbox/Migrations/20250519062044_AddPartitionNumber.Designer.cs new file mode 100644 index 0000000..92405d6 --- /dev/null +++ b/src/Outbox/Migrations/20250519062044_AddPartitionNumber.Designer.cs @@ -0,0 +1,202 @@ +// +using System; +using System.Collections.Generic; +using EFCore.MigrationExtensions.SqlObjects; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; +using Outbox; + +#nullable disable + +namespace Outbox.Migrations +{ + [DbContext(typeof(AppDbContext))] + [Migration("20250519062044_AddPartitionNumber")] + partial class AddPartitionNumber + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder.Model.AddSqlObjects(new SqlObject[] + { + new("insert_outbox_message.sql", "CREATE OR REPLACE PROCEDURE outbox.insert_outbox_message(IN ptopic character varying, IN ppartition integer, IN type character varying, IN key character varying, IN payload jsonb, IN headers jsonb)\n LANGUAGE plpgsql\nAS $procedure$\nDECLARE\ncounter int;\nBEGIN\nupdate outbox.outbox_offset_sequences\nset value = value + 1\nwhere topic = ptopic and partition = ppartition\nreturning value into counter;\ninsert into outbox.outbox_messages(topic, partition, type, key, payload, headers, number)\nvalues (ptopic, ppartition, type, key, payload, headers, counter);\nEND;\n$procedure$\n;\n") + { + Order = 2147483647 + }, + }); + + modelBuilder + .HasDefaultSchema("outbox") + .HasAnnotation("ProductVersion", "9.0.3") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("Outbox.Entities.OutboxMessage", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("CreatedAt") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasColumnName("created_at") + .HasDefaultValueSql("now()"); + + b.Property>("Headers") + .IsRequired() + .HasColumnType("jsonb") + .HasColumnName("headers"); + + b.Property("Key") + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("key"); + + b.Property("Number") + .HasColumnType("integer") + .HasColumnName("number"); + + b.Property("Partition") + .HasColumnType("integer") + .HasColumnName("partition"); + + b.Property("Payload") + .IsRequired() + .HasColumnType("jsonb") + .HasColumnName("payload"); + + b.Property("Topic") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("topic"); + + b.Property("Type") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("type"); + + b.HasKey("Id") + .HasName("pk_outbox_messages"); + + b.HasIndex("Topic", "Partition", "Number") + .HasDatabaseName("ix_outbox_messages_topic_partition_number"); + + b.ToTable("outbox_messages", "outbox"); + }); + + modelBuilder.Entity("Outbox.Entities.OutboxOffset", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("AvailableAfter") + .ValueGeneratedOnAdd() + .HasColumnType("timestamp with time zone") + .HasColumnName("available_after") + .HasDefaultValueSql("now()"); + + b.Property("LastProcessedNumber") + .HasColumnType("integer") + .HasColumnName("last_processed_number"); + + b.Property("Partition") + .HasColumnType("integer") + .HasColumnName("partition"); + + b.Property("Topic") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("topic"); + + b.HasKey("Id") + .HasName("pk_outbox_offsets"); + + b.HasIndex("Topic", "Partition") + .IsUnique() + .HasDatabaseName("ix_outbox_offsets_topic_partition"); + + b.ToTable("outbox_offsets", "outbox"); + + b.HasData( + new + { + Id = 2, + AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), + LastProcessedNumber = 0, + Partition = 0, + Topic = "topic-1" + }, + new + { + Id = 3, + AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), + LastProcessedNumber = 0, + Partition = 1, + Topic = "topic-1" + }); + }); + + modelBuilder.Entity("Outbox.Entities.OutboxOffsetSequence", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("Partition") + .HasColumnType("integer") + .HasColumnName("partition"); + + b.Property("Topic") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("topic"); + + b.Property("Value") + .HasColumnType("integer") + .HasColumnName("value"); + + b.HasKey("Id") + .HasName("pk_outbox_offset_sequences"); + + b.ToTable("outbox_offset_sequences", "outbox"); + + b.HasData( + new + { + Id = 1, + Partition = 0, + Topic = "topic-1", + Value = 0 + }, + new + { + Id = 2, + Partition = 1, + Topic = "topic-1", + Value = 0 + }); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/Outbox/Migrations/20250519062044_AddPartitionNumber.cs b/src/Outbox/Migrations/20250519062044_AddPartitionNumber.cs new file mode 100644 index 0000000..8015199 --- /dev/null +++ b/src/Outbox/Migrations/20250519062044_AddPartitionNumber.cs @@ -0,0 +1,109 @@ +using EFCore.MigrationExtensions.SqlObjects; +using Microsoft.EntityFrameworkCore.Migrations; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +#nullable disable + +#pragma warning disable CA1814 // Prefer jagged arrays over multidimensional + +namespace Outbox.Migrations +{ + /// + public partial class AddPartitionNumber : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "ix_outbox_messages_topic_partition_id", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.RenameColumn( + name: "last_processed_id", + schema: "outbox", + table: "outbox_offsets", + newName: "last_processed_number"); + + migrationBuilder.AddColumn( + name: "number", + schema: "outbox", + table: "outbox_messages", + type: "integer", + nullable: false, + defaultValue: 0); + + migrationBuilder.CreateTable( + name: "outbox_offset_sequences", + schema: "outbox", + columns: table => new + { + id = table.Column(type: "integer", nullable: false) + .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn), + topic = table.Column(type: "character varying(128)", maxLength: 128, nullable: false), + partition = table.Column(type: "integer", nullable: false), + value = table.Column(type: "integer", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("pk_outbox_offset_sequences", x => x.id); + }); + + migrationBuilder.InsertData( + schema: "outbox", + table: "outbox_offset_sequences", + columns: new[] { "id", "partition", "topic", "value" }, + values: new object[,] + { + { 1, 0, "topic-1", 0 }, + { 2, 1, "topic-1", 0 } + }); + + migrationBuilder.CreateIndex( + name: "ix_outbox_messages_topic_partition_number", + schema: "outbox", + table: "outbox_messages", + columns: new[] { "topic", "partition", "number" }); + + migrationBuilder.CreateOrUpdateSqlObject( + name: "insert_outbox_message.sql", + sqlCode: "CREATE OR REPLACE PROCEDURE outbox.insert_outbox_message(IN ptopic character varying, IN ppartition integer, IN type character varying, IN key character varying, IN payload jsonb, IN headers jsonb)\n LANGUAGE plpgsql\nAS $procedure$\nDECLARE\ncounter int;\nBEGIN\nupdate outbox.outbox_offset_sequences\nset value = value + 1\nwhere topic = ptopic and partition = ppartition\nreturning value into counter;\ninsert into outbox.outbox_messages(topic, partition, type, key, payload, headers, number)\nvalues (ptopic, ppartition, type, key, payload, headers, counter);\nEND;\n$procedure$\n;\n", + order: 2147483647); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropSqlObject( + name: "insert_outbox_message.sql", + sqlCode: "DROP PROCEDURE outbox.insert_outbox_message", + order: 2147483647); + + migrationBuilder.DropTable( + name: "outbox_offset_sequences", + schema: "outbox"); + + migrationBuilder.DropIndex( + name: "ix_outbox_messages_topic_partition_number", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.DropColumn( + name: "number", + schema: "outbox", + table: "outbox_messages"); + + migrationBuilder.RenameColumn( + name: "last_processed_number", + schema: "outbox", + table: "outbox_offsets", + newName: "last_processed_id"); + + migrationBuilder.CreateIndex( + name: "ix_outbox_messages_topic_partition_id", + schema: "outbox", + table: "outbox_messages", + columns: new[] { "topic", "partition", "id" }); + } + } +} diff --git a/src/Outbox/Migrations/AppDbContextModelSnapshot.cs b/src/Outbox/Migrations/AppDbContextModelSnapshot.cs index 7338d08..4b9e6d1 100644 --- a/src/Outbox/Migrations/AppDbContextModelSnapshot.cs +++ b/src/Outbox/Migrations/AppDbContextModelSnapshot.cs @@ -1,6 +1,7 @@ // using System; using System.Collections.Generic; +using EFCore.MigrationExtensions.SqlObjects; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Storage.ValueConversion; @@ -17,6 +18,14 @@ partial class AppDbContextModelSnapshot : ModelSnapshot protected override void BuildModel(ModelBuilder modelBuilder) { #pragma warning disable 612, 618 + modelBuilder.Model.AddSqlObjects(new SqlObject[] + { + new("insert_outbox_message.sql", "CREATE OR REPLACE PROCEDURE outbox.insert_outbox_message(IN ptopic character varying, IN ppartition integer, IN type character varying, IN key character varying, IN payload jsonb, IN headers jsonb)\n LANGUAGE plpgsql\nAS $procedure$\nDECLARE\ncounter int;\nBEGIN\nupdate outbox.outbox_offset_sequences\nset value = value + 1\nwhere topic = ptopic and partition = ppartition\nreturning value into counter;\ninsert into outbox.outbox_messages(topic, partition, type, key, payload, headers, number)\nvalues (ptopic, ppartition, type, key, payload, headers, counter);\nEND;\n$procedure$\n;\n") + { + Order = 2147483647 + }, + }); + modelBuilder .HasDefaultSchema("outbox") .HasAnnotation("ProductVersion", "9.0.3") @@ -49,6 +58,10 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnType("character varying(128)") .HasColumnName("key"); + b.Property("Number") + .HasColumnType("integer") + .HasColumnName("number"); + b.Property("Partition") .HasColumnType("integer") .HasColumnName("partition"); @@ -73,8 +86,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("Id") .HasName("pk_outbox_messages"); - b.HasIndex("Topic", "Partition", "Id") - .HasDatabaseName("ix_outbox_messages_topic_partition_id"); + b.HasIndex("Topic", "Partition", "Number") + .HasDatabaseName("ix_outbox_messages_topic_partition_number"); b.ToTable("outbox_messages", "outbox"); }); @@ -94,9 +107,9 @@ protected override void BuildModel(ModelBuilder modelBuilder) .HasColumnName("available_after") .HasDefaultValueSql("now()"); - b.Property("LastProcessedId") + b.Property("LastProcessedNumber") .HasColumnType("integer") - .HasColumnName("last_processed_id"); + .HasColumnName("last_processed_number"); b.Property("Partition") .HasColumnType("integer") @@ -122,7 +135,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) { Id = 2, AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), - LastProcessedId = 0, + LastProcessedNumber = 0, Partition = 0, Topic = "topic-1" }, @@ -130,11 +143,56 @@ protected override void BuildModel(ModelBuilder modelBuilder) { Id = 3, AvailableAfter = new DateTimeOffset(new DateTime(2025, 5, 18, 12, 0, 0, 0, DateTimeKind.Unspecified), new TimeSpan(0, 7, 0, 0, 0)), - LastProcessedId = 0, + LastProcessedNumber = 0, Partition = 1, Topic = "topic-1" }); }); + + modelBuilder.Entity("Outbox.Entities.OutboxOffsetSequence", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("Partition") + .HasColumnType("integer") + .HasColumnName("partition"); + + b.Property("Topic") + .IsRequired() + .HasMaxLength(128) + .HasColumnType("character varying(128)") + .HasColumnName("topic"); + + b.Property("Value") + .HasColumnType("integer") + .HasColumnName("value"); + + b.HasKey("Id") + .HasName("pk_outbox_offset_sequences"); + + b.ToTable("outbox_offset_sequences", "outbox"); + + b.HasData( + new + { + Id = 1, + Partition = 0, + Topic = "topic-1", + Value = 0 + }, + new + { + Id = 2, + Partition = 1, + Topic = "topic-1", + Value = 0 + }); + }); #pragma warning restore 612, 618 } } diff --git a/src/Outbox/Outbox.csproj b/src/Outbox/Outbox.csproj index d4bc364..be31d6a 100644 --- a/src/Outbox/Outbox.csproj +++ b/src/Outbox/Outbox.csproj @@ -7,8 +7,13 @@ + + + + + diff --git a/src/Outbox/Sql/insert_outbox_message.sql b/src/Outbox/Sql/insert_outbox_message.sql new file mode 100644 index 0000000..ee9e7fa --- /dev/null +++ b/src/Outbox/Sql/insert_outbox_message.sql @@ -0,0 +1,15 @@ +CREATE OR REPLACE PROCEDURE outbox.insert_outbox_message(IN ptopic character varying, IN ppartition integer, IN type character varying, IN key character varying, IN payload jsonb, IN headers jsonb) + LANGUAGE plpgsql +AS $procedure$ +DECLARE +counter int; +BEGIN +update outbox.outbox_offset_sequences +set value = value + 1 +where topic = ptopic and partition = ppartition +returning value into counter; +insert into outbox.outbox_messages(topic, partition, type, key, payload, headers, number) +values (ptopic, ppartition, type, key, payload, headers, counter); +END; +$procedure$ +;