Skip to content

Commit

Permalink
refactor: rename StoreOffset to Complete
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Sep 14, 2023
1 parent 57be753 commit 8d13d7d
Show file tree
Hide file tree
Showing 24 changed files with 92 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
catch (Exception exception)
{
context.ConsumerContext.ShouldStoreOffset = false;
context.ConsumerContext.AutoCompleteMessage = false;
this.logHandler.Error("Error handling message", exception,
new
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,11 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
where T : class, IDistributionStrategy;

/// <summary>
/// Offsets will be stored after the execution of the handler and middlewares automatically, this is the default behaviour
/// Configures the consumer for manual message completion.
/// The client should call the <see cref="IConsumerContext.Complete"/> to mark the message processing as finished
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithAutoStoreOffsets();

/// <summary>
/// The client should call the <see cref="IConsumerContext.StoreOffset()"/>
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithManualStoreOffsets();
IConsumerConfigurationBuilder WithManualMessageCompletion();

/// <summary>
/// No offsets will be stored on Kafka
Expand Down
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ namespace KafkaFlow.Configuration
/// </summary>
public class WorkersCountContext
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkersCountContext"/> class.
/// </summary>
/// <param name="consumerName">The consumer's name</param>
/// <param name="consumerGroupId">The consumer's group id</param>
/// <param name="assignedTopicsPartitions">The consumer's assigned partition</param>
public WorkersCountContext(
string consumerName,
string consumerGroupId,
Expand All @@ -17,8 +23,14 @@ public WorkersCountContext(
this.AssignedTopicsPartitions = assignedTopicsPartitions;
}

/// <summary>
/// Gets the consumer's name
/// </summary>
public string ConsumerName { get; }

/// <summary>
/// Gets the consumer's group id
/// </summary>
public string ConsumerGroupId { get; }

/// <summary>
Expand Down
16 changes: 8 additions & 8 deletions src/KafkaFlow.Abstractions/IConsumerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public interface IConsumerContext
DateTime MessageTimestamp { get; }

/// <summary>
/// Gets or sets a value indicating whether if the framework should store the current offset in the end when auto store offset is used
/// Gets or sets a value indicating whether if the framework should auto complete the message in the end
/// </summary>
bool ShouldStoreOffset { get; set; }
bool AutoCompleteMessage { get; set; }

/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
Expand All @@ -74,12 +74,12 @@ public interface IConsumerContext
IDependencyResolver WorkerDependencyResolver { get; }

/// <summary>
/// Stores the message offset to eventually be committed. After this call, the framework considers the
/// message processing as finished and releases resources associated with the message.
/// By default, this method is automatically called when the message processing ends, unless
/// the consumer is set to manual store offsets or the <see cref="ShouldStoreOffset"/> flag is set to false.
/// Signals the completion of message processing and stores the message offset to eventually be committed.
/// After this call, the framework marks the message processing as finished and releases resources associated with the message.
/// By default, this method is automatically invoked when message processing concludes, unless
/// the consumer is configured for manual message completion or the <see cref="AutoCompleteMessage"/> flag is set to false.
/// </summary>
void StoreOffset();
void Complete();

/// <summary>
/// Get offset watermark data
Expand All @@ -98,7 +98,7 @@ public interface IConsumerContext
void Resume();

/// <summary>
/// Gets a Task that completes when the <see cref="StoreOffset"/> method is invoked,
/// Gets a Task that completes when the <see cref="Complete"/> method is invoked,
/// indicating the end of message processing. This allows async operations
/// to wait for the message to be fully processed and its offset stored.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static IConsumerMiddlewareConfigurationBuilder BatchConsume(
{
return builder.Add(
resolver => new BatchConsumeMiddleware(
resolver.Resolve<IWorkerLifetimeContext>(),
resolver.Resolve<IConsumerMiddlewareContext>(),
batchSize,
batchTimeout,
resolver.Resolve<ILogHandler>()),
Expand Down
8 changes: 4 additions & 4 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal class BatchConsumeMiddleware
private Task<Task> dispatchTask;

public BatchConsumeMiddleware(
IWorkerLifetimeContext workerContext,
IConsumerMiddlewareContext workerContext,
int batchSize,
TimeSpan batchTimeout,
ILogHandler logHandler)
Expand All @@ -46,7 +46,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)

try
{
context.ConsumerContext.ShouldStoreOffset = false;
context.ConsumerContext.AutoCompleteMessage = false;

this.batch.Add(context);

Expand Down Expand Up @@ -138,11 +138,11 @@ private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate nex
this.dispatchSemaphore.Release();
}

if (this.consumerConfiguration.AutoStoreOffsets)
if (this.consumerConfiguration.AutoMessageCompletion)
{
foreach (var messageContext in localBatch)
{
messageContext.ConsumerContext.StoreOffset();
messageContext.ConsumerContext.Complete();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public async Task AddAsync_LessThanBatchSize_CallNextOnTimeout()
this.timesNextWasCalled.Should().Be(0);
await this.WaitBatchTimeoutAsync();
this.timesNextWasCalled.Should().Be(1);
consumerContext.Verify(x => x.StoreOffset(), Times.Once);
consumerContext.Verify(x => x.Complete(), Times.Once);
}

[TestMethod]
Expand All @@ -79,7 +79,7 @@ public async Task AddAsync_ExactlyBatchSize_CallNextInstantly()
// Assert
this.timesNextWasCalled.Should().Be(1);
this.nextContext.GetMessagesBatch().Should().HaveCount(BatchSize);
consumerContext.Verify(x => x.StoreOffset(), Times.Exactly(BatchSize));
consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize));
}

[TestMethod]
Expand All @@ -104,11 +104,11 @@ public async Task AddAsync_MoreThanBatchSize_CallNextInstantlyThenCallWhenTimeou
// Assert
this.timesNextWasCalled.Should().Be(1);
this.nextContext.GetMessagesBatch().Should().HaveCount(BatchSize);
consumerContext.Verify(x => x.StoreOffset(), Times.Exactly(BatchSize));
consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize));

