diff --git a/src/KafkaFlow.Abstractions/IEvent.cs b/src/KafkaFlow.Abstractions/IEvent.cs index 0a6617bc3..f9a055b4f 100644 --- a/src/KafkaFlow.Abstractions/IEvent.cs +++ b/src/KafkaFlow.Abstractions/IEvent.cs @@ -8,19 +8,12 @@ /// public interface IEvent { - /// - /// Subscribes to the async event. - /// - /// The handler to be called when the async event is fired. - /// Event subscription reference - IEventSubscription Subscribe(Func handler); - /// /// Subscribes to the event. /// /// The handler to be called when the event is fired. /// Event subscription reference - IEventSubscription Subscribe(Action handler); + IEventSubscription Subscribe(Func handler); } /// @@ -29,18 +22,11 @@ public interface IEvent /// The argument expected by the event. public interface IEvent { - /// - /// Subscribes to the async event. - /// - /// The handler to be called when the async event is fired. - /// Event subscription reference - IEventSubscription Subscribe(Func handler); - /// /// Subscribes to the event. /// /// The handler to be called when the event is fired. /// Event subscription reference - IEventSubscription Subscribe(Action handler); + IEventSubscription Subscribe(Func handler); } } \ No newline at end of file diff --git a/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json deleted file mode 100644 index a3f79fedf..000000000 --- a/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "profiles": { - "KafkaFlow.Admin.Dashboard": { - "commandName": "Project", - "launchBrowser": true, - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - }, - "applicationUrl": "https://localhost:63908;http://localhost:63909" - } - } -} \ No newline at end of file diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs index bae4f525b..cea4febc2 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -17,7 +17,7 @@ internal static class OpenTelemetryConsumerEventsHandler private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition"; private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; - public static void OnConsumeStarted(IMessageContext context) + public static Task OnConsumeStarted(IMessageContext context) { try { @@ -50,17 +50,21 @@ public static void OnConsumeStarted(IMessageContext context) { // If there is any failure, do not propagate the context. } + + return Task.CompletedTask; } - public static void OnConsumeCompleted(IMessageContext context) + public static Task OnConsumeCompleted(IMessageContext context) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { activity?.Dispose(); } + + return Task.CompletedTask; } - public static void OnConsumeError(IMessageContext context, Exception ex) + public static Task OnConsumeError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { @@ -69,6 +73,8 @@ public static void OnConsumeError(IMessageContext context, Exception ex) activity?.Dispose(); } + + return Task.CompletedTask; } private static IEnumerable ExtractTraceContextIntoBasicProperties(IMessageContext context, string key) diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs index a7fa35d6a..403e257d1 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs @@ -16,7 +16,7 @@ internal static class OpenTelemetryProducerEventsHandler private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition"; private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; - public static void OnProducerStarted(IMessageContext context) + public static Task OnProducerStarted(IMessageContext context) { try { @@ -60,17 +60,21 @@ public static void OnProducerStarted(IMessageContext context) { // If there is any failure, do not propagate the context. } + + return Task.CompletedTask; } - public static void OnProducerCompleted(IMessageContext context) + public static Task OnProducerCompleted(IMessageContext context) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { activity?.Dispose(); } + + return Task.CompletedTask; } - public static void OnProducerError(IMessageContext context, Exception ex) + public static Task OnProducerError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { @@ -79,6 +83,8 @@ public static void OnProducerError(IMessageContext context, Exception ex) activity?.Dispose(); } + + return Task.CompletedTask; } private static void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value) diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index 29e269959..cce7308ea 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -127,7 +127,7 @@ private async Task ProcessMessageAsync(ConsumeResult message, Ca message.TopicPartitionOffset, () => scope.Dispose()); - this.globalEvents.FireMessageConsumeStarted(new MessageEventContext(context)); + await this.globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context)); await this.middlewareExecutor .Execute(scope.Resolver, context, _ => Task.CompletedTask) diff --git a/src/KafkaFlow/Event.cs b/src/KafkaFlow/Event.cs index 0c6682f48..39432d58e 100644 --- a/src/KafkaFlow/Event.cs +++ b/src/KafkaFlow/Event.cs @@ -9,8 +9,7 @@ internal class Event : IEvent { private readonly ILogHandler logHandler; - private readonly List> asyncHandlers = new(); - private readonly List> syncHandlers = new(); + private readonly List> handlers = new(); public Event(ILogHandler logHandler) { @@ -19,61 +18,43 @@ public Event(ILogHandler logHandler) public IEventSubscription Subscribe(Func handler) { - return this.Subscribe(this.asyncHandlers, handler); - } - - public IEventSubscription Subscribe(Action handler) - { - return this.Subscribe(this.syncHandlers, handler); - } + if (handler is null) + { + throw new ArgumentNullException("Handler cannot be null"); + } - internal Task FireAsync(TArg arg) - { - return this.InvokeAsyncHandlers(arg); - } + if (!this.handlers.Contains(handler)) + { + this.handlers.Add(handler); + } - internal void Fire(TArg arg) - { - this.InvokeSyncHandlers(arg); + return new EventSubscription(() => this.handlers.Remove(handler)); } - private IEventSubscription Subscribe(List handlersList, T handler) + internal Task FireAsync(TArg arg) { - if (!handlersList.Contains(handler)) - { - handlersList.Add(handler); - } + var tasks = this.handlers + .Select(handler => this.ProcessHandler(handler, arg)); - return new EventSubscription(() => handlersList.Remove(handler)); + return Task.WhenAll(tasks); } - private async Task InvokeAsyncHandlers(TArg arg) + private Task ProcessHandler(Func handler, TArg arg) { - foreach (var handler in this.asyncHandlers.Where(h => h is not null)) + try { - try - { - await handler.Invoke(arg); - } - catch (Exception ex) + return handler.Invoke(arg).ContinueWith(t => { - this.LogHandlerOnError(ex); - } + if (t.IsFaulted) + { + this.LogHandlerOnError(t.Exception); + } + }); } - } - - private void InvokeSyncHandlers(TArg arg) - { - foreach (var handler in this.syncHandlers.Where(h => h is not null)) + catch (Exception ex) { - try - { - handler.Invoke(arg); - } - catch (Exception ex) - { - this.LogHandlerOnError(ex); - } + this.LogHandlerOnError(ex); + return Task.CompletedTask; } } @@ -94,8 +75,6 @@ public Event(ILogHandler logHandler) public IEventSubscription Subscribe(Func handler) => this.evt.Subscribe(_ => handler.Invoke()); - public IEventSubscription Subscribe(Action handler) => this.evt.Subscribe(_ => handler.Invoke()); - internal Task FireAsync() => this.evt.FireAsync(null); } } \ No newline at end of file diff --git a/src/KafkaFlow/GlobalEvents.cs b/src/KafkaFlow/GlobalEvents.cs index 3b09aef5d..c4e4212a3 100644 --- a/src/KafkaFlow/GlobalEvents.cs +++ b/src/KafkaFlow/GlobalEvents.cs @@ -37,9 +37,6 @@ public GlobalEvents(ILogHandler log) public Task FireMessageConsumeStartedAsync(MessageEventContext context) => this.messageConsumeStarted.FireAsync(context); - public void FireMessageConsumeStarted(MessageEventContext context) - => this.messageConsumeStarted.Fire(context); - public Task FireMessageConsumeErrorAsync(MessageErrorEventContext context) => this.messageConsumeError.FireAsync(context);