Skip to content

Commit

Permalink
fix: solve Offset Manager and Worker racing condition when stopping
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Dec 6, 2023
1 parent 40f6bac commit 453996e
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 18 deletions.
6 changes: 2 additions & 4 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@ public ConsumerWorker(

public IEvent<IMessageContext> WorkerProcessingEnded => _workerProcessingEnded;

public ValueTask EnqueueAsync(
IMessageContext context,
CancellationToken stopCancellationToken)
public ValueTask EnqueueAsync(IMessageContext context)
{
return _messagesBuffer.Writer.WriteAsync(context, stopCancellationToken);
return _messagesBuffer.Writer.WriteAsync(context, CancellationToken.None);
}

public Task StartAsync()
Expand Down
11 changes: 4 additions & 7 deletions src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public async Task StopAsync()

public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, CancellationToken stopCancellationToken)
{
await _startedTaskSource.Task.ConfigureAwait(false);
await _startedTaskSource.Task;

var worker = (IConsumerWorker)await _distributionStrategy
.GetWorkerAsync(
Expand All @@ -136,8 +136,7 @@ public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, Cancellati
message.Topic,
message.Partition.Value,
message.Message.Key,
stopCancellationToken))
.ConfigureAwait(false);
stopCancellationToken));

if (worker is null)
{
Expand All @@ -146,11 +145,9 @@ public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, Cancellati

var context = this.CreateMessageContext(message, worker);

await worker
.EnqueueAsync(context, stopCancellationToken)
.ConfigureAwait(false);

_offsetManager.Enqueue(context.ConsumerContext);

await worker.EnqueueAsync(context);
}

private MessageContext CreateMessageContext(ConsumeResult<byte[], byte[]> message, IConsumerWorker worker)
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Consumers/IConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ internal interface IConsumerWorker : IWorker, IDisposable

IDependencyResolver WorkerDependencyResolver { get; }

ValueTask EnqueueAsync(IMessageContext context, CancellationToken stopCancellationToken);
ValueTask EnqueueAsync(IMessageContext context);

Task StartAsync();

Expand Down
20 changes: 14 additions & 6 deletions src/KafkaFlow/Consumers/PartitionOffsets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,24 @@ public bool TryDequeue(IConsumerContext context)
{
this.DequeuedContext = _receivedContexts.First.Value;
_receivedContexts.RemoveFirst();
}
while (_receivedContexts.Count > 0 && _processedContexts.Remove(_receivedContexts.First.Value.Offset));
} while (_receivedContexts.Count > 0 && _processedContexts.Remove(_receivedContexts.First.Value.Offset));
}

return true;
}

public Task WaitContextsCompletionAsync() => Task.WhenAll(
_receivedContexts
.Select(x => x.Completion)
.ToList());
public Task WaitContextsCompletionAsync()
{
List<Task> tasks;

lock (_receivedContexts)
{
tasks = _receivedContexts
.Select(x => (Task)x.Completion)
.ToList();
}

return Task.WhenAll(tasks);
}
}
}

0 comments on commit 453996e

Please sign in to comment.