await this.WaitBatchTimeoutAsync();
this.timesNextWasCalled.Should().Be(2);
consumerContext.Verify(x => x.StoreOffset(), Times.Exactly(BatchSize + 1));
consumerContext.Verify(x => x.Complete(), Times.Exactly(BatchSize + 1));
}

[TestMethod]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void Build_AllCalls_ReturnPassedValues()
.WithWorkersCount(workers)
.WithGroupId(groupId)
.WithAutoOffsetReset(offsetReset)
.WithManualStoreOffsets()
.WithManualMessageCompletion()
.WithAutoCommitIntervalMs(autoCommitInterval)
.WithMaxPollIntervalMs(maxPollIntervalMs)
.WithConsumerConfig(consumerConfig)
Expand Down
26 changes: 13 additions & 13 deletions src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void ShouldUpdate_WithoutAddingOffsets_ThrowsException()
var offsets = new PartitionOffsets();

// Act
Func<bool> act = () => offsets.ShouldCommit(1, out _);
Func<bool> act = () => offsets.IsCommitAllowed(1, out _);

// Assert
act.Should().Throw<InvalidOperationException>();
Expand All @@ -34,7 +34,7 @@ public void ShouldUpdateOffset_NextOffset_ShouldUpdate()
offsets.Enqueue(1);

// Act
var shouldUpdate = offsets.ShouldCommit(1, out var offset);
var shouldUpdate = offsets.IsCommitAllowed(1, out var offset);

// Assert
Assert.IsTrue(shouldUpdate);
Expand All @@ -49,7 +49,7 @@ public void ShouldUpdateOffset_WithOneGap_ShouldNotUpdate()
offsets.Enqueue(1);

// Act
var shouldUpdate = offsets.ShouldCommit(2, out var offset);
var shouldUpdate = offsets.IsCommitAllowed(2, out var offset);

