Skip to content

Commit

Permalink
feat: creates the ConsumerLagWorkerBalancer
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Sep 14, 2023
1 parent 6ff9fe1 commit 57be753
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 42 deletions.
42 changes: 7 additions & 35 deletions samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,19 @@

namespace KafkaFlow.Sample.BatchOperations;

using System.Collections.Generic;
using System.Threading.Channels;
using KafkaFlow.Consumers;
using KafkaFlow.Observer;

internal class PrintConsoleMiddleware : IMessageMiddleware, ISubjectObserver<WorkerStoppedSubject>
internal class PrintConsoleMiddleware : IMessageMiddleware
{
private readonly Channel<IReadOnlyCollection<IMessageContext>> buffer = Channel.CreateBounded<IReadOnlyCollection<IMessageContext>>(5);
private readonly Task task;

public PrintConsoleMiddleware(IWorkerLifetimeContext workerLifetimeContext)
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
workerLifetimeContext.Worker.WorkerStopped.Subscribe(this);
this.task = CreateTask();
}
var batch = context.GetMessagesBatch();

private async Task CreateTask()
{
await foreach (var batch in this.buffer.Reader.ReadAllAsync())
foreach (var ctx in batch)
{
foreach (var context in batch)
{
Console.WriteLine(((SampleBatchMessage)context.Message.Value).Text);
Console.WriteLine(((SampleBatchMessage)ctx.Message.Value).Text);

context.ConsumerContext.StoreOffset();
Task.Delay(5000).ContinueWith(_ => ctx.ConsumerContext.StoreOffset());

await Task.Delay(100);
}
await Task.Delay(100);
}
}

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
var batch = context.GetMessagesBatch();

await this.buffer.Writer.WriteAsync(batch);
}

