From bba2d281ed0ead513a119a53774306f673082e94 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Sat, 18 May 2024 00:15:25 +0300 Subject: [PATCH 1/7] #29 and #30 change the error catching mechanism to topic-partition-based and apply topic-based parallel programming to optimize program running time --- .../Implementations/ConfigurationService.cs | 1 + .../Implementations/KafkaRetryJobService.cs | 110 +++++++++++------- 2 files changed, 68 insertions(+), 43 deletions(-) diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index 4a166b3..1b1391d 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -40,6 +40,7 @@ public ConfigurationService(IConfiguration configuration) public int? MessageTimeoutMs => GetValue("ProducerMessageTimeoutMs"); public int? RequestTimeoutMs => GetValue("ProducerRequestTimeoutMs"); public int? MessageMaxBytes => GetValue("ProducerMessageMaxBytes"); + public int MaxLevelParallelism => GetValue("MaxLevelParallelism") ?? 1; private string GetValueOrThrowInvalidConfigException(string configName) { diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 101f6fb..3203f0c 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Text.RegularExpressions; +using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; using KafkaRetry.Job.Services.Interfaces; @@ -26,43 +28,71 @@ public KafkaRetryJobService(IKafkaService kafkaService, public async Task MoveMessages() { _logService.LogApplicationStarted(); - - using var assignedConsumer = _kafkaService.BuildKafkaConsumer(); + var adminClient = _kafkaService.BuildAdminClient(); var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(120)); adminClient.Dispose(); - var errorTopicPartitionsWithLag = GetErrorTopicInfosFromCluster(assignedConsumer, metadata); - var errorTopics = errorTopicPartitionsWithLag.Select(p => p.Item1.Topic).Distinct().ToList(); + var assignedConsumerPool = new ThreadLocal>(() => _kafkaService.BuildKafkaConsumer()); + var producerPool = new ThreadLocal>(() => _kafkaService.BuildKafkaProducer()); + + var errorTopicsWithLag = GetErrorTopicInfosFromCluster(assignedConsumerPool.Value, metadata); + var errorTopics = errorTopicsWithLag.Keys.ToList(); _logService.LogMatchingErrorTopics(errorTopics); + + var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy(); + + var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition; + if (messageConsumeLimit <= 0) + { + _logService.LogMessageConsumeLimitIsZero(); + return; + } + + var maxDegreeOfParallelism = _configuration.MaxLevelParallelism; + var semaphore = new SemaphoreSlim(maxDegreeOfParallelism); + var tasks = new List(); + + foreach (var (_, topicPartitionsWithLag) in errorTopicsWithLag) + { + await semaphore.WaitAsync(); + tasks.Add(Task.Run(async () => + { + await MoveMessagesForTopic(topicPartitionsWithLag, assignedConsumerPool, producerPool, + consumerCommitStrategy); + semaphore.Release(); + })); + } - using var producer = _kafkaService.BuildKafkaProducer(); - - var utcNow = DateTime.UtcNow; + Task.WaitAll(tasks.ToArray()); + assignedConsumerPool.Value.Dispose(); + + _logService.LogApplicationIsClosing(); + } - var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy(); + private async Task MoveMessagesForTopic( + List<(TopicPartition, long)> topicPartitionsWithLag, + ThreadLocal> assignedConsumerPool, + ThreadLocal> producerPool, + Action, ConsumeResult> consumerCommitStrategy + ) { + var assignedConsumer = assignedConsumerPool.Value; + var producer = producerPool.Value; - try + foreach (var (topicPartition, lag) in topicPartitionsWithLag) { - var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition; - if (messageConsumeLimit <= 0) + var errorTopic = topicPartition.Topic; + if (lag <= 0) { - _logService.LogMessageConsumeLimitIsZero(); - return; + continue; } - foreach (var (topicPartition, lag) in errorTopicPartitionsWithLag) + try { - if (lag <= 0) - { - continue; - } - - var messageConsumeLimitForTopicPartition = messageConsumeLimit; + var messageConsumeLimitForTopicPartition = _configuration.MessageConsumeLimitPerTopicPartition; _logService.LogStartOfSubscribingTopicPartition(topicPartition); - var errorTopic = topicPartition.Topic; var currentLag = lag; assignedConsumer.Assign(topicPartition); @@ -78,15 +108,7 @@ public async Task MoveMessages() currentLag -= 1; messageConsumeLimitForTopicPartition -= 1; - - var resultDate = result.Message.Timestamp.UtcDateTime; - - if (utcNow < resultDate) - { - _logService.LogNewMessageArrived(utcNow); - break; - } - + result.Message.Timestamp = new Timestamp(DateTime.UtcNow); var retryTopic = GetRetryTopicName(result, errorTopic); @@ -97,21 +119,18 @@ public async Task MoveMessages() consumerCommitStrategy.Invoke(assignedConsumer, result); } + + assignedConsumer.Unassign(); _logService.LogEndOfSubscribingTopicPartition(topicPartition); } - - assignedConsumer.Unassign(); - - } - catch (Exception e) - { - _logService.LogError(e); - assignedConsumer.Unassign(); - throw; + catch (Exception e) + { + _logService.LogError(e); + assignedConsumer.Unassign(); + throw; + } } - - _logService.LogApplicationIsClosing(); } private string GetRetryTopicName(ConsumeResult result , string errorTopic ) @@ -122,7 +141,7 @@ private string GetRetryTopicName(ConsumeResult result , string er errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix); } - private List<(TopicPartition, long)> GetErrorTopicInfosFromCluster(IConsumer assignedConsumer, Metadata metadata) + private IDictionary> GetErrorTopicInfosFromCluster(IConsumer assignedConsumer, Metadata metadata) { _logService.LogFetchingErrorTopicInfoStarted(); @@ -142,7 +161,12 @@ private string GetRetryTopicName(ConsumeResult result , string er var watermark = assignedConsumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(5)); var lag = tpo.Offset >= 0 ? watermark.High - tpo.Offset : watermark.High - watermark.Low; return (tpo.TopicPartition, lag); - }).ToList(); + }) + .GroupBy(t => t.Item1.Topic) + .ToImmutableDictionary( + t => t.Key, + t => t.ToList() + ); _logService.LogFetchingErrorTopicInfoFinished(); From 6c4fa0dc0e4b3d53840798da0d49b73064d60204 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Sun, 19 May 2024 17:23:11 +0300 Subject: [PATCH 2/7] #29 and #44 fixes async programming race condition issues and limits the message consumption based on topics intead of topic partitions --- .../Implementations/ConfigurationService.cs | 2 +- .../Implementations/KafkaRetryJobService.cs | 70 +++++++++---------- src/Services/Implementations/KafkaService.cs | 63 +++++++++++++++-- src/Services/Interfaces/IKafkaService.cs | 9 ++- 4 files changed, 100 insertions(+), 44 deletions(-) diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index 1b1391d..45d1ff1 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -21,7 +21,7 @@ public ConfigurationService(IConfiguration configuration) public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix"); public string RetryTopicNameInHeader => GetValue("RetryTopicNameInHeader"); - public long MessageConsumeLimitPerTopicPartition => GetValue("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue; + public long MessageConsumeLimitPerTopic => GetValue("MessageConsumeLimitPerTopic") ?? Int64.MaxValue; public bool? EnableAutoCommit => GetValue("EnableAutoCommit") ?? false; public bool? EnableAutoOffsetStore => GetValue("EnableAutoOffsetStore") ?? false; diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 3203f0c..417734b 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -29,21 +29,19 @@ public async Task MoveMessages() { _logService.LogApplicationStarted(); - var adminClient = _kafkaService.BuildAdminClient(); + var adminClient = _kafkaService.GetKafkaAdminClient(); var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(120)); - adminClient.Dispose(); - - var assignedConsumerPool = new ThreadLocal>(() => _kafkaService.BuildKafkaConsumer()); - var producerPool = new ThreadLocal>(() => _kafkaService.BuildKafkaProducer()); - - var errorTopicsWithLag = GetErrorTopicInfosFromCluster(assignedConsumerPool.Value, metadata); + _kafkaService.ReleaseKafkaAdminClient(ref adminClient); + + var consumer = _kafkaService.GetKafkaConsumer(); + var errorTopicsWithLag = GetErrorTopicInfosFromCluster(consumer, metadata); var errorTopics = errorTopicsWithLag.Keys.ToList(); _logService.LogMatchingErrorTopics(errorTopics); var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy(); - var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition; + var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopic; if (messageConsumeLimit <= 0) { _logService.LogMessageConsumeLimitIsZero(); @@ -59,47 +57,45 @@ public async Task MoveMessages() await semaphore.WaitAsync(); tasks.Add(Task.Run(async () => { - await MoveMessagesForTopic(topicPartitionsWithLag, assignedConsumerPool, producerPool, - consumerCommitStrategy); + await MoveMessagesForTopic(topicPartitionsWithLag, consumerCommitStrategy); semaphore.Release(); })); } - - Task.WaitAll(tasks.ToArray()); - assignedConsumerPool.Value.Dispose(); + + await Task.WhenAll(tasks.ToArray()); _logService.LogApplicationIsClosing(); } private async Task MoveMessagesForTopic( List<(TopicPartition, long)> topicPartitionsWithLag, - ThreadLocal> assignedConsumerPool, - ThreadLocal> producerPool, Action, ConsumeResult> consumerCommitStrategy - ) { - var assignedConsumer = assignedConsumerPool.Value; - var producer = producerPool.Value; + ) + { + var consumer = _kafkaService.GetKafkaConsumer(); + var producer = _kafkaService.GetKafkaProducer(); + + var messageConsumeLimitPerTopic = _configuration.MessageConsumeLimitPerTopic; foreach (var (topicPartition, lag) in topicPartitionsWithLag) { - var errorTopic = topicPartition.Topic; if (lag <= 0) { continue; } - + var errorTopic = topicPartition.Topic; + try { - var messageConsumeLimitForTopicPartition = _configuration.MessageConsumeLimitPerTopicPartition; _logService.LogStartOfSubscribingTopicPartition(topicPartition); - + var currentLag = lag; - - assignedConsumer.Assign(topicPartition); - while (currentLag > 0 && messageConsumeLimitForTopicPartition > 0) + consumer.Assign(topicPartition); + + while (currentLag > 0 && messageConsumeLimitPerTopic > 0) { - var result = assignedConsumer.Consume(TimeSpan.FromSeconds(3)); + var result = consumer.Consume(TimeSpan.FromSeconds(3)); if (result is null) { @@ -107,29 +103,31 @@ Action, ConsumeResult> consumerCommitStr } currentLag -= 1; - messageConsumeLimitForTopicPartition -= 1; - + messageConsumeLimitPerTopic -= 1; + result.Message.Timestamp = new Timestamp(DateTime.UtcNow); var retryTopic = GetRetryTopicName(result, errorTopic); - + _logService.LogProducingMessage(result, errorTopic, retryTopic); - + await producer.ProduceAsync(retryTopic, result.Message); - consumerCommitStrategy.Invoke(assignedConsumer, result); + consumerCommitStrategy.Invoke(consumer, result); } - - assignedConsumer.Unassign(); _logService.LogEndOfSubscribingTopicPartition(topicPartition); } catch (Exception e) { _logService.LogError(e); - assignedConsumer.Unassign(); - throw; - } + } + finally + { + consumer.Unassign(); + _kafkaService.ReleaseKafkaConsumer(ref consumer); + _kafkaService.ReleaseKafkaProducer(ref producer); + } } } diff --git a/src/Services/Implementations/KafkaService.cs b/src/Services/Implementations/KafkaService.cs index 9560275..9741bf9 100644 --- a/src/Services/Implementations/KafkaService.cs +++ b/src/Services/Implementations/KafkaService.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using Confluent.Kafka; using KafkaRetry.Job.Helpers.KafkaConfigs; using KafkaRetry.Job.Services.Interfaces; @@ -8,13 +9,34 @@ namespace KafkaRetry.Job.Services.Implementations; public class KafkaService : IKafkaService { private readonly ConfigurationService _configuration; + private readonly ConcurrentBag> _consumers = new(); + private readonly ConcurrentBag> _producers = new(); + private readonly ConcurrentBag _adminClients = new(); public KafkaService(ConfigurationService configuration) { _configuration = configuration; } - public IConsumer BuildKafkaConsumer() + ~KafkaService() + { + while (_consumers.TryTake(out var consumer)) + { + consumer.Dispose(); + } + + while (_producers.TryTake(out var producer)) + { + producer.Dispose(); + } + + while (_adminClients.TryTake(out var adminClient)) + { + adminClient.Dispose(); + } + } + + private IConsumer BuildKafkaConsumer() { var bootstrapServers = _configuration.BootstrapServers; var groupId = _configuration.GroupId; @@ -24,7 +46,7 @@ public IConsumer BuildKafkaConsumer() return consumerBuilder.Build(); } - public IProducer BuildKafkaProducer() + private IProducer BuildKafkaProducer() { var bootstrapServers = _configuration.BootstrapServers; var producerConfig = CreateProducerConfig(bootstrapServers); @@ -32,8 +54,8 @@ public IProducer BuildKafkaProducer() return producerBuilder.Build(); } - - public IAdminClient BuildAdminClient() + + private IAdminClient BuildAdminClient() { var bootstrapServers = _configuration.BootstrapServers; var adminClientConfig = CreateAdminClientConfig(bootstrapServers); @@ -41,6 +63,39 @@ public IAdminClient BuildAdminClient() return adminClientBuilder.Build(); } + + public IConsumer GetKafkaConsumer() + { + return _consumers.TryTake(out var consumer) ? consumer : BuildKafkaConsumer(); + } + + public IProducer GetKafkaProducer() + { + return _producers.TryTake(out var producer) ? producer : BuildKafkaProducer(); + } + + public IAdminClient GetKafkaAdminClient() + { + return _adminClients.TryTake(out var adminClient) ? adminClient : BuildAdminClient(); + } + + public void ReleaseKafkaConsumer(ref IConsumer consumer) + { + _consumers.Add(consumer); + consumer = null; + } + + public void ReleaseKafkaProducer(ref IProducer producer) + { + _producers.Add(producer); + producer = null; + } + + public void ReleaseKafkaAdminClient(ref IAdminClient adminClient) + { + _adminClients.Add(adminClient); + adminClient = null; + } private ClientConfig CreateClientConfig(string bootstrapServers) { diff --git a/src/Services/Interfaces/IKafkaService.cs b/src/Services/Interfaces/IKafkaService.cs index 46f9c1a..0b6cb91 100644 --- a/src/Services/Interfaces/IKafkaService.cs +++ b/src/Services/Interfaces/IKafkaService.cs @@ -5,9 +5,12 @@ namespace KafkaRetry.Job.Services.Interfaces { public interface IKafkaService { - IConsumer BuildKafkaConsumer(); - IProducer BuildKafkaProducer(); - IAdminClient BuildAdminClient(); + public IConsumer GetKafkaConsumer(); + public IProducer GetKafkaProducer(); + public IAdminClient GetKafkaAdminClient(); + public void ReleaseKafkaConsumer(ref IConsumer consumer); + public void ReleaseKafkaProducer(ref IProducer producer); + public void ReleaseKafkaAdminClient(ref IAdminClient adminClient); public Action, ConsumeResult> GetConsumerCommitStrategy(); } } \ No newline at end of file From e160087c71d6b64cfbf7424c8693a5b8c304086a Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Sun, 19 May 2024 20:04:13 +0300 Subject: [PATCH 3/7] #29 continues to consume messages if the topic partition lag is not cleared yet --- src/Services/Implementations/KafkaRetryJobService.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 417734b..9269772 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -99,7 +99,7 @@ Action, ConsumeResult> consumerCommitStr if (result is null) { - break; + continue; } currentLag -= 1; From e60a4c032c676c3c578d69968ecbee481564d64d Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Sun, 19 May 2024 20:36:46 +0300 Subject: [PATCH 4/7] #29 increases message consume timeout limit --- src/Services/Implementations/KafkaRetryJobService.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index 9269772..d899a88 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -95,7 +95,7 @@ Action, ConsumeResult> consumerCommitStr while (currentLag > 0 && messageConsumeLimitPerTopic > 0) { - var result = consumer.Consume(TimeSpan.FromSeconds(3)); + var result = consumer.Consume(TimeSpan.FromSeconds(10)); if (result is null) { From 2eecccfc6718c26212df1d63668a982b99ee73c4 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Mon, 20 May 2024 23:37:18 +0300 Subject: [PATCH 5/7] #29 prevents to initialize kafka consumers for empty topic partitions --- src/Services/Implementations/KafkaRetryJobService.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index d899a88..a9d98ce 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -72,9 +72,6 @@ private async Task MoveMessagesForTopic( Action, ConsumeResult> consumerCommitStrategy ) { - var consumer = _kafkaService.GetKafkaConsumer(); - var producer = _kafkaService.GetKafkaProducer(); - var messageConsumeLimitPerTopic = _configuration.MessageConsumeLimitPerTopic; foreach (var (topicPartition, lag) in topicPartitionsWithLag) @@ -83,6 +80,8 @@ Action, ConsumeResult> consumerCommitStr { continue; } + var consumer = _kafkaService.GetKafkaConsumer(); + var producer = _kafkaService.GetKafkaProducer(); var errorTopic = topicPartition.Topic; try @@ -95,7 +94,7 @@ Action, ConsumeResult> consumerCommitStr while (currentLag > 0 && messageConsumeLimitPerTopic > 0) { - var result = consumer.Consume(TimeSpan.FromSeconds(10)); + var result = consumer.Consume(TimeSpan.FromSeconds(3)); if (result is null) { From 5d99eb11ff267416b686b8385297bda89abdd5e0 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Tue, 28 May 2024 16:52:20 +0300 Subject: [PATCH 6/7] #29 #30 #44 update versions in changelog and gitlab-ci --- .github/workflows/publish.yml | 2 +- .gitlab-ci.yml | 2 +- CHANGELOG.MD | 10 ++++++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 5e21781..d8dffe4 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -19,7 +19,7 @@ jobs: name: Build & push Docker image with: image: kafka-retry-job - tags: 1.12.3, latest + tags: 1.12.4, latest registry: ghcr.io username: ${{ secrets.GHCR_USERNAME }} password: ${{ secrets.GHCR_TOKEN }} diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 2a3db68..4d48127 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,6 +1,6 @@ variables: - VERSION: "1.12.3" + VERSION: "1.12.4" DOCKER_IMAGE_VERSION: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH:$VERSION DOCKER_IMAGE_LATEST: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 5e85107..f567f63 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -1,5 +1,15 @@ # Changelog +## [1.12.4](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03) + +- [\#29](https://github.com/Trendyol/kafka-retry-job/issues/29) Add async programming to consume message in parallel +- [\#30](https://github.com/Trendyol/kafka-retry-job/issues/30) Catch exception based on Partitions +- [\#44](https://github.com/Trendyol/kafka-retry-job/issues/44) Limit message consumption based on Topic instead of Topic Partition + +**Merged pull requests:** + +- Pull Request for the issues #29, #30, #44 [\#43](https://github.com/Trendyol/kafka-retry-job/pull/43) ([ahmetfurkankavraz](https://github.com/ahmetfurkankavraz)) + ## [1.12.3](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03) - [\#15](https://github.com/Trendyol/kafka-retry-job/issues/15) Fix integration tests From a3412ff6b855628484e55bfb4180717528d63ea5 Mon Sep 17 00:00:00 2001 From: "furkan.kavraz" Date: Mon, 29 Jul 2024 20:59:35 +0300 Subject: [PATCH 7/7] #29 uses try finally block to avoid deadlock for async calls --- src/Services/Implementations/KafkaRetryJobService.cs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/Services/Implementations/KafkaRetryJobService.cs b/src/Services/Implementations/KafkaRetryJobService.cs index a9d98ce..ac0d6ce 100644 --- a/src/Services/Implementations/KafkaRetryJobService.cs +++ b/src/Services/Implementations/KafkaRetryJobService.cs @@ -57,8 +57,14 @@ public async Task MoveMessages() await semaphore.WaitAsync(); tasks.Add(Task.Run(async () => { - await MoveMessagesForTopic(topicPartitionsWithLag, consumerCommitStrategy); - semaphore.Release(); + try + { + await MoveMessagesForTopic(topicPartitionsWithLag, consumerCommitStrategy); + } + finally + { + semaphore.Release(); + } })); }