// Assert
Assert.IsFalse(shouldUpdate);
Expand All @@ -76,63 +76,63 @@ public void ShouldUpdateOffset_WithManyGaps_ShouldUpdate()
{
new
{
ShouldUpdateResult = offsets.ShouldCommit(7, out long offset),
ShouldUpdateResult = offsets.IsCommitAllowed(7, out long offset),
ShouldUpdateExpected = false,
LastOffsetResult = offset,
LastOffsetExpected = -1,
},
new
{
ShouldUpdateResult = offsets.ShouldCommit(1, out offset),
ShouldUpdateResult = offsets.IsCommitAllowed(1, out offset),
ShouldUpdateExpected = true,
LastOffsetResult = offset,
LastOffsetExpected = 1,
},
new
{
ShouldUpdateResult = offsets.ShouldCommit(2, out offset),
ShouldUpdateResult = offsets.IsCommitAllowed(2, out offset),
ShouldUpdateExpected = true,
LastOffsetResult = offset,
LastOffsetExpected = 2,
},
new
{
ShouldUpdateResult = offsets.ShouldCommit(20, out offset),
ShouldUpdateResult = offsets.IsCommitAllowed(20, out offset),
ShouldUpdateExpected = false,
LastOffsetResult = offset,
LastOffsetExpected = -1,
},
new
{
ShouldUpdateResult = offsets.ShouldCommit(5, out offset),
ShouldUpdateResult = offsets.IsCommitAllowed(5, out offset),
ShouldUpdateExpected = false,
LastOffsetResult = offset,
LastOffsetExpected = -1,
},
new
{
ShouldUpdateResult = offsets.ShouldCommit(8, out offset),
ShouldUpdateResult = offsets.IsCommitAllowed(8, out offset),
ShouldUpdateExpected = false,
LastOffsetResult = offset,
LastOffsetExpected = -1,
},
new
{
ShouldUpdateResult = offsets.ShouldCommit(4, out offset),
ShouldUpdateResult = offsets.IsCommitAllowed(4, out offset),
ShouldUpdateExpected = true,
LastOffsetResult = offset,
LastOffsetExpected = 8,
},
new
{
ShouldUpdateResult = offsets.ShouldCommit(15, out offset),
ShouldUpdateResult = offsets.IsCommitAllowed(15, out offset),
ShouldUpdateExpected = true,
LastOffsetResult = offset,
LastOffsetExpected = 20,
},
new
{
ShouldUpdateResult = offsets.ShouldCommit(50, out offset),
ShouldUpdateResult = offsets.IsCommitAllowed(50, out offset),
ShouldUpdateExpected = true,
LastOffsetResult = offset,
LastOffsetExpected = 50,
Expand Down Expand Up @@ -174,7 +174,7 @@ public void ShouldUpdateOffset_WithManyConcurrentOffsets_ShouldUpdate()
{
waitHandle.WaitOne();
if (target.ShouldCommit(offset, out var lastProcessedOffset))
if (target.IsCommitAllowed(offset, out var lastProcessedOffset))
{
offsetsCommitted.Add(lastProcessedOffset);
}
Expand Down
6 changes: 3 additions & 3 deletions src/KafkaFlow/Configuration/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public ConsumerConfiguration(
TimeSpan workerStopTimeout,
Factory<IDistributionStrategy> distributionStrategyFactory,
IReadOnlyList<MiddlewareConfiguration> middlewaresConfigurations,
bool autoStoreOffsets,
bool autoMessageCompletion,
bool noStoreOffsets,
ConsumerInitialState initialState,
TimeSpan autoCommitInterval,
Expand All @@ -44,7 +44,7 @@ public ConsumerConfiguration(
distributionStrategyFactory ?? throw new ArgumentNullException(nameof(distributionStrategyFactory));
this.MiddlewaresConfigurations =
middlewaresConfigurations ?? throw new ArgumentNullException(nameof(middlewaresConfigurations));
this.AutoStoreOffsets = autoStoreOffsets;
this.AutoMessageCompletion = autoMessageCompletion;
this.NoStoreOffsets = noStoreOffsets;
this.InitialState = initialState;
this.AutoCommitInterval = autoCommitInterval;
Expand Down Expand Up @@ -94,7 +94,7 @@ public ConsumerConfiguration(

public TimeSpan WorkerStopTimeout { get; }

public bool AutoStoreOffsets { get; }
public bool AutoMessageCompletion { get; }

public bool NoStoreOffsets { get; }

Expand Down
14 changes: 4 additions & 10 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
private TimeSpan workersCountEvaluationInterval = TimeSpan.FromMinutes(5);
private int bufferSize;
private TimeSpan workerStopTimeout = TimeSpan.FromSeconds(30);
private bool autoStoreOffsets = true;
private bool autoMessageCompletion = true;
private bool noStoreOffsets;
private ConsumerInitialState initialState = ConsumerInitialState.Running;
private int statisticsInterval;
Expand Down Expand Up @@ -174,15 +174,9 @@ public IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
return this;
}

public IConsumerConfigurationBuilder WithAutoStoreOffsets()
public IConsumerConfigurationBuilder WithManualMessageCompletion()
{
this.autoStoreOffsets = true;
return this;
}

public IConsumerConfigurationBuilder WithManualStoreOffsets()
{
this.autoStoreOffsets = false;
this.autoMessageCompletion = false;
return this;
}

Expand Down Expand Up @@ -273,7 +267,7 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
this.workerStopTimeout,
this.distributionStrategyFactory,
middlewareConfiguration,
this.autoStoreOffsets,
this.autoMessageCompletion,
this.noStoreOffsets,
this.initialState,
this.autoCommitInterval,
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow/Configuration/IConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public interface IConsumerConfiguration
TimeSpan WorkerStopTimeout { get; }

/// <summary>
/// Gets a value indicating whether if the application should store store at the end
/// Gets a value indicating whether if the application should manual complete the message at the end
/// </summary>
bool AutoStoreOffsets { get; }
bool AutoMessageCompletion { get; }

/// <summary>
/// Gets a value indicating that no offsets will be stored on Kafka
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public KafkaConfiguration Build()
.AddSingleton<IConsumerAccessor>(new ConsumerAccessor())
.AddSingleton<IConsumerManagerFactory>(new ConsumerManagerFactory())
.AddSingleton<IClusterManagerAccessor, ClusterManagerAccessor>()
.AddScoped<WorkerLifetimeContext>()
.AddScoped<IWorkerLifetimeContext>(r => r.Resolve<WorkerLifetimeContext>());
.AddScoped<ConsumerMiddlewareContext>()
.AddScoped<IConsumerMiddlewareContext>(r => r.Resolve<ConsumerMiddlewareContext>());

return configuration;
}
Expand Down
Loading

0 comments on commit 8d13d7d

Please sign in to comment.