Skip to content

Commit

Permalink
Include FireMessageConsumeCompletedAsync in complete method
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Oct 9, 2023
1 parent 5e86a51 commit 2cf0fab
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/KafkaFlow.Abstractions/IConsumerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public interface IConsumerContext
/// By default, this method is automatically invoked when message processing concludes, unless
/// the consumer is configured for manual message completion or the <see cref="AutoMessageCompletion"/> flag is set to false.
/// </summary>
void Complete();
void Complete(IMessageContext context);

/// <summary>
/// Get offset watermark data
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate nex
{
foreach (var messageContext in localBatch)
{
messageContext.ConsumerContext.Complete();
messageContext.ConsumerContext.Complete(context);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public async Task AddAsync_LessThanBatchSize_CallNextOnTimeout()
this.timesNextWasCalled.Should().Be(0);
await this.WaitBatchTimeoutAsync();
this.timesNextWasCalled.Should().Be(1);
consumerContext.Verify(x => x.Complete(), Times.Once);
consumerContext.Verify(x => x.Complete(context.Object), Times.Once);
}

[TestMethod]
Expand Down Expand Up @@ -115,7 +115,7 @@ public async Task AddAsync_ExactlyBatchSize_CallNextInstantly()
// Assert
this.timesNextWasCalled.Should().Be(1);
this.nextContext.GetMessagesBatch().Should().HaveCount(BatchSize);
consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize));
consumerContext.Verify(x => x.Complete(contextMock.Object), Times.Exactly(BatchSize));
}

[TestMethod]
Expand Down Expand Up @@ -144,11 +144,11 @@ public async Task AddAsync_MoreThanBatchSize_CallNextInstantlyThenCallWhenTimeou
// Assert
this.timesNextWasCalled.Should().Be(1);
this.nextContext.GetMessagesBatch().Should().HaveCount(BatchSize);
consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize));
consumerContext.Verify(x => x.Complete(contextMock.Object), Times.Exactly(BatchSize));

await this.WaitBatchTimeoutAsync();
this.timesNextWasCalled.Should().Be(2);
consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize + 1));
consumerContext.Verify(x => x.Complete(contextMock.Object), Times.Exactly(BatchSize + 1));
}

[TestMethod]
Expand Down
10 changes: 8 additions & 2 deletions src/KafkaFlow/Consumers/ConsumerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ internal class ConsumerContext : IConsumerContext
private readonly IOffsetManager offsetManager;
private readonly IConsumerWorker worker;
private readonly IDependencyResolverScope messageDependencyScope;
private readonly GlobalEvents globalEvents;

public ConsumerContext(
IConsumer consumer,
IOffsetManager offsetManager,
ConsumeResult<byte[], byte[]> kafkaResult,
IConsumerWorker worker,
IDependencyResolverScope messageDependencyScope,
IDependencyResolver consumerDependencyResolver)
IDependencyResolver consumerDependencyResolver,
GlobalEvents globalEvents)
{
this.ConsumerDependencyResolver = consumerDependencyResolver;
this.consumer = consumer;
Expand All @@ -33,6 +35,7 @@ public ConsumerContext(
kafkaResult.Partition.Value,
kafkaResult.Offset.Value);
this.MessageTimestamp = kafkaResult.Message.Timestamp.UtcDateTime;
this.globalEvents = globalEvents;
}

public string ConsumerName => this.consumer.Configuration.ConsumerName;
Expand Down Expand Up @@ -63,8 +66,11 @@ public ConsumerContext(

public Task<TopicPartitionOffset> Completion => this.completionSource.Task;

public void Complete()
public void Complete(IMessageContext context)
{
this.globalEvents.FireMessageConsumeCompletedAsync(
new MessageEventContext(context));

if (this.ShouldStoreOffset)
{
this.offsetManager.MarkAsProcessed(this);
Expand Down
5 changes: 1 addition & 4 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,10 @@ await this.middlewareExecutor
{
if (context.ConsumerContext.AutoMessageCompletion)
{
context.ConsumerContext.Complete();
context.ConsumerContext.Complete(context);
}

await this.workerProcessingEnded.FireAsync(context);

await this.globalEvents.FireMessageConsumeCompletedAsync(
new MessageEventContext(context));
}
}
catch (Exception ex)
Expand Down
6 changes: 5 additions & 1 deletion src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool
private readonly ILogHandler logHandler;
private readonly Factory<IDistributionStrategy> distributionStrategyFactory;
private readonly IOffsetCommitter offsetCommitter;
private readonly GlobalEvents globalEvents;

private readonly Event workerPoolStoppedSubject;

Expand Down Expand Up @@ -47,6 +48,8 @@ public ConsumerWorkerPool(
logHandler);

this.offsetCommitter.PendingOffsetsStatisticsHandlers.AddRange(consumer.Configuration.PendingOffsetsStatisticsHandlers);

this.globalEvents = this.consumerDependencyResolver.Resolve<GlobalEvents>();
}

public int CurrentWorkersCount { get; private set; }
Expand Down Expand Up @@ -161,7 +164,8 @@ private MessageContext CreateMessageContext(ConsumeResult<byte[], byte[]> messag
message,
worker,
messageDependencyScope,
this.consumerDependencyResolver),
this.consumerDependencyResolver,
this.globalEvents),
null);
return context;
}
Expand Down

0 comments on commit 2cf0fab

Please sign in to comment.