From 31159bd0c64aca98a30bdc9b7507dfbbadd08c5d Mon Sep 17 00:00:00 2001 From: Valentin Troadec Date: Sun, 12 Nov 2023 17:36:09 +0100 Subject: [PATCH] fix(RecordRepository): fix copy infinite loop when record added during copy --- .../akhq/repositories/RecordRepository.java | 32 ++++++++++++++- src/test/java/org/akhq/KafkaTestCluster.java | 12 ++++++ .../repositories/RecordRepositoryTest.java | 41 +++++++++++++++++++ 3 files changed, 84 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index f8833cd73..bd95b301a 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -1035,12 +1035,17 @@ public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List< ); } + Map partitionsLastOffsetMap = fromTopic.getPartitions() + .stream() + .collect(Collectors.toMap(Function.identity(), Partition::getLastOffset)); + boolean samePartition = toTopic.getPartitions().size() == fromTopic.getPartitions().size(); KafkaProducer producer = kafkaModule.getProducer(toClusterId); ConsumerRecords records; do { - records = this.poll(consumer); + records = this.pollAndFilter(consumer, partitionsLastOffsetMap); + for (ConsumerRecord record : records) { System.out.println(record.offset() + "-" + record.partition()); @@ -1064,6 +1069,31 @@ public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List< return new CopyResult(counter); } + /** + * Polls the records and filters them with a maximum offset + * + * @param consumer + * @param partitionsLastOffsetMap key : partition, value : the maximum offset we want to reach + * @return filtered records after polled. And an empty one if there are no records polled + * or if every record has been filtered + */ + private ConsumerRecords pollAndFilter(KafkaConsumer consumer, Map partitionsLastOffsetMap) { + ConsumerRecords records = this.poll(consumer); + return new ConsumerRecords<>(partitionsLastOffsetMap.entrySet() + .stream() + .map(entry -> + { + // We filter records by partition + TopicPartition topicPartition = new TopicPartition(entry.getKey().getTopic(), entry.getKey().getId()); + return Map.entry(topicPartition, records.records(topicPartition) + .stream() + .filter(consumerRecord -> consumerRecord.offset() < entry.getValue()) + .collect(Collectors.toList())); + } + ).filter(entry -> !entry.getValue().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + @ToString @EqualsAndHashCode @AllArgsConstructor diff --git a/src/test/java/org/akhq/KafkaTestCluster.java b/src/test/java/org/akhq/KafkaTestCluster.java index 543b4df17..eedbba9cf 100644 --- a/src/test/java/org/akhq/KafkaTestCluster.java +++ b/src/test/java/org/akhq/KafkaTestCluster.java @@ -45,6 +45,8 @@ public class KafkaTestCluster implements Runnable { public static final String CLUSTER_ID = "test"; public static final String TOPIC_RANDOM = "random"; + public static final String TOPIC_COPY = "copy"; + public static final String TOPIC_COPY_DLT = "copy.dlt"; public static final String TOPIC_TOBE_EMPTIED = "emptied"; public static final String TOPIC_COMPACTED = "compacted"; public static final String TOPIC_EMPTY = "empty"; @@ -279,6 +281,16 @@ private void injectTestData() throws InterruptedException, ExecutionException { } log.debug("Random topic created"); + // random data + testUtils.createTopic(TOPIC_COPY_DLT, 3, (short) 1); + for (int partition = 0; partition < 3; partition++) { + testUtils.produceRecords(randomDatas(50, 0), TOPIC_COPY_DLT, partition); + } + log.debug("DLT copy created"); + + testUtils.createTopic(TOPIC_COPY, 3, (short) 1); + log.debug("Copy created"); + // random data to be emptied testUtils.createTopic(TOPIC_TOBE_EMPTIED, 3, (short) 1); for (int partition = 0; partition < 3; partition++) { diff --git a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java index 8e4d37c0f..deab62a1f 100644 --- a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.akhq.AbstractTest; import org.akhq.KafkaTestCluster; +import org.akhq.controllers.TopicController; import org.akhq.models.Record; import org.akhq.models.Schema; import org.akhq.models.Topic; @@ -23,6 +24,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -322,4 +324,43 @@ private int searchAll(RecordRepository.Options options) throws ExecutionExceptio return size.get(); } + + @Test + void copy() throws ExecutionException, InterruptedException, RestClientException, IOException { + + RecordRepository.Options optionsFrom = new RecordRepository.Options(environment, KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_COPY_DLT); + RecordRepository.Options optionsTo = new RecordRepository.Options(environment, KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_COPY); + optionsTo.setSize(500); + + Topic topicFrom = topicRepository.findByName(optionsFrom.getClusterId(), optionsFrom.getTopic()); + String toClusterId = KafkaTestCluster.CLUSTER_ID; + Topic topicTo = topicRepository.findByName(optionsTo.getClusterId(), optionsTo.getTopic()); + + List offsets = topicTo.getPartitions() + .stream() + .map(partition -> new TopicController.OffsetCopy(partition.getId(), partition.getLastOffset())) + .collect(Collectors.toList()); + + // We simulate the case a record has been added after the method copy has been used + this.repository.produce( + KafkaTestCluster.CLUSTER_ID, + KafkaTestCluster.TOPIC_COPY_DLT, + Optional.of("value"), + Collections.emptyList(), + Optional.of("key"), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty() + ); + + RecordRepository.CopyResult copyResult = this.repository.copy(topicFrom, toClusterId, topicTo, offsets, optionsFrom); + + log.info("Copied " + copyResult.records + " records"); + + assertEquals(150, copyResult.records); + + //We empty the topic to set it to his initialisation state + this.repository.emptyTopic(KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_COPY); + } }