diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index e23b0de9be..121c0bea1c 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -18,7 +18,7 @@ package kafka.server import com.yammer.metrics.core.Meter -import io.aiven.inkless.control_plane.FindBatchRequest +import io.aiven.inkless.control_plane.FindBatchResponse import kafka.utils.Logging import java.util.concurrent.TimeUnit @@ -60,6 +60,7 @@ class DelayedFetch( minBytes: Option[Int] = None, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, ) extends DelayedOperation(maxWaitMs.getOrElse(params.maxWaitMs)) with Logging { + var maybeBatchCoordinates: Option[Map[TopicIdPartition, FindBatchResponse]] = None override def toString: String = { s"DelayedFetch(params=$params" + @@ -153,7 +154,15 @@ class DelayedFetch( } } - tryCompleteDiskless(disklessFetchPartitionStatus) match { + // adjust the max bytes for diskless fetches based on the percentage of diskless partitions + // Complete the classic fetches first + val classicRequestsSize = classicFetchPartitionStatus.size.toFloat + val disklessRequestsSize = disklessFetchPartitionStatus.size.toFloat + val totalRequestsSize = classicRequestsSize + disklessRequestsSize + val disklessPercentage = disklessRequestsSize / totalRequestsSize + val disklessParams = replicaManager.fetchParamsWithNewMaxBytes(params, disklessPercentage) + + tryCompleteDiskless(disklessFetchPartitionStatus, disklessParams.maxBytes) match { case Some(disklessAccumulatedSize) => accumulatedSize += disklessAccumulatedSize case None => forceComplete() } @@ -174,53 +183,55 @@ class DelayedFetch( * Case D: The fetch offset is equal to the end offset, meaning that we have reached the end of the log * Upon completion, should return whatever data is available for each valid partition */ - private def tryCompleteDiskless(fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Option[Long] = { + private def tryCompleteDiskless( + fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], + disklessMaxBytes: Int + ): Option[Long] = { var accumulatedSize = 0L val fetchPartitionStatusMap = fetchPartitionStatus.toMap - val requests = fetchPartitionStatus.map { case (topicIdPartition, fetchStatus) => - new FindBatchRequest(topicIdPartition, fetchStatus.startOffsetMetadata.messageOffset, fetchStatus.fetchInfo.maxBytes) - } - if (requests.isEmpty) return Some(0) - val response = try { - replicaManager.findDisklessBatches(requests, Int.MaxValue) + maybeBatchCoordinates = try { + Some(replicaManager.findDisklessBatches(fetchPartitionStatus, disklessMaxBytes)) } catch { case e: Throwable => error("Error while trying to find diskless batches on delayed fetch.", e) return None // Case C } - response.get.asScala.foreach { r => - r.errors() match { - case Errors.NONE => - if (r.batches().size() > 0) { - // Gather topic id partition from first batch. Same for all batches in the response. - val topicIdPartition = r.batches().get(0).metadata().topicIdPartition() - val endOffset = r.highWatermark() - - val fetchPartitionStatus = fetchPartitionStatusMap.get(topicIdPartition) - if (fetchPartitionStatus.isEmpty) { - warn(s"Fetch partition status for $topicIdPartition not found in delayed fetch $this.") - return None // Case C - } - - val fetchOffset = fetchPartitionStatus.get.startOffsetMetadata - // If the fetch offset is greater than the end offset, it means that the log has been truncated - // If it is equal to the end offset, it means that we have reached the end of the log - // If the fetch offset is less than the end offset, we can accumulate the size of the batches - if (fetchOffset.messageOffset > endOffset) { - // Truncation happened - debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") - return None // Case A - } else if (fetchOffset.messageOffset < endOffset) { - val bytesAvailable = r.estimatedByteSize(fetchOffset.messageOffset) - accumulatedSize += bytesAvailable // Case B: accumulate the size of the batches - } // Case D: same as fetchOffset == endOffset, no new data available + maybeBatchCoordinates match { + case Some(exists) => + exists.values.foreach { r => + r.errors() match { + case Errors.NONE => + if (r.batches().size() > 0) { + // Gather topic id partition from first batch. Same for all batches in the response. + val topicIdPartition = r.batches().get(0).metadata().topicIdPartition() + val endOffset = r.highWatermark() + + val fetchPartitionStatus = fetchPartitionStatusMap.get(topicIdPartition) + if (fetchPartitionStatus.isEmpty) { + warn(s"Fetch partition status for $topicIdPartition not found in delayed fetch $this.") + return None // Case C + } + + val fetchOffset = fetchPartitionStatus.get.startOffsetMetadata + // If the fetch offset is greater than the end offset, it means that the log has been truncated + // If it is equal to the end offset, it means that we have reached the end of the log + // If the fetch offset is less than the end offset, we can accumulate the size of the batches + if (fetchOffset.messageOffset > endOffset) { + // Truncation happened + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return None // Case A + } else if (fetchOffset.messageOffset < endOffset) { + val bytesAvailable = r.estimatedByteSize(fetchOffset.messageOffset) + accumulatedSize += bytesAvailable // Case B: accumulate the size of the batches + } // Case D: same as fetchOffset == endOffset, no new data available + } + case _ => return None // Case C } - case _ => return None // Case C - } + } + case None => // Case D } - Some(accumulatedSize) } @@ -272,13 +283,16 @@ class DelayedFetch( if (disklessRequestsSize > 0) { // Classic fetches are complete, now handle diskless fetches - // adjust the max bytes for diskless fetches based on the percentage of diskless partitions - val disklessPercentage = disklessRequestsSize / totalRequestsSize - val disklessParams = replicaManager.fetchParamsWithNewMaxBytes(params, disklessPercentage) val disklessFetchInfos = disklessFetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo } - val disklessFetchResponseFuture = replicaManager.fetchDisklessMessages(disklessParams, disklessFetchInfos) + val batchCoordinates = maybeBatchCoordinates match { + case Some(batchCoordinates) => batchCoordinates + case None => + responseCallback(Seq.empty) + return + } + val disklessFetchResponseFuture = replicaManager.fetchDisklessMessages(batchCoordinates, disklessFetchInfos) // Combine the classic fetch results with the diskless fetch results disklessFetchResponseFuture.whenComplete { case (disklessFetchPartitionData, _) => diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 92a27aabf2..2bf72a3dcc 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1719,16 +1719,40 @@ class ReplicaManager(val config: KafkaConfig, } } - def findDisklessBatches(requests: Seq[FindBatchRequest], maxBytes: Int): Option[util.List[FindBatchResponse]] = { - inklessSharedState.map { sharedState => - sharedState.controlPlane().findBatches(requests.asJava, maxBytes, sharedState.config().maxBatchesPerPartitionToFind()) + def findDisklessBatches(fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], maxBytes: Int): Map[TopicIdPartition, FindBatchResponse] = { + val requests = fetchPartitionStatus.map { case (topicIdPartition, fetchStatus) => + new FindBatchRequest(topicIdPartition, fetchStatus.startOffsetMetadata.messageOffset, fetchStatus.fetchInfo.maxBytes) + } + if (requests.isEmpty) return Map.empty + + val findBatchResponses = try { + inklessSharedState.map { sharedState => + sharedState.controlPlane().findBatches(requests.asJava, maxBytes, sharedState.config().maxBatchesPerPartitionToFind()) + } + } match { + case Some(responses) => responses + case None => + return Map.empty + } catch { + case e: Throwable => + // kala + trace("Error while trying to find diskless batches.", e) + return Map.empty } + + val topicPartitionToFindBatchResponse = collection.mutable.Map[TopicIdPartition, FindBatchResponse]() + for (i <- requests.indices) { + val request = requests(i) + val response = findBatchResponses.get(i) + topicPartitionToFindBatchResponse.update(request.topicIdPartition, response) + } + topicPartitionToFindBatchResponse; } - def fetchDisklessMessages(params: FetchParams, + def fetchDisklessMessages(batchCoordinates: Map[TopicIdPartition, FindBatchResponse], fetchInfos: Seq[(TopicIdPartition, PartitionData)]): CompletableFuture[Seq[(TopicIdPartition, FetchPartitionData)]] = { inklessFetchHandler match { - case Some(handler) => handler.handle(params, fetchInfos.toMap.asJava).thenApply(_.asScala.toSeq) + case Some(handler) => handler.handle(batchCoordinates.asJava, fetchInfos.toMap.asJava).thenApply(_.asScala.toSeq) case None => if (fetchInfos.nonEmpty) error(s"Received diskless fetch request for topics ${fetchInfos.map(_._1.topic()).distinct.mkString(", ")} but diskless fetch handler is not available. " + @@ -1830,6 +1854,8 @@ class ReplicaManager(val config: KafkaConfig, delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, (classicDelayedFetchKeys ++ disklessDelayedFetchKeys).asJava) } + // If there is nothing to fetch for classic topics, + // create delayed response and fetch possible diskless data there. if (classicFetchInfos.isEmpty) { delayedResponse(Seq.empty) return @@ -1894,9 +1920,18 @@ class ReplicaManager(val config: KafkaConfig, // In case of remote fetches, synchronously wait for diskless records and then perform the remote fetch. // This is currently a workaround to avoid modifying the DelayedRemoteFetch in order to correctly process // diskless fetches. + // Get diskless batch coordinates and hand over to fetching + val batchCoordinates = try { + findDisklessBatches(fetchPartitionStatus, Int.MaxValue) + } catch { + case e: Throwable => + error("Error while trying to find diskless batches on remote fetch.", e) + responseCallback(Seq.empty) + return + } + val disklessFetchResults = try { - val disklessParams = fetchParamsWithNewMaxBytes(params, disklessFetchInfos.size.toFloat / fetchInfos.size.toFloat) - val disklessResponsesFuture = fetchDisklessMessages(disklessParams, disklessFetchInfos) + val disklessResponsesFuture = fetchDisklessMessages(batchCoordinates, disklessFetchInfos) val response = disklessResponsesFuture.get(maxWaitMs, TimeUnit.MILLISECONDS) response.map { case (tp, data) => @@ -1933,8 +1968,11 @@ class ReplicaManager(val config: KafkaConfig, } } else { if (disklessFetchInfos.isEmpty && (bytesReadable >= params.minBytes || params.maxWaitMs <= 0)) { + // No remote fetch needed and not any diskless topics to be fetched. + // Response immediately. responseCallback(fetchPartitionData) } else { + // No remote fetch, requires fetching data from the diskless topics. delayedResponse(fetchPartitionStatus) } } diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 23c30663d1..354ce3cc3e 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -16,7 +16,7 @@ */ package kafka.server -import io.aiven.inkless.control_plane.{BatchInfo, BatchMetadata, FindBatchRequest, FindBatchResponse} +import io.aiven.inkless.control_plane.{BatchInfo, BatchMetadata, FindBatchResponse} import java.util.{Collections, Optional, OptionalLong} import scala.collection.Seq @@ -213,6 +213,9 @@ class DelayedFetchTest { responseCallback = callback ) + val batchCoordinates = Map.empty[TopicIdPartition, FindBatchResponse] + when(replicaManager.findDisklessBatches(any(), anyInt())).thenReturn(batchCoordinates) + val partition: Partition = mock(classOf[Partition]) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) // Note that the high-watermark does not contain the complete metadata @@ -345,12 +348,13 @@ class DelayedFetchTest { ))) when(mockResponse.highWatermark()).thenReturn(endOffset) // endOffset < fetchOffset (truncation) - val future = Some(Collections.singletonList(mockResponse)) - when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future) + val batchCoordinates = Map((topicIdPartition, mockResponse)) + + when(replicaManager.findDisklessBatches(any(), anyInt())).thenReturn(batchCoordinates) // Mock fetchDisklessMessages for onComplete when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], any[Float])).thenAnswer(_.getArgument(0)) - when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + when(replicaManager.fetchDisklessMessages(any[Map[TopicIdPartition, FindBatchResponse]], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) .thenReturn(CompletableFuture.completedFuture(Seq((topicIdPartition, mock(classOf[FetchPartitionData]))))) when(replicaManager.readFromLog( @@ -402,6 +406,7 @@ class DelayedFetchTest { fetchResultOpt = Some(responses) } + when(replicaManager.fetchParamsWithNewMaxBytes(any(), any())).thenReturn(fetchParams) val delayedFetch = new DelayedFetch( params = fetchParams, classicFetchPartitionStatus = Seq.empty, @@ -434,8 +439,8 @@ class DelayedFetchTest { when(mockResponse.highWatermark()).thenReturn(fetchOffset) // fetchOffset == endOffset (no new data) when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize) - val future = Some(Collections.singletonList(mockResponse)) - when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future) + val future = Map((topicIdPartition, mockResponse)) + when(replicaManager.findDisklessBatches(any[Seq[(TopicIdPartition, FetchPartitionStatus)]], anyInt())).thenReturn(future) when(replicaManager.readFromLog( fetchParams, @@ -451,7 +456,7 @@ class DelayedFetchTest { assertFalse(fetchResultOpt.isDefined) // Verify that estimatedByteSize is never called since fetchOffset == endOffset - verify(replicaManager, never()).fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]]) + verify(replicaManager, never()).fetchDisklessMessages(any[Map[TopicIdPartition, FindBatchResponse]], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]]) verify(mockResponse, never()).estimatedByteSize(anyLong()) } @@ -487,6 +492,7 @@ class DelayedFetchTest { fetchResultOpt = Some(responses) } + when(replicaManager.fetchParamsWithNewMaxBytes(any(), any())).thenReturn(fetchParams) val delayedFetch = new DelayedFetch( params = fetchParams, classicFetchPartitionStatus = Seq.empty, @@ -519,8 +525,8 @@ class DelayedFetchTest { when(mockResponse.highWatermark()).thenReturn(endOffset) // endOffset > fetchOffset (data available) when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize) - val future = Some(Collections.singletonList(mockResponse)) - when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future) + val batchCoordinates = Map((topicIdPartition, mockResponse)) + when(replicaManager.findDisklessBatches(any(), anyInt())).thenReturn(batchCoordinates) when(replicaManager.readFromLog( fetchParams, @@ -601,12 +607,12 @@ class DelayedFetchTest { when(mockResponse.highWatermark()).thenReturn(endOffset) // endOffset > fetchOffset (data available) when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize) - val future = Some(Collections.singletonList(mockResponse)) - when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future) + val batchCoordinates = Map((topicIdPartition, mockResponse)) + when(replicaManager.findDisklessBatches(any(), anyInt())).thenReturn(batchCoordinates) // Mock fetchDisklessMessages for onComplete when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) - when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + when(replicaManager.fetchDisklessMessages(any[Map[TopicIdPartition, FindBatchResponse]], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) .thenReturn(CompletableFuture.completedFuture(Seq((topicIdPartition, mock(classOf[FetchPartitionData]))))) when(replicaManager.readFromLog( @@ -685,12 +691,12 @@ class DelayedFetchTest { ))) when(mockResponse.highWatermark()).thenReturn(600L) - val future = Some(Collections.singletonList(mockResponse)) - when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future) + val batchCoordinates = Map((topicIdPartition, mockResponse)) + when(replicaManager.findDisklessBatches(any(), anyInt())).thenReturn(batchCoordinates) // Mock fetchDisklessMessages for onComplete when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) - when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + when(replicaManager.fetchDisklessMessages(any[Map[TopicIdPartition, FindBatchResponse]], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) .thenReturn(CompletableFuture.completedFuture(Seq((topicIdPartition, mock(classOf[FetchPartitionData]))))) when(replicaManager.readFromLog( diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 3e6dcc78cc..837b4fd9db 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -7098,9 +7098,10 @@ class ReplicaManagerTest { // and the response does not satisfy minBytes, it should be delayed in the purgatory // until the delayed fetch expires. replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) - assertEquals(0, replicaManager.delayedFetchPurgatory.numDelayed()) + assertEquals(1, replicaManager.delayedFetchPurgatory.numDelayed()) latch.await(10, TimeUnit.SECONDS) // Wait for the delayed fetch to expire + assertEquals(0, replicaManager.delayedFetchPurgatory.numDelayed()) assertNotNull(responseData) assertEquals(2, responseData.size) assertEquals(disklessResponse(disklessTopicPartition), responseData(disklessTopicPartition)) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java index fb10490dd5..c957a2bf0d 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.slf4j.Logger; @@ -37,6 +36,7 @@ import java.util.stream.Collectors; import io.aiven.inkless.common.SharedState; +import io.aiven.inkless.control_plane.FindBatchResponse; public class FetchHandler implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(FetchHandler.class); @@ -50,12 +50,10 @@ public FetchHandler(final SharedState state) { state.objectKeyCreator(), state.keyAlignmentStrategy(), state.cache(), - state.controlPlane(), state.buildStorage(), state.brokerTopicStats(), state.config().fetchMetadataThreadPoolSize(), - state.config().fetchDataThreadPoolSize(), - state.config().maxBatchesPerPartitionToFind() + state.config().fetchDataThreadPoolSize() ) ); } @@ -65,14 +63,14 @@ public FetchHandler(final Reader reader) { } public CompletableFuture> handle( - final FetchParams params, + final Map batchCoordinates, final Map fetchInfos ) { if (fetchInfos.isEmpty()) { return CompletableFuture.completedFuture(Map.of()); } - final CompletableFuture> resultFuture = reader.fetch(params, fetchInfos); + final CompletableFuture> resultFuture = reader.fetch(batchCoordinates, fetchInfos); return resultFuture.handle((result, e) -> { if (result == null) { // We don't really expect this future to fail, but in case it does... diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesException.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesException.java deleted file mode 100644 index 3a74c338af..0000000000 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesException.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Inkless - * Copyright (C) 2024 - 2025 Aiven OY - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package io.aiven.inkless.consume; - -public class FindBatchesException extends RuntimeException { - public FindBatchesException(Throwable cause) { - super(cause); - } -} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesJob.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesJob.java deleted file mode 100644 index 5791d0f6a4..0000000000 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesJob.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Inkless - * Copyright (C) 2024 - 2025 Aiven OY - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package io.aiven.inkless.consume; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.server.storage.log.FetchParams; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Consumer; -import java.util.function.Supplier; - -import io.aiven.inkless.TimeUtils; -import io.aiven.inkless.control_plane.ControlPlane; -import io.aiven.inkless.control_plane.FindBatchRequest; -import io.aiven.inkless.control_plane.FindBatchResponse; - -public class FindBatchesJob implements Supplier> { - - private final Time time; - private final ControlPlane controlPlane; - private final FetchParams params; - private final Map fetchInfos; - private final int maxBatchesPerPartition; - private final Consumer durationCallback; - - public FindBatchesJob(Time time, - ControlPlane controlPlane, - FetchParams params, - Map fetchInfos, - int maxBatchesPerPartition, - Consumer durationCallback) { - this.time = time; - this.controlPlane = controlPlane; - this.params = params; - this.fetchInfos = fetchInfos; - this.maxBatchesPerPartition = maxBatchesPerPartition; - this.durationCallback = durationCallback; - } - - @Override - public Map get() { - return TimeUtils.measureDurationMsSupplier(time, this::doWork, durationCallback); - } - - private Map doWork() { - try { - List requests = new ArrayList<>(); - for (Map.Entry fetchInfo : fetchInfos.entrySet()) { - TopicIdPartition topicIdPartition = fetchInfo.getKey(); - requests.add(new FindBatchRequest(topicIdPartition, fetchInfo.getValue().fetchOffset, fetchInfo.getValue().maxBytes)); - } - - List responses = controlPlane.findBatches(requests, params.maxBytes, maxBatchesPerPartition); - - Map out = new HashMap<>(); - for (int i = 0; i < requests.size(); i++) { - FindBatchRequest request = requests.get(i); - FindBatchResponse response = responses.get(i); - out.put(request.topicIdPartition(), response); - } - return out; - } catch (Exception e) { - throw new FindBatchesException(e); - } - } -} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index 1b04618951..2ae23ca58d 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -44,7 +43,7 @@ import io.aiven.inkless.common.InklessThreadFactory; import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.metrics.ThreadPoolMonitor; -import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.control_plane.FindBatchResponse; import io.aiven.inkless.storage_backend.common.ObjectFetcher; public class Reader implements AutoCloseable { @@ -54,9 +53,7 @@ public class Reader implements AutoCloseable { private final ObjectKeyCreator objectKeyCreator; private final KeyAlignmentStrategy keyAlignmentStrategy; private final ObjectCache cache; - private final ControlPlane controlPlane; private final ObjectFetcher objectFetcher; - private final int maxBatchesPerPartitionToFind; private final ExecutorService metadataExecutor; private final ExecutorService dataExecutor; private final InklessFetchMetrics fetchMetrics; @@ -65,25 +62,21 @@ public class Reader implements AutoCloseable { private ThreadPoolMonitor dataThreadPoolMonitor; public Reader( - Time time, - ObjectKeyCreator objectKeyCreator, - KeyAlignmentStrategy keyAlignmentStrategy, - ObjectCache cache, - ControlPlane controlPlane, - ObjectFetcher objectFetcher, - BrokerTopicStats brokerTopicStats, - int fetchMetadataThreadPoolSize, - int fetchDataThreadPoolSize, - int maxBatchesPerPartitionToFind + final Time time, + final ObjectKeyCreator objectKeyCreator, + final KeyAlignmentStrategy keyAlignmentStrategy, + final ObjectCache cache, + final ObjectFetcher objectFetcher, + final BrokerTopicStats brokerTopicStats, + final int fetchMetadataThreadPoolSize, + final int fetchDataThreadPoolSize ) { this( time, objectKeyCreator, keyAlignmentStrategy, cache, - controlPlane, objectFetcher, - maxBatchesPerPartitionToFind, Executors.newFixedThreadPool(fetchMetadataThreadPoolSize, new InklessThreadFactory("inkless-fetch-metadata-", false)), Executors.newFixedThreadPool(fetchDataThreadPoolSize, new InklessThreadFactory("inkless-fetch-data-", false)), brokerTopicStats @@ -91,24 +84,20 @@ public Reader( } public Reader( - Time time, - ObjectKeyCreator objectKeyCreator, - KeyAlignmentStrategy keyAlignmentStrategy, - ObjectCache cache, - ControlPlane controlPlane, - ObjectFetcher objectFetcher, - int maxBatchesPerPartitionToFind, - ExecutorService metadataExecutor, - ExecutorService dataExecutor, - BrokerTopicStats brokerTopicStats + final Time time, + final ObjectKeyCreator objectKeyCreator, + final KeyAlignmentStrategy keyAlignmentStrategy, + final ObjectCache cache, + final ObjectFetcher objectFetcher, + final ExecutorService metadataExecutor, + final ExecutorService dataExecutor, + final BrokerTopicStats brokerTopicStats ) { this.time = time; this.objectKeyCreator = objectKeyCreator; this.keyAlignmentStrategy = keyAlignmentStrategy; this.cache = cache; - this.controlPlane = controlPlane; this.objectFetcher = objectFetcher; - this.maxBatchesPerPartitionToFind = maxBatchesPerPartitionToFind; this.metadataExecutor = metadataExecutor; this.dataExecutor = dataExecutor; this.fetchMetrics = new InklessFetchMetrics(time, cache); @@ -123,84 +112,68 @@ public Reader( } public CompletableFuture> fetch( - final FetchParams params, - final Map fetchInfos + final Map batchCoordinates, + final Map fetchInfos ) { final Instant startAt = TimeUtils.durationMeasurementNow(time); fetchMetrics.fetchStarted(fetchInfos.size()); - final var batchCoordinates = CompletableFuture.supplyAsync( - new FindBatchesJob( + return CompletableFuture.supplyAsync( + new FetchPlanner( time, - controlPlane, - params, - fetchInfos, - maxBatchesPerPartitionToFind, - fetchMetrics::findBatchesFinished + objectKeyCreator, + keyAlignmentStrategy, + cache, + objectFetcher, + dataExecutor, + batchCoordinates, + fetchMetrics ), metadataExecutor - ); - return batchCoordinates.thenApply( - coordinates -> - new FetchPlanner( - time, - objectKeyCreator, - keyAlignmentStrategy, - cache, - objectFetcher, - dataExecutor, - coordinates, - fetchMetrics - ).get() - ) - .thenCombineAsync(batchCoordinates, (fileExtents, coordinates) -> - new FetchCompleter( + ).thenApply(fileExtents -> { + return new FetchCompleter( time, objectKeyCreator, fetchInfos, - coordinates, + batchCoordinates, fileExtents, fetchMetrics::fetchCompletionFinished - ).get() - ) - .whenComplete((topicIdPartitionFetchPartitionDataMap, throwable) -> { - // Mark broker side fetch metrics - if (throwable != null) { - LOGGER.warn("Fetch failed", throwable); - for (final var entry : fetchInfos.entrySet()) { - final String topic = entry.getKey().topic(); - brokerTopicStats.allTopicsStats().failedFetchRequestRate().mark(); - brokerTopicStats.topicStats(topic).failedFetchRequestRate().mark(); - } - // Check if the exception was caused by a fetch related exception and increment the relevant metric - if (throwable instanceof CompletionException) { - // Finding batches fails on the initial stage - if (throwable.getCause() instanceof FindBatchesException) { - fetchMetrics.findBatchesFailed(); - } else if (throwable.getCause() instanceof FetchException) { - // but storage-related exceptions are wrapped twice as they happen within the fetch completer - final Throwable fetchException = throwable.getCause(); - if (fetchException.getCause() instanceof FileFetchException) { - fetchMetrics.fileFetchFailed(); - } else if (fetchException.getCause() instanceof CacheFetchException) { - fetchMetrics.cacheFetchFailed(); - } + ).get(); + }).whenComplete((topicIdPartitionFetchPartitionDataMap, throwable) -> { + // Mark broker side fetch metrics + if (throwable != null) { + LOGGER.warn("Fetch failed", throwable); + for (final var entry : fetchInfos.entrySet()) { + final String topic = entry.getKey().topic(); + brokerTopicStats.allTopicsStats().failedFetchRequestRate().mark(); + brokerTopicStats.topicStats(topic).failedFetchRequestRate().mark(); + } + // Check if the exception was caused by a fetch related exception and increment the relevant metric + if (throwable instanceof CompletionException) { + if (throwable.getCause() instanceof FetchException) { + // but storage-related exceptions are wrapped twice as they happen within the fetch completer + final Throwable fetchException = throwable.getCause(); + if (fetchException.getCause() instanceof FileFetchException) { + fetchMetrics.fileFetchFailed(); + } else if (fetchException.getCause() instanceof CacheFetchException) { + fetchMetrics.cacheFetchFailed(); } } - fetchMetrics.fetchFailed(); - } else { - for (final var entry : topicIdPartitionFetchPartitionDataMap.entrySet()) { - final String topic = entry.getKey().topic(); - if (entry.getValue().error == Errors.NONE) { - brokerTopicStats.allTopicsStats().totalFetchRequestRate().mark(); - brokerTopicStats.topicStats(topic).totalFetchRequestRate().mark(); - } else { - brokerTopicStats.allTopicsStats().failedFetchRequestRate().mark(); - brokerTopicStats.topicStats(topic).failedFetchRequestRate().mark(); - } + } + fetchMetrics.fetchFailed(); + } else { + for (final var entry : topicIdPartitionFetchPartitionDataMap.entrySet()) { + final String topic = entry.getKey().topic(); + if (entry.getValue().error == Errors.NONE) { + brokerTopicStats.allTopicsStats().totalFetchRequestRate().mark(); + brokerTopicStats.topicStats(topic).totalFetchRequestRate().mark(); + } else { + brokerTopicStats.allTopicsStats().failedFetchRequestRate().mark(); + brokerTopicStats.topicStats(topic).failedFetchRequestRate().mark(); } - fetchMetrics.fetchCompleted(startAt); } - }); + fetchMetrics.fetchCompleted(startAt); + } + }); } @Override diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchHandlerTest.java index e42b945474..29c864ce35 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchHandlerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchHandlerTest.java @@ -25,8 +25,6 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.server.storage.log.FetchIsolation; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.junit.jupiter.api.Test; @@ -36,6 +34,7 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; @@ -63,16 +62,12 @@ public class FetchHandlerTest { public void readerFutureFailed() throws Exception { when(reader.fetch(any(), any())).thenReturn(CompletableFuture.failedFuture(new RuntimeException())); try (FetchHandler handler = new FetchHandler(reader)) { - final FetchParams params = new FetchParams(fetchVersion, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = Map.of( topicIdPartition, new FetchRequest.PartitionData(inklessUuid, 0, 0, 1024, Optional.empty()) ); - final var result = handler.handle(params, fetchInfos).get(); + final var result = handler.handle(Collections.emptyMap(), fetchInfos).get(); assertThat(result).hasSize(1); assertThat(result.get(topicIdPartition)).satisfies(data -> { @@ -102,17 +97,12 @@ public void readerFutureSuccess() throws Exception { ); when(reader.fetch(any(), any())).thenReturn(CompletableFuture.completedFuture(value)); try (FetchHandler handler = new FetchHandler(reader)) { - - final FetchParams params = new FetchParams(fetchVersion, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = Map.of( topicIdPartition, new FetchRequest.PartitionData(inklessUuid, 0, 0, 1024, Optional.empty()) ); - final var result = handler.handle(params, fetchInfos).get(); + final var result = handler.handle(Collections.emptyMap(), fetchInfos).get(); assertThat(result).hasSize(1); assertThat(result.get(topicIdPartition)).satisfies(data -> { @@ -141,16 +131,12 @@ public void readerFutureSuccessEmpty() throws Exception { when(reader.fetch(any(), any())).thenReturn(CompletableFuture.completedFuture(value)); try (FetchHandler handler = new FetchHandler(reader)) { - final FetchParams params = new FetchParams(fetchVersion, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = Map.of( topicIdPartition, new FetchRequest.PartitionData(inklessUuid, 0, 0, 1024, Optional.empty()) ); - final var result = handler.handle(params, fetchInfos).get(); + final var result = handler.handle(Collections.emptyMap(), fetchInfos).get(); assertThat(result).hasSize(1); assertThat(result.get(topicIdPartition)).satisfies(data -> { @@ -164,13 +150,9 @@ public void readerFutureSuccessEmpty() throws Exception { @Test public void emptyRequest() throws Exception { try (FetchHandler handler = new FetchHandler(reader)) { - final FetchParams params = new FetchParams(fetchVersion, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = Map.of(); - final var result = handler.handle(params, fetchInfos).get(); + final var result = handler.handle(Collections.emptyMap(), fetchInfos).get(); assertThat(result).hasSize(0); } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FindBatchesJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FindBatchesJobTest.java deleted file mode 100644 index 6707c0d104..0000000000 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FindBatchesJobTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Inkless - * Copyright (C) 2024 - 2025 Aiven OY - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package io.aiven.inkless.consume; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.server.storage.log.FetchParams; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import io.aiven.inkless.control_plane.BatchInfo; -import io.aiven.inkless.control_plane.BatchMetadata; -import io.aiven.inkless.control_plane.ControlPlane; -import io.aiven.inkless.control_plane.FindBatchRequest; -import io.aiven.inkless.control_plane.FindBatchResponse; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.STRICT_STUBS) -public class FindBatchesJobTest { - - private final Time time = new MockTime(); - private final int maxBatchesPerPartition = 0; - - @Mock - private ControlPlane controlPlane; - @Mock - private FetchParams params; - - @Captor - ArgumentCaptor> requestCaptor; - - Uuid topicId = Uuid.randomUuid(); - static final String OBJECT_KEY_MAIN_PART = "a"; - TopicIdPartition partition0 = new TopicIdPartition(topicId, 0, "diskless-topic"); - - @Test - public void findSingleBatch() { - Map fetchInfos = Map.of( - partition0, new FetchRequest.PartitionData(topicId, 0, 0, 1000, Optional.empty()) - ); - int logStartOffset = 0; - long logAppendTimestamp = 10L; - long maxBatchTimestamp = 20L; - int highWatermark = 1; - Map coordinates = Map.of( - partition0, FindBatchResponse.success(List.of( - new BatchInfo(1L, OBJECT_KEY_MAIN_PART, BatchMetadata.of(partition0, 0, 10, 0, 0, logAppendTimestamp, maxBatchTimestamp, TimestampType.CREATE_TIME)) - ), logStartOffset, highWatermark) - ); - FindBatchesJob job = new FindBatchesJob(time, controlPlane, params, fetchInfos, maxBatchesPerPartition, durationMs -> {}); - when(controlPlane.findBatches(requestCaptor.capture(), anyInt(), anyInt())).thenReturn(new ArrayList<>(coordinates.values())); - Map result = job.get(); - - assertThat(result).isEqualTo(coordinates); - } - -} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java index 3f6345ea3c..109bb89ccf 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java @@ -77,8 +77,8 @@ public class ReaderTest { @Test public void testReaderEmptyRequests() throws IOException { - try(final var reader = new Reader(time, OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, controlPlane, objectFetcher, 0, metadataExecutor, dataExecutor, new BrokerTopicStats())) { - final CompletableFuture> fetch = reader.fetch(fetchParams, Collections.emptyMap()); + try(final var reader = new Reader(time, OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, objectFetcher, metadataExecutor, dataExecutor, new BrokerTopicStats())) { + final CompletableFuture> fetch = reader.fetch(Collections.emptyMap(), Collections.emptyMap()); verify(metadataExecutor, atLeastOnce()).execute(any()); verifyNoInteractions(dataExecutor); assertThat(fetch.join()).isEqualTo(Collections.emptyMap()); @@ -87,7 +87,7 @@ public void testReaderEmptyRequests() throws IOException { @Test public void testClose() throws Exception { - final var reader = new Reader(time, OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, controlPlane, objectFetcher, 0, metadataExecutor, dataExecutor, new BrokerTopicStats()); + final var reader = new Reader(time, OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, objectFetcher, metadataExecutor, dataExecutor, new BrokerTopicStats()); reader.close(); verify(metadataExecutor, atLeastOnce()).shutdown(); verify(dataExecutor, atLeastOnce()).shutdown(); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java index 31f583593a..6ed8a1d00b 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java @@ -28,8 +28,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.RequestLocal; -import org.apache.kafka.server.storage.log.FetchIsolation; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -297,15 +295,25 @@ private Map> read(final FetchHandler fetchHa private void readIteration(final FetchHandler fetchHandler, final ConcurrentHashMap fetchPositions, final ConcurrentMap> records) throws InterruptedException, ExecutionException, TimeoutException { - final FetchParams params = new FetchParams(FETCH_VERSION, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = ALL_TOPIC_ID_PARTITIONS.stream().collect(Collectors.toMap( tidp -> tidp, tidp -> new FetchRequest.PartitionData(TOPIC_ID_0, fetchPositions.get(tidp), 0, 1024 * 1024, Optional.empty()) )); - final Map fetchResult = fetchHandler.handle(params, fetchInfos).get(2L, TimeUnit.SECONDS); + + final List requests = ALL_TOPIC_ID_PARTITIONS.stream().map(tidp -> { + return new FindBatchRequest(tidp, fetchPositions.get(tidp), 1024 * 1024); + }).toList(); + + final List findBatchResponses = sharedState.controlPlane().findBatches(requests, 1024 * 1024, sharedState.config().maxBatchesPerPartitionToFind()); + + final Map batchCoordinates = new HashMap<>(); + for (int i = 0; i < requests.size(); i++) { + final FindBatchRequest request = requests.get(i); + final FindBatchResponse response = findBatchResponses.get(i); + batchCoordinates.put(request.topicIdPartition(), response); + } + + final Map fetchResult = fetchHandler.handle(batchCoordinates, fetchInfos).get(2L, TimeUnit.SECONDS); for (final var entry : fetchResult.entrySet()) { final var tidp = entry.getKey(); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java index bafd7b95ec..eea6d60e3f 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java @@ -27,8 +27,6 @@ import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.RequestLocal; -import org.apache.kafka.server.storage.log.FetchIsolation; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -316,16 +314,25 @@ private Map> read(final FetchHandler fetchHa private void readIteration(final FetchHandler fetchHandler, final ConcurrentHashMap fetchPositions, final ConcurrentMap> records) throws InterruptedException, ExecutionException, TimeoutException { - final FetchParams params = new FetchParams(FETCH_VERSION, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = ALL_TOPIC_ID_PARTITIONS.stream().collect(Collectors.toMap( tidp -> tidp, tidp -> new FetchRequest.PartitionData(TOPIC_ID_0, fetchPositions.get(tidp), 0, 1024 * 1024, Optional.empty()) )); - final Map fetchResult = fetchHandler.handle(params, fetchInfos).get(2L, TimeUnit.SECONDS); + final List requests = ALL_TOPIC_ID_PARTITIONS.stream().map(tidp -> { + return new FindBatchRequest(tidp, fetchPositions.get(tidp), 1024 * 1024); + }).toList(); + + final List findBatchResponses = sharedState.controlPlane().findBatches(requests, 1024 * 1024, sharedState.config().maxBatchesPerPartitionToFind()); + + final Map batchCoordinates = new HashMap<>(); + for (int i = 0; i < requests.size(); i++) { + final FindBatchRequest request = requests.get(i); + final FindBatchResponse response = findBatchResponses.get(i); + batchCoordinates.put(request.topicIdPartition(), response); + } + + final Map fetchResult = fetchHandler.handle(batchCoordinates, fetchInfos).get(2L, TimeUnit.SECONDS); for (final var entry : fetchResult.entrySet()) { final var tidp = entry.getKey(); boolean isEmpty = true;