Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.server

import io.aiven.inkless.control_plane.InitLogDisklessStartOffsetRequest

import java.util.{Collections, Properties}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
Expand Down Expand Up @@ -53,10 +55,52 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
val logs = logManager.logsByTopic(topic)
val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
val wasCopyDisabled = logs.exists(_.config.remoteLogCopyDisable())
val wasDisklessEnabled = logs.exists(_.config.disklessEnable())

logManager.updateTopicConfig(topic, topicConfig, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled,
wasRemoteLogEnabled)
maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, wasCopyDisabled)
maybeInitializeDisklessLog(topic, logs, wasDisklessEnabled)
}

/**
* Initialize the diskless log in the control plane for topics that are being migrated
* from classic (local disk) storage to diskless storage.
*
* This is called when a topic's diskless.enable config changes to true.
* Only the leader partitions need to initialize the diskless log, using each partition's
* log end offset as its diskless start offset.
*/
private[server] def maybeInitializeDisklessLog(topic: String,
logs: Seq[UnifiedLog],
wasDisklessEnabled: Boolean): Unit = {
val isDisklessEnabled = logs.exists(_.config.disklessEnable())

// Only initialize if diskless is being enabled (was false, now true) and we have leader partitions
if (isDisklessEnabled && !wasDisklessEnabled) {
val leaderPartitions = logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).filter(_.isLeader)

if (leaderPartitions.nonEmpty) {
replicaManager.getInklessSharedState.foreach { sharedState =>
val topicId = replicaManager.metadataCache.getTopicId(topic)

// Create a request for each leader partition with its own offsets
val requests = leaderPartitions.flatMap { partition =>
logs.find(_.topicPartition == partition.topicPartition).map { log =>
val logStartOffset = log.logStartOffset
val disklessStartOffset = log.logEndOffset
info(s"Initializing diskless log for partition ${partition.topicPartition} with topicId $topicId, " +
s"logStartOffset $logStartOffset, disklessStartOffset $disklessStartOffset")
new InitLogDisklessStartOffsetRequest(topicId, topic, partition.topicPartition.partition(), logStartOffset, disklessStartOffset)
}
}

if (requests.nonEmpty) {
sharedState.controlPlane().initLogDisklessStartOffset(requests.toSet.asJava)
}
}
}
}
}

private[server] def maybeUpdateRemoteLogComponents(topic: String,
Expand Down
96 changes: 85 additions & 11 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.server
import com.yammer.metrics.core.Meter
import io.aiven.inkless.common.SharedState
import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler}
import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, MetadataView}
import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, GetDisklessLogRequest, InitLogDisklessStartOffsetRequest, MetadataView}
import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer}
import io.aiven.inkless.merge.FileMerger
import io.aiven.inkless.produce.AppendHandler
Expand Down Expand Up @@ -272,6 +272,8 @@ class ReplicaManager(val config: KafkaConfig,
config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests))

