Skip to content

Commit 6f19730

Browse files
authored
KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory. (apache#13535)
This change includes - Recognize the fetch requests with out of range local log offsets - Add fetch implementation for the data lying in the range of [logStartOffset, localLogStartOffset] - Add a new purgatory for async remote read requests which are served through a specific thread pool We have an extended version of remote fetch that can fetch from multiple remote partitions in parallel, which we will raise as a followup PR. A few tests for the newly introduced changes are added in this PR. There are some tests available for these scenarios in 2.8.x, refactoring with the trunk changes, will add them in followup PRs. Other contributors: Kamal Chandraprakash <kchandraprakash@uber.com> - Further improvements and adding a few tests Luke Chen <showuon@gmail.com> - Added a few test cases for these changes. PS: This functionality is pulled out from internal branches with other functionalities related to the feature in 2.8.x. The reason for not pulling all the changes as it makes the PR huge, and burdensome to review and it also needs other metrics, minor enhancements(including perf), and minor changes done for tests. So, we will try to have followup PRs to cover all those. Reviewers: Jun Rao <junrao@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>, Divij Vaidya <diviv@amazon.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
1 parent d944ef1 commit 6f19730

20 files changed

+1107
-109
lines changed

checkstyle/suppressions.xml

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
4040
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
4141
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
42+
<suppress checks="NPathComplexity|ClassFanOutComplexity" files="RemoteLogManager.java"/>
4243
<suppress checks="MethodLength"
4344
files="(KafkaClusterTestKit).java"/>
4445

core/src/main/java/kafka/log/remote/RemoteLogManager.java

+264-34
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package kafka.log.remote;
18+
19+
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
20+
import org.apache.kafka.common.utils.LogContext;
21+
import org.apache.kafka.storage.internals.log.FetchDataInfo;
22+
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
23+
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
24+
import org.slf4j.Logger;
25+
26+
import java.util.Optional;
27+
import java.util.concurrent.Callable;
28+
import java.util.function.Consumer;
29+
30+
public class RemoteLogReader implements Callable<Void> {
31+
private final Logger logger;
32+
private final RemoteStorageFetchInfo fetchInfo;
33+
private final RemoteLogManager rlm;
34+
private final Consumer<RemoteLogReadResult> callback;
35+
36+
public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
37+
RemoteLogManager rlm,
38+
Consumer<RemoteLogReadResult> callback) {
39+
this.fetchInfo = fetchInfo;
40+
this.rlm = rlm;
41+
this.callback = callback;
42+
logger = new LogContext() {
43+
@Override
44+
public String logPrefix() {
45+
return "[" + Thread.currentThread().getName() + "]";
46+
}
47+
}.logger(RemoteLogReader.class);
48+
}
49+
50+
@Override
51+
public Void call() {
52+
RemoteLogReadResult result;
53+
try {
54+
logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
55+
56+
FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
57+
result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty());
58+
} catch (OffsetOutOfRangeException e) {
59+
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
60+
} catch (Exception e) {
61+
logger.error("Error occurred while reading the remote data for {}", fetchInfo.topicPartition, e);
62+
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
63+
}
64+
65+
logger.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
66+
callback.accept(result);
67+
68+
return null;
69+
}
70+
}

core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java

