diff --git a/src/KafkaFlow.Abstractions/IEvent.cs b/src/KafkaFlow.Abstractions/IEvent.cs index f9a055b4f..0a6617bc3 100644 --- a/src/KafkaFlow.Abstractions/IEvent.cs +++ b/src/KafkaFlow.Abstractions/IEvent.cs @@ -8,12 +8,19 @@ /// public interface IEvent { + /// + /// Subscribes to the async event. + /// + /// The handler to be called when the async event is fired. + /// Event subscription reference + IEventSubscription Subscribe(Func handler); + /// /// Subscribes to the event. /// /// The handler to be called when the event is fired. /// Event subscription reference - IEventSubscription Subscribe(Func handler); + IEventSubscription Subscribe(Action handler); } /// @@ -22,11 +29,18 @@ public interface IEvent /// The argument expected by the event. public interface IEvent { + /// + /// Subscribes to the async event. + /// + /// The handler to be called when the async event is fired. + /// Event subscription reference + IEventSubscription Subscribe(Func handler); + /// /// Subscribes to the event. /// /// The handler to be called when the event is fired. /// Event subscription reference - IEventSubscription Subscribe(Func handler); + IEventSubscription Subscribe(Action handler); } } \ No newline at end of file diff --git a/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json new file mode 100644 index 000000000..a3f79fedf --- /dev/null +++ b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json @@ -0,0 +1,12 @@ +{ + "profiles": { + "KafkaFlow.Admin.Dashboard": { + "commandName": "Project", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + }, + "applicationUrl": "https://localhost:63908;http://localhost:63909" + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs index 4a73f4f4d..69bc10b42 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs @@ -1,13 +1,18 @@ namespace KafkaFlow.IntegrationTests.Core.Middlewares { + using System.Diagnostics; using System.Threading.Tasks; using KafkaFlow.IntegrationTests.Core.Handlers; + using KafkaFlow.OpenTelemetry; internal class GzipMiddleware : IMessageMiddleware { public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { - MessageStorage.Add((byte[]) context.Message.Value); + var source = new ActivitySource(KafkaFlowInstrumentation.ActivitySourceName); + using var activity = source.StartActivity("integration-test", ActivityKind.Internal); + + MessageStorage.Add((byte[])context.Message.Value); await next(context); } } diff --git a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs index 05a61f306..53c3f2ec3 100644 --- a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs +++ b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs @@ -57,7 +57,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans await producer.ProduceAsync(null, message); // Assert - var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); Assert.IsNotNull(this.exportedItems); Assert.IsNull(producerSpan.ParentId); @@ -65,6 +65,35 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); } + [TestMethod] + public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity() + { + // Arrange + var provider = await this.GetServiceProvider(); + MessageStorage.Clear(); + + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource("KafkaFlow.OpenTelemetry") + .AddInMemoryExporter(this.exportedItems) + .Build(); + + var producer = provider.GetRequiredService>(); + var message = this.fixture.Create(); + + // Act + await producer.ProduceAsync(null, message); + + // Assert + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); + + Assert.IsNotNull(this.exportedItems); + Assert.IsNull(producerSpan.ParentId); + Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); + Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); + Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId); + Assert.AreEqual(internalSpan.ParentSpanId, consumerSpan.SpanId); + } + [TestMethod] public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer() { @@ -98,7 +127,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp await producer.ProduceAsync(null, message); // Assert - var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); Assert.IsNotNull(this.exportedItems); Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); @@ -182,9 +211,9 @@ await Policy .ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned)); } - private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync() + private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync() { - Activity producerSpan = null, consumerSpan = null; + Activity producerSpan = null, consumerSpan = null, internalSpan = null; await Policy .HandleResult(isAvailable => !isAvailable) @@ -193,11 +222,12 @@ await Policy { producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer); consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer); + internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal); return Task.FromResult(producerSpan != null && consumerSpan != null); }); - return (producerSpan, consumerSpan); + return (producerSpan, consumerSpan, internalSpan); } } } diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs index cea4febc2..bae4f525b 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -17,7 +17,7 @@ internal static class OpenTelemetryConsumerEventsHandler private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition"; private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; - public static Task OnConsumeStarted(IMessageContext context) + public static void OnConsumeStarted(IMessageContext context) { try { @@ -50,21 +50,17 @@ public static Task OnConsumeStarted(IMessageContext context) { // If there is any failure, do not propagate the context. } - - return Task.CompletedTask; } - public static Task OnConsumeCompleted(IMessageContext context) + public static void OnConsumeCompleted(IMessageContext context) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { activity?.Dispose(); } - - return Task.CompletedTask; } - public static Task OnConsumeError(IMessageContext context, Exception ex) + public static void OnConsumeError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { @@ -73,8 +69,6 @@ public static Task OnConsumeError(IMessageContext context, Exception ex) activity?.Dispose(); } - - return Task.CompletedTask; } private static IEnumerable ExtractTraceContextIntoBasicProperties(IMessageContext context, string key) diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs index 403e257d1..a7fa35d6a 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs @@ -16,7 +16,7 @@ internal static class OpenTelemetryProducerEventsHandler private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition"; private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; - public static Task OnProducerStarted(IMessageContext context) + public static void OnProducerStarted(IMessageContext context) { try { @@ -60,21 +60,17 @@ public static Task OnProducerStarted(IMessageContext context) { // If there is any failure, do not propagate the context. } - - return Task.CompletedTask; } - public static Task OnProducerCompleted(IMessageContext context) + public static void OnProducerCompleted(IMessageContext context) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { activity?.Dispose(); } - - return Task.CompletedTask; } - public static Task OnProducerError(IMessageContext context, Exception ex) + public static void OnProducerError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { @@ -83,8 +79,6 @@ public static Task OnProducerError(IMessageContext context, Exception ex) activity?.Dispose(); } - - return Task.CompletedTask; } private static void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value) diff --git a/src/KafkaFlow/Event.cs b/src/KafkaFlow/Event.cs index 44d73ad48..c965e8ef4 100644 --- a/src/KafkaFlow/Event.cs +++ b/src/KafkaFlow/Event.cs @@ -2,13 +2,15 @@ { using System; using System.Collections.Generic; + using System.Linq; using System.Threading.Tasks; internal class Event : IEvent { private readonly ILogHandler logHandler; - private readonly List> handlers = new(); + private readonly List> asyncHandlers = new(); + private readonly List> syncHandlers = new(); public Event(ILogHandler logHandler) { @@ -17,33 +19,64 @@ public Event(ILogHandler logHandler) public IEventSubscription Subscribe(Func handler) { - if (!this.handlers.Contains(handler)) + return this.Subscribe(this.asyncHandlers, handler); + } + + public IEventSubscription Subscribe(Action handler) + { + return this.Subscribe(this.syncHandlers, handler); + } + + internal Task FireAsync(TArg arg) + { + this.InvokeSyncHandlers(arg); + return this.InvokeAsyncHandlers(arg); + } + + private IEventSubscription Subscribe(List handlersList, T handler) + { + if (!handlersList.Contains(handler)) { - this.handlers.Add(handler); + handlersList.Add(handler); } - return new EventSubscription(() => this.handlers.Remove(handler)); + return new EventSubscription(() => handlersList.Remove(handler)); } - internal async Task FireAsync(TArg arg) + private async Task InvokeAsyncHandlers(TArg arg) { - foreach (var handler in this.handlers) + foreach (var handler in this.asyncHandlers.Where(h => h is not null)) { try { - if (handler is null) - { - continue; - } - await handler.Invoke(arg); } - catch (Exception e) + catch (Exception ex) + { + this.LogHandlerOnError(ex); + } + } + } + + private void InvokeSyncHandlers(TArg arg) + { + foreach (var handler in this.syncHandlers.Where(h => h is not null)) + { + try { - this.logHandler.Error("Error firing event", e, new { Event = this.GetType().Name }); + handler.Invoke(arg); + } + catch (Exception ex) + { + this.LogHandlerOnError(ex); } } } + + private void LogHandlerOnError(Exception ex) + { + this.logHandler.Error("Error firing event", ex, new { Event = this.GetType().Name }); + } } internal class Event : IEvent @@ -57,6 +90,8 @@ public Event(ILogHandler logHandler) public IEventSubscription Subscribe(Func handler) => this.evt.Subscribe(_ => handler.Invoke()); + public IEventSubscription Subscribe(Action handler) => this.evt.Subscribe(_ => handler.Invoke()); + internal Task FireAsync() => this.evt.FireAsync(null); } } \ No newline at end of file