Skip to content

Commit

Permalink
refactor: rename StoreOffset to Complete
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Sep 27, 2023
1 parent 685b3af commit 18e25e1
Show file tree
Hide file tree
Showing 43 changed files with 401 additions and 361 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
catch (Exception exception)
{
context.ConsumerContext.ShouldStoreOffset = false;
context.ConsumerContext.AutoMessageCompletion = false;
this.logHandler.Error("Error handling message", exception,
new
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,11 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
where T : class, IDistributionStrategy;

/// <summary>
/// Offsets will be stored after the execution of the handler and middlewares automatically, this is the default behaviour
/// Configures the consumer for manual message completion.
/// The client should call the <see cref="IConsumerContext.Complete"/> to mark the message processing as finished
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithAutoStoreOffsets();

/// <summary>
/// The client should call the <see cref="IConsumerContext.StoreOffset()"/>
/// </summary>
/// <returns></returns>
IConsumerConfigurationBuilder WithManualStoreOffsets();
IConsumerConfigurationBuilder WithManualMessageCompletion();

/// <summary>
/// No offsets will be stored on Kafka
Expand Down
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ namespace KafkaFlow.Configuration
/// </summary>
public class WorkersCountContext
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkersCountContext"/> class.
/// </summary>
/// <param name="consumerName">The consumer's name</param>
/// <param name="consumerGroupId">The consumer's group id</param>
/// <param name="assignedTopicsPartitions">The consumer's assigned partition</param>
public WorkersCountContext(
string consumerName,
string consumerGroupId,
Expand All @@ -17,8 +23,14 @@ public WorkersCountContext(
this.AssignedTopicsPartitions = assignedTopicsPartitions;
}

/// <summary>
/// Gets the consumer's name
/// </summary>
public string ConsumerName { get; }

/// <summary>
/// Gets the consumer's group id
/// </summary>
public string ConsumerGroupId { get; }

/// <summary>
Expand Down
33 changes: 19 additions & 14 deletions src/KafkaFlow.Abstractions/IConsumerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,14 @@ public interface IConsumerContext
DateTime MessageTimestamp { get; }

/// <summary>
/// Gets or sets a value indicating whether if the framework should store the current offset in the end when auto store offset is used
/// Gets or sets a value indicating whether if the framework should auto complete the message in the end
/// </summary>
bool ShouldStoreOffset { get; set; }
bool AutoMessageCompletion { get; set; }

/// <summary>
/// Gets or sets a value indicating whether if the message offset must be stored when the message is marked as completed
/// </summary>
bool StoreOffsetOnCompletion { get; set; }

/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
Expand All @@ -74,12 +79,19 @@ public interface IConsumerContext
IDependencyResolver WorkerDependencyResolver { get; }

/// <summary>
/// Stores the message offset to eventually be committed. After this call, the framework considers the
/// message processing as finished and releases resources associated with the message.
/// By default, this method is automatically called when the message processing ends, unless
/// the consumer is set to manual store offsets or the <see cref="ShouldStoreOffset"/> flag is set to false.
/// Gets a Task that completes when the <see cref="Complete"/> method is invoked,
/// indicating the end of message processing. This allows async operations
/// to wait for the message to be fully processed and its offset stored.
/// </summary>
void StoreOffset();
Task<TopicPartitionOffset> Completion { get; }

/// <summary>
/// Signals the completion of message processing and stores the message offset to eventually be committed.
/// After this call, the framework marks the message processing as finished and releases resources associated with the message.
/// By default, this method is automatically invoked when message processing concludes, unless
/// the consumer is configured for manual message completion or the <see cref="AutoMessageCompletion"/> flag is set to false.
/// </summary>
void Complete();

/// <summary>
/// Get offset watermark data
Expand All @@ -96,12 +108,5 @@ public interface IConsumerContext
/// Resume Kafka's message fetch
/// </summary>
void Resume();

/// <summary>
/// Gets a Task that completes when the <see cref="StoreOffset"/> method is invoked,
/// indicating the end of message processing. This allows async operations
/// to wait for the message to be fully processed and its offset stored.
/// </summary>
Task<TopicPartitionOffset> Completion { get; }
}
}
11 changes: 0 additions & 11 deletions src/KafkaFlow.Abstractions/IMessageContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace KafkaFlow
{
using System;

/// <summary>
/// A context that contains the message and metadata
/// </summary>
Expand Down Expand Up @@ -34,21 +32,12 @@ public interface IMessageContext
/// </summary>
IDependencyResolver DependencyResolver { get; }


/// <summary>
/// Creates a new <see cref="IMessageContext"/> with the new message
/// </summary>
/// <param name="key">The new message key</param>
/// <param name="value">The new message value</param>
/// <returns>A new message context containing the new values</returns>
IMessageContext SetMessage(object key, object value);

/// <summary>
/// Deprecated
/// </summary>
/// <param name="message">key</param>
/// <returns></returns>
[Obsolete("This method should no longer be used, use the " + nameof(SetMessage) + "() instead.", true)]
IMessageContext TransformMessage(object message);
}
}
7 changes: 0 additions & 7 deletions src/KafkaFlow.Abstractions/MiddlewareLifetime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ public enum MiddlewareLifetime
/// </summary>
Singleton,

/// <summary>
/// Obsolete. Please use Message instead. Indicates a new middleware instance is instantiated for each message scope.
/// This instance will be disposed when the message scope ends.
/// </summary>
[Obsolete("Use Message lifetime instead")]
Scoped,

/// <summary>
/// Indicates a new middleware instance is instantiated for each individual message scope, ensuring isolated processing.
/// This instance will be disposed when the message scope ends.
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow.Admin/MemoryTelemetryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private void TryCleanItems()
return;
}

