Skip to content

Commit

Permalink
Fix consumer committing offsets for bugfix release (#97)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Stejskal <xstejs24@gmail.com>
  • Loading branch information
Frawless authored Jun 20, 2024
1 parent a8235b5 commit a0fbf40
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions clients/src/main/java/io/strimzi/kafka/KafkaConsumerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private void checkAndReceiveMessages() {

public void consumeMessages() {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
int recordProcessed = 0;

for (ConsumerRecord<String, String> record : records) {
LOGGER.info("Received message:");
Expand All @@ -114,9 +115,14 @@ public void consumeMessages() {
LOGGER.info("\t\tkey: {}, value: {}", header.key(), new String(header.value()));
}
}
consumedMessages++;
recordProcessed++;
}

consumer.commitSync();
try {
consumer.commitSync();
consumedMessages += recordProcessed;
} catch (Exception ex) {
LOGGER.warn("Failed to commit the offset: {}", ex.getMessage());
}
}
}

0 comments on commit a0fbf40

Please sign in to comment.