+8
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import kafka.server.DelayedFetch;
2727
import kafka.server.DelayedOperationPurgatory;
2828
import kafka.server.DelayedProduce;
29+
import kafka.server.DelayedRemoteFetch;
2930
import kafka.server.KafkaConfig;
3031
import kafka.server.MetadataCache;
3132
import kafka.server.QuotaFactory.QuotaManagers;
@@ -61,6 +62,7 @@ public class ReplicaManagerBuilder {
6162
private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty();
6263
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty();
6364
private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty();
65+
private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty();
6466
private Optional<String> threadNamePrefix = Optional.empty();
6567
private Long brokerEpoch = -1L;
6668
private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty();
@@ -140,6 +142,11 @@ public ReplicaManagerBuilder setDelayedFetchPurgatory(DelayedOperationPurgatory<
140142
return this;
141143
}
142144

145+
public ReplicaManagerBuilder setDelayedRemoteFetchPurgatory(DelayedOperationPurgatory<DelayedRemoteFetch> delayedRemoteFetchPurgatory) {
146+
this.delayedRemoteFetchPurgatory = Optional.of(delayedRemoteFetchPurgatory);
147+
return this;
148+
}
149+
143150
public ReplicaManagerBuilder setDelayedDeleteRecordsPurgatory(DelayedOperationPurgatory<DelayedDeleteRecords> delayedDeleteRecordsPurgatory) {
144151
this.delayedDeleteRecordsPurgatory = Optional.of(delayedDeleteRecordsPurgatory);
145152
return this;
@@ -189,6 +196,7 @@ public ReplicaManager build() {
189196
OptionConverters.toScala(delayedFetchPurgatory),
190197
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
191198
OptionConverters.toScala(delayedElectLeaderPurgatory),
199+
OptionConverters.toScala(delayedRemoteFetchPurgatory),
192200
OptionConverters.toScala(threadNamePrefix),
193201
() -> brokerEpoch,
194202
OptionConverters.toScala(addPartitionsToTxnManager));

core/src/main/scala/kafka/cluster/Partition.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ class Partition(val topicPartition: TopicPartition,
526526
leaderReplicaIdOpt.filter(_ == localBrokerId)
527527
}
528528

529-
private def localLogWithEpochOrThrow(
529+
def localLogWithEpochOrThrow(
530530
currentLeaderEpoch: Optional[Integer],
531531
requireLeader: Boolean
532532
): UnifiedLog = {

core/src/main/scala/kafka/server/BrokerServer.scala

+1
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ class BrokerServer(
270270
isShuttingDown = isShuttingDown,
271271
zkClient = None,
272272
threadNamePrefix = None, // The ReplicaManager only runs on the broker, and already includes the ID in thread names.
273+
delayedRemoteFetchPurgatoryParam = None,
273274
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
274275
addPartitionsToTxnManager = Some(addPartitionsToTxnManager)
275276
)

core/src/main/scala/kafka/server/DelayedFetch.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ class DelayedFetch(
163163
tp -> status.fetchInfo
164164
}
165165

166-
val logReadResults = replicaManager.readFromLocalLog(
166+
val logReadResults = replicaManager.readFromLog(
167167
params,
168168
fetchInfos,
169169
quota,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.server
19+
20+
import org.apache.kafka.common.TopicIdPartition
21+
import org.apache.kafka.common.errors._
22+
import org.apache.kafka.common.protocol.Errors
23+
import org.apache.kafka.storage.internals.log.{FetchParams, FetchPartitionData, LogOffsetMetadata, RemoteLogReadResult, RemoteStorageFetchInfo}
24+
25+
import java.util.concurrent.{CompletableFuture, Future}
26+
import java.util.{Optional, OptionalInt, OptionalLong}
27+
import scala.collection._
28+
29+
/**
30+
* A remote fetch operation that can be created by the replica manager and watched
31+
* in the remote fetch operation purgatory
32+
*/
33+
class DelayedRemoteFetch(remoteFetchTask: Future[Void],
34+
remoteFetchResult: CompletableFuture[RemoteLogReadResult],
35+
remoteFetchInfo: RemoteStorageFetchInfo,
36+
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
37+
fetchParams: FetchParams,
38+
localReadResults: Seq[(TopicIdPartition, LogReadResult)],
39+
replicaManager: ReplicaManager,
40+
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
41+
extends DelayedOperation(fetchParams.maxWaitMs) {
42+
43+
/**
44+
* The operation can be completed if:
45+
*
46+
* Case a: This broker is no longer the leader of the partition it tries to fetch
47+
* Case b: This broker does not know the partition it tries to fetch
48+
* Case c: The remote storage read request completed (succeeded or failed)
49+
* Case d: The partition is in an offline log directory on this broker
50+
*
51+
* Upon completion, should return whatever data is available for each valid partition
52+
*/
53+
override def tryComplete(): Boolean = {
54+
fetchPartitionStatus.foreach {
55+
case (topicPartition, fetchStatus) =>
56+
val fetchOffset = fetchStatus.startOffsetMetadata
57+
try {
58+
if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
59+
replicaManager.getPartitionOrException(topicPartition.topicPartition())
60+
}
61+
} catch {
62+
case _: KafkaStorageException => // Case d
63+
debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchParams immediately")
64+
return forceComplete()
65+
case _: UnknownTopicOrPartitionException => // Case b
66+
debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchParams immediately")
67+
return forceComplete()
68+
case _: NotLeaderOrFollowerException => // Case a
69+
debug("Broker is no longer the leader or follower of %s, satisfy %s immediately".format(topicPartition, fetchParams))
70+
return forceComplete()
71+
}
72+
}
73+
if (remoteFetchResult.isDone) // Case c
74+
forceComplete()
75+
else
76+
false
77+
}
78+
79+
override def onExpiration(): Unit = {
80+
// cancel the remote storage read task, if it has not been executed yet
81+
val cancelled = remoteFetchTask.cancel(true)
82+
if (!cancelled) debug(s"Remote fetch task for for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}")
83+
}
84+
85+
/**
86+
* Upon completion, read whatever data is available and pass to the complete callback
87+
*/
88+
override def onComplete(): Unit = {
89+
val fetchPartitionData = localReadResults.map { case (tp, result) =>
90+
if (tp.topicPartition().equals(remoteFetchInfo.topicPartition)
91+
&& remoteFetchResult.isDone
92+
&& result.error == Errors.NONE
93+
&& result.info.delayedRemoteStorageFetch.isPresent) {
94+
if (remoteFetchResult.get.error.isPresent) {
95+
tp -> ReplicaManager.createLogReadResult(remoteFetchResult.get.error.get).toFetchPartitionData(false)
96+
} else {
97+
val info = remoteFetchResult.get.fetchDataInfo.get
98+
tp -> new FetchPartitionData(
99+
result.error,
100+
result.highWatermark,
101+
result.leaderLogStartOffset,
102+
info.records,
103+
Optional.empty(),
104+
if (result.lastStableOffset.isDefined) OptionalLong.of(result.lastStableOffset.get) else OptionalLong.empty(),
105+
info.abortedTransactions,
106+
if (result.preferredReadReplica.isDefined) OptionalInt.of(result.preferredReadReplica.get) else OptionalInt.empty(),
107+
false)
108+
}
109+
} else {
110+
tp -> result.toFetchPartitionData(false)
111+
}
112+
}
113+
114+
responseCallback(fetchPartitionData)
115+
}
116+
}

core/src/main/scala/kafka/server/KafkaServer.scala

+1
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,7 @@ class KafkaServer(
629629
brokerTopicStats = brokerTopicStats,
630630
isShuttingDown = isShuttingDown,
631631
zkClient = Some(zkClient),
632+
delayedRemoteFetchPurgatoryParam = None,
632633
threadNamePrefix = threadNamePrefix,
633634
brokerEpochSupplier = brokerEpochSupplier,
634635
addPartitionsToTxnManager = Some(addPartitionsToTxnManager))

0 commit comments

Comments
 (0)