Skip to content

Commit

Permalink
GlobalEvents tests. Not finished yet
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Oct 10, 2023
1 parent 1cb2e06 commit 4cffda0
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 3 deletions.
12 changes: 12 additions & 0 deletions src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"profiles": {
"KafkaFlow.Admin.Dashboard": {
"commandName": "Project",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:52566;http://localhost:52567"
}
}
}
2 changes: 2 additions & 0 deletions src/KafkaFlow.IntegrationTests/ConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class ConsumerTest
[TestInitialize]
public void Setup()
{
Bootstrapper.GlobalEvents = observers => { };

this.provider = Bootstrapper.GetServiceProvider();
MessageStorage.Clear();
}
Expand Down
26 changes: 24 additions & 2 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace KafkaFlow.IntegrationTests.Core
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
Expand All @@ -11,6 +12,7 @@ namespace KafkaFlow.IntegrationTests.Core
using global::Microsoft.Extensions.Hosting;
using KafkaFlow.Compressor;
using KafkaFlow.Compressor.Gzip;
using KafkaFlow.Configuration;
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Messages;
using KafkaFlow.IntegrationTests.Core.Middlewares;
Expand Down Expand Up @@ -42,9 +44,28 @@ internal static class Bootstrapper
private const string ProtobufGzipTopicName = "test-protobuf-gzip";
private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2";
private const string AvroTopicName = "test-avro";
private static Action<IGlobalEvents> globalEvents;

private static readonly Lazy<IServiceProvider> LazyProvider = new(SetupProvider);

public static Action<IGlobalEvents> GlobalEvents
{
get
{
if(globalEvents == null)
{
return _ => { };
}

return globalEvents;
}

set
{
globalEvents = value;
}
}

public static IServiceProvider GetServiceProvider() => LazyProvider.Value;

private static IServiceProvider SetupProvider()
Expand Down Expand Up @@ -87,7 +108,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection

ConsumerConfig defaultConfig = new()
{
Acks = Confluent.Kafka.Acks.All,
Acks = Acks.All,
AllowAutoCreateTopics = false,
AutoCommitIntervalMs = 5000,
AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest,
Expand Down Expand Up @@ -332,7 +353,8 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.DefaultTopic(GzipTopicName)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()))));
.AddCompressor<GzipMessageCompressor>())))
.SubscribeGlobalEvents(GlobalEvents));

services.AddSingleton<JsonProducer>();
services.AddSingleton<JsonGzipProducer>();
Expand Down
30 changes: 30 additions & 0 deletions src/KafkaFlow.IntegrationTests/ProducerTest.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
namespace KafkaFlow.IntegrationTests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using AutoFixture;
using KafkaFlow.Consumers;
using KafkaFlow.IntegrationTests.Core;
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Messages;
using KafkaFlow.IntegrationTests.Core.Producers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand All @@ -16,9 +19,19 @@ public class ProducerTest

private IServiceProvider provider;

private IMessageContext messageContext;

[TestInitialize]
public void Setup()
{
Bootstrapper.GlobalEvents = observers =>
{
observers.MessageProduceStarted.Subscribe(eventContext =>
{
this.messageContext = eventContext.MessageContext;
return Task.CompletedTask;
});
};
this.provider = Bootstrapper.GetServiceProvider();
MessageStorage.Clear();
}
Expand All @@ -36,5 +49,22 @@ public async Task ProduceNullKeyTest()
// Assert
await MessageStorage.AssertMessageAsync(message);
}

[TestMethod]
public void SubscribeGlobalEventsTest()
{
var consumer = this.provider.GetRequiredService<IConsumerAccessor>().All.First();

Check notice on line 56 in src/KafkaFlow.IntegrationTests/ProducerTest.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.IntegrationTests/ProducerTest.cs#L56

Remove the unused local variable 'consumer'.
var producer = this.provider.GetRequiredService<IMessageProducer<ProtobufGzipProducer>>();
var messages = this.fixture.CreateMany<TestMessage1>(1).ToList();

// Act
messages.ForEach(m => producer.Produce(m.Id.ToString(), m));

var x = messages.FirstOrDefault();

Check notice on line 63 in src/KafkaFlow.IntegrationTests/ProducerTest.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.IntegrationTests/ProducerTest.cs#L63

Remove the unused local variable 'x'.

// Assert
Assert.IsNotNull(this.messageContext);
Assert.AreEqual(this.messageContext.Message.Key, messages.FirstOrDefault().Id.ToString());
}
}
}
2 changes: 1 addition & 1 deletion src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public KafkaConfiguration Build()
.AddSingleton<IConsumerAccessor>(new ConsumerAccessor())
.AddSingleton<IConsumerManagerFactory>(new ConsumerManagerFactory())
.AddSingleton<IClusterManagerAccessor, ClusterManagerAccessor>()
.AddSingleton<IGlobalEvents>(r =>
.AddSingleton(r =>
{
var logHandler = r.Resolve<ILogHandler>();
Expand Down
3 changes: 3 additions & 0 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public ConsumerWorker(

middlewareContext.Worker = this;
middlewareContext.Consumer = consumer;
this.workerStoppingEvent = new Event(logHandler);
this.workerStoppedEvent = new Event(logHandler);
this.workerProcessingEnded = new Event<IMessageContext>(logHandler);
}

public int Id { get; }
Expand Down

0 comments on commit 4cffda0

Please sign in to comment.