From 453996e3895a926cfc4b7387dce9bcfdc7c76ace Mon Sep 17 00:00:00 2001 From: Filipe Esch Date: Mon, 4 Dec 2023 12:45:53 +0000 Subject: [PATCH] fix: solve Offset Manager and Worker racing condition when stopping --- src/KafkaFlow/Consumers/ConsumerWorker.cs | 6 ++---- src/KafkaFlow/Consumers/ConsumerWorkerPool.cs | 11 ++++------ src/KafkaFlow/Consumers/IConsumerWorker.cs | 2 +- src/KafkaFlow/Consumers/PartitionOffsets.cs | 20 +++++++++++++------ 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index a4c1fcbc9..7098897c7 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -59,11 +59,9 @@ public ConsumerWorker( public IEvent WorkerProcessingEnded => _workerProcessingEnded; - public ValueTask EnqueueAsync( - IMessageContext context, - CancellationToken stopCancellationToken) + public ValueTask EnqueueAsync(IMessageContext context) { - return _messagesBuffer.Writer.WriteAsync(context, stopCancellationToken); + return _messagesBuffer.Writer.WriteAsync(context, CancellationToken.None); } public Task StartAsync() diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs index 59846df75..8a00079f3 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs @@ -127,7 +127,7 @@ public async Task StopAsync() public async Task EnqueueAsync(ConsumeResult message, CancellationToken stopCancellationToken) { - await _startedTaskSource.Task.ConfigureAwait(false); + await _startedTaskSource.Task; var worker = (IConsumerWorker)await _distributionStrategy .GetWorkerAsync( @@ -136,8 +136,7 @@ public async Task EnqueueAsync(ConsumeResult message, Cancellati message.Topic, message.Partition.Value, message.Message.Key, - stopCancellationToken)) - .ConfigureAwait(false); + stopCancellationToken)); if (worker is null) { @@ -146,11 +145,9 @@ public async Task EnqueueAsync(ConsumeResult message, Cancellati var context = this.CreateMessageContext(message, worker); - await worker - .EnqueueAsync(context, stopCancellationToken) - .ConfigureAwait(false); - _offsetManager.Enqueue(context.ConsumerContext); + + await worker.EnqueueAsync(context); } private MessageContext CreateMessageContext(ConsumeResult message, IConsumerWorker worker) diff --git a/src/KafkaFlow/Consumers/IConsumerWorker.cs b/src/KafkaFlow/Consumers/IConsumerWorker.cs index 11c544d0a..ce25991a2 100644 --- a/src/KafkaFlow/Consumers/IConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/IConsumerWorker.cs @@ -10,7 +10,7 @@ internal interface IConsumerWorker : IWorker, IDisposable IDependencyResolver WorkerDependencyResolver { get; } - ValueTask EnqueueAsync(IMessageContext context, CancellationToken stopCancellationToken); + ValueTask EnqueueAsync(IMessageContext context); Task StartAsync(); diff --git a/src/KafkaFlow/Consumers/PartitionOffsets.cs b/src/KafkaFlow/Consumers/PartitionOffsets.cs index 9a4eada09..827e7f818 100644 --- a/src/KafkaFlow/Consumers/PartitionOffsets.cs +++ b/src/KafkaFlow/Consumers/PartitionOffsets.cs @@ -42,16 +42,24 @@ public bool TryDequeue(IConsumerContext context) { this.DequeuedContext = _receivedContexts.First.Value; _receivedContexts.RemoveFirst(); - } - while (_receivedContexts.Count > 0 && _processedContexts.Remove(_receivedContexts.First.Value.Offset)); + } while (_receivedContexts.Count > 0 && _processedContexts.Remove(_receivedContexts.First.Value.Offset)); } return true; } - public Task WaitContextsCompletionAsync() => Task.WhenAll( - _receivedContexts - .Select(x => x.Completion) - .ToList()); + public Task WaitContextsCompletionAsync() + { + List tasks; + + lock (_receivedContexts) + { + tasks = _receivedContexts + .Select(x => (Task)x.Completion) + .ToList(); + } + + return Task.WhenAll(tasks); + } } }