From 8d13d7d92d78119b715c87cd5fbb8417585eb80d Mon Sep 17 00:00:00 2001 From: Filipe Esch Date: Wed, 13 Sep 2023 15:26:59 +0100 Subject: [PATCH] refactor: rename StoreOffset to Complete --- .../PauseConsumerOnExceptionMiddleware.cs | 2 +- .../IConsumerConfigurationBuilder.cs | 11 +++----- .../Configuration/WorkersCountContext.cs | 12 +++++++++ .../IConsumerContext.cs | 16 ++++++------ .../BatchConsumeExtensions.cs | 2 +- .../BatchConsumeMiddleware.cs | 8 +++--- .../BatchConsumeMiddlewareTests.cs | 8 +++--- .../ConsumerConfigurationBuilderTests.cs | 2 +- .../PartitionOffsetsTests.cs | 26 +++++++++---------- .../Configuration/ConsumerConfiguration.cs | 6 ++--- .../ConsumerConfigurationBuilder.cs | 14 +++------- .../Configuration/IConsumerConfiguration.cs | 4 +-- .../KafkaConfigurationBuilder.cs | 4 +-- src/KafkaFlow/ConsumerManagerFactory.cs | 12 +++++---- src/KafkaFlow/Consumers/Consumer.cs | 5 ++++ src/KafkaFlow/Consumers/ConsumerContext.cs | 5 ++-- src/KafkaFlow/Consumers/ConsumerManager.cs | 4 --- ...ontext.cs => ConsumerMiddlewareContext.cs} | 2 +- src/KafkaFlow/Consumers/ConsumerWorker.cs | 10 +++---- .../Consumers/IWorkerLifetimeContext.cs | 5 ++-- src/KafkaFlow/Consumers/OffsetCommitter.cs | 2 -- src/KafkaFlow/Consumers/OffsetManager.cs | 4 +-- src/KafkaFlow/Consumers/PartitionOffsets.cs | 8 +++--- src/KafkaFlow/KafkaBus.cs | 6 ++--- 24 files changed, 92 insertions(+), 86 deletions(-) rename src/KafkaFlow/Consumers/{WorkerLifetimeContext.cs => ConsumerMiddlewareContext.cs} (65%) diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs b/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs index d78811488..3daaa5a54 100644 --- a/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs +++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs @@ -21,7 +21,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) } catch (Exception exception) { - context.ConsumerContext.ShouldStoreOffset = false; + context.ConsumerContext.AutoCompleteMessage = false; this.logHandler.Error("Error handling message", exception, new { diff --git a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs index 3f48c485d..68ac372f1 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs @@ -150,16 +150,11 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy() where T : class, IDistributionStrategy; /// - /// Offsets will be stored after the execution of the handler and middlewares automatically, this is the default behaviour + /// Configures the consumer for manual message completion. + /// The client should call the to mark the message processing as finished /// /// - IConsumerConfigurationBuilder WithAutoStoreOffsets(); - - /// - /// The client should call the - /// - /// - IConsumerConfigurationBuilder WithManualStoreOffsets(); + IConsumerConfigurationBuilder WithManualMessageCompletion(); /// /// No offsets will be stored on Kafka diff --git a/src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs b/src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs index 1a476f02f..a54a4b82f 100644 --- a/src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs +++ b/src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs @@ -7,6 +7,12 @@ namespace KafkaFlow.Configuration /// public class WorkersCountContext { + /// + /// Initializes a new instance of the class. + /// + /// The consumer's name + /// The consumer's group id + /// The consumer's assigned partition public WorkersCountContext( string consumerName, string consumerGroupId, @@ -17,8 +23,14 @@ public WorkersCountContext( this.AssignedTopicsPartitions = assignedTopicsPartitions; } + /// + /// Gets the consumer's name + /// public string ConsumerName { get; } + /// + /// Gets the consumer's group id + /// public string ConsumerGroupId { get; } /// diff --git a/src/KafkaFlow.Abstractions/IConsumerContext.cs b/src/KafkaFlow.Abstractions/IConsumerContext.cs index 44b57e47c..5e5d2c1e9 100644 --- a/src/KafkaFlow.Abstractions/IConsumerContext.cs +++ b/src/KafkaFlow.Abstractions/IConsumerContext.cs @@ -55,9 +55,9 @@ public interface IConsumerContext DateTime MessageTimestamp { get; } /// - /// Gets or sets a value indicating whether if the framework should store the current offset in the end when auto store offset is used + /// Gets or sets a value indicating whether if the framework should auto complete the message in the end /// - bool ShouldStoreOffset { get; set; } + bool AutoCompleteMessage { get; set; } /// /// Gets an instance of IDependencyResolver which provides methods to resolve dependencies. @@ -74,12 +74,12 @@ public interface IConsumerContext IDependencyResolver WorkerDependencyResolver { get; } /// - /// Stores the message offset to eventually be committed. After this call, the framework considers the - /// message processing as finished and releases resources associated with the message. - /// By default, this method is automatically called when the message processing ends, unless - /// the consumer is set to manual store offsets or the flag is set to false. + /// Signals the completion of message processing and stores the message offset to eventually be committed. + /// After this call, the framework marks the message processing as finished and releases resources associated with the message. + /// By default, this method is automatically invoked when message processing concludes, unless + /// the consumer is configured for manual message completion or the flag is set to false. /// - void StoreOffset(); + void Complete(); /// /// Get offset watermark data @@ -98,7 +98,7 @@ public interface IConsumerContext void Resume(); /// - /// Gets a Task that completes when the method is invoked, + /// Gets a Task that completes when the method is invoked, /// indicating the end of message processing. This allows async operations /// to wait for the message to be fully processed and its offset stored. /// diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs b/src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs index 932c66a51..9e3326156 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs +++ b/src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs @@ -24,7 +24,7 @@ public static IConsumerMiddlewareConfigurationBuilder BatchConsume( { return builder.Add( resolver => new BatchConsumeMiddleware( - resolver.Resolve(), + resolver.Resolve(), batchSize, batchTimeout, resolver.Resolve()), diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs index 6f007a4ba..24117ba8d 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs +++ b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs @@ -26,7 +26,7 @@ internal class BatchConsumeMiddleware private Task dispatchTask; public BatchConsumeMiddleware( - IWorkerLifetimeContext workerContext, + IConsumerMiddlewareContext workerContext, int batchSize, TimeSpan batchTimeout, ILogHandler logHandler) @@ -46,7 +46,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) try { - context.ConsumerContext.ShouldStoreOffset = false; + context.ConsumerContext.AutoCompleteMessage = false; this.batch.Add(context); @@ -138,11 +138,11 @@ private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate nex this.dispatchSemaphore.Release(); } - if (this.consumerConfiguration.AutoStoreOffsets) + if (this.consumerConfiguration.AutoMessageCompletion) { foreach (var messageContext in localBatch) { - messageContext.ConsumerContext.StoreOffset(); + messageContext.ConsumerContext.Complete(); } } } diff --git a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs index 5e75f3c4d..0a16eeffb 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs @@ -54,7 +54,7 @@ public async Task AddAsync_LessThanBatchSize_CallNextOnTimeout() this.timesNextWasCalled.Should().Be(0); await this.WaitBatchTimeoutAsync(); this.timesNextWasCalled.Should().Be(1); - consumerContext.Verify(x => x.StoreOffset(), Times.Once); + consumerContext.Verify(x => x.Complete(), Times.Once); } [TestMethod] @@ -79,7 +79,7 @@ public async Task AddAsync_ExactlyBatchSize_CallNextInstantly() // Assert this.timesNextWasCalled.Should().Be(1); this.nextContext.GetMessagesBatch().Should().HaveCount(BatchSize); - consumerContext.Verify(x => x.StoreOffset(), Times.Exactly(BatchSize)); + consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize)); } [TestMethod] @@ -104,11 +104,11 @@ public async Task AddAsync_MoreThanBatchSize_CallNextInstantlyThenCallWhenTimeou // Assert this.timesNextWasCalled.Should().Be(1); this.nextContext.GetMessagesBatch().Should().HaveCount(BatchSize); - consumerContext.Verify(x => x.StoreOffset(), Times.Exactly(BatchSize)); + consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize)); await this.WaitBatchTimeoutAsync(); this.timesNextWasCalled.Should().Be(2); - consumerContext.Verify(x => x.StoreOffset(), Times.Exactly(BatchSize + 1)); + consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize + 1)); } [TestMethod] diff --git a/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs b/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs index 16e06241c..51e387ffb 100644 --- a/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs +++ b/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs @@ -98,7 +98,7 @@ public void Build_AllCalls_ReturnPassedValues() .WithWorkersCount(workers) .WithGroupId(groupId) .WithAutoOffsetReset(offsetReset) - .WithManualStoreOffsets() + .WithManualMessageCompletion() .WithAutoCommitIntervalMs(autoCommitInterval) .WithMaxPollIntervalMs(maxPollIntervalMs) .WithConsumerConfig(consumerConfig) diff --git a/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs b/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs index fdd46c766..323f0088c 100644 --- a/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs +++ b/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs @@ -20,7 +20,7 @@ public void ShouldUpdate_WithoutAddingOffsets_ThrowsException() var offsets = new PartitionOffsets(); // Act - Func act = () => offsets.ShouldCommit(1, out _); + Func act = () => offsets.IsCommitAllowed(1, out _); // Assert act.Should().Throw(); @@ -34,7 +34,7 @@ public void ShouldUpdateOffset_NextOffset_ShouldUpdate() offsets.Enqueue(1); // Act - var shouldUpdate = offsets.ShouldCommit(1, out var offset); + var shouldUpdate = offsets.IsCommitAllowed(1, out var offset); // Assert Assert.IsTrue(shouldUpdate); @@ -49,7 +49,7 @@ public void ShouldUpdateOffset_WithOneGap_ShouldNotUpdate() offsets.Enqueue(1); // Act - var shouldUpdate = offsets.ShouldCommit(2, out var offset); + var shouldUpdate = offsets.IsCommitAllowed(2, out var offset); // Assert Assert.IsFalse(shouldUpdate); @@ -76,63 +76,63 @@ public void ShouldUpdateOffset_WithManyGaps_ShouldUpdate() { new { - ShouldUpdateResult = offsets.ShouldCommit(7, out long offset), + ShouldUpdateResult = offsets.IsCommitAllowed(7, out long offset), ShouldUpdateExpected = false, LastOffsetResult = offset, LastOffsetExpected = -1, }, new { - ShouldUpdateResult = offsets.ShouldCommit(1, out offset), + ShouldUpdateResult = offsets.IsCommitAllowed(1, out offset), ShouldUpdateExpected = true, LastOffsetResult = offset, LastOffsetExpected = 1, }, new { - ShouldUpdateResult = offsets.ShouldCommit(2, out offset), + ShouldUpdateResult = offsets.IsCommitAllowed(2, out offset), ShouldUpdateExpected = true, LastOffsetResult = offset, LastOffsetExpected = 2, }, new { - ShouldUpdateResult = offsets.ShouldCommit(20, out offset), + ShouldUpdateResult = offsets.IsCommitAllowed(20, out offset), ShouldUpdateExpected = false, LastOffsetResult = offset, LastOffsetExpected = -1, }, new { - ShouldUpdateResult = offsets.ShouldCommit(5, out offset), + ShouldUpdateResult = offsets.IsCommitAllowed(5, out offset), ShouldUpdateExpected = false, LastOffsetResult = offset, LastOffsetExpected = -1, }, new { - ShouldUpdateResult = offsets.ShouldCommit(8, out offset), + ShouldUpdateResult = offsets.IsCommitAllowed(8, out offset), ShouldUpdateExpected = false, LastOffsetResult = offset, LastOffsetExpected = -1, }, new { - ShouldUpdateResult = offsets.ShouldCommit(4, out offset), + ShouldUpdateResult = offsets.IsCommitAllowed(4, out offset), ShouldUpdateExpected = true, LastOffsetResult = offset, LastOffsetExpected = 8, }, new { - ShouldUpdateResult = offsets.ShouldCommit(15, out offset), + ShouldUpdateResult = offsets.IsCommitAllowed(15, out offset), ShouldUpdateExpected = true, LastOffsetResult = offset, LastOffsetExpected = 20, }, new { - ShouldUpdateResult = offsets.ShouldCommit(50, out offset), + ShouldUpdateResult = offsets.IsCommitAllowed(50, out offset), ShouldUpdateExpected = true, LastOffsetResult = offset, LastOffsetExpected = 50, @@ -174,7 +174,7 @@ public void ShouldUpdateOffset_WithManyConcurrentOffsets_ShouldUpdate() { waitHandle.WaitOne(); - if (target.ShouldCommit(offset, out var lastProcessedOffset)) + if (target.IsCommitAllowed(offset, out var lastProcessedOffset)) { offsetsCommitted.Add(lastProcessedOffset); } diff --git a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs index 5638f5492..f3ae30c46 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs @@ -22,7 +22,7 @@ public ConsumerConfiguration( TimeSpan workerStopTimeout, Factory distributionStrategyFactory, IReadOnlyList middlewaresConfigurations, - bool autoStoreOffsets, + bool autoMessageCompletion, bool noStoreOffsets, ConsumerInitialState initialState, TimeSpan autoCommitInterval, @@ -44,7 +44,7 @@ public ConsumerConfiguration( distributionStrategyFactory ?? throw new ArgumentNullException(nameof(distributionStrategyFactory)); this.MiddlewaresConfigurations = middlewaresConfigurations ?? throw new ArgumentNullException(nameof(middlewaresConfigurations)); - this.AutoStoreOffsets = autoStoreOffsets; + this.AutoMessageCompletion = autoMessageCompletion; this.NoStoreOffsets = noStoreOffsets; this.InitialState = initialState; this.AutoCommitInterval = autoCommitInterval; @@ -94,7 +94,7 @@ public ConsumerConfiguration( public TimeSpan WorkerStopTimeout { get; } - public bool AutoStoreOffsets { get; } + public bool AutoMessageCompletion { get; } public bool NoStoreOffsets { get; } diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 59dd3a844..784a02200 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -32,7 +32,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild private TimeSpan workersCountEvaluationInterval = TimeSpan.FromMinutes(5); private int bufferSize; private TimeSpan workerStopTimeout = TimeSpan.FromSeconds(30); - private bool autoStoreOffsets = true; + private bool autoMessageCompletion = true; private bool noStoreOffsets; private ConsumerInitialState initialState = ConsumerInitialState.Running; private int statisticsInterval; @@ -174,15 +174,9 @@ public IConsumerConfigurationBuilder WithWorkDistributionStrategy() return this; } - public IConsumerConfigurationBuilder WithAutoStoreOffsets() + public IConsumerConfigurationBuilder WithManualMessageCompletion() { - this.autoStoreOffsets = true; - return this; - } - - public IConsumerConfigurationBuilder WithManualStoreOffsets() - { - this.autoStoreOffsets = false; + this.autoMessageCompletion = false; return this; } @@ -273,7 +267,7 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) this.workerStopTimeout, this.distributionStrategyFactory, middlewareConfiguration, - this.autoStoreOffsets, + this.autoMessageCompletion, this.noStoreOffsets, this.initialState, this.autoCommitInterval, diff --git a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs index ef44288eb..a2b2acb32 100644 --- a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs +++ b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs @@ -72,9 +72,9 @@ public interface IConsumerConfiguration TimeSpan WorkerStopTimeout { get; } /// - /// Gets a value indicating whether if the application should store store at the end + /// Gets a value indicating whether if the application should manual complete the message at the end /// - bool AutoStoreOffsets { get; } + bool AutoMessageCompletion { get; } /// /// Gets a value indicating that no offsets will be stored on Kafka diff --git a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs index dc2128d4b..07c855b78 100644 --- a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs @@ -46,8 +46,8 @@ public KafkaConfiguration Build() .AddSingleton(new ConsumerAccessor()) .AddSingleton(new ConsumerManagerFactory()) .AddSingleton() - .AddScoped() - .AddScoped(r => r.Resolve()); + .AddScoped() + .AddScoped(r => r.Resolve()); return configuration; } diff --git a/src/KafkaFlow/ConsumerManagerFactory.cs b/src/KafkaFlow/ConsumerManagerFactory.cs index cea52a8a7..a3c752bc2 100644 --- a/src/KafkaFlow/ConsumerManagerFactory.cs +++ b/src/KafkaFlow/ConsumerManagerFactory.cs @@ -5,17 +5,19 @@ namespace KafkaFlow internal class ConsumerManagerFactory : IConsumerManagerFactory { - public IConsumerManager Create(IConsumerConfiguration configuration, IDependencyResolver resolver) + public IConsumerManager Create(IConsumerConfiguration configuration, IDependencyResolver consumerDependencyResolver) { - var logHandler = resolver.Resolve(); + var logHandler = consumerDependencyResolver.Resolve(); - var consumer = configuration.CustomFactory(new Consumer(configuration, resolver, logHandler), resolver); + var consumer = configuration.CustomFactory( + new Consumer(configuration, consumerDependencyResolver, logHandler), + consumerDependencyResolver); var middlewareExecutor = new MiddlewareExecutor(configuration.MiddlewaresConfigurations); var consumerWorkerPool = new ConsumerWorkerPool( consumer, - resolver, + consumerDependencyResolver, middlewareExecutor, configuration, logHandler); @@ -31,7 +33,7 @@ public IConsumerManager Create(IConsumerConfiguration configuration, IDependency consumer, consumerWorkerPool, feeder, - resolver, + consumerDependencyResolver, logHandler); return consumerManager; diff --git a/src/KafkaFlow/Consumers/Consumer.cs b/src/KafkaFlow/Consumers/Consumer.cs index 9c9ffb2ff..e19583156 100644 --- a/src/KafkaFlow/Consumers/Consumer.cs +++ b/src/KafkaFlow/Consumers/Consumer.cs @@ -54,6 +54,11 @@ public Consumer( this.OnPartitionsRevoked((resolver, _, topicPartitions) => handler(resolver, topicPartitions)); } + var middlewareContext = this.dependencyResolver.Resolve(); + + middlewareContext.Worker = null; + middlewareContext.Consumer = this; + this.RegisterLogErrorHandler(); } diff --git a/src/KafkaFlow/Consumers/ConsumerContext.cs b/src/KafkaFlow/Consumers/ConsumerContext.cs index 356d001f9..0e6a7e397 100644 --- a/src/KafkaFlow/Consumers/ConsumerContext.cs +++ b/src/KafkaFlow/Consumers/ConsumerContext.cs @@ -27,6 +27,7 @@ public ConsumerContext( this.offsetManager = offsetManager; this.worker = worker; this.messageDependencyScope = messageDependencyScope; + this.AutoCompleteMessage = this.consumer.Configuration.AutoMessageCompletion; this.TopicPartitionOffset = new TopicPartitionOffset( kafkaResult.Topic, kafkaResult.Partition.Value, @@ -54,13 +55,13 @@ public ConsumerContext( public string GroupId => this.consumer.Configuration.GroupId; - public bool ShouldStoreOffset { get; set; } = true; + public bool AutoCompleteMessage { get; set; } public DateTime MessageTimestamp { get; } public Task Completion => this.completionSource.Task; - public void StoreOffset() + public void Complete() { this.offsetManager.MarkAsProcessed(this); this.messageDependencyScope.Dispose(); diff --git a/src/KafkaFlow/Consumers/ConsumerManager.cs b/src/KafkaFlow/Consumers/ConsumerManager.cs index 96a73e7b3..a8c34bea3 100644 --- a/src/KafkaFlow/Consumers/ConsumerManager.cs +++ b/src/KafkaFlow/Consumers/ConsumerManager.cs @@ -63,10 +63,6 @@ public async Task StopAsync() this.Consumer.Dispose(); } - public void Dispose() - { - } - private void StopEvaluateWorkerCountTimer() => this.evaluateWorkersCountTimer?.Change(Timeout.Infinite, Timeout.Infinite); private void StartEvaluateWorkerCountTimer() => this.evaluateWorkersCountTimer?.Change( diff --git a/src/KafkaFlow/Consumers/WorkerLifetimeContext.cs b/src/KafkaFlow/Consumers/ConsumerMiddlewareContext.cs similarity index 65% rename from src/KafkaFlow/Consumers/WorkerLifetimeContext.cs rename to src/KafkaFlow/Consumers/ConsumerMiddlewareContext.cs index a31812b2d..82faf1e34 100644 --- a/src/KafkaFlow/Consumers/WorkerLifetimeContext.cs +++ b/src/KafkaFlow/Consumers/ConsumerMiddlewareContext.cs @@ -1,6 +1,6 @@ namespace KafkaFlow.Consumers { - internal class WorkerLifetimeContext : IWorkerLifetimeContext + internal class ConsumerMiddlewareContext : IConsumerMiddlewareContext { public IWorker Worker { get; set; } diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index d33335bbd..94e73fb47 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -39,10 +39,10 @@ public ConsumerWorker( this.workerStoppingSubject = new(logHandler); this.workerStoppedSubject = new(logHandler); - var workerContext = this.workerDependencyResolverScope.Resolver.Resolve(); + var middlewareContext = this.workerDependencyResolverScope.Resolver.Resolve(); - workerContext.Worker = this; - workerContext.Consumer = consumer; + middlewareContext.Worker = this; + middlewareContext.Consumer = consumer; } public int Id { get; } @@ -152,9 +152,9 @@ await this.middlewareExecutor }); } - if (this.consumer.Configuration.AutoStoreOffsets && context.ConsumerContext.ShouldStoreOffset) + if (context.ConsumerContext.AutoCompleteMessage) { - context.ConsumerContext.StoreOffset(); + context.ConsumerContext.Complete(); } this.onMessageFinishedHandler?.Invoke(); diff --git a/src/KafkaFlow/Consumers/IWorkerLifetimeContext.cs b/src/KafkaFlow/Consumers/IWorkerLifetimeContext.cs index 73ea0391b..cdb5f40aa 100644 --- a/src/KafkaFlow/Consumers/IWorkerLifetimeContext.cs +++ b/src/KafkaFlow/Consumers/IWorkerLifetimeContext.cs @@ -1,12 +1,13 @@ namespace KafkaFlow.Consumers { /// - /// Provides access to the current consumer worker context. This interface only returns values when inside a middleware with Worker lifetime; otherwise, it will return null. + /// Provides access to the current consumer's middleware context. /// - public interface IWorkerLifetimeContext + public interface IConsumerMiddlewareContext { /// /// Gets the current worker in the context. + /// This property only returns values when inside a middleware with Worker lifetime; otherwise, it will return null. /// IWorker Worker { get; } diff --git a/src/KafkaFlow/Consumers/OffsetCommitter.cs b/src/KafkaFlow/Consumers/OffsetCommitter.cs index 7c9d5eb70..8f58d08f4 100644 --- a/src/KafkaFlow/Consumers/OffsetCommitter.cs +++ b/src/KafkaFlow/Consumers/OffsetCommitter.cs @@ -38,8 +38,6 @@ public OffsetCommitter( this.logHandler = logHandler; } - public void FlushOffsets() => this.CommitHandler(); - public void MarkAsProcessed(TopicPartitionOffset tpo) { this.offsetsToCommit.AddOrUpdate( diff --git a/src/KafkaFlow/Consumers/OffsetManager.cs b/src/KafkaFlow/Consumers/OffsetManager.cs index a4751fa9f..42d4c8288 100644 --- a/src/KafkaFlow/Consumers/OffsetManager.cs +++ b/src/KafkaFlow/Consumers/OffsetManager.cs @@ -29,9 +29,9 @@ public void MarkAsProcessed(IConsumerContext context) lock (offsets) { - if (offsets.ShouldCommit(context, out var lastProcessedContext)) + if (offsets.IsCommitAllowed(context)) { - this.committer.MarkAsProcessed(lastProcessedContext.TopicPartitionOffset); + this.committer.MarkAsProcessed(offsets.ReadyToCommitContext.TopicPartitionOffset); } } } diff --git a/src/KafkaFlow/Consumers/PartitionOffsets.cs b/src/KafkaFlow/Consumers/PartitionOffsets.cs index 9ab9dbae8..b39b60245 100644 --- a/src/KafkaFlow/Consumers/PartitionOffsets.cs +++ b/src/KafkaFlow/Consumers/PartitionOffsets.cs @@ -10,6 +10,8 @@ internal class PartitionOffsets private readonly SortedDictionary processedContexts = new(); private readonly LinkedList receivedContexts = new(); + public IConsumerContext ReadyToCommitContext { get; private set; } + public void Enqueue(IConsumerContext context) { lock (this.receivedContexts) @@ -18,9 +20,9 @@ public void Enqueue(IConsumerContext context) } } - public bool ShouldCommit(IConsumerContext context, out IConsumerContext lastProcessedContext) + public bool IsCommitAllowed(IConsumerContext context) { - lastProcessedContext = null; + this.ReadyToCommitContext = null; lock (this.receivedContexts) { @@ -38,7 +40,7 @@ public bool ShouldCommit(IConsumerContext context, out IConsumerContext lastProc do { - lastProcessedContext = this.receivedContexts.First.Value; + this.ReadyToCommitContext = this.receivedContexts.First.Value; this.receivedContexts.RemoveFirst(); } while (this.receivedContexts.Count > 0 && this.processedContexts.Remove(this.receivedContexts.First.Value.Offset)); } diff --git a/src/KafkaFlow/KafkaBus.cs b/src/KafkaFlow/KafkaBus.cs index bc0f128b7..8ac7a699c 100644 --- a/src/KafkaFlow/KafkaBus.cs +++ b/src/KafkaFlow/KafkaBus.cs @@ -50,16 +50,16 @@ public async Task StartAsync(CancellationToken stopCancellationToken = default) foreach (var consumerConfiguration in cluster.Consumers) { - var dependencyScope = this.dependencyResolver.CreateScope(); + var consumerDependencyScope = this.dependencyResolver.CreateScope(); var consumerManager = - this.consumerManagerFactory.Create(consumerConfiguration, dependencyScope.Resolver); + this.consumerManagerFactory.Create(consumerConfiguration, consumerDependencyScope.Resolver); this.consumerManagers.Add(consumerManager); this.Consumers.Add( new MessageConsumer( consumerManager, - dependencyScope.Resolver.Resolve())); + consumerDependencyScope.Resolver.Resolve())); if (consumerConfiguration.InitialState == ConsumerInitialState.Running) {