Skip to content

Commit

Permalink
fix(topicdata): fix copy infinite loop when record added during copy (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
valTroadec authored Dec 6, 2023
1 parent 829f364 commit addb48a
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
32 changes: 31 additions & 1 deletion src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -1035,12 +1035,17 @@ public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List<
);
}

Map<Partition, Long> partitionsLastOffsetMap = fromTopic.getPartitions()
.stream()
.collect(Collectors.toMap(Function.identity(), Partition::getLastOffset));

boolean samePartition = toTopic.getPartitions().size() == fromTopic.getPartitions().size();

KafkaProducer<byte[], byte[]> producer = kafkaModule.getProducer(toClusterId);
ConsumerRecords<byte[], byte[]> records;
do {
records = this.poll(consumer);
records = this.pollAndFilter(consumer, partitionsLastOffsetMap);

for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(record.offset() + "-" + record.partition());

Expand All @@ -1064,6 +1069,31 @@ public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List<
return new CopyResult(counter);
}

/**
* Polls the records and filters them with a maximum offset
*
* @param consumer
* @param partitionsLastOffsetMap key : partition, value : the maximum offset we want to reach
* @return filtered records after polled. And an empty one if there are no records polled
* or if every record has been filtered
*/
private ConsumerRecords<byte[], byte[]> pollAndFilter(KafkaConsumer<byte[], byte[]> consumer, Map<Partition, Long> partitionsLastOffsetMap) {
ConsumerRecords<byte[], byte[]> records = this.poll(consumer);
return new ConsumerRecords<>(partitionsLastOffsetMap.entrySet()
.stream()
.map(entry ->
{
// We filter records by partition
TopicPartition topicPartition = new TopicPartition(entry.getKey().getTopic(), entry.getKey().getId());
return Map.entry(topicPartition, records.records(topicPartition)
.stream()
.filter(consumerRecord -> consumerRecord.offset() < entry.getValue())
.collect(Collectors.toList()));
}
).filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

@ToString
@EqualsAndHashCode
@AllArgsConstructor
Expand Down
34 changes: 34 additions & 0 deletions src/test/java/org/akhq/repositories/RecordRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import lombok.extern.slf4j.Slf4j;
import org.akhq.AbstractTest;
import org.akhq.KafkaTestCluster;
import org.akhq.controllers.TopicController;
import org.akhq.models.Record;
import org.akhq.models.Schema;
import org.akhq.models.Topic;
Expand All @@ -23,6 +24,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -322,4 +324,36 @@ private int searchAll(RecordRepository.Options options) throws ExecutionExceptio

return size.get();
}

@Test
void copy() throws ExecutionException, InterruptedException, RestClientException, IOException {

RecordRepository.Options optionsFromAndTo = new RecordRepository.Options(environment, KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_RANDOM);

Topic topicFromAndTo = topicRepository.findByName(optionsFromAndTo.getClusterId(), optionsFromAndTo.getTopic());

List<TopicController.OffsetCopy> offsets = topicFromAndTo.getPartitions()
.stream()
.map(partition -> new TopicController.OffsetCopy(partition.getId(), partition.getLastOffset()))
.collect(Collectors.toList());

// We simulate the case a record has been added after the method copy has been used
this.repository.produce(
KafkaTestCluster.CLUSTER_ID,
KafkaTestCluster.TOPIC_RANDOM,
Optional.of("value"),
Collections.emptyList(),
Optional.of("key"),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty()
);

RecordRepository.CopyResult copyResult = this.repository.copy(topicFromAndTo, KafkaTestCluster.CLUSTER_ID, topicFromAndTo, offsets, optionsFromAndTo);

log.info("Copied " + copyResult.records + " records");

assertEquals(300, copyResult.records);
}
}

0 comments on commit addb48a

Please sign in to comment.