diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index cce7308ea..29e269959 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -127,7 +127,7 @@ private async Task ProcessMessageAsync(ConsumeResult message, Ca message.TopicPartitionOffset, () => scope.Dispose()); - await this.globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context)); + this.globalEvents.FireMessageConsumeStarted(new MessageEventContext(context)); await this.middlewareExecutor .Execute(scope.Resolver, context, _ => Task.CompletedTask) diff --git a/src/KafkaFlow/Event.cs b/src/KafkaFlow/Event.cs index c965e8ef4..0c6682f48 100644 --- a/src/KafkaFlow/Event.cs +++ b/src/KafkaFlow/Event.cs @@ -29,10 +29,14 @@ public IEventSubscription Subscribe(Action handler) internal Task FireAsync(TArg arg) { - this.InvokeSyncHandlers(arg); return this.InvokeAsyncHandlers(arg); } + internal void Fire(TArg arg) + { + this.InvokeSyncHandlers(arg); + } + private IEventSubscription Subscribe(List handlersList, T handler) { if (!handlersList.Contains(handler)) diff --git a/src/KafkaFlow/GlobalEvents.cs b/src/KafkaFlow/GlobalEvents.cs index c4e4212a3..3b09aef5d 100644 --- a/src/KafkaFlow/GlobalEvents.cs +++ b/src/KafkaFlow/GlobalEvents.cs @@ -37,6 +37,9 @@ public GlobalEvents(ILogHandler log) public Task FireMessageConsumeStartedAsync(MessageEventContext context) => this.messageConsumeStarted.FireAsync(context); + public void FireMessageConsumeStarted(MessageEventContext context) + => this.messageConsumeStarted.Fire(context); + public Task FireMessageConsumeErrorAsync(MessageErrorEventContext context) => this.messageConsumeError.FireAsync(context);