Skip to content

Commit

Permalink
feat: evolve worker distribution strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Oct 31, 2023
1 parent d5a9c21 commit f56d8e2
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,19 @@ IConsumerConfigurationBuilder WithWorkersCount(
/// <summary>
/// Sets the strategy to choose a worker when a message arrives
/// </summary>
/// <typeparam name="T">A class that implements the <see cref="IDistributionStrategy"/> interface</typeparam>
/// <typeparam name="T">A class that implements the <see cref="IWorkerDistributionStrategy"/> interface</typeparam>
/// <param name="factory">A factory to create the instance</param>
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>(Factory<T> factory)
where T : class, IDistributionStrategy;
IConsumerConfigurationBuilder WithWorkerDistributionStrategy<T>(Factory<T> factory)
where T : class, IWorkerDistributionStrategy;

/// <summary>
/// Sets the strategy to choose a worker when a message arrives
/// </summary>
/// <typeparam name="T">A class that implements the <see cref="IDistributionStrategy"/> interface</typeparam>
/// <typeparam name="T">A class that implements the <see cref="IWorkerDistributionStrategy"/> interface</typeparam>
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
where T : class, IDistributionStrategy;
where T : class, IWorkerDistributionStrategy;

/// <summary>
/// Configures the consumer for manual message completion.
Expand Down
26 changes: 0 additions & 26 deletions src/KafkaFlow.Abstractions/IDistributionStrategy.cs

This file was deleted.

23 changes: 23 additions & 0 deletions src/KafkaFlow.Abstractions/IWorkerDistributionStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace KafkaFlow;

using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// An interface used to create a distribution strategy
/// </summary>
public interface IWorkerDistributionStrategy
{
/// <summary>
/// Initializes the distribution strategy, this method is called when a consumer is started
/// </summary>
/// <param name="workers">List of workers to be initialized</param>
void Initialize(IReadOnlyList<IWorker> workers);

/// <summary>
/// Retrieves an available worker based on the provided distribution strategy context.
/// </summary>
/// <param name="context">The distribution strategy context containing message and consumer details.</param>
/// <returns>The selected <see cref="IWorker"/> instance.</returns>
ValueTask<IWorker> GetWorkerAsync(WorkerDistributionStrategy context);
}
5 changes: 5 additions & 0 deletions src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@
<Description>Contains all KafkaFlow extendable interfaces</Description>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="System.Memory" Version="4.5.5" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>

</Project>
57 changes: 57 additions & 0 deletions src/KafkaFlow.Abstractions/WorkerDistributionStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
namespace KafkaFlow;

using System;
using System.Threading;

/// <summary>
/// Represents a strategy context for distributing workers based on specific message and consumer details.
/// </summary>
public ref struct WorkerDistributionStrategy
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkerDistributionStrategy"/> struct.
/// </summary>
/// <param name="consumerName">Name of the consumer.</param>
/// <param name="topic">Topic associated with the message.</param>
/// <param name="partition">Partition of the topic.</param>
/// <param name="rawMessageKey">Raw key of the message.</param>
/// <param name="consumerStoppedCancellationToken">A cancellation token that is cancelled when the consumer has stopped</param>
public WorkerDistributionStrategy(
string consumerName,
string topic,
int partition,
ReadOnlyMemory<byte>? rawMessageKey,
CancellationToken consumerStoppedCancellationToken)
{
this.ConsumerName = consumerName;
this.Topic = topic;
this.Partition = partition;
this.RawMessageKey = rawMessageKey;
this.ConsumerStoppedCancellationToken = consumerStoppedCancellationToken;
}

/// <summary>
/// Gets the name of the consumer.
/// </summary>
public string ConsumerName { get; }

/// <summary>
/// Gets the topic associated with the message.
/// </summary>
public string Topic { get; }

/// <summary>
/// Gets the partition number of the topic.
/// </summary>
public int Partition { get; }

/// <summary>
/// Gets the raw key of the message.
/// </summary>
public ReadOnlyMemory<byte>? RawMessageKey { get; }

/// <summary>
/// Gets the cancellation token that is cancelled when the consumer has stopped
/// </summary>
public CancellationToken ConsumerStoppedCancellationToken { get; }
}
4 changes: 2 additions & 2 deletions src/KafkaFlow/Configuration/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public ConsumerConfiguration(
TimeSpan workersCountEvaluationInterval,
int bufferSize,
TimeSpan workerStopTimeout,
Factory<IDistributionStrategy> distributionStrategyFactory,
Factory<IWorkerDistributionStrategy> distributionStrategyFactory,
IReadOnlyList<MiddlewareConfiguration> middlewaresConfigurations,
bool autoMessageCompletion,
bool noStoreOffsets,
Expand Down Expand Up @@ -69,7 +69,7 @@ public ConsumerConfiguration(
"The value must be greater than 0");
}

