From 6729912648e8e56fe9ba0cbef8d329d9ace4f1c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Wed, 22 Nov 2023 15:20:42 +0000 Subject: [PATCH] test --- src/KafkaFlow/Consumers/ConsumerWorker.cs | 2 +- src/KafkaFlow/Event.cs | 6 +++++- src/KafkaFlow/GlobalEvents.cs | 3 +++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index cce7308ea..29e269959 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -127,7 +127,7 @@ private async Task ProcessMessageAsync(ConsumeResult message, Ca message.TopicPartitionOffset, () => scope.Dispose()); - await this.globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context)); + this.globalEvents.FireMessageConsumeStarted(new MessageEventContext(context)); await this.middlewareExecutor .Execute(scope.Resolver, context, _ => Task.CompletedTask) diff --git a/src/KafkaFlow/Event.cs b/src/KafkaFlow/Event.cs index c965e8ef4..0c6682f48 100644 --- a/src/KafkaFlow/Event.cs +++ b/src/KafkaFlow/Event.cs @@ -29,10 +29,14 @@ public IEventSubscription Subscribe(Action handler) internal Task FireAsync(TArg arg) { - this.InvokeSyncHandlers(arg); return this.InvokeAsyncHandlers(arg); } + internal void Fire(TArg arg) + { + this.InvokeSyncHandlers(arg); + } + private IEventSubscription Subscribe(List handlersList, T handler) { if (!handlersList.Contains(handler)) diff --git a/src/KafkaFlow/GlobalEvents.cs b/src/KafkaFlow/GlobalEvents.cs index c4e4212a3..3b09aef5d 100644 --- a/src/KafkaFlow/GlobalEvents.cs +++ b/src/KafkaFlow/GlobalEvents.cs @@ -37,6 +37,9 @@ public GlobalEvents(ILogHandler log) public Task FireMessageConsumeStartedAsync(MessageEventContext context) => this.messageConsumeStarted.FireAsync(context); + public void FireMessageConsumeStarted(MessageEventContext context) + => this.messageConsumeStarted.Fire(context); + public Task FireMessageConsumeErrorAsync(MessageErrorEventContext context) => this.messageConsumeError.FireAsync(context);