From 050f236552718b1990b16015f47fcab62bebaede Mon Sep 17 00:00:00 2001 From: jvandaal Date: Thu, 5 Dec 2024 09:29:29 +0100 Subject: [PATCH] feat: configure consumer offset without deploy --- paket.dependencies | 8 +- paket.lock | 8 +- .../Program.cs | 29 +---- .../ConsumerAddressContext.cs | 10 +- ...41205082856_AddOffsetOverrides.Designer.cs | 105 ++++++++++++++++++ .../20241205082856_AddOffsetOverrides.cs | 47 ++++++++ .../ConsumerAddressContextModelSnapshot.cs | 22 +++- 7 files changed, 192 insertions(+), 37 deletions(-) create mode 100644 src/ParcelRegistry.Consumer.Address/Migrations/20241205082856_AddOffsetOverrides.Designer.cs create mode 100644 src/ParcelRegistry.Consumer.Address/Migrations/20241205082856_AddOffsetOverrides.cs diff --git a/paket.dependencies b/paket.dependencies index fbf30d6c..1d70bd5a 100755 --- a/paket.dependencies +++ b/paket.dependencies @@ -90,10 +90,10 @@ nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm 21.14.1 nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.14.1 nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.14.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.0.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.0.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.0.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.0.1 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.2.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.2.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.2.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.2.0 nuget Be.Vlaanderen.Basisregisters.Shaperon 10.0.2 diff --git a/paket.lock b/paket.lock index 11f066e0..cc07f7ef 100644 --- a/paket.lock +++ b/paket.lock @@ -305,19 +305,19 @@ NUGET Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1) Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.14.1) Microsoft.CSharp (>= 4.7) - Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.2) AWSSDK.Core (>= 3.7.302.15) AWSSDK.SQS (>= 3.7.300.54) Microsoft.Extensions.Logging (>= 8.0) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.2) Confluent.Kafka (>= 2.3) Microsoft.EntityFrameworkCore.SqlServer (>= 8.0.2) Microsoft.Extensions.Logging (>= 8.0) Newtonsoft.Json (>= 13.0.3) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.2) Confluent.Kafka (>= 2.3) Newtonsoft.Json (>= 13.0.3) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple (5.2) Confluent.Kafka (>= 2.3) Microsoft.Extensions.Logging (>= 8.0) Be.Vlaanderen.Basisregisters.Middleware.AddProblemJsonHeader (3.0) diff --git a/src/ParcelRegistry.Consumer.Address.Console/Program.cs b/src/ParcelRegistry.Consumer.Address.Console/Program.cs index 85f79982..6357d5a2 100644 --- a/src/ParcelRegistry.Consumer.Address.Console/Program.cs +++ b/src/ParcelRegistry.Consumer.Address.Console/Program.cs @@ -2,7 +2,6 @@ namespace ParcelRegistry.Consumer.Address.Console { using System; using System.IO; - using System.Linq; using System.Threading; using System.Threading.Tasks; using Api.BackOffice.Abstractions; @@ -13,6 +12,7 @@ namespace ParcelRegistry.Consumer.Address.Console using Be.Vlaanderen.Basisregisters.EventHandling; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer; + using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.Extensions; using Destructurama; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; @@ -103,7 +103,7 @@ public static async Task Main(string[] args) builder.Register(c => { - var bootstrapServers = hostContext.Configuration["Kafka:BootstrapServers"]; + var bootstrapServers = hostContext.Configuration["Kafka:BootstrapServers"]!; var topic = $"{hostContext.Configuration["AddressTopic"]}" ?? throw new ArgumentException("Configuration has no AddressTopic."); var suffix = hostContext.Configuration["GroupSuffix"]; var consumerGroupId = $"ParcelRegistry.BackOfficeConsumer.{topic}{suffix}"; @@ -115,28 +115,11 @@ public static async Task Main(string[] args) EventsJsonSerializerSettingsProvider.CreateSerializerSettings()); consumerOptions.ConfigureSaslAuthentication(new SaslAuthentication( - hostContext.Configuration["Kafka:SaslUserName"], - hostContext.Configuration["Kafka:SaslPassword"])); + hostContext.Configuration["Kafka:SaslUserName"]!, + hostContext.Configuration["Kafka:SaslPassword"]!)); - var offset = hostContext.Configuration["AddressTopicOffset"]; - - if (!string.IsNullOrWhiteSpace(offset) && long.TryParse(offset, out var result)) - { - var ignoreDataCheck = hostContext.Configuration.GetValue("IgnoreAddressTopicOffsetDataCheck", false); - - if (!ignoreDataCheck) - { - using var ctx = c.Resolve(); - - if (ctx.AddressConsumerItems.Any()) - { - throw new InvalidOperationException( - $"Cannot set Kafka offset to {offset} because {nameof(ctx.AddressConsumerItems)} has data."); - } - } - - consumerOptions.ConfigureOffset(new Offset(result)); - } + using var ctx = c.Resolve(); + ctx.OverrideConfigureOffset(consumerOptions); return consumerOptions; }); diff --git a/src/ParcelRegistry.Consumer.Address/ConsumerAddressContext.cs b/src/ParcelRegistry.Consumer.Address/ConsumerAddressContext.cs index 1faf4b13..721d1f69 100644 --- a/src/ParcelRegistry.Consumer.Address/ConsumerAddressContext.cs +++ b/src/ParcelRegistry.Consumer.Address/ConsumerAddressContext.cs @@ -5,6 +5,7 @@ namespace ParcelRegistry.Consumer.Address using System.IO; using System.Linq; using System.Reflection; + using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer; using Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner.SqlServer.MigrationExtensions; using Microsoft.EntityFrameworkCore; @@ -15,11 +16,12 @@ namespace ParcelRegistry.Consumer.Address using Parcel.DataStructures; using ParcelRegistry.Infrastructure; - public class ConsumerAddressContext : SqlServerConsumerDbContext, IAddresses + public class ConsumerAddressContext : SqlServerConsumerDbContext, IAddresses, IOffsetOverrideDbSet { public override string ProcessedMessagesSchema => Schema.ConsumerAddress; - public DbSet AddressConsumerItems { get; set; } + public DbSet AddressConsumerItems => Set(); + public DbSet OffsetOverrides => Set(); // This needs to be here to please EF public ConsumerAddressContext() @@ -88,8 +90,8 @@ protected override void OnModelCreating(ModelBuilder modelBuilder) { base.OnModelCreating(modelBuilder); - modelBuilder - .ApplyConfigurationsFromAssembly(typeof(ConsumerAddressContext).GetTypeInfo().Assembly); + modelBuilder.ApplyConfigurationsFromAssembly(typeof(ConsumerAddressContext).GetTypeInfo().Assembly); + modelBuilder.ApplyConfiguration(new OffsetOverrideConfiguration(Schema.ConsumerAddress)); } } diff --git a/src/ParcelRegistry.Consumer.Address/Migrations/20241205082856_AddOffsetOverrides.Designer.cs b/src/ParcelRegistry.Consumer.Address/Migrations/20241205082856_AddOffsetOverrides.Designer.cs new file mode 100644 index 00000000..43162ba8 --- /dev/null +++ b/src/ParcelRegistry.Consumer.Address/Migrations/20241205082856_AddOffsetOverrides.Designer.cs @@ -0,0 +1,105 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using NetTopologySuite.Geometries; +using ParcelRegistry.Consumer.Address; + +#nullable disable + +namespace ParcelRegistry.Consumer.Address.Migrations +{ + [DbContext(typeof(ConsumerAddressContext))] + [Migration("20241205082856_AddOffsetOverrides")] + partial class AddOffsetOverrides + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "8.0.3") + .HasAnnotation("Relational:MaxIdentifierLength", 128); + + SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); + + modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.OffsetOverride", b => + { + b.Property("ConsumerGroupId") + .HasColumnType("nvarchar(450)"); + + b.Property("Configured") + .HasColumnType("bit"); + + b.Property("Offset") + .HasColumnType("bigint"); + + b.HasKey("ConsumerGroupId"); + + b.ToTable("OffsetOverrides", "ParcelRegistryConsumerAddress"); + }); + + modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.ProcessedMessage", b => + { + b.Property("IdempotenceKey") + .HasMaxLength(128) + .HasColumnType("nvarchar(128)"); + + b.Property("DateProcessed") + .HasColumnType("datetimeoffset"); + + b.HasKey("IdempotenceKey"); + + SqlServerKeyBuilderExtensions.IsClustered(b.HasKey("IdempotenceKey")); + + b.HasIndex("DateProcessed"); + + b.ToTable("ProcessedMessages", "ParcelRegistryConsumerAddress"); + }); + + modelBuilder.Entity("ParcelRegistry.Consumer.Address.AddressConsumerItem", b => + { + b.Property("AddressPersistentLocalId") + .HasColumnType("int"); + + b.Property("AddressId") + .HasColumnType("uniqueidentifier"); + + b.Property("GeometryMethod") + .IsRequired() + .HasColumnType("nvarchar(max)"); + + b.Property("GeometrySpecification") + .IsRequired() + .HasColumnType("nvarchar(max)"); + + b.Property("IsRemoved") + .HasColumnType("bit"); + + b.Property("Position") + .IsRequired() + .HasColumnType("sys.geometry"); + + b.Property("Status") + .IsRequired() + .HasColumnType("nvarchar(max)"); + + b.HasKey("AddressPersistentLocalId"); + + SqlServerKeyBuilderExtensions.IsClustered(b.HasKey("AddressPersistentLocalId")); + + b.HasIndex("AddressId"); + + b.HasIndex("IsRemoved"); + + b.HasIndex("Position"); + + b.ToTable("Addresses", "ParcelRegistryConsumerAddress"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/ParcelRegistry.Consumer.Address/Migrations/20241205082856_AddOffsetOverrides.cs b/src/ParcelRegistry.Consumer.Address/Migrations/20241205082856_AddOffsetOverrides.cs new file mode 100644 index 00000000..bf1dcfe4 --- /dev/null +++ b/src/ParcelRegistry.Consumer.Address/Migrations/20241205082856_AddOffsetOverrides.cs @@ -0,0 +1,47 @@ +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace ParcelRegistry.Consumer.Address.Migrations +{ + /// + public partial class AddOffsetOverrides : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.CreateTable( + name: "OffsetOverrides", + schema: "ParcelRegistryConsumerAddress", + columns: table => new + { + ConsumerGroupId = table.Column(type: "nvarchar(450)", nullable: false), + Offset = table.Column(type: "bigint", nullable: false), + Configured = table.Column(type: "bit", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_OffsetOverrides", x => x.ConsumerGroupId); + }); + + migrationBuilder.CreateIndex( + name: "IX_Addresses_Position", + schema: "ParcelRegistryConsumerAddress", + table: "Addresses", + column: "Position"); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "OffsetOverrides", + schema: "ParcelRegistryConsumerAddress"); + + migrationBuilder.DropIndex( + name: "IX_Addresses_Position", + schema: "ParcelRegistryConsumerAddress", + table: "Addresses"); + } + } +} diff --git a/src/ParcelRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs b/src/ParcelRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs index a0e9aca5..8dd65749 100644 --- a/src/ParcelRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs +++ b/src/ParcelRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs @@ -18,10 +18,26 @@ protected override void BuildModel(ModelBuilder modelBuilder) { #pragma warning disable 612, 618 modelBuilder - .HasAnnotation("ProductVersion", "6.0.3") + .HasAnnotation("ProductVersion", "8.0.3") .HasAnnotation("Relational:MaxIdentifierLength", 128); - SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1); + SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); + + modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.OffsetOverride", b => + { + b.Property("ConsumerGroupId") + .HasColumnType("nvarchar(450)"); + + b.Property("Configured") + .HasColumnType("bit"); + + b.Property("Offset") + .HasColumnType("bigint"); + + b.HasKey("ConsumerGroupId"); + + b.ToTable("OffsetOverrides", "ParcelRegistryConsumerAddress"); + }); modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.ProcessedMessage", b => { @@ -76,6 +92,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("IsRemoved"); + b.HasIndex("Position"); + b.ToTable("Addresses", "ParcelRegistryConsumerAddress"); }); #pragma warning restore 612, 618