From 2cf0fabe4e8a4c64922b9ddc1f383562b0cf91c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Mon, 9 Oct 2023 15:34:07 +0100 Subject: [PATCH] Include FireMessageConsumeCompletedAsync in complete method --- src/KafkaFlow.Abstractions/IConsumerContext.cs | 2 +- src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs | 2 +- .../BatchConsume/BatchConsumeMiddlewareTests.cs | 8 ++++---- src/KafkaFlow/Consumers/ConsumerContext.cs | 10 ++++++++-- src/KafkaFlow/Consumers/ConsumerWorker.cs | 5 +---- src/KafkaFlow/Consumers/ConsumerWorkerPool.cs | 6 +++++- 6 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/KafkaFlow.Abstractions/IConsumerContext.cs b/src/KafkaFlow.Abstractions/IConsumerContext.cs index b185d9c2b..f07b84042 100644 --- a/src/KafkaFlow.Abstractions/IConsumerContext.cs +++ b/src/KafkaFlow.Abstractions/IConsumerContext.cs @@ -91,7 +91,7 @@ public interface IConsumerContext /// By default, this method is automatically invoked when message processing concludes, unless /// the consumer is configured for manual message completion or the flag is set to false. /// - void Complete(); + void Complete(IMessageContext context); /// /// Get offset watermark data diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs index a6f0621df..3c26971c8 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs +++ b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs @@ -138,7 +138,7 @@ private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate nex { foreach (var messageContext in localBatch) { - messageContext.ConsumerContext.Complete(); + messageContext.ConsumerContext.Complete(context); } } } diff --git a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs index 2269db974..e1583c5eb 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs @@ -86,7 +86,7 @@ public async Task AddAsync_LessThanBatchSize_CallNextOnTimeout() this.timesNextWasCalled.Should().Be(0); await this.WaitBatchTimeoutAsync(); this.timesNextWasCalled.Should().Be(1); - consumerContext.Verify(x => x.Complete(), Times.Once); + consumerContext.Verify(x => x.Complete(context.Object), Times.Once); } [TestMethod] @@ -115,7 +115,7 @@ public async Task AddAsync_ExactlyBatchSize_CallNextInstantly() // Assert this.timesNextWasCalled.Should().Be(1); this.nextContext.GetMessagesBatch().Should().HaveCount(BatchSize); - consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize)); + consumerContext.Verify(x => x.Complete(contextMock.Object), Times.Exactly(BatchSize)); } [TestMethod] @@ -144,11 +144,11 @@ public async Task AddAsync_MoreThanBatchSize_CallNextInstantlyThenCallWhenTimeou // Assert this.timesNextWasCalled.Should().Be(1); this.nextContext.GetMessagesBatch().Should().HaveCount(BatchSize); - consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize)); + consumerContext.Verify(x => x.Complete(contextMock.Object), Times.Exactly(BatchSize)); await this.WaitBatchTimeoutAsync(); this.timesNextWasCalled.Should().Be(2); - consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize + 1)); + consumerContext.Verify(x => x.Complete(contextMock.Object), Times.Exactly(BatchSize + 1)); } [TestMethod] diff --git a/src/KafkaFlow/Consumers/ConsumerContext.cs b/src/KafkaFlow/Consumers/ConsumerContext.cs index 794dd7031..dc1c38ef6 100644 --- a/src/KafkaFlow/Consumers/ConsumerContext.cs +++ b/src/KafkaFlow/Consumers/ConsumerContext.cs @@ -13,6 +13,7 @@ internal class ConsumerContext : IConsumerContext private readonly IOffsetManager offsetManager; private readonly IConsumerWorker worker; private readonly IDependencyResolverScope messageDependencyScope; + private readonly GlobalEvents globalEvents; public ConsumerContext( IConsumer consumer, @@ -20,7 +21,8 @@ public ConsumerContext( ConsumeResult kafkaResult, IConsumerWorker worker, IDependencyResolverScope messageDependencyScope, - IDependencyResolver consumerDependencyResolver) + IDependencyResolver consumerDependencyResolver, + GlobalEvents globalEvents) { this.ConsumerDependencyResolver = consumerDependencyResolver; this.consumer = consumer; @@ -33,6 +35,7 @@ public ConsumerContext( kafkaResult.Partition.Value, kafkaResult.Offset.Value); this.MessageTimestamp = kafkaResult.Message.Timestamp.UtcDateTime; + this.globalEvents = globalEvents; } public string ConsumerName => this.consumer.Configuration.ConsumerName; @@ -63,8 +66,11 @@ public ConsumerContext( public Task Completion => this.completionSource.Task; - public void Complete() + public void Complete(IMessageContext context) { + this.globalEvents.FireMessageConsumeCompletedAsync( + new MessageEventContext(context)); + if (this.ShouldStoreOffset) { this.offsetManager.MarkAsProcessed(this); diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index 1b54b635e..da1e21683 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -153,13 +153,10 @@ await this.middlewareExecutor { if (context.ConsumerContext.AutoMessageCompletion) { - context.ConsumerContext.Complete(); + context.ConsumerContext.Complete(context); } await this.workerProcessingEnded.FireAsync(context); - - await this.globalEvents.FireMessageConsumeCompletedAsync( - new MessageEventContext(context)); } } catch (Exception ex) diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs index 5774d2c9e..970acfc79 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs @@ -16,6 +16,7 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool private readonly ILogHandler logHandler; private readonly Factory distributionStrategyFactory; private readonly IOffsetCommitter offsetCommitter; + private readonly GlobalEvents globalEvents; private readonly Event workerPoolStoppedSubject; @@ -47,6 +48,8 @@ public ConsumerWorkerPool( logHandler); this.offsetCommitter.PendingOffsetsStatisticsHandlers.AddRange(consumer.Configuration.PendingOffsetsStatisticsHandlers); + + this.globalEvents = this.consumerDependencyResolver.Resolve(); } public int CurrentWorkersCount { get; private set; } @@ -161,7 +164,8 @@ private MessageContext CreateMessageContext(ConsumeResult messag message, worker, messageDependencyScope, - this.consumerDependencyResolver), + this.consumerDependencyResolver, + this.globalEvents), null); return context; }