public Factory<IDistributionStrategy> DistributionStrategyFactory { get; }
public Factory<IWorkerDistributionStrategy> DistributionStrategyFactory { get; }

public IReadOnlyList<MiddlewareConfiguration> MiddlewaresConfigurations { get; }

Expand Down
8 changes: 4 additions & 4 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
private ConsumerInitialState initialState = ConsumerInitialState.Running;
private int statisticsInterval;

private Factory<IDistributionStrategy> distributionStrategyFactory = _ => new BytesSumDistributionStrategy();
private Factory<IWorkerDistributionStrategy> distributionStrategyFactory = _ => new BytesSumDistributionStrategy();
private TimeSpan autoCommitInterval = TimeSpan.FromSeconds(5);

private ConsumerCustomFactory customFactory = (consumer, _) => consumer;
Expand Down Expand Up @@ -157,15 +157,15 @@ public IConsumerConfigurationBuilder WithWorkerStopTimeout(TimeSpan timeout)
return this;
}

public IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>(Factory<T> factory)
where T : class, IDistributionStrategy
public IConsumerConfigurationBuilder WithWorkerDistributionStrategy<T>(Factory<T> factory)
where T : class, IWorkerDistributionStrategy
{
this.distributionStrategyFactory = factory;
return this;
}

public IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
where T : class, IDistributionStrategy
where T : class, IWorkerDistributionStrategy
{
this.DependencyConfigurator.AddTransient<T>();
this.distributionStrategyFactory = resolver => resolver.Resolve<T>();
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Configuration/IConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IConsumerConfiguration
/// <summary>
/// Gets the consumer worker distribution strategy
/// </summary>
Factory<IDistributionStrategy> DistributionStrategyFactory { get; }
Factory<IWorkerDistributionStrategy> DistributionStrategyFactory { get; }

/// <summary>
/// Gets the consumer middlewares configurations
Expand Down
14 changes: 10 additions & 4 deletions src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool
private readonly IDependencyResolver consumerDependencyResolver;
private readonly IMiddlewareExecutor middlewareExecutor;
private readonly ILogHandler logHandler;
private readonly Factory<IDistributionStrategy> distributionStrategyFactory;
private readonly Factory<IWorkerDistributionStrategy> distributionStrategyFactory;
private readonly IOffsetCommitter offsetCommitter;

private readonly Event workerPoolStoppedSubject;

private TaskCompletionSource<object> startedTaskSource = new();
private List<IConsumerWorker> workers = new();

private IDistributionStrategy distributionStrategy;
private IWorkerDistributionStrategy distributionStrategy;
private IOffsetManager offsetManager;

public ConsumerWorkerPool(
Expand Down Expand Up @@ -85,7 +85,7 @@ await Task.WhenAll(
.ConfigureAwait(false);

this.distributionStrategy = this.distributionStrategyFactory(this.consumerDependencyResolver);
this.distributionStrategy.Init(this.workers.AsReadOnly());
this.distributionStrategy.Initialize(this.workers.AsReadOnly());

this.startedTaskSource.TrySetResult(null);
}
Expand Down Expand Up @@ -130,7 +130,13 @@ public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, Cancellati
await this.startedTaskSource.Task.ConfigureAwait(false);

var worker = (IConsumerWorker)await this.distributionStrategy
.GetWorkerAsync(message.Message.Key, stopCancellationToken)
.GetWorkerAsync(
new WorkerDistributionStrategy(
this.consumer.Configuration.ConsumerName,
message.Topic,
message.Partition.Value,
message.Message.Key,
stopCancellationToken))
.ConfigureAwait(false);

if (worker is null)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,42 @@
namespace KafkaFlow.Consumers.DistributionStrategies
namespace KafkaFlow.Consumers.DistributionStrategies;

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

/// <summary>
/// This strategy sums all bytes in the partition key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen
/// This algorithm is fast and creates a good work balance. Messages with the same partition key are always delivered in the same worker, so, message order is guaranteed
/// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same partition key are consumed)
/// </summary>
public class BytesSumDistributionStrategy : IWorkerDistributionStrategy
{
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
private IReadOnlyList<IWorker> workers;

/// <summary>
/// This strategy sums all bytes in the partition key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen
/// This algorithm is fast and creates a good work balance. Messages with the same partition key are always delivered in the same worker, so, message order is guaranteed
/// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same partition key are consumed)
/// </summary>
public class BytesSumDistributionStrategy : IDistributionStrategy
/// <inheritdoc />
public void Initialize(IReadOnlyList<IWorker> workers)
{
private IReadOnlyList<IWorker> workers;
this.workers = workers;
}

/// <inheritdoc />
public void Init(IReadOnlyList<IWorker> workers)
/// <inheritdoc />
public ValueTask<IWorker> GetWorkerAsync(WorkerDistributionStrategy context)
{
if (context.RawMessageKey is null || this.workers.Count == 1)
{
this.workers = workers;
return new ValueTask<IWorker>(this.workers[0]);
}

/// <inheritdoc />
public Task<IWorker> GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken)
{
if (partitionKey is null || this.workers.Count == 1)
{
return Task.FromResult(this.workers[0]);
}
var bytesSum = 0;

var bytesSum = 0;

for (int i = 0; i < partitionKey.Length; i++)
{
bytesSum += partitionKey[i];
}

return Task.FromResult(
cancellationToken.IsCancellationRequested
? null
: this.workers.ElementAtOrDefault(bytesSum % this.workers.Count));
for (var i = 0; i < context.RawMessageKey.Value.Length; i++)
{
bytesSum += context.RawMessageKey.Value.Span[i];
}

return new ValueTask<IWorker>(
context.ConsumerStoppedCancellationToken.IsCancellationRequested
? null
: this.workers.ElementAtOrDefault(bytesSum % this.workers.Count));
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
namespace KafkaFlow.Consumers.DistributionStrategies
namespace KafkaFlow.Consumers.DistributionStrategies;

using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;

/// <summary>
/// This strategy chooses the first free worker to process the message. When a worker finishes the processing, it notifies the worker pool that it is free to get a new message
/// This is the fastest and resource-friendly strategy (the message buffer is not used) but messages with the same partition key can be delivered in different workers, so, no message order guarantee
/// </summary>
public class FreeWorkerDistributionStrategy : IWorkerDistributionStrategy
{
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
private readonly Channel<IWorker> freeWorkers = Channel.CreateUnbounded<IWorker>();

/// <summary>
/// This strategy chooses the first free worker to process the message. When a worker finishes the processing, it notifies the worker pool that it is free to get a new message
/// This is the fastest and resource-friendly strategy (the message buffer is not used) but messages with the same partition key can be delivered in different workers, so, no message order guarantee
/// </summary>
public class FreeWorkerDistributionStrategy : IDistributionStrategy
/// <inheritdoc />
public void Initialize(IReadOnlyList<IWorker> workers)
{
private readonly Channel<IWorker> freeWorkers = Channel.CreateUnbounded<IWorker>();

/// <inheritdoc />
public void Init(IReadOnlyList<IWorker> workers)
foreach (var worker in workers)
{
foreach (var worker in workers)
{
worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker)));
this.freeWorkers.Writer.TryWrite(worker);
}
worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker)));
this.freeWorkers.Writer.TryWrite(worker);
}
}

/// <inheritdoc />
public Task<IWorker> GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken)
{
return this.freeWorkers.Reader.ReadAsync(cancellationToken).AsTask();
}
/// <inheritdoc />
public ValueTask<IWorker> GetWorkerAsync(WorkerDistributionStrategy context)
{
return this.freeWorkers.Reader.ReadAsync(context.ConsumerStoppedCancellationToken);
}
}

0 comments on commit f56d8e2

Please sign in to comment.