From af5784b588e92e12196ee85ae8a77978082c1381 Mon Sep 17 00:00:00 2001 From: ArneD Date: Thu, 5 Dec 2024 09:16:24 +0100 Subject: [PATCH 1/2] fix(consumer): remove stopping token in messagehandler --- .../BackOfficeConsumer.cs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs b/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs index 79641938..c1714e13 100644 --- a/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs +++ b/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs @@ -57,14 +57,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await _kafkaIdemIdompotencyConsumer.ConsumeContinuously(async (message, context) => { - _logger.LogInformation("Handling next message"); - - await commandHandlingProjector.ProjectAsync(commandHandler, message, stoppingToken).ConfigureAwait(false); - await backOfficeProjector.ProjectAsync(context, message, stoppingToken).ConfigureAwait(false); - - //CancellationToken.None to prevent halfway consumption - await context.SaveChangesAsync(CancellationToken.None); - + await HandleMessage(commandHandlingProjector, commandHandler, message, backOfficeProjector, context); }, stoppingToken); } catch (Exception) @@ -73,5 +66,17 @@ await _kafkaIdemIdompotencyConsumer.ConsumeContinuously(async (message, context) throw; } } + + private async Task HandleMessage(ConnectedProjector commandHandlingProjector, CommandHandler commandHandler, object message, + ConnectedProjector backOfficeProjector, ConsumerAddressContext context) + { + _logger.LogInformation("Handling next message"); + + await commandHandlingProjector.ProjectAsync(commandHandler, message, CancellationToken.None).ConfigureAwait(false); + await backOfficeProjector.ProjectAsync(context, message, CancellationToken.None).ConfigureAwait(false); + + //CancellationToken.None to prevent halfway consumption + await context.SaveChangesAsync(CancellationToken.None); + } } } From f9fa15c15d709d441456c8be216c06ad930ab710 Mon Sep 17 00:00:00 2001 From: ArneD Date: Thu, 5 Dec 2024 09:21:11 +0100 Subject: [PATCH 2/2] feat(consumer): add offset override --- paket.dependencies | 8 +- paket.lock | 8 +- .../Program.cs | 24 +--- .../appsettings.json | 2 - .../ConsumerAddressContext.cs | 7 +- ...241205082405_AddOffsetOverride.Designer.cs | 105 ++++++++++++++++++ .../20241205082405_AddOffsetOverride.cs | 47 ++++++++ .../ConsumerAddressContextModelSnapshot.cs | 22 +++- 8 files changed, 189 insertions(+), 34 deletions(-) create mode 100644 src/ParcelRegistry.Consumer.Address/Migrations/20241205082405_AddOffsetOverride.Designer.cs create mode 100644 src/ParcelRegistry.Consumer.Address/Migrations/20241205082405_AddOffsetOverride.cs diff --git a/paket.dependencies b/paket.dependencies index fbf30d6c..1d70bd5a 100755 --- a/paket.dependencies +++ b/paket.dependencies @@ -90,10 +90,10 @@ nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm 21.14.1 nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.14.1 nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.14.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.0.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.0.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.0.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.0.1 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.2.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.2.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.2.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.2.0 nuget Be.Vlaanderen.Basisregisters.Shaperon 10.0.2 diff --git a/paket.lock b/paket.lock index 11f066e0..cc07f7ef 100644 --- a/paket.lock +++ b/paket.lock @@ -305,19 +305,19 @@ NUGET Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1) Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.14.1) Microsoft.CSharp (>= 4.7) - Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.2) AWSSDK.Core (>= 3.7.302.15) AWSSDK.SQS (>= 3.7.300.54) Microsoft.Extensions.Logging (>= 8.0) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.2) Confluent.Kafka (>= 2.3) Microsoft.EntityFrameworkCore.SqlServer (>= 8.0.2) Microsoft.Extensions.Logging (>= 8.0) Newtonsoft.Json (>= 13.0.3) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.2) Confluent.Kafka (>= 2.3) Newtonsoft.Json (>= 13.0.3) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple (5.2) Confluent.Kafka (>= 2.3) Microsoft.Extensions.Logging (>= 8.0) Be.Vlaanderen.Basisregisters.Middleware.AddProblemJsonHeader (3.0) diff --git a/src/ParcelRegistry.Consumer.Address.Console/Program.cs b/src/ParcelRegistry.Consumer.Address.Console/Program.cs index 85f79982..38e96aad 100644 --- a/src/ParcelRegistry.Consumer.Address.Console/Program.cs +++ b/src/ParcelRegistry.Consumer.Address.Console/Program.cs @@ -13,6 +13,7 @@ namespace ParcelRegistry.Consumer.Address.Console using Be.Vlaanderen.Basisregisters.EventHandling; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer; + using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.Extensions; using Destructurama; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; @@ -84,7 +85,7 @@ public static async Task Main(string[] args) )); services - .AddDbContextFactory((provider, options) => options + .AddDbContextFactory((_, options) => options .UseLoggerFactory(loggerFactory) .UseSqlServer(hostContext.Configuration.GetConnectionString("ConsumerAddress"), sqlServerOptions => { @@ -118,25 +119,8 @@ public static async Task Main(string[] args) hostContext.Configuration["Kafka:SaslUserName"], hostContext.Configuration["Kafka:SaslPassword"])); - var offset = hostContext.Configuration["AddressTopicOffset"]; - - if (!string.IsNullOrWhiteSpace(offset) && long.TryParse(offset, out var result)) - { - var ignoreDataCheck = hostContext.Configuration.GetValue("IgnoreAddressTopicOffsetDataCheck", false); - - if (!ignoreDataCheck) - { - using var ctx = c.Resolve(); - - if (ctx.AddressConsumerItems.Any()) - { - throw new InvalidOperationException( - $"Cannot set Kafka offset to {offset} because {nameof(ctx.AddressConsumerItems)} has data."); - } - } - - consumerOptions.ConfigureOffset(new Offset(result)); - } + using var ctx = c.Resolve(); + ctx.OverrideConfigureOffset(consumerOptions); return consumerOptions; }); diff --git a/src/ParcelRegistry.Consumer.Address.Console/appsettings.json b/src/ParcelRegistry.Consumer.Address.Console/appsettings.json index 1ce8dbbe..eca3427a 100644 --- a/src/ParcelRegistry.Consumer.Address.Console/appsettings.json +++ b/src/ParcelRegistry.Consumer.Address.Console/appsettings.json @@ -17,8 +17,6 @@ "GroupSuffix": "", "AddressTopic": "dev.address", - "AddressTopicOffset": "", - "IgnoreAddressTopicOffsetDataCheck": false, "BaseUrl": "https://api.staging-basisregisters.vlaanderen/", diff --git a/src/ParcelRegistry.Consumer.Address/ConsumerAddressContext.cs b/src/ParcelRegistry.Consumer.Address/ConsumerAddressContext.cs index 1faf4b13..27d8ac6b 100644 --- a/src/ParcelRegistry.Consumer.Address/ConsumerAddressContext.cs +++ b/src/ParcelRegistry.Consumer.Address/ConsumerAddressContext.cs @@ -5,6 +5,7 @@ namespace ParcelRegistry.Consumer.Address using System.IO; using System.Linq; using System.Reflection; + using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer; using Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner.SqlServer.MigrationExtensions; using Microsoft.EntityFrameworkCore; @@ -15,11 +16,12 @@ namespace ParcelRegistry.Consumer.Address using Parcel.DataStructures; using ParcelRegistry.Infrastructure; - public class ConsumerAddressContext : SqlServerConsumerDbContext, IAddresses + public class ConsumerAddressContext : SqlServerConsumerDbContext, IAddresses, IOffsetOverrideDbSet { public override string ProcessedMessagesSchema => Schema.ConsumerAddress; - public DbSet AddressConsumerItems { get; set; } + public DbSet AddressConsumerItems => Set(); + public DbSet OffsetOverrides => Set(); // This needs to be here to please EF public ConsumerAddressContext() @@ -90,6 +92,7 @@ protected override void OnModelCreating(ModelBuilder modelBuilder) modelBuilder .ApplyConfigurationsFromAssembly(typeof(ConsumerAddressContext).GetTypeInfo().Assembly); + modelBuilder.ApplyConfiguration(new OffsetOverrideConfiguration(ProcessedMessagesSchema)); } } diff --git a/src/ParcelRegistry.Consumer.Address/Migrations/20241205082405_AddOffsetOverride.Designer.cs b/src/ParcelRegistry.Consumer.Address/Migrations/20241205082405_AddOffsetOverride.Designer.cs new file mode 100644 index 00000000..47632108 --- /dev/null +++ b/src/ParcelRegistry.Consumer.Address/Migrations/20241205082405_AddOffsetOverride.Designer.cs @@ -0,0 +1,105 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using NetTopologySuite.Geometries; +using ParcelRegistry.Consumer.Address; + +#nullable disable + +namespace ParcelRegistry.Consumer.Address.Migrations +{ + [DbContext(typeof(ConsumerAddressContext))] + [Migration("20241205082405_AddOffsetOverride")] + partial class AddOffsetOverride + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "8.0.3") + .HasAnnotation("Relational:MaxIdentifierLength", 128); + + SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); + + modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.OffsetOverride", b => + { + b.Property("ConsumerGroupId") + .HasColumnType("nvarchar(450)"); + + b.Property("Configured") + .HasColumnType("bit"); + + b.Property("Offset") + .HasColumnType("bigint"); + + b.HasKey("ConsumerGroupId"); + + b.ToTable("OffsetOverrides", "ParcelRegistryConsumerAddress"); + }); + + modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.ProcessedMessage", b => + { + b.Property("IdempotenceKey") + .HasMaxLength(128) + .HasColumnType("nvarchar(128)"); + + b.Property("DateProcessed") + .HasColumnType("datetimeoffset"); + + b.HasKey("IdempotenceKey"); + + SqlServerKeyBuilderExtensions.IsClustered(b.HasKey("IdempotenceKey")); + + b.HasIndex("DateProcessed"); + + b.ToTable("ProcessedMessages", "ParcelRegistryConsumerAddress"); + }); + + modelBuilder.Entity("ParcelRegistry.Consumer.Address.AddressConsumerItem", b => + { + b.Property("AddressPersistentLocalId") + .HasColumnType("int"); + + b.Property("AddressId") + .HasColumnType("uniqueidentifier"); + + b.Property("GeometryMethod") + .IsRequired() + .HasColumnType("nvarchar(max)"); + + b.Property("GeometrySpecification") + .IsRequired() + .HasColumnType("nvarchar(max)"); + + b.Property("IsRemoved") + .HasColumnType("bit"); + + b.Property("Position") + .IsRequired() + .HasColumnType("sys.geometry"); + + b.Property("Status") + .IsRequired() + .HasColumnType("nvarchar(max)"); + + b.HasKey("AddressPersistentLocalId"); + + SqlServerKeyBuilderExtensions.IsClustered(b.HasKey("AddressPersistentLocalId")); + + b.HasIndex("AddressId"); + + b.HasIndex("IsRemoved"); + + b.HasIndex("Position"); + + b.ToTable("Addresses", "ParcelRegistryConsumerAddress"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/ParcelRegistry.Consumer.Address/Migrations/20241205082405_AddOffsetOverride.cs b/src/ParcelRegistry.Consumer.Address/Migrations/20241205082405_AddOffsetOverride.cs new file mode 100644 index 00000000..dd2ff133 --- /dev/null +++ b/src/ParcelRegistry.Consumer.Address/Migrations/20241205082405_AddOffsetOverride.cs @@ -0,0 +1,47 @@ +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace ParcelRegistry.Consumer.Address.Migrations +{ + /// + public partial class AddOffsetOverride : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.CreateTable( + name: "OffsetOverrides", + schema: "ParcelRegistryConsumerAddress", + columns: table => new + { + ConsumerGroupId = table.Column(type: "nvarchar(450)", nullable: false), + Offset = table.Column(type: "bigint", nullable: false), + Configured = table.Column(type: "bit", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_OffsetOverrides", x => x.ConsumerGroupId); + }); + + migrationBuilder.CreateIndex( + name: "IX_Addresses_Position", + schema: "ParcelRegistryConsumerAddress", + table: "Addresses", + column: "Position"); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "OffsetOverrides", + schema: "ParcelRegistryConsumerAddress"); + + migrationBuilder.DropIndex( + name: "IX_Addresses_Position", + schema: "ParcelRegistryConsumerAddress", + table: "Addresses"); + } + } +} diff --git a/src/ParcelRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs b/src/ParcelRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs index a0e9aca5..8dd65749 100644 --- a/src/ParcelRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs +++ b/src/ParcelRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs @@ -18,10 +18,26 @@ protected override void BuildModel(ModelBuilder modelBuilder) { #pragma warning disable 612, 618 modelBuilder - .HasAnnotation("ProductVersion", "6.0.3") + .HasAnnotation("ProductVersion", "8.0.3") .HasAnnotation("Relational:MaxIdentifierLength", 128); - SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1); + SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); + + modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.OffsetOverride", b => + { + b.Property("ConsumerGroupId") + .HasColumnType("nvarchar(450)"); + + b.Property("Configured") + .HasColumnType("bit"); + + b.Property("Offset") + .HasColumnType("bigint"); + + b.HasKey("ConsumerGroupId"); + + b.ToTable("OffsetOverrides", "ParcelRegistryConsumerAddress"); + }); modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.ProcessedMessage", b => { @@ -76,6 +92,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasIndex("IsRemoved"); + b.HasIndex("Position"); + b.ToTable("Addresses", "ParcelRegistryConsumerAddress"); }); #pragma warning restore 612, 618