From 4cffda098228ec3e03bb80db4b74cae7bbc9dd02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Tue, 10 Oct 2023 14:40:44 +0100 Subject: [PATCH] GlobalEvents tests. Not finished yet --- .../Properties/launchSettings.json | 12 ++++++++ .../ConsumerTest.cs | 2 ++ .../Core/Bootstrapper.cs | 26 ++++++++++++++-- .../ProducerTest.cs | 30 +++++++++++++++++++ .../KafkaConfigurationBuilder.cs | 2 +- src/KafkaFlow/Consumers/ConsumerWorker.cs | 3 ++ 6 files changed, 72 insertions(+), 3 deletions(-) create mode 100644 src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json diff --git a/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json new file mode 100644 index 000000000..b1df5e8b5 --- /dev/null +++ b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json @@ -0,0 +1,12 @@ +{ + "profiles": { + "KafkaFlow.Admin.Dashboard": { + "commandName": "Project", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + }, + "applicationUrl": "https://localhost:52566;http://localhost:52567" + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.IntegrationTests/ConsumerTest.cs b/src/KafkaFlow.IntegrationTests/ConsumerTest.cs index d80752cf3..be0999574 100644 --- a/src/KafkaFlow.IntegrationTests/ConsumerTest.cs +++ b/src/KafkaFlow.IntegrationTests/ConsumerTest.cs @@ -22,6 +22,8 @@ public class ConsumerTest [TestInitialize] public void Setup() { + Bootstrapper.GlobalEvents = observers => { }; + this.provider = Bootstrapper.GetServiceProvider(); MessageStorage.Clear(); } diff --git a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs index 293d8c60b..158c520be 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs @@ -3,6 +3,7 @@ namespace KafkaFlow.IntegrationTests.Core using System; using System.IO; using System.Threading; + using System.Threading.Tasks; using Confluent.Kafka; using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; @@ -11,6 +12,7 @@ namespace KafkaFlow.IntegrationTests.Core using global::Microsoft.Extensions.Hosting; using KafkaFlow.Compressor; using KafkaFlow.Compressor.Gzip; + using KafkaFlow.Configuration; using KafkaFlow.IntegrationTests.Core.Handlers; using KafkaFlow.IntegrationTests.Core.Messages; using KafkaFlow.IntegrationTests.Core.Middlewares; @@ -42,9 +44,28 @@ internal static class Bootstrapper private const string ProtobufGzipTopicName = "test-protobuf-gzip"; private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2"; private const string AvroTopicName = "test-avro"; + private static Action globalEvents; private static readonly Lazy LazyProvider = new(SetupProvider); + public static Action GlobalEvents + { + get + { + if(globalEvents == null) + { + return _ => { }; + } + + return globalEvents; + } + + set + { + globalEvents = value; + } + } + public static IServiceProvider GetServiceProvider() => LazyProvider.Value; private static IServiceProvider SetupProvider() @@ -87,7 +108,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection ConsumerConfig defaultConfig = new() { - Acks = Confluent.Kafka.Acks.All, + Acks = Acks.All, AllowAutoCreateTopics = false, AutoCommitIntervalMs = 5000, AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest, @@ -332,7 +353,8 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .DefaultTopic(GzipTopicName) .AddMiddlewares( middlewares => middlewares - .AddCompressor())))); + .AddCompressor()))) + .SubscribeGlobalEvents(GlobalEvents)); services.AddSingleton(); services.AddSingleton(); diff --git a/src/KafkaFlow.IntegrationTests/ProducerTest.cs b/src/KafkaFlow.IntegrationTests/ProducerTest.cs index 00f4e58db..13ceb0fde 100644 --- a/src/KafkaFlow.IntegrationTests/ProducerTest.cs +++ b/src/KafkaFlow.IntegrationTests/ProducerTest.cs @@ -1,10 +1,13 @@ namespace KafkaFlow.IntegrationTests { using System; + using System.Linq; using System.Threading.Tasks; using AutoFixture; + using KafkaFlow.Consumers; using KafkaFlow.IntegrationTests.Core; using KafkaFlow.IntegrationTests.Core.Handlers; + using KafkaFlow.IntegrationTests.Core.Messages; using KafkaFlow.IntegrationTests.Core.Producers; using Microsoft.Extensions.DependencyInjection; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -16,9 +19,19 @@ public class ProducerTest private IServiceProvider provider; + private IMessageContext messageContext; + [TestInitialize] public void Setup() { + Bootstrapper.GlobalEvents = observers => + { + observers.MessageProduceStarted.Subscribe(eventContext => + { + this.messageContext = eventContext.MessageContext; + return Task.CompletedTask; + }); + }; this.provider = Bootstrapper.GetServiceProvider(); MessageStorage.Clear(); } @@ -36,5 +49,22 @@ public async Task ProduceNullKeyTest() // Assert await MessageStorage.AssertMessageAsync(message); } + + [TestMethod] + public void SubscribeGlobalEventsTest() + { + var consumer = this.provider.GetRequiredService().All.First(); + var producer = this.provider.GetRequiredService>(); + var messages = this.fixture.CreateMany(1).ToList(); + + // Act + messages.ForEach(m => producer.Produce(m.Id.ToString(), m)); + + var x = messages.FirstOrDefault(); + + // Assert + Assert.IsNotNull(this.messageContext); + Assert.AreEqual(this.messageContext.Message.Key, messages.FirstOrDefault().Id.ToString()); + } } } diff --git a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs index 52c265da2..0a22fef6f 100644 --- a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs @@ -47,7 +47,7 @@ public KafkaConfiguration Build() .AddSingleton(new ConsumerAccessor()) .AddSingleton(new ConsumerManagerFactory()) .AddSingleton() - .AddSingleton(r => + .AddSingleton(r => { var logHandler = r.Resolve(); diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index da1e21683..c8f315827 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -41,6 +41,9 @@ public ConsumerWorker( middlewareContext.Worker = this; middlewareContext.Consumer = consumer; + this.workerStoppingEvent = new Event(logHandler); + this.workerStoppedEvent = new Event(logHandler); + this.workerProcessingEnded = new Event(logHandler); } public int Id { get; }