Skip to content

Commit

Permalink
tests: create activity on consuming message
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Nov 14, 2023
1 parent d6d3f7e commit 3e926e7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
namespace KafkaFlow.IntegrationTests.Core.Middlewares
{
using System.Diagnostics;
using System.Threading.Tasks;
using KafkaFlow.IntegrationTests.Core.Handlers;

internal class GzipMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
using var activity = ActivitySourceAccessor.ActivitySource.StartActivity("integration-test", ActivityKind.Internal);

This comment has been minimized.

Copy link
@joelfoliveira

joelfoliveira Nov 14, 2023

Contributor

Can we create a Middleware just for this?
This way it is reusable in other tests that are not using the GzipMiddleware


MessageStorage.Add((byte[]) context.Message.Value);
await next(context);
}
Expand Down
39 changes: 34 additions & 5 deletions src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,42 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity()
{
// Arrange
var provider = await this.GetServiceProvider();
MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow.OpenTelemetry")
.AddInMemoryExporter(this.exportedItems)
.Build();

var producer = provider.GetRequiredService<IMessageProducer<GzipProducer>>();
var message = this.fixture.Create<byte[]>();

// Act
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId);
}

[TestMethod]
public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer()
{
Expand Down Expand Up @@ -97,7 +125,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Expand Down Expand Up @@ -181,9 +209,9 @@ await Policy
.ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned));
}

private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync()
private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync()
{
Activity producerSpan = null, consumerSpan = null;
Activity producerSpan = null, consumerSpan = null, internalSpan = null;

await Policy
.HandleResult<bool>(isAvailable => !isAvailable)
Expand All @@ -192,11 +220,12 @@ await Policy
{
producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);
internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal);
return Task.FromResult(producerSpan != null && consumerSpan != null);
});

return (producerSpan, consumerSpan);
return (producerSpan, consumerSpan, internalSpan);
}
}
}

0 comments on commit 3e926e7

Please sign in to comment.