this.lastCleanDate = this.dateTimeProvider.Now;
this.lastCleanDate = this.dateTimeProvider.UtcNow;

this.CleanExpiredItems();
}
Expand All @@ -70,6 +70,6 @@ private void CleanExpiredItems()
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool NeedsCleaning() => this.dateTimeProvider.Now - this.lastCleanDate > this.cleanRunInterval;
private bool NeedsCleaning() => this.dateTimeProvider.UtcNow - this.lastCleanDate > this.cleanRunInterval;
}
}
2 changes: 1 addition & 1 deletion src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static IConsumerMiddlewareConfigurationBuilder BatchConsume(
{
return builder.Add(
resolver => new BatchConsumeMiddleware(
resolver.Resolve<IWorkerLifetimeContext>(),
resolver.Resolve<IConsumerMiddlewareContext>(),
batchSize,
batchTimeout,
resolver.Resolve<ILogHandler>()),
Expand Down
4 changes: 1 addition & 3 deletions src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ public BatchConsumeMessageContext(
public IDependencyResolver DependencyResolver => this.batchDependencyScope.Resolver;

public IMessageContext SetMessage(object key, object value) =>
throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow change the message");

public IMessageContext TransformMessage(object message) => throw new NotImplementedException();
throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow to change the message");

public void Dispose() => this.batchDependencyScope.Dispose();
}
Expand Down
23 changes: 13 additions & 10 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal class BatchConsumeMiddleware
private Task<Task> dispatchTask;

public BatchConsumeMiddleware(
IWorkerLifetimeContext workerContext,
IConsumerMiddlewareContext middlewareContext,
int batchSize,
TimeSpan batchTimeout,
ILogHandler logHandler)
Expand All @@ -35,9 +35,9 @@ public BatchConsumeMiddleware(
this.batchTimeout = batchTimeout;
this.logHandler = logHandler;
this.batch = new(batchSize);
this.consumerConfiguration = workerContext.Consumer.Configuration;
this.consumerConfiguration = middlewareContext.Consumer.Configuration;

workerContext.Worker.WorkerStopped.Subscribe(this);
middlewareContext.Worker.WorkerStopped.Subscribe(this);
}

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
Expand All @@ -46,7 +46,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)

try
{
context.ConsumerContext.ShouldStoreOffset = false;
context.ConsumerContext.AutoMessageCompletion = false;

this.batch.Add(context);

Expand Down Expand Up @@ -118,7 +118,10 @@ private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate nex
}
catch (OperationCanceledException) when (context.ConsumerContext.WorkerStopped.IsCancellationRequested)
{
return;
foreach (var messageContext in localBatch)
{
messageContext.ConsumerContext.StoreOffsetOnCompletion = false;
}
}
catch (Exception ex)
{
Expand All @@ -136,13 +139,13 @@ private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate nex
{
this.batch.Clear();
this.dispatchSemaphore.Release();
}

if (this.consumerConfiguration.AutoStoreOffsets)
{
foreach (var messageContext in localBatch)
if (this.consumerConfiguration.AutoMessageCompletion)
{
messageContext.ConsumerContext.StoreOffset();
foreach (var messageContext in localBatch)
{
messageContext.ConsumerContext.Complete();
}
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions src/KafkaFlow.BatchConsume/KafkaFlow.BatchConsume.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,5 @@
<ProjectReference Include="..\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
</ItemGroup>

</Project>
1 change: 0 additions & 1 deletion src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
/// <summary>
/// A GZIP message compressor
/// </summary>
[Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public class GzipMessageCompressor : IMessageCompressor
{
/// <inheritdoc />
Expand Down
4 changes: 0 additions & 4 deletions src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public static class ConfigurationBuilderExtensions
/// <param name="middlewares">The middleware configuration builder</param>
/// <typeparam name="T">The compressor type</typeparam>
/// <returns></returns>
[Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IConsumerMiddlewareConfigurationBuilder AddCompressor<T>(this IConsumerMiddlewareConfigurationBuilder middlewares)
where T : class, IMessageCompressor
{
Expand All @@ -29,7 +28,6 @@ public static IConsumerMiddlewareConfigurationBuilder AddCompressor<T>(this ICon
/// <typeparam name="T">The compressor type that implements <see cref="IMessageCompressor"/></typeparam>
/// <param name="factory">A factory to create the <see cref="IMessageCompressor"/> instance</param>
/// <returns></returns>
[Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IConsumerMiddlewareConfigurationBuilder AddCompressor<T>(
this IConsumerMiddlewareConfigurationBuilder middlewares,
Factory<T> factory)
Expand All @@ -45,7 +43,6 @@ public static IConsumerMiddlewareConfigurationBuilder AddCompressor<T>(
/// <param name="middlewares">The middleware configuration builder</param>
/// <typeparam name="T">The compressor type that implements <see cref="IMessageCompressor"/></typeparam>
/// <returns></returns>
[Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IProducerMiddlewareConfigurationBuilder AddCompressor<T>(this IProducerMiddlewareConfigurationBuilder middlewares)
where T : class, IMessageCompressor
{
Expand All @@ -61,7 +58,6 @@ public static IProducerMiddlewareConfigurationBuilder AddCompressor<T>(this IPro
/// <typeparam name="T">The compressor type that implements <see cref="IMessageCompressor"/></typeparam>
/// <param name="factory">A factory to create the <see cref="IMessageCompressor"/> instance</param>
/// <returns></returns>
[Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IProducerMiddlewareConfigurationBuilder AddCompressor<T>(
this IProducerMiddlewareConfigurationBuilder middlewares,
Factory<T> factory)
Expand Down
Loading

0 comments on commit 18e25e1

Please sign in to comment.