From c48df05339fd7ff4499da56164f6e066bec58c8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Mon, 20 Nov 2023 11:12:53 +0000 Subject: [PATCH] fix: opentelemetry broken traces Activity.Current is a static variable that uses AsyncLocal internally, which means that it flows into children async calls, but not back to the caller (message consumer and producer). With the previous implementation, the Activity.Current is null after the produce and consume of messages, which means that all spans created after it will generate new trace information because the context is not being propagated. This change fixes the problem firing sync events and making the OpenTelemetry handlers subscribe them. --- src/KafkaFlow.Abstractions/IEvent.cs | 4 ++ .../Properties/launchSettings.json | 12 ++++ .../Core/Middlewares/GzipMiddleware.cs | 7 ++- .../OpenTelemetryTests.cs | 40 ++++++++++-- .../OpenTelemetryConsumerEventsHandler.cs | 12 +--- .../OpenTelemetryProducerEventsHandler.cs | 12 +--- src/KafkaFlow/Event.cs | 61 +++++++++++++++---- 7 files changed, 111 insertions(+), 37 deletions(-) create mode 100644 src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json diff --git a/src/KafkaFlow.Abstractions/IEvent.cs b/src/KafkaFlow.Abstractions/IEvent.cs index f9a055b4f..a5f70a688 100644 --- a/src/KafkaFlow.Abstractions/IEvent.cs +++ b/src/KafkaFlow.Abstractions/IEvent.cs @@ -14,6 +14,8 @@ public interface IEvent /// The handler to be called when the event is fired. /// Event subscription reference IEventSubscription Subscribe(Func handler); + + IEventSubscription Subscribe(Action handler); } /// @@ -28,5 +30,7 @@ public interface IEvent /// The handler to be called when the event is fired. /// Event subscription reference IEventSubscription Subscribe(Func handler); + + IEventSubscription Subscribe(Action handler); } } \ No newline at end of file diff --git a/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json new file mode 100644 index 000000000..a3f79fedf --- /dev/null +++ b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json @@ -0,0 +1,12 @@ +{ + "profiles": { + "KafkaFlow.Admin.Dashboard": { + "commandName": "Project", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + }, + "applicationUrl": "https://localhost:63908;http://localhost:63909" + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs index 4a73f4f4d..69bc10b42 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs @@ -1,13 +1,18 @@ namespace KafkaFlow.IntegrationTests.Core.Middlewares { + using System.Diagnostics; using System.Threading.Tasks; using KafkaFlow.IntegrationTests.Core.Handlers; + using KafkaFlow.OpenTelemetry; internal class GzipMiddleware : IMessageMiddleware { public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { - MessageStorage.Add((byte[]) context.Message.Value); + var source = new ActivitySource(KafkaFlowInstrumentation.ActivitySourceName); + using var activity = source.StartActivity("integration-test", ActivityKind.Internal); + + MessageStorage.Add((byte[])context.Message.Value); await next(context); } } diff --git a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs index 05a61f306..53c3f2ec3 100644 --- a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs +++ b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs @@ -57,7 +57,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans await producer.ProduceAsync(null, message); // Assert - var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); Assert.IsNotNull(this.exportedItems); Assert.IsNull(producerSpan.ParentId); @@ -65,6 +65,35 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); } + [TestMethod] + public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity() + { + // Arrange + var provider = await this.GetServiceProvider(); + MessageStorage.Clear(); + + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource("KafkaFlow.OpenTelemetry") + .AddInMemoryExporter(this.exportedItems) + .Build(); + + var producer = provider.GetRequiredService>(); + var message = this.fixture.Create(); + + // Act + await producer.ProduceAsync(null, message); + + // Assert + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); + + Assert.IsNotNull(this.exportedItems); + Assert.IsNull(producerSpan.ParentId); + Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); + Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); + Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId); + Assert.AreEqual(internalSpan.ParentSpanId, consumerSpan.SpanId); + } + [TestMethod] public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer() { @@ -98,7 +127,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp await producer.ProduceAsync(null, message); // Assert - var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); Assert.IsNotNull(this.exportedItems); Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); @@ -182,9 +211,9 @@ await Policy .ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned)); } - private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync() + private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync() { - Activity producerSpan = null, consumerSpan = null; + Activity producerSpan = null, consumerSpan = null, internalSpan = null; await Policy .HandleResult(isAvailable => !isAvailable) @@ -193,11 +222,12 @@ await Policy { producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer); consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer); + internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal); return Task.FromResult(producerSpan != null && consumerSpan != null); }); - return (producerSpan, consumerSpan); + return (producerSpan, consumerSpan, internalSpan); } } } diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs index cea4febc2..bae4f525b 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -17,7 +17,7 @@ internal static class OpenTelemetryConsumerEventsHandler private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition"; private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; - public static Task OnConsumeStarted(IMessageContext context) + public static void OnConsumeStarted(IMessageContext context) { try { @@ -50,21 +50,17 @@ public static Task OnConsumeStarted(IMessageContext context) { // If there is any failure, do not propagate the context. } - - return Task.CompletedTask; } - public static Task OnConsumeCompleted(IMessageContext context) + public static void OnConsumeCompleted(IMessageContext context) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { activity?.Dispose(); } - - return Task.CompletedTask; } - public static Task OnConsumeError(IMessageContext context, Exception ex) + public static void OnConsumeError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { @@ -73,8 +69,6 @@ public static Task OnConsumeError(IMessageContext context, Exception ex) activity?.Dispose(); } - - return Task.CompletedTask; } private static IEnumerable ExtractTraceContextIntoBasicProperties(IMessageContext context, string key) diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs index 403e257d1..a7fa35d6a 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs @@ -16,7 +16,7 @@ internal static class OpenTelemetryProducerEventsHandler private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition"; private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; - public static Task OnProducerStarted(IMessageContext context) + public static void OnProducerStarted(IMessageContext context) { try { @@ -60,21 +60,17 @@ public static Task OnProducerStarted(IMessageContext context) { // If there is any failure, do not propagate the context. } - - return Task.CompletedTask; } - public static Task OnProducerCompleted(IMessageContext context) + public static void OnProducerCompleted(IMessageContext context) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { activity?.Dispose(); } - - return Task.CompletedTask; } - public static Task OnProducerError(IMessageContext context, Exception ex) + public static void OnProducerError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { @@ -83,8 +79,6 @@ public static Task OnProducerError(IMessageContext context, Exception ex) activity?.Dispose(); } - - return Task.CompletedTask; } private static void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value) diff --git a/src/KafkaFlow/Event.cs b/src/KafkaFlow/Event.cs index 44d73ad48..c965e8ef4 100644 --- a/src/KafkaFlow/Event.cs +++ b/src/KafkaFlow/Event.cs @@ -2,13 +2,15 @@ { using System; using System.Collections.Generic; + using System.Linq; using System.Threading.Tasks; internal class Event : IEvent { private readonly ILogHandler logHandler; - private readonly List> handlers = new(); + private readonly List> asyncHandlers = new(); + private readonly List> syncHandlers = new(); public Event(ILogHandler logHandler) { @@ -17,33 +19,64 @@ public Event(ILogHandler logHandler) public IEventSubscription Subscribe(Func handler) { - if (!this.handlers.Contains(handler)) + return this.Subscribe(this.asyncHandlers, handler); + } + + public IEventSubscription Subscribe(Action handler) + { + return this.Subscribe(this.syncHandlers, handler); + } + + internal Task FireAsync(TArg arg) + { + this.InvokeSyncHandlers(arg); + return this.InvokeAsyncHandlers(arg); + } + + private IEventSubscription Subscribe(List handlersList, T handler) + { + if (!handlersList.Contains(handler)) { - this.handlers.Add(handler); + handlersList.Add(handler); } - return new EventSubscription(() => this.handlers.Remove(handler)); + return new EventSubscription(() => handlersList.Remove(handler)); } - internal async Task FireAsync(TArg arg) + private async Task InvokeAsyncHandlers(TArg arg) { - foreach (var handler in this.handlers) + foreach (var handler in this.asyncHandlers.Where(h => h is not null)) { try { - if (handler is null) - { - continue; - } - await handler.Invoke(arg); } - catch (Exception e) + catch (Exception ex) + { + this.LogHandlerOnError(ex); + } + } + } + + private void InvokeSyncHandlers(TArg arg) + { + foreach (var handler in this.syncHandlers.Where(h => h is not null)) + { + try { - this.logHandler.Error("Error firing event", e, new { Event = this.GetType().Name }); + handler.Invoke(arg); + } + catch (Exception ex) + { + this.LogHandlerOnError(ex); } } } + + private void LogHandlerOnError(Exception ex) + { + this.logHandler.Error("Error firing event", ex, new { Event = this.GetType().Name }); + } } internal class Event : IEvent @@ -57,6 +90,8 @@ public Event(ILogHandler logHandler) public IEventSubscription Subscribe(Func handler) => this.evt.Subscribe(_ => handler.Invoke()); + public IEventSubscription Subscribe(Action handler) => this.evt.Subscribe(_ => handler.Invoke()); + internal Task FireAsync() => this.evt.FireAsync(null); } } \ No newline at end of file