public async Task OnNotification()
{
this.buffer.Writer.Complete();
await this.task;
}
}
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample.BatchOperations/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
.UseConsoleLog()
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.WithBrokers(new[] { "localhost:9094" })
.CreateTopicIfNotExists(batchTestTopic, 1, 1)
.CreateTopicIfNotExists("kafka-flow.admin", 1, 1)
.EnableAdminMessages("kafka-flow.admin")
Expand All @@ -36,7 +36,7 @@
.Topic(batchTestTopic)
.WithGroupId("kafka-flow-sample")
.WithName("my-consumer")
.WithBufferSize(100)
.WithBufferSize(10)
.WithWorkersCount(1)
.WithManualStoreOffsets()
.AddMiddlewares(
Expand Down
158 changes: 158 additions & 0 deletions src/KafkaFlow/Consumers/WorkersBalancers/ConsumerLagWorkerBalancer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
namespace KafkaFlow.Consumers.WorkersBalancers
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using KafkaFlow.Clusters;
using KafkaFlow.Configuration;

internal class ConsumerLagWorkerBalancer
{
private readonly IClusterManager clusterManager;
private readonly IConsumerAccessor consumerAccessor;
private readonly ILogHandler logHandler;
private readonly int totalConsumerWorkers;
private readonly int minInstanceWorkers;
private readonly int maxInstanceWorkers;

public ConsumerLagWorkerBalancer(
IClusterManager clusterManager,
IConsumerAccessor consumerAccessor,
ILogHandler logHandler,
int totalConsumerWorkers,
int minInstanceWorkers,
int maxInstanceWorkers)
{
this.clusterManager = clusterManager;
this.consumerAccessor = consumerAccessor;
this.logHandler = logHandler;
this.totalConsumerWorkers = totalConsumerWorkers;
this.minInstanceWorkers = minInstanceWorkers;
this.maxInstanceWorkers = maxInstanceWorkers;
}

public async Task<int> GetWorkersCountAsync(WorkersCountContext context)
{
var workers = await this.CalculateAsync(context);

this.logHandler.Info(
"New workers count calculated",
new
{
Workers = workers,
Consumer = context.ConsumerName,
});

return workers;
}

private static long CalculateMyPartitionsLag(
WorkersCountContext context,
IEnumerable<(string Topic, int Partition, long Lag)> partitionsLag)
{
return partitionsLag
.Where(
partitionLag => context.AssignedTopicsPartitions
.Any(
topic => topic.Name == partitionLag.Topic &&
topic.Partitions.Any(p => p == partitionLag.Partition)))
.Sum(partitionLag => partitionLag.Lag);
}

private static IReadOnlyList<(string Topic, int Partition, long Lag)> CalculatePartitionsLag(
IEnumerable<(string Topic, int Partition, long Offset)> lastOffsets,
IEnumerable<TopicPartitionOffset> currentPartitionsOffset)
{
return lastOffsets
.Select(
last =>
{
var currentOffset = currentPartitionsOffset
.Where(current => current.Topic == last.Topic && current.Partition == last.Partition)
.Select(current => current.Offset)
.FirstOrDefault();
var lastOffset = Math.Max(0, last.Offset);
currentOffset = Math.Max(0, currentOffset);
return (last.Topic, last.Partition, lastOffset - currentOffset);
})
.ToList();
}

private async Task<int> CalculateAsync(WorkersCountContext context)
{
try
{
if (!context.AssignedTopicsPartitions.Any())
{
return 1;
}

var topicsMetadata = await this.GetTopicsMetadataAsync(context);

var lastOffsets = this.GetPartitionsLastOffset(context.ConsumerName, topicsMetadata);

var partitionsOffset = await this.clusterManager.GetConsumerGroupOffsetsAsync(
context.ConsumerGroupId,
context.AssignedTopicsPartitions.Select(t => t.Name));

var partitionsLag = CalculatePartitionsLag(lastOffsets, partitionsOffset);
var instanceLag = CalculateMyPartitionsLag(context, partitionsLag);

decimal totalConsumerLag = partitionsLag.Sum(p => p.Lag);

var ratio = instanceLag / Math.Max(1, totalConsumerLag);

var workers = (int)Math.Round(this.totalConsumerWorkers * ratio);

workers = Math.Min(workers, this.maxInstanceWorkers);
workers = Math.Max(workers, this.minInstanceWorkers);

return workers;
}
catch (Exception e)
{
this.logHandler.Error(
"Error calculating new workers count, using 1 as fallback",
e,
new
{
context.ConsumerName,
});

return 1;
}
}

private IEnumerable<(string TopicName, int Partition, long Offset)> GetPartitionsLastOffset(
string consumerName,
IEnumerable<(string Name, TopicMetadata Metadata)> topicsMetadata)
{
var consumer = this.consumerAccessor[consumerName];

return topicsMetadata.SelectMany(
topic => topic.Metadata.Partitions.Select(
partition => (
topic.Name,
partition.Id,
consumer
.QueryWatermarkOffsets(new(topic.Name, new(partition.Id)), TimeSpan.FromSeconds(30))
.High
.Value)));
}

private async Task<IReadOnlyList<(string Name, TopicMetadata Metadata)>> GetTopicsMetadataAsync(WorkersCountContext context)
{
var topicsMetadata = new List<(string Name, TopicMetadata Metadata)>(context.AssignedTopicsPartitions.Count);

foreach (var topic in context.AssignedTopicsPartitions)
{
topicsMetadata.Add((topic.Name, await this.clusterManager.GetTopicMetadataAsync(topic.Name)));
}

return topicsMetadata;
}
}
}
64 changes: 59 additions & 5 deletions src/KafkaFlow/Extensions/ConfigurationBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ namespace KafkaFlow
using System;
using System.Collections.Generic;
using Confluent.Kafka;
using KafkaFlow.Clusters;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using KafkaFlow.Consumers.WorkersBalancers;

/// <summary>
/// Provides extension methods over <see cref="IConsumerConfigurationBuilder"/> and <see cref="IProducerConfigurationBuilder"/>
Expand All @@ -18,7 +21,7 @@ public static class ConfigurationBuilderExtensions
/// <returns></returns>
public static IProducerConfigurationBuilder WithProducerConfig(this IProducerConfigurationBuilder builder, ProducerConfig config)
{
return ((ProducerConfigurationBuilder) builder).WithProducerConfig(config);
return ((ProducerConfigurationBuilder)builder).WithProducerConfig(config);
}

