Skip to content

Commit

Permalink
Include consumer and producer test
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Oct 11, 2023
1 parent 23c3745 commit a14f601
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="OpenTelemetry.Exporter.InMemory" Version="1.6.0" />
</ItemGroup>

<ItemGroup Condition="$([MSBuild]::IsOsPlatform('OSX'))">
Expand All @@ -37,6 +38,7 @@
<ProjectReference Include="..\KafkaFlow.Compressor.Gzip\KafkaFlow.Compressor.Gzip.csproj" />
<ProjectReference Include="..\KafkaFlow.Compressor\KafkaFlow.Compressor.csproj" />
<ProjectReference Include="..\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj" />
<ProjectReference Include="..\KafkaFlow.Serializer.JsonCore\KafkaFlow.Serializer.JsonCore.csproj" />
<ProjectReference Include="..\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
<ProjectReference Include="..\KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro\KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj" />
Expand Down
139 changes: 139 additions & 0 deletions src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
namespace KafkaFlow.IntegrationTests
{
using System;
using System.IO;
using System.Threading;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using global::Microsoft.Extensions.Configuration;
using global::Microsoft.Extensions.Hosting;
using KafkaFlow.Configuration;
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Producers;
using KafkaFlow.Serializer.SchemaRegistry;
using KafkaFlow.TypedHandler;
using AutoOffsetReset = KafkaFlow.AutoOffsetReset;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Threading.Tasks;
using KafkaFlow.IntegrationTests.Core;
using AutoFixture;
using System.Collections.Generic;
using System.Diagnostics;
using global::OpenTelemetry;
using global::OpenTelemetry.Trace;
using Microsoft.Extensions.DependencyInjection;
using KafkaFlow.Compressor;
using KafkaFlow.Compressor.Gzip;
using KafkaFlow.IntegrationTests.Core.Middlewares;

[TestClass]
public class OpenTelemetryTests
{
private readonly Fixture fixture = new();

private List<Activity> exportedItems;

private IServiceProvider provider;

[TestInitialize]
public void Setup()
{
this.exportedItems = new List<Activity>();
//this.BuildTracerProvider();

Check warning on line 42 in src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs#L42

Remove this commented out code.
this.provider = this.GetServiceProvider();
MessageStorage.Clear();
}

[TestMethod]
public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpansAreCreatedCorrectly()
{
// Arrange
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow")
.AddInMemoryExporter(this.exportedItems)
.Build();

var producer = this.provider.GetRequiredService<IMessageProducer<GzipProducer>>();
var message = this.fixture.Create<byte[]>();

// Act
await producer.ProduceAsync(null, message);

// Assert
await Task.Delay(8000).ConfigureAwait(false);

Assert.IsNotNull(this.exportedItems);
var producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
var consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);

Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}

private IServiceProvider GetServiceProvider()
{
string topicName = "Otel";

var builder = Host
.CreateDefaultBuilder()
.ConfigureAppConfiguration(
(_, config) =>
{
config
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile(
"conf/appsettings.json",
false,
true)
.AddEnvironmentVariables();
})
.ConfigureServices((context, services) =>
services.AddKafka(
kafka => kafka
.UseLogHandler<TraceLogHandler>()
.AddCluster(
cluster => cluster
.WithBrokers(context.Configuration.GetValue<string>("Kafka:Brokers").Split(';'))
.CreateTopicIfNotExists(topicName, 1, 1)
.AddProducer<GzipProducer>(
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()))
.AddConsumer(
consumer => consumer
.Topic(topicName)
.WithGroupId(topicName)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()
.Add<GzipMiddleware>())))
.AddOpenTelemetryInstrumentation()))
.UseDefaultServiceProvider(
(_, options) =>
{
options.ValidateScopes = true;
options.ValidateOnBuild = true;
});

var host = builder.Build();
var bus = host.Services.CreateKafkaBus();
bus.StartAsync().GetAwaiter().GetResult();

// Wait partition assignment
Thread.Sleep(10000);

return host.Services;
}

private void BuildTracerProvider()

Check failure on line 134 in src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs#L134

Add a nested comment explaining why this method is empty, throw a 'NotSupportedException' or complete the implementation.
{

}
}
}
7 changes: 0 additions & 7 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ public static Task OnConsumeStarted(IMessageContext context)
{
SetConsumerTags(context, activity);
}

Console.WriteLine(activity.TraceId);
Console.WriteLine(activity.SpanId);
Console.WriteLine(activity.ParentSpanId);
}
catch

Check notice on line 40 in src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerObserver.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerObserver.cs#L40

Handle the exception or explain in a comment why it can be ignored.
{

Check warning on line 41 in src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerObserver.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerObserver.cs#L41

Either remove or fill this block of code.
Expand All @@ -51,9 +47,6 @@ public static Task OnConsumeStarted(IMessageContext context)
public static Task OnConsumeCompleted(IMessageContext context)
{
var activity = context.Items[KafkaFlowActivitySourceHelper.ActivityString] as Activity;
Console.WriteLine(activity.TraceId);
Console.WriteLine(activity.SpanId);
Console.WriteLine(activity.ParentSpanId);

activity?.Dispose();

Expand Down
4 changes: 0 additions & 4 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public static Task OnProducerStarted(IMessageContext context)
{
SetProducerTags(context, activity);
}

Console.WriteLine(activity.TraceId);
Console.WriteLine(activity.SpanId);
Console.WriteLine(activity.ParentSpanId);
}
catch

Check notice on line 54 in src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerObserver.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerObserver.cs#L54

Handle the exception or explain in a comment why it can be ignored.
{

Check warning on line 55 in src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerObserver.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerObserver.cs#L55

Either remove or fill this block of code.
Expand Down

0 comments on commit a14f601

Please sign in to comment.