Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Report]: BatchConsumer with ManualMessageCompletion does not commit all messages #578

Open
1 task done
Gunth opened this issue Jun 24, 2024 · 0 comments
Open
1 task done
Labels
bug Something isn't working

Comments

@Gunth
Copy link

Gunth commented Jun 24, 2024

Prerequisites

  • I have searched issues to ensure it has not already been reported

Description

Hi there,

I'm trying to consume my kafka messages in batch and complete the message myself. For this I've added a consumer with batching configuration and WithManualMessageCompletion feature.

When consuming them in my handler I complete each messages from the batch and also the message containing the batch ( not sure if it's needed ). I'm seeing some logs saying "KafkaFlow: Offsets committed ..." but when I restart my service it re-process a lot of messages that has been already processed in my previous execution.

If I'm not using WithManualMessageCompletion() all messages a correctly committed and no messages are coming back with the next execution.

Did a miss something or we should not use ManualMessageCompletion with batching ?

G.

Steps to reproduce

Add consumer :

var config = new ConsumerConfig()
{
    BootstrapServers = configuration.GetValue<string>(Config.Properties.KafkaBrokers),
    SecurityProtocol = SecurityProtocol.SaslSsl,
    SaslMechanism = SaslMechanism.OAuthBearer,
    ClientId = "xyz",
};

.AddConsumer(consumer => consumer
    .WithConsumerConfig(new ConsumerConfig(config))
    .Topics([finalTopic, importTopic, globalTopic])
    .WithGroupId(topicConsumerGroup)
    .WithManualMessageCompletion()
    .AddMiddlewares(middlewares => middlewares
        .AddBatching(configuration.GetValue<int>(Config.Properties.KafkaMessageTrackerWorkersBatchSize
            , Config.Defaults.KafkaMessageTrackerWorkersBatchSize), TimeSpan.FromSeconds(5))
        .Add(resolver =>
        {
            return new TrackingEventProcessor();
        })
    )
)

Add handler to consume messages:

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
    var messagesContext = context.GetMessagesBatch();

    DoSomeWorks...
    
    foreach (var messageContext in messagesContext)
    {
        messageContext.ConsumerContext.Complete();
    }

    context.ConsumerContext.Complete();
            
    await next(context);
}

Expected behavior

All messages should be committed when they are completed.

Actual behavior

Some of them are committed but not all of them.
When sending 110 messages in kafka 23 are comming back on each new execution of the service.

KafkaFlow version

3.8.0

@Gunth Gunth added the bug Something isn't working label Jun 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Development

No branches or pull requests

1 participant