diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 3e82db2909..d6b3404203 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -17,6 +17,8 @@ package kafka.server +import io.aiven.inkless.control_plane.InitLogDisklessStartOffsetRequest + import java.util.{Collections, Properties} import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.Logging @@ -53,10 +55,52 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, val logs = logManager.logsByTopic(topic) val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) val wasCopyDisabled = logs.exists(_.config.remoteLogCopyDisable()) + val wasDisklessEnabled = logs.exists(_.config.disklessEnable()) logManager.updateTopicConfig(topic, topicConfig, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled, wasRemoteLogEnabled) maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, wasCopyDisabled) + maybeInitializeDisklessLog(topic, logs, wasDisklessEnabled) + } + + /** + * Initialize the diskless log in the control plane for topics that are being migrated + * from classic (local disk) storage to diskless storage. + * + * This is called when a topic's diskless.enable config changes to true. + * Only the leader partitions need to initialize the diskless log, using each partition's + * log end offset as its diskless start offset. + */ + private[server] def maybeInitializeDisklessLog(topic: String, + logs: Seq[UnifiedLog], + wasDisklessEnabled: Boolean): Unit = { + val isDisklessEnabled = logs.exists(_.config.disklessEnable()) + + // Only initialize if diskless is being enabled (was false, now true) and we have leader partitions + if (isDisklessEnabled && !wasDisklessEnabled) { + val leaderPartitions = logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).filter(_.isLeader) + + if (leaderPartitions.nonEmpty) { + replicaManager.getInklessSharedState.foreach { sharedState => + val topicId = replicaManager.metadataCache.getTopicId(topic) + + // Create a request for each leader partition with its own offsets + val requests = leaderPartitions.flatMap { partition => + logs.find(_.topicPartition == partition.topicPartition).map { log => + val logStartOffset = log.logStartOffset + val disklessStartOffset = log.logEndOffset + info(s"Initializing diskless log for partition ${partition.topicPartition} with topicId $topicId, " + + s"logStartOffset $logStartOffset, disklessStartOffset $disklessStartOffset") + new InitLogDisklessStartOffsetRequest(topicId, topic, partition.topicPartition.partition(), logStartOffset, disklessStartOffset) + } + } + + if (requests.nonEmpty) { + sharedState.controlPlane().initLogDisklessStartOffset(requests.toSet.asJava) + } + } + } + } } private[server] def maybeUpdateRemoteLogComponents(topic: String, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1c793038a2..3ab9784dc8 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -19,7 +19,7 @@ package kafka.server import com.yammer.metrics.core.Meter import io.aiven.inkless.common.SharedState import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler} -import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, MetadataView} +import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, GetDisklessLogRequest, InitLogDisklessStartOffsetRequest, MetadataView} import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer} import io.aiven.inkless.merge.FileMerger import io.aiven.inkless.produce.AppendHandler @@ -272,6 +272,8 @@ class ReplicaManager(val config: KafkaConfig, config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests)) private val _inklessMetadataView: MetadataView = inklessMetadataView.getOrElse(new InklessMetadataView(metadataCache.asInstanceOf[KRaftMetadataCache], () => config.extractLogConfigMap)) + def getInklessMetadataView: MetadataView = _inklessMetadataView + def getInklessSharedState: Option[SharedState] = inklessSharedState private val inklessAppendHandler: Option[AppendHandler] = inklessSharedState.map(new AppendHandler(_)) private val inklessFetchHandler: Option[FetchHandler] = inklessSharedState.map(new FetchHandler(_)) private val inklessFetchOffsetHandler: Option[FetchOffsetHandler] = inklessSharedState.map(new FetchOffsetHandler(_)) @@ -1776,16 +1778,17 @@ class ReplicaManager(val config: KafkaConfig, fetchInfos: Seq[(TopicIdPartition, PartitionData)], quota: ReplicaQuota, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit): Unit = { +// println(fetchInfos) if (fetchInfos.isEmpty) { responseCallback(Seq.empty) return } - val (disklessFetchInfosWithoutTopicId, classicFetchInfos) = fetchInfos.partition { case (k, _) => _inklessMetadataView.isDisklessTopic(k.topic()) } + val (disklessTopicFetchInfos, initialClassicFetchInfos) = fetchInfos.partition { case (k, _) => _inklessMetadataView.isDisklessTopic(k.topic()) } inklessSharedState match { case None => - if (disklessFetchInfosWithoutTopicId.nonEmpty) { - error(s"Received diskless fetch request for topics ${disklessFetchInfosWithoutTopicId.map(_._1.topic()).distinct.mkString(", ")} but diskless storage system is not enabled. " + + if (disklessTopicFetchInfos.nonEmpty) { + error(s"Received diskless fetch request for topics ${disklessTopicFetchInfos.map(_._1.topic()).distinct.mkString(", ")} but diskless storage system is not enabled. " + s"Replying an empty response.") responseCallback(Seq.empty) return @@ -1794,7 +1797,7 @@ class ReplicaManager(val config: KafkaConfig, } // Older fetch versions (<13) don't have topicId in the request -- backfill it for backward compatibility - val disklessFetchInfos = disklessFetchInfosWithoutTopicId.map { disklessFetchInfo => + val disklessTopicFetchInfosWithTopicId = disklessTopicFetchInfos.map { disklessFetchInfo => val (topicIdPartition, partitionData) = disklessFetchInfo if (topicIdPartition.topicId().equals(Uuid.ZERO_UUID)) { _inklessMetadataView.getTopicId(topicIdPartition.topic()) match { @@ -1810,14 +1813,46 @@ class ReplicaManager(val config: KafkaConfig, } } + // For diskless topics, check if the fetch offset is before disklessStartOffset + // If so, redirect to classic fetch (for migrated topics with local data) + val (disklessFetchInfos, classicFetchInfos) = inklessSharedState match { + case Some(sharedState) if disklessTopicFetchInfosWithTopicId.nonEmpty => + val disklessLogRequests = disklessTopicFetchInfosWithTopicId.map { case (tp, _) => + new GetDisklessLogRequest(tp.topicId(), tp.partition()) + }.toList.asJava + + val disklessLogResponses = sharedState.controlPlane().getDisklessLog(disklessLogRequests).asScala + val responseMap = disklessLogResponses.map(r => (r.topicId(), r.partition()) -> r).toMap + + val (needsClassicFetch, canDisklessFetch) = disklessTopicFetchInfosWithTopicId.partition { case (tp, partitionData) => + responseMap.get((tp.topicId(), tp.partition())) match { + case Some(response) if response.error() == Errors.NONE && response.disklessStartOffset() != null => + // If fetch offset is before disklessStartOffset, use classic fetch + partitionData.fetchOffset < response.disklessStartOffset() + case _ => + // If partition not found or no disklessStartOffset, use diskless fetch + false + } + } +// println("canDisklessFetch: " + canDisklessFetch) +// println("needsClassicFetch: " + needsClassicFetch) - if (params.isFromFollower && disklessFetchInfos.nonEmpty) { - warn("Diskless topics are not supported for follower fetch requests. " + - s"Request from follower ${params.replicaId} contains diskless topics: ${disklessFetchInfos.map(_._1.topic()).mkString(", ")}") - responseCallback(Seq.empty) - return + (canDisklessFetch, initialClassicFetchInfos ++ needsClassicFetch) + case _ => + (disklessTopicFetchInfosWithTopicId, initialClassicFetchInfos) } +// println("disklessFetchInfos: " + disklessFetchInfos) +// println("classicFetchInfos: " + classicFetchInfos) + + +// if (params.isFromFollower && disklessFetchInfos.nonEmpty) { +// warn("Diskless topics are not supported for follower fetch requests. " + +// s"Request from follower ${params.replicaId} contains diskless topics: ${disklessFetchInfos.map(_._1.topic()).mkString(", ")}") +// responseCallback(Seq.empty) +// return +// } + // Override maxWaitMs and minBytes with lower-bound if there are diskless fetches. Otherwise, leave the consumer-provided values. val maxWaitMs = if (disklessFetchInfos.nonEmpty) Math.max(config.disklessFetchMaxWaitMs.toLong, params.maxWaitMs) else params.maxWaitMs val minBytes = if (disklessFetchInfos.nonEmpty) Math.max(config.disklessFetchMinBytes, params.minBytes) else params.minBytes @@ -1860,6 +1895,7 @@ class ReplicaManager(val config: KafkaConfig, // check if this fetch request can be satisfied right away val logReadResults = readFromLog(classicParams, classicFetchInfos, quota, readFromPurgatory = false) +// println("logReadResult: " + logReadResults) var bytesReadable: Long = 0 var errorReadingData = false @@ -3115,7 +3151,11 @@ class ReplicaManager(val config: KafkaConfig, "local leaders.") replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet) localLeaders.foreachEntry { (tp, info) => - if (!_inklessMetadataView.isDisklessTopic(tp.topic())) + if (_inklessMetadataView.isDisklessTopic(tp.topic())) { + // For diskless topics, check if this is a migrated topic with local logs + // If so, initialize the diskless log in the control plane + maybeInitializeDisklessLogForMigratedTopic(tp, info.topicId) + } else { getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => try { val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) @@ -3134,6 +3174,40 @@ class ReplicaManager(val config: KafkaConfig, markPartitionOffline(tp) } } + } + } + } + + /** + * For topics that were migrated from classic to diskless storage, we need to ensure + * the diskless log is initialized in the control plane when this broker becomes leader. + * + * This handles the edge case where: + * 1. A topic was created as classic (diskless.enable=false) + * 2. The config was changed to diskless.enable=true + * 3. Another broker processed the config change + * 4. This broker becomes leader later + * + * In this case, the TopicConfigHandler won't fire on this broker, so we need to + * initialize the diskless log here. + */ + private def maybeInitializeDisklessLogForMigratedTopic(tp: TopicPartition, topicId: Uuid): Unit = { + // Check if we have local logs for this topic (indicating it was a classic topic that was migrated) + val logs = logManager.logsByTopic(tp.topic()) + if (logs.nonEmpty) { + inklessSharedState.foreach { sharedState => + logs.find(_.topicPartition == tp).foreach { log => + val logStartOffset = log.logStartOffset + val disklessStartOffset = log.logEndOffset + + stateChangeLogger.info(s"Initializing diskless log for migrated topic ${tp.topic()} " + + s"partition ${tp.partition()} with topicId $topicId, " + + s"logStartOffset $logStartOffset, disklessStartOffset $disklessStartOffset") + + val request = new InitLogDisklessStartOffsetRequest(topicId, tp.topic(), tp.partition(), logStartOffset, disklessStartOffset) + sharedState.controlPlane().initLogDisklessStartOffset(java.util.Set.of(request)) + } + } } } diff --git a/core/src/test/java/kafka/server/ClassicToDisklessMigrationTest.java b/core/src/test/java/kafka/server/ClassicToDisklessMigrationTest.java new file mode 100644 index 0000000000..962bf39cf1 --- /dev/null +++ b/core/src/test/java/kafka/server/ClassicToDisklessMigrationTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.test.KafkaClusterTestKit; +import org.apache.kafka.common.test.TestKitNodes; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.aiven.inkless.config.InklessConfig; +import io.aiven.inkless.control_plane.postgres.PostgresControlPlane; +import io.aiven.inkless.control_plane.postgres.PostgresControlPlaneConfig; +import io.aiven.inkless.storage_backend.s3.S3Storage; +import io.aiven.inkless.storage_backend.s3.S3StorageConfig; +import io.aiven.inkless.test_utils.InklessPostgreSQLContainer; +import io.aiven.inkless.test_utils.MinioContainer; +import io.aiven.inkless.test_utils.PostgreSQLTestContainer; +import io.aiven.inkless.test_utils.S3TestContainer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Testcontainers +public class ClassicToDisklessMigrationTest { + @Container + protected static InklessPostgreSQLContainer pgContainer = PostgreSQLTestContainer.container(); + @Container + protected static MinioContainer s3Container = S3TestContainer.minio(); + + private static final Logger log = LoggerFactory.getLogger(ClassicToDisklessMigrationTest.class); + + private static final int LEADER_BROKER_ID = 0; + private static final int FOLLOWER_BROKER_ID = 1; + + private KafkaClusterTestKit cluster; + + @BeforeEach + public void setup(final TestInfo testInfo) throws Exception { + s3Container.createBucket(testInfo); + pgContainer.createDatabase(testInfo); + + // Configure broker.rack for each broker to enable fetch from follower + Map> perServerProperties = Map.of( + LEADER_BROKER_ID, Map.of(ServerConfigs.BROKER_RACK_CONFIG, String.valueOf(LEADER_BROKER_ID)), + FOLLOWER_BROKER_ID, Map.of(ServerConfigs.BROKER_RACK_CONFIG, String.valueOf(FOLLOWER_BROKER_ID)) + ); + + final TestKitNodes nodes = new TestKitNodes.Builder() + .setCombined(true) + .setNumBrokerNodes(2) + .setNumControllerNodes(1) + .setPerServerProperties(perServerProperties) + .build(); + cluster = new KafkaClusterTestKit.Builder(nodes) + .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") + .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, "true") + // Enable fetch from follower + .setConfigProp(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, "org.apache.kafka.common.replica.RackAwareReplicaSelector") + // PG control plane config + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.USERNAME_CONFIG, PostgreSQLTestContainer.USERNAME) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.PASSWORD_CONFIG, PostgreSQLTestContainer.PASSWORD) + // S3 storage config + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_BACKEND_CLASS_CONFIG, S3Storage.class.getName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_BUCKET_NAME_CONFIG, s3Container.getBucketName()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_REGION_CONFIG, s3Container.getRegion()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_ENDPOINT_URL_CONFIG, s3Container.getEndpoint()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_PATH_STYLE_ENABLED_CONFIG, "true") + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_ACCESS_KEY_ID_CONFIG, s3Container.getAccessKey()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) + // Decrease cache block bytes to test cache split due to alignment + .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONSUME_CACHE_BLOCK_BYTES_CONFIG, 16 * 1024) + .build(); + cluster.format(); + cluster.startup(); + cluster.waitForReadyBrokers(); + } + + @AfterEach + public void teardown() throws Exception { + cluster.close(); + } + + @Test + public void migrateClassicToDisklessTopic() throws Exception { + Map clientConfigs = new HashMap<>(); + clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + clientConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, String.valueOf(true)); + clientConfigs.put(ProducerConfig.LINGER_MS_CONFIG, "1000"); + clientConfigs.put(ProducerConfig.BATCH_SIZE_CONFIG, "100000"); + clientConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + clientConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + clientConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + clientConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + // by default is latest and nothing would get consumed. + clientConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AutoOffsetResetStrategy.EARLIEST.name()); + clientConfigs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5000000"); + String topicName = "classic-to-diskless-topic"; + int numRecordsBeforeMigration = 250; + int numRecordsAfterMigration = 250; + int totalRecords = numRecordsBeforeMigration + numRecordsAfterMigration; + + // Step 1: Create topic with diskless.enable=false (classic mode) and replication factor 2 + try (Admin admin = AdminClient.create(clientConfigs)) { + final NewTopic topic = new NewTopic(topicName, 1, (short) 2) + .configs(Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "false", + TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.CREATE_TIME.name + )); + CreateTopicsResult topics = admin.createTopics(Collections.singletonList(topic)); + topics.all().get(10, TimeUnit.SECONDS); + } + + // Step 2: Produce messages to classic topic + final long now = System.currentTimeMillis(); + int nextOffset = produceRecords(clientConfigs, topicName, now, 0, numRecordsBeforeMigration); + + // Step 3: Consume messages from classic topic to verify it works + consumeWithSubscription(TimestampType.CREATE_TIME, clientConfigs, topicName, now, numRecordsBeforeMigration); + + // Step 4: Migrate topic to diskless mode by changing config + try (Admin admin = AdminClient.create(clientConfigs)) { + ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); + AlterConfigOp alterConfigOp = new AlterConfigOp( + new ConfigEntry(TopicConfig.DISKLESS_ENABLE_CONFIG, "true"), + AlterConfigOp.OpType.SET + ); + admin.incrementalAlterConfigs(Map.of(topicResource, Collections.singletonList(alterConfigOp))) + .all().get(10, TimeUnit.SECONDS); + } + + // Step 5: Produce more messages after migration + produceRecords(clientConfigs, topicName, now, nextOffset, numRecordsAfterMigration); + + // Step 6: Consume from the beginning to verify all messages are available + consumeWithSubscription(TimestampType.CREATE_TIME, clientConfigs, topicName, now, totalRecords); + } + + private static int produceRecords(Map clientConfigs, String topicName, long timestamp, int startOffset, int numRecords) { + AtomicInteger recordsProduced = new AtomicInteger(); + try (Producer producer = new KafkaProducer<>(clientConfigs)) { + for (int i = 0; i < numRecords; i++) { + int offset = startOffset + i; + byte[] value = String.valueOf(offset).getBytes(); + final ProducerRecord record = new ProducerRecord<>(topicName, 0, timestamp, null, value); + producer.send(record, (metadata, exception) -> { + if (exception != null) { + log.error("Failed to send record", exception); + } else { + log.info("Committed value at offset {} at {}", metadata.offset(), timestamp); + recordsProduced.incrementAndGet(); + } + }); + } + producer.flush(); + } + assertEquals(numRecords, recordsProduced.get()); + return startOffset + numRecords; + } + + private static void consumeWithSubscription(TimestampType timestampType, Map clientConfigs, String topicName, long now, int numRecords) { + final Map consumerConfigs = new HashMap<>(clientConfigs); + consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, java.util.UUID.randomUUID().toString()); + // Set client.rack to match the follower broker's rack to enable fetch from follower + consumerConfigs.put(ConsumerConfig.CLIENT_RACK_CONFIG, String.valueOf(FOLLOWER_BROKER_ID)); + int recordsConsumed; + try (Consumer consumer = new KafkaConsumer<>(consumerConfigs)) { + consumer.subscribe(Collections.singletonList(topicName)); + recordsConsumed = poll(consumer, timestampType, now, numRecords); + } + assertEquals(numRecords, recordsConsumed); + } + + private static int poll(Consumer consumer, TimestampType timestampType, long now, int expectedRecords) { + int recordsConsumed = 0; + long deadline = System.currentTimeMillis() + 30_000; + while (recordsConsumed < expectedRecords && System.currentTimeMillis() < deadline) { + ConsumerRecords poll = consumer.poll(Duration.ofSeconds(5)); + for (ConsumerRecord record : poll) { + log.info("Received record {} at {}", recordsConsumed, record.timestamp()); + switch (timestampType) { + case CREATE_TIME -> assertEquals(now, record.timestamp()); + case LOG_APPEND_TIME -> assertTrue(record.timestamp() > now); + } + recordsConsumed++; + } + } + return recordsConsumed; + } +} diff --git a/docker/examples/docker-compose-files/inkless/docker-compose.yml b/docker/examples/docker-compose-files/inkless/docker-compose.yml index 2a1dd0739c..c0473de914 100644 --- a/docker/examples/docker-compose-files/inkless/docker-compose.yml +++ b/docker/examples/docker-compose-files/inkless/docker-compose.yml @@ -11,7 +11,9 @@ services: KAFKA_PROCESS_ROLES: "broker,controller" KAFKA_NODE_ID: 1 # Add rack to the broker to simulate AZ placement - # KAFKA_BROKER_RACK: "az1" + KAFKA_BROKER_RACK: "az1" + # Enable fetch from follower with rack-aware replica selector + KAFKA_REPLICA_SELECTOR_CLASS: "org.apache.kafka.common.replica.RackAwareReplicaSelector" KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker:29090,2@broker2:29090" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" KAFKA_LISTENERS: "CONTROLLER://:29090,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092" @@ -67,7 +69,7 @@ services: <<: *base-broker-env KAFKA_NODE_ID: 2 # Add rack to the broker to simulate AZ placement - # KAFKA_BROKER_RACK: "az2" + KAFKA_BROKER_RACK: "az2" KAFKA_LISTENERS: "CONTROLLER://:29090,PLAINTEXT_HOST://:9093,PLAINTEXT://:19093" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT_HOST://localhost:9093,PLAINTEXT://broker2:19093" diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java index a008b02154..35370be50f 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java @@ -55,6 +55,8 @@ List findBatches( void createTopicAndPartitions(Set requests); + void initLogDisklessStartOffset(Set requests); + List deleteRecords(List requests); void deleteTopics(Set topicIds); @@ -94,6 +96,8 @@ static ControlPlane create(final InklessConfig config, final Time time) { boolean isSafeToDeleteFile(String objectKeyPath); + List getDisklessLog(List requests); + // used for testing purposes only List getLogInfo(List requests); } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogRequest.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogRequest.java new file mode 100644 index 0000000000..a84b15e64f --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogRequest.java @@ -0,0 +1,24 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; + +public record GetDisklessLogRequest(Uuid topicId, + int partition) { +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogResponse.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogResponse.java new file mode 100644 index 0000000000..d148b941f2 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/GetDisklessLogResponse.java @@ -0,0 +1,42 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; + +public record GetDisklessLogResponse(Uuid topicId, + int partition, + Errors error, + long logStartOffset, + long highWatermark, + Long disklessStartOffset) { + public static final long INVALID_OFFSET = -1L; + + public static GetDisklessLogResponse success(final Uuid topicId, + final int partition, + final long logStartOffset, + final long highWatermark, + final Long disklessStartOffset) { + return new GetDisklessLogResponse(topicId, partition, Errors.NONE, logStartOffset, highWatermark, disklessStartOffset); + } + + public static GetDisklessLogResponse unknownTopicOrPartition(final Uuid topicId, final int partition) { + return new GetDisklessLogResponse(topicId, partition, Errors.UNKNOWN_TOPIC_OR_PARTITION, INVALID_OFFSET, INVALID_OFFSET, null); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java index aa99e79e84..e1192d9f36 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java @@ -90,6 +90,26 @@ public synchronized void createTopicAndPartitions(final Set requests) { + for (final InitLogDisklessStartOffsetRequest request : requests) { + final TopicIdPartition topicIdPartition = new TopicIdPartition( + request.topicId(), request.partition(), request.topicName()); + + // Only create if not already exists + if (!logs.containsKey(topicIdPartition)) { + LOGGER.info("Initializing {} with logStartOffset {}, disklessStartOffset {}", + topicIdPartition, request.logStartOffset(), request.disklessStartOffset()); + final LogInfo logInfo = new LogInfo(); + logInfo.logStartOffset = request.logStartOffset(); + logInfo.highWatermark = request.disklessStartOffset(); + logInfo.disklessStartOffset = request.disklessStartOffset(); + logs.put(topicIdPartition, logInfo); + batches.putIfAbsent(topicIdPartition, new TreeMap<>()); + } + } + } + @Override protected synchronized Iterator commitFileForValidRequests( final String objectKey, @@ -644,6 +664,27 @@ public boolean isSafeToDeleteFile(String objectKeyPath) { return !files.containsKey(objectKeyPath); } + @Override + public synchronized List getDisklessLog(final List requests) { + final List result = new ArrayList<>(); + for (final GetDisklessLogRequest request : requests) { + final TopicIdPartition tidp = findTopicIdPartition(request.topicId(), request.partition()); + final LogInfo logInfo; + if (tidp == null || (logInfo = logs.get(tidp)) == null) { + result.add(GetDisklessLogResponse.unknownTopicOrPartition(request.topicId(), request.partition())); + } else { + result.add(GetDisklessLogResponse.success( + request.topicId(), + request.partition(), + logInfo.logStartOffset, + logInfo.highWatermark, + logInfo.disklessStartOffset + )); + } + } + return result; + } + @Override public synchronized List getLogInfo(final List requests) { final List result = new ArrayList<>(); @@ -679,6 +720,7 @@ private static class LogInfo { long logStartOffset = 0; long highWatermark = 0; long byteSize = 0; + Long disklessStartOffset = null; } private static class FileInfo { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitLogDisklessStartOffsetRequest.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitLogDisklessStartOffsetRequest.java new file mode 100644 index 0000000000..adc83501d9 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/InitLogDisklessStartOffsetRequest.java @@ -0,0 +1,27 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane; + +import org.apache.kafka.common.Uuid; + +public record InitLogDisklessStartOffsetRequest(Uuid topicId, + String topicName, + int partition, + long logStartOffset, + long disklessStartOffset) { +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJob.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJob.java new file mode 100644 index 0000000000..53588c48c2 --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/GetDisklessLogJob.java @@ -0,0 +1,114 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; + +import org.jooq.Configuration; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Row2; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.function.Consumer; + +import io.aiven.inkless.control_plane.GetDisklessLogRequest; +import io.aiven.inkless.control_plane.GetDisklessLogResponse; +import io.aiven.inkless.control_plane.postgres.converters.UUIDtoUuidConverter; + +import static org.jooq.generated.Tables.LOGS; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.row; +import static org.jooq.impl.DSL.values; + +public class GetDisklessLogJob implements Callable> { + private static final Field REQUEST_TOPIC_ID = field(name("topic_id"), LOGS.TOPIC_ID.getDataType()); + private static final Field REQUEST_PARTITION = field(name("partition"), LOGS.PARTITION.getDataType()); + + private final Time time; + private final DSLContext jooqCtx; + private final List requests; + private final Consumer durationCallback; + + public GetDisklessLogJob(final Time time, + final DSLContext jooqCtx, + final List requests, + final Consumer durationCallback) { + this.time = time; + this.jooqCtx = jooqCtx; + this.requests = requests; + this.durationCallback = durationCallback; + } + + @Override + public List call() throws Exception { + return JobUtils.run(this::runOnce, time, durationCallback); + } + + private List runOnce() throws Exception { + return jooqCtx.transactionResult((final Configuration conf) -> { + final DSLContext context = conf.dsl(); + + final UUIDtoUuidConverter uuidConverter = new UUIDtoUuidConverter(); + final var requestRows = requests.stream() + .map(req -> row(uuidConverter.to(req.topicId()), req.partition())) + .toArray(Row2[]::new); + @SuppressWarnings("unchecked") + final var requestsTable = values(requestRows) + .as("requests", REQUEST_TOPIC_ID.getName(), REQUEST_PARTITION.getName()); + + final var select = context.select( + requestsTable.field(REQUEST_TOPIC_ID), + requestsTable.field(REQUEST_PARTITION), + LOGS.LOG_START_OFFSET, + LOGS.HIGH_WATERMARK, + LOGS.DISKLESS_START_OFFSET + ).from(requestsTable) + .leftJoin(LOGS).on(LOGS.TOPIC_ID.eq(requestsTable.field(REQUEST_TOPIC_ID)) + .and(LOGS.PARTITION.eq(requestsTable.field(REQUEST_PARTITION)))); + + final List responses = new ArrayList<>(); + try (final var cursor = select.fetchSize(1000).fetchLazy()) { + for (final var record : cursor) { + // The synthetic table stores raw java.util.UUID, need to convert explicitly + final UUID rawTopicId = (UUID) (Object) record.get(requestsTable.field(REQUEST_TOPIC_ID)); + final Uuid topicId = uuidConverter.from(rawTopicId); + final Integer partition = record.get(requestsTable.field(REQUEST_PARTITION)); + final Long logStartOffset = record.get(LOGS.LOG_START_OFFSET); + if (logStartOffset == null) { + responses.add(GetDisklessLogResponse.unknownTopicOrPartition(topicId, partition)); + } else { + responses.add(GetDisklessLogResponse.success( + topicId, + partition, + logStartOffset, + record.get(LOGS.HIGH_WATERMARK), + record.get(LOGS.DISKLESS_START_OFFSET) + )); + } + } + } + return responses; + }); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitLogDisklessStartOffsetJob.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitLogDisklessStartOffsetJob.java new file mode 100644 index 0000000000..8a55b2e18e --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/InitLogDisklessStartOffsetJob.java @@ -0,0 +1,79 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.utils.Time; + +import org.jooq.Configuration; +import org.jooq.DSLContext; + +import java.util.Set; +import java.util.function.Consumer; + +import io.aiven.inkless.control_plane.InitLogDisklessStartOffsetRequest; + +import static org.jooq.generated.Tables.LOGS; + +public class InitLogDisklessStartOffsetJob implements Runnable { + private final Time time; + private final DSLContext jooqCtx; + private final Set requests; + private final Consumer durationCallback; + + InitLogDisklessStartOffsetJob(final Time time, + final DSLContext jooqCtx, + final Set requests, + final Consumer durationCallback) { + this.time = time; + this.jooqCtx = jooqCtx; + this.requests = requests; + this.durationCallback = durationCallback; + } + + @Override + public void run() { + if (requests.isEmpty()) { + return; + } + JobUtils.run(this::runOnce, time, durationCallback); + } + + private void runOnce() { + jooqCtx.transaction((final Configuration conf) -> { + var insertStep = conf.dsl().insertInto(LOGS, + LOGS.TOPIC_ID, + LOGS.PARTITION, + LOGS.TOPIC_NAME, + LOGS.LOG_START_OFFSET, + LOGS.HIGH_WATERMARK, + LOGS.BYTE_SIZE, + LOGS.DISKLESS_START_OFFSET); + for (final var request : requests) { + insertStep = insertStep.values( + request.topicId(), + request.partition(), + request.topicName(), + request.logStartOffset(), + request.disklessStartOffset(), + 0L, + request.disklessStartOffset()); + } + insertStep.onConflictDoNothing().execute(); + }); + } +} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java index e76c869315..1e647d8613 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java @@ -57,8 +57,11 @@ import io.aiven.inkless.control_plane.FileToDelete; import io.aiven.inkless.control_plane.FindBatchRequest; import io.aiven.inkless.control_plane.FindBatchResponse; +import io.aiven.inkless.control_plane.GetDisklessLogRequest; +import io.aiven.inkless.control_plane.GetDisklessLogResponse; import io.aiven.inkless.control_plane.GetLogInfoRequest; import io.aiven.inkless.control_plane.GetLogInfoResponse; +import io.aiven.inkless.control_plane.InitLogDisklessStartOffsetRequest; import io.aiven.inkless.control_plane.ListOffsetsRequest; import io.aiven.inkless.control_plane.ListOffsetsResponse; import io.aiven.inkless.control_plane.MergedFileBatch; @@ -149,6 +152,12 @@ public void createTopicAndPartitions(final Set new TopicsAndPartitionsCreateJob(time, jobsJooqCtx, requests, pgMetrics::onTopicCreateCompleted).run(); } + @Override + public void initLogDisklessStartOffset(final Set requests) { + // Expected to be performed synchronously + new InitLogDisklessStartOffsetJob(time, jobsJooqCtx, requests, pgMetrics::onInitLogDisklessStartOffsetCompleted).run(); + } + @Override protected Iterator commitFileForValidRequests( final String objectKey, @@ -333,6 +342,20 @@ public boolean isSafeToDeleteFile(String objectKeyPath) { } } + @Override + public List getDisklessLog(final List requests) { + try { + final GetDisklessLogJob job = new GetDisklessLogJob(time, readJooqCtx, requests, pgMetrics::onGetDisklessLogCompleted); + return job.call(); + } catch (final Exception e) { + if (e instanceof ControlPlaneException) { + throw (ControlPlaneException) e; + } else { + throw new ControlPlaneException("Failed to get diskless log", e); + } + } + } + @Override public List getLogInfo(final List requests) { try { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java index 3f8d55738e..4fe7085577 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java @@ -36,6 +36,7 @@ public class PostgresControlPlaneMetrics implements Closeable { private final QueryMetrics commitFileMetrics = new QueryMetrics("CommitFile"); private final QueryMetrics commitFileMergeWorkItemMetrics = new QueryMetrics("CommitFileMergeWorkItem"); private final QueryMetrics topicCreateMetrics = new QueryMetrics("TopicCreate"); + private final QueryMetrics initLogDisklessStartOffsetMetrics = new QueryMetrics("InitLogDisklessStartOffset"); private final QueryMetrics topicDeleteMetrics = new QueryMetrics("TopicDelete"); private final QueryMetrics fileDeleteMetrics = new QueryMetrics("FilesDelete"); private final QueryMetrics listOffsetsMetrics = new QueryMetrics("ListOffsets"); @@ -46,6 +47,7 @@ public class PostgresControlPlaneMetrics implements Closeable { private final QueryMetrics releaseFileMergeWorkItemMetrics = new QueryMetrics("ReleaseFileMergeWorkItem"); private final QueryMetrics safeDeleteFileCheckMetrics = new QueryMetrics("SafeDeleteFileCheck"); private final QueryMetrics getLogInfoMetrics = new QueryMetrics("GetLogInfo"); + private final QueryMetrics getDisklessLogMetrics = new QueryMetrics("GetDisklessLog"); public PostgresControlPlaneMetrics(Time time) { this.time = Objects.requireNonNull(time, "time cannot be null"); @@ -75,6 +77,10 @@ public void onTopicCreateCompleted(Long duration) { topicCreateMetrics.record(duration); } + public void onInitLogDisklessStartOffsetCompleted(Long duration) { + initLogDisklessStartOffsetMetrics.record(duration); + } + public void onFilesDeleteCompleted(Long duration) { fileDeleteMetrics.record(duration); } @@ -111,6 +117,10 @@ public void onGetLogInfoCompleted(Long duration) { getLogInfoMetrics.record(duration); } + public void onGetDisklessLogCompleted(Long duration) { + getDisklessLogMetrics.record(duration); + } + @Override public void close() { findBatchesMetrics.remove(); @@ -118,6 +128,7 @@ public void close() { commitFileMetrics.remove(); commitFileMergeWorkItemMetrics.remove(); topicCreateMetrics.remove(); + initLogDisklessStartOffsetMetrics.remove(); topicDeleteMetrics.remove(); fileDeleteMetrics.remove(); listOffsetsMetrics.remove(); @@ -128,6 +139,7 @@ public void close() { releaseFileMergeWorkItemMetrics.remove(); safeDeleteFileCheckMetrics.remove(); getLogInfoMetrics.remove(); + getDisklessLogMetrics.remove(); } private class QueryMetrics { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java index 6e77d0aad4..c699f04dc8 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJob.java @@ -69,10 +69,11 @@ private void runOnce() { LOGS.TOPIC_NAME, LOGS.LOG_START_OFFSET, LOGS.HIGH_WATERMARK, - LOGS.BYTE_SIZE); + LOGS.BYTE_SIZE, + LOGS.DISKLESS_START_OFFSET); for (final var request : requests) { for (int partition = 0; partition < request.numPartitions(); partition++) { - insertStep = insertStep.values(request.topicId(), partition, request.topicName(), 0L, 0L, 0L); + insertStep = insertStep.values(request.topicId(), partition, request.topicName(), 0L, 0L, 0L, 0L); } } final int rowsInserted = insertStep.onConflictDoNothing().execute(); diff --git a/storage/inkless/src/main/resources/db/migration/V11__Add_diskless_offsets_to_logs.sql b/storage/inkless/src/main/resources/db/migration/V11__Add_diskless_offsets_to_logs.sql new file mode 100644 index 0000000000..1d6cc731c9 --- /dev/null +++ b/storage/inkless/src/main/resources/db/migration/V11__Add_diskless_offsets_to_logs.sql @@ -0,0 +1,3 @@ +-- Copyright (c) 2024-2025 Aiven, Helsinki, Finland. https://aiven.io/ +ALTER TABLE logs ADD COLUMN diskless_start_offset offset_nullable_t DEFAULT NULL; +ALTER TABLE logs ADD COLUMN diskless_end_offset offset_nullable_t DEFAULT NULL; \ No newline at end of file diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java index 77daba9322..4bfd1388ec 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/CommitFileJobTest.java @@ -112,9 +112,9 @@ void simpleCommit() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, null, null), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, null, null) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -168,9 +168,9 @@ void commitMultipleFiles() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 159L, 111L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L + 245, 322L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 159L, 111L, null, null), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L + 245, 322L, null, null), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, null, null) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -215,9 +215,9 @@ void nonExistentPartition() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, null, null), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, null, null) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -249,9 +249,9 @@ void simpleIdempotentCommit() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, null, null), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 27L, 50L, null, null) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -283,9 +283,9 @@ void inSequenceCommit() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 27L, 150L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 27L, 150L, null, null), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, null, null) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -325,9 +325,9 @@ void outOfOrderCommit(int lastBatchSequence, int firstBatchSequence) { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, null, null), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, null, null) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) @@ -357,9 +357,9 @@ void outOfOrderCommitNewEpoch() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, null, null) ); // The file will be deleted because its only batch is rejected. @@ -389,9 +389,9 @@ void invalidProducerEpoch() { assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())) .containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 0L, 15L, 100L, null, null), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, null, null) ); assertThat(DBUtils.getAllFiles(pgContainer.getDataSource())) diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java index b6eb541544..7762890f39 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteRecordsJobTest.java @@ -185,10 +185,10 @@ void deleteRecordsFromMultipleTopics(final List order) { assertThat(responses).containsExactlyElementsOf(expectedResponses); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 18L, 36L, (long) file2Batch1Size + file3Batch1Size), - new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 24L, 24L, 0L), - new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size) + new LogsRecord(TOPIC_ID_0, 0, TOPIC_0, 18L, 36L, (long) file2Batch1Size + file3Batch1Size, null, null), + new LogsRecord(TOPIC_ID_0, 1, TOPIC_0, 24L, 24L, 0L, null, null), + new LogsRecord(TOPIC_ID_1, 0, TOPIC_1, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size, null, null) ); assertThat(DBUtils.getAllBatches(pgContainer.getDataSource())).containsExactlyInAnyOrder( diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java index 522d8d6592..f25e5ec4e7 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/DeleteTopicJobTest.java @@ -149,7 +149,7 @@ void deleteMultipleTopics() { // The logs of the deleted topics must be gone, i.e. only TOPIC_2 remains. assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactly( - new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size) + new LogsRecord(TOPIC_ID_2, 0, TOPIC_2, 0L, 24L, (long) file2Batch2Size + file3Batch3Size, null, null) ); // The batches of the deleted topics must be gone, i.e. only TOPIC_2 remains. diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/InitLogDisklessStartOffsetJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/InitLogDisklessStartOffsetJobTest.java new file mode 100644 index 0000000000..c793de728a --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/InitLogDisklessStartOffsetJobTest.java @@ -0,0 +1,164 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.control_plane.postgres; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; + +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.jooq.generated.tables.records.LogsRecord; +import org.jooq.impl.DSL; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Set; + +import io.aiven.inkless.control_plane.InitLogDisklessStartOffsetRequest; +import io.aiven.inkless.test_utils.InklessPostgreSQLContainer; +import io.aiven.inkless.test_utils.PostgreSQLTestContainer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.jooq.generated.Tables.LOGS; + +@Testcontainers +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +class InitLogDisklessStartOffsetJobTest { + @Container + static final InklessPostgreSQLContainer pgContainer = PostgreSQLTestContainer.container(); + + static final String TOPIC_1 = "topic1"; + static final String TOPIC_2 = "topic2"; + static final Uuid TOPIC_ID1 = new Uuid(10, 12); + static final Uuid TOPIC_ID2 = new Uuid(555, 333); + + @BeforeEach + void setUp(final TestInfo testInfo) { + pgContainer.createDatabase(testInfo); + pgContainer.migrate(); + } + + @AfterEach + void tearDown() { + pgContainer.tearDown(); + } + + @Test + void empty() { + final InitLogDisklessStartOffsetJob job = new InitLogDisklessStartOffsetJob( + Time.SYSTEM, pgContainer.getJooqCtx(), Set.of(), durationMs -> {}); + job.run(); + assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).isEmpty(); + } + + @Test + void createLogsWithDisklessStartOffset() { + final Set requests = Set.of( + // logStartOffset, disklessStartOffset (highWatermark) + new InitLogDisklessStartOffsetRequest(TOPIC_ID1, TOPIC_1, 0, 50L, 100L), + new InitLogDisklessStartOffsetRequest(TOPIC_ID1, TOPIC_1, 1, 150L, 200L), + new InitLogDisklessStartOffsetRequest(TOPIC_ID2, TOPIC_2, 0, 25L, 50L) + ); + final InitLogDisklessStartOffsetJob job = new InitLogDisklessStartOffsetJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + job.run(); + + assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( + // LogsRecord: topicId, partition, topicName, logStartOffset, highWatermark, byteSize, disklessStartOffset, disklessEndOffset + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 50L, 100L, 0L, 100L, null), + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 150L, 200L, 0L, 200L, null), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 25L, 50L, 0L, 50L, null) + ); + } + + @Test + void doesNotOverwriteExistingLog() throws SQLException { + // Create log that already exists + try (final Connection connection = pgContainer.getDataSource().getConnection()) { + final DSLContext ctx = DSL.using(connection, SQLDialect.POSTGRES); + ctx.insertInto(LOGS, + LOGS.TOPIC_ID, LOGS.PARTITION, LOGS.TOPIC_NAME, LOGS.LOG_START_OFFSET, LOGS.HIGH_WATERMARK, LOGS.BYTE_SIZE, LOGS.DISKLESS_START_OFFSET + ).values( + TOPIC_ID1, 0, TOPIC_1, 0L, 100L, 999L, 50L + ).execute(); + connection.commit(); + } + + // Try to create logs - existing one should not be overwritten + final Set requests = Set.of( + new InitLogDisklessStartOffsetRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 100L), // Should not overwrite existing + new InitLogDisklessStartOffsetRequest(TOPIC_ID1, TOPIC_1, 1, 200L, 200L) // Should be created + ); + final InitLogDisklessStartOffsetJob job = new InitLogDisklessStartOffsetJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + job.run(); + + assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 100L, 999L, 50L, null), // Unchanged + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 200L, 200L, 0L, 200L, null) // Created + ); + } + + @Test + void idempotentExecution() { + final Set requests = Set.of( + new InitLogDisklessStartOffsetRequest(TOPIC_ID1, TOPIC_1, 0, 100L, 100L) + ); + + // First execution + final InitLogDisklessStartOffsetJob job1 = new InitLogDisklessStartOffsetJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + job1.run(); + + assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 100L, 100L, 0L, 100L, null) + ); + + // Second execution with same value - should not change anything + final InitLogDisklessStartOffsetJob job2 = new InitLogDisklessStartOffsetJob( + Time.SYSTEM, pgContainer.getJooqCtx(), requests, durationMs -> {}); + job2.run(); + + assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 100L, 100L, 0L, 100L, null) + ); + + // Third execution with different value - should not overwrite + final Set differentRequests = Set.of( + new InitLogDisklessStartOffsetRequest(TOPIC_ID1, TOPIC_1, 0, 999L, 999L) + ); + final InitLogDisklessStartOffsetJob job3 = new InitLogDisklessStartOffsetJob( + Time.SYSTEM, pgContainer.getJooqCtx(), differentRequests, durationMs -> {}); + job3.run(); + + assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 100L, 100L, 0L, 100L, null) // Still 100L, not 999L + ); + } +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java index 7e818812c9..1dfaaac5e9 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/TopicsAndPartitionsCreateJobTest.java @@ -96,18 +96,18 @@ void createTopicsAndPartition() { final TopicsAndPartitionsCreateJob job1 = new TopicsAndPartitionsCreateJob(Time.SYSTEM, pgContainer.getJooqCtx(), createTopicAndPartitionsRequests, durationMs -> {}); job1.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L, null, null) ); // Repetition doesn't affect anything. final TopicsAndPartitionsCreateJob job2 = new TopicsAndPartitionsCreateJob(Time.SYSTEM, pgContainer.getJooqCtx(), createTopicAndPartitionsRequests, durationMs -> {}); job2.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L, null, null) ); } @@ -128,10 +128,10 @@ void createPartitionAfterTopic() { job2.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 1, TOPIC_2, 0L, 0L, 0L) + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID2, 1, TOPIC_2, 0L, 0L, 0L, null, null) ); } @@ -157,9 +157,9 @@ void existingRecordsNotAffected() throws SQLException { job1.run(); assertThat(DBUtils.getAllLogs(pgContainer.getDataSource())).containsExactlyInAnyOrder( - new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 101L, 201L, 999L), // unaffected - new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L), - new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 102L, 202L, 1999L) // unaffected + new LogsRecord(TOPIC_ID1, 0, TOPIC_1, 101L, 201L, 999L, null, null), // unaffected + new LogsRecord(TOPIC_ID1, 1, TOPIC_1, 0L, 0L, 0L, null, null), + new LogsRecord(TOPIC_ID2, 0, TOPIC_2, 102L, 202L, 1999L, null, null) // unaffected ); } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 838ab49929..5ba7999c81 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -396,6 +396,10 @@ public int initFileSize() { else return 0; } + + public boolean disklessEnable() { + return getBoolean(TopicConfig.DISKLESS_ENABLE_CONFIG); + } public boolean remoteStorageEnable() { return remoteLogConfig.remoteStorageEnable; @@ -512,11 +516,11 @@ private static void validateDiskless(Map existingConfigs, Optional.ofNullable((Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) .ifPresent(isBeingEnabled -> { if (isBeingEnabled) { - // diskless.enable=true -> diskless.enable must be already set to true - if (wasDiskless.isPresent() && !wasDiskless.get()) { - // cannot change from diskless.enable = false to diskless.enable = true - throw new InvalidConfigurationException("It is invalid to enable diskless"); - } +// // diskless.enable=true -> diskless.enable must be already set to true +// if (wasDiskless.isPresent() && !wasDiskless.get()) { +// // cannot change from diskless.enable = false to diskless.enable = true +// throw new InvalidConfigurationException("It is invalid to enable diskless"); +// } if (isRemoteLogStorageEnabled) { throw new InvalidConfigurationException("Diskless and remote storage cannot be enabled simultaneously");