private val _inklessMetadataView: MetadataView = inklessMetadataView.getOrElse(new InklessMetadataView(metadataCache.asInstanceOf[KRaftMetadataCache], () => config.extractLogConfigMap))
def getInklessMetadataView: MetadataView = _inklessMetadataView
def getInklessSharedState: Option[SharedState] = inklessSharedState
private val inklessAppendHandler: Option[AppendHandler] = inklessSharedState.map(new AppendHandler(_))
private val inklessFetchHandler: Option[FetchHandler] = inklessSharedState.map(new FetchHandler(_))
private val inklessFetchOffsetHandler: Option[FetchOffsetHandler] = inklessSharedState.map(new FetchOffsetHandler(_))
Expand Down Expand Up @@ -1776,16 +1778,17 @@ class ReplicaManager(val config: KafkaConfig,
fetchInfos: Seq[(TopicIdPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit): Unit = {
// println(fetchInfos)
if (fetchInfos.isEmpty) {
responseCallback(Seq.empty)
return
}

val (disklessFetchInfosWithoutTopicId, classicFetchInfos) = fetchInfos.partition { case (k, _) => _inklessMetadataView.isDisklessTopic(k.topic()) }
val (disklessTopicFetchInfos, initialClassicFetchInfos) = fetchInfos.partition { case (k, _) => _inklessMetadataView.isDisklessTopic(k.topic()) }
inklessSharedState match {
case None =>
if (disklessFetchInfosWithoutTopicId.nonEmpty) {
error(s"Received diskless fetch request for topics ${disklessFetchInfosWithoutTopicId.map(_._1.topic()).distinct.mkString(", ")} but diskless storage system is not enabled. " +
if (disklessTopicFetchInfos.nonEmpty) {
error(s"Received diskless fetch request for topics ${disklessTopicFetchInfos.map(_._1.topic()).distinct.mkString(", ")} but diskless storage system is not enabled. " +
s"Replying an empty response.")
responseCallback(Seq.empty)
return
Expand All @@ -1794,7 +1797,7 @@ class ReplicaManager(val config: KafkaConfig,
}

// Older fetch versions (<13) don't have topicId in the request -- backfill it for backward compatibility
val disklessFetchInfos = disklessFetchInfosWithoutTopicId.map { disklessFetchInfo =>
val disklessTopicFetchInfosWithTopicId = disklessTopicFetchInfos.map { disklessFetchInfo =>
val (topicIdPartition, partitionData) = disklessFetchInfo
if (topicIdPartition.topicId().equals(Uuid.ZERO_UUID)) {
_inklessMetadataView.getTopicId(topicIdPartition.topic()) match {
Expand All @@ -1810,14 +1813,46 @@ class ReplicaManager(val config: KafkaConfig,
}
}

// For diskless topics, check if the fetch offset is before disklessStartOffset
// If so, redirect to classic fetch (for migrated topics with local data)
val (disklessFetchInfos, classicFetchInfos) = inklessSharedState match {
case Some(sharedState) if disklessTopicFetchInfosWithTopicId.nonEmpty =>
val disklessLogRequests = disklessTopicFetchInfosWithTopicId.map { case (tp, _) =>
new GetDisklessLogRequest(tp.topicId(), tp.partition())
}.toList.asJava

val disklessLogResponses = sharedState.controlPlane().getDisklessLog(disklessLogRequests).asScala
val responseMap = disklessLogResponses.map(r => (r.topicId(), r.partition()) -> r).toMap

val (needsClassicFetch, canDisklessFetch) = disklessTopicFetchInfosWithTopicId.partition { case (tp, partitionData) =>
responseMap.get((tp.topicId(), tp.partition())) match {
case Some(response) if response.error() == Errors.NONE && response.disklessStartOffset() != null =>
// If fetch offset is before disklessStartOffset, use classic fetch
partitionData.fetchOffset < response.disklessStartOffset()
case _ =>
// If partition not found or no disklessStartOffset, use diskless fetch
false
}
}
// println("canDisklessFetch: " + canDisklessFetch)
// println("needsClassicFetch: " + needsClassicFetch)

if (params.isFromFollower && disklessFetchInfos.nonEmpty) {
warn("Diskless topics are not supported for follower fetch requests. " +
s"Request from follower ${params.replicaId} contains diskless topics: ${disklessFetchInfos.map(_._1.topic()).mkString(", ")}")
responseCallback(Seq.empty)
return
(canDisklessFetch, initialClassicFetchInfos ++ needsClassicFetch)
case _ =>
(disklessTopicFetchInfosWithTopicId, initialClassicFetchInfos)
}

// println("disklessFetchInfos: " + disklessFetchInfos)
// println("classicFetchInfos: " + classicFetchInfos)


// if (params.isFromFollower && disklessFetchInfos.nonEmpty) {
// warn("Diskless topics are not supported for follower fetch requests. " +
// s"Request from follower ${params.replicaId} contains diskless topics: ${disklessFetchInfos.map(_._1.topic()).mkString(", ")}")
// responseCallback(Seq.empty)
// return
// }

// Override maxWaitMs and minBytes with lower-bound if there are diskless fetches. Otherwise, leave the consumer-provided values.
val maxWaitMs = if (disklessFetchInfos.nonEmpty) Math.max(config.disklessFetchMaxWaitMs.toLong, params.maxWaitMs) else params.maxWaitMs
val minBytes = if (disklessFetchInfos.nonEmpty) Math.max(config.disklessFetchMinBytes, params.minBytes) else params.minBytes
Expand Down Expand Up @@ -1860,6 +1895,7 @@ class ReplicaManager(val config: KafkaConfig,

// check if this fetch request can be satisfied right away
val logReadResults = readFromLog(classicParams, classicFetchInfos, quota, readFromPurgatory = false)
// println("logReadResult: " + logReadResults)
var bytesReadable: Long = 0
var errorReadingData = false

Expand Down Expand Up @@ -3115,7 +3151,11 @@ class ReplicaManager(val config: KafkaConfig,
"local leaders.")
replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet)
localLeaders.foreachEntry { (tp, info) =>
if (!_inklessMetadataView.isDisklessTopic(tp.topic()))
if (_inklessMetadataView.isDisklessTopic(tp.topic())) {
// For diskless topics, check if this is a migrated topic with local logs
// If so, initialize the diskless log in the control plane
maybeInitializeDisklessLogForMigratedTopic(tp, info.topicId)
} else {
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
try {
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
Expand All @@ -3134,6 +3174,40 @@ class ReplicaManager(val config: KafkaConfig,
markPartitionOffline(tp)
}
}
}
}
}

/**
* For topics that were migrated from classic to diskless storage, we need to ensure
* the diskless log is initialized in the control plane when this broker becomes leader.
*
* This handles the edge case where:
* 1. A topic was created as classic (diskless.enable=false)
* 2. The config was changed to diskless.enable=true
* 3. Another broker processed the config change
* 4. This broker becomes leader later
*
* In this case, the TopicConfigHandler won't fire on this broker, so we need to
* initialize the diskless log here.
*/
private def maybeInitializeDisklessLogForMigratedTopic(tp: TopicPartition, topicId: Uuid): Unit = {
// Check if we have local logs for this topic (indicating it was a classic topic that was migrated)
val logs = logManager.logsByTopic(tp.topic())
if (logs.nonEmpty) {
inklessSharedState.foreach { sharedState =>
logs.find(_.topicPartition == tp).foreach { log =>
val logStartOffset = log.logStartOffset
val disklessStartOffset = log.logEndOffset

stateChangeLogger.info(s"Initializing diskless log for migrated topic ${tp.topic()} " +
s"partition ${tp.partition()} with topicId $topicId, " +
s"logStartOffset $logStartOffset, disklessStartOffset $disklessStartOffset")

val request = new InitLogDisklessStartOffsetRequest(topicId, tp.topic(), tp.partition(), logStartOffset, disklessStartOffset)
sharedState.controlPlane().initLogDisklessStartOffset(java.util.Set.of(request))
}
}
}
}

Expand Down
Loading
Loading