Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Nov 22, 2023
1 parent a45ea76 commit 6729912
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private async Task ProcessMessageAsync(ConsumeResult<byte[], byte[]> message, Ca
message.TopicPartitionOffset,
() => scope.Dispose());

await this.globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context));
this.globalEvents.FireMessageConsumeStarted(new MessageEventContext(context));

await this.middlewareExecutor
.Execute(scope.Resolver, context, _ => Task.CompletedTask)
Expand Down
6 changes: 5 additions & 1 deletion src/KafkaFlow/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ public IEventSubscription Subscribe(Action<TArg> handler)

internal Task FireAsync(TArg arg)
{
this.InvokeSyncHandlers(arg);
return this.InvokeAsyncHandlers(arg);
}

internal void Fire(TArg arg)
{
this.InvokeSyncHandlers(arg);
}

private IEventSubscription Subscribe<T>(List<T> handlersList, T handler)
{
if (!handlersList.Contains(handler))
Expand Down
3 changes: 3 additions & 0 deletions src/KafkaFlow/GlobalEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public GlobalEvents(ILogHandler log)
public Task FireMessageConsumeStartedAsync(MessageEventContext context)
=> this.messageConsumeStarted.FireAsync(context);

public void FireMessageConsumeStarted(MessageEventContext context)
=> this.messageConsumeStarted.Fire(context);

public Task FireMessageConsumeErrorAsync(MessageErrorEventContext context)
=> this.messageConsumeError.FireAsync(context);

Expand Down

0 comments on commit 6729912

Please sign in to comment.