/// <summary>
Expand All @@ -40,7 +43,7 @@ public static IProducerConfigurationBuilder WithCompression(
CompressionType compressionType,
int? compressionLevel = -1)
{
return ((ProducerConfigurationBuilder) builder).WithCompression(compressionType, compressionLevel);
return ((ProducerConfigurationBuilder)builder).WithCompression(compressionType, compressionLevel);
}

/// <summary>
Expand All @@ -51,7 +54,7 @@ public static IProducerConfigurationBuilder WithCompression(
/// <returns></returns>
public static IConsumerConfigurationBuilder WithConsumerConfig(this IConsumerConfigurationBuilder builder, ConsumerConfig config)
{
return ((ConsumerConfigurationBuilder) builder).WithConsumerConfig(config);
return ((ConsumerConfigurationBuilder)builder).WithConsumerConfig(config);
}

/// <summary>
Expand Down Expand Up @@ -105,7 +108,7 @@ public static IConsumerConfigurationBuilder WithCustomFactory(
this IConsumerConfigurationBuilder builder,
ConsumerCustomFactory decoratorFactory)
{
return ((ConsumerConfigurationBuilder) builder).WithCustomFactory(decoratorFactory);
return ((ConsumerConfigurationBuilder)builder).WithCustomFactory(decoratorFactory);
}

/// <summary>
Expand All @@ -118,7 +121,58 @@ public static IProducerConfigurationBuilder WithCustomFactory(
this IProducerConfigurationBuilder builder,
ProducerCustomFactory decoratorFactory)
{
return ((ProducerConfigurationBuilder) builder).WithCustomFactory(decoratorFactory);
return ((ProducerConfigurationBuilder)builder).WithCustomFactory(decoratorFactory);
}

/// <summary>
/// Configures the consumer to use the consumer's lag as a metric for dynamically calculating the number of workers for each application instance.
/// </summary>
/// <param name="builder">The consumer's configuration builder.</param>
/// <param name="totalWorkers">The total number of workers to be distributed across all application instances. The sum of workers across all instances will approximate this number.</param>
/// <param name="minInstanceWorkers">The minimum number of workers for each application instance.</param>
/// <param name="maxInstanceWorkers">The maximum number of workers for each application instance.</param>
/// <param name="evaluationInterval">The interval at which the number of workers will be recalculated based on consumer's lag.</param>
/// <returns></returns>
public static IConsumerConfigurationBuilder WithConsumerLagWorkerBalancer(
this IConsumerConfigurationBuilder builder,
int totalWorkers,
int minInstanceWorkers,
int maxInstanceWorkers,
TimeSpan evaluationInterval)
{
return builder.WithWorkersCount(
(context, resolver) =>
new ConsumerLagWorkerBalancer(
resolver.Resolve<IClusterManager>(),
resolver.Resolve<IConsumerAccessor>(),
resolver.Resolve<ILogHandler>(),
totalWorkers,
minInstanceWorkers,
maxInstanceWorkers)
.GetWorkersCountAsync(context),
evaluationInterval);
}

/// <summary>
/// Configures the consumer to use the consumer's lag as a metric for dynamically calculating the number of workers for each application instance.
/// The number of workers will be re-evaluated every 5 minutes.
/// </summary>
/// <param name="builder">The consumer's configuration builder.</param>
/// <param name="totalWorkers">The total number of workers to be distributed across all application instances. The sum of workers across all instances will approximate this number.</param>
/// <param name="minInstanceWorkers">The minimum number of workers for each application instance.</param>
/// <param name="maxInstanceWorkers">The maximum number of workers for each application instance.</param>
/// <returns></returns>
public static IConsumerConfigurationBuilder WithConsumerLagWorkerBalancer(
this IConsumerConfigurationBuilder builder,
int totalWorkers,
int minInstanceWorkers,
int maxInstanceWorkers)
{
return builder.WithConsumerLagWorkerBalancer(
totalWorkers,
minInstanceWorkers,
maxInstanceWorkers,
TimeSpan.FromMinutes(5));
}
}
}

0 comments on commit 57be753

Please sign in to comment.