From 995b5fa817c4031178c3ae6583f9a3dc7ba5b13a Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Mon, 19 Jan 2026 15:47:35 +0100 Subject: [PATCH 1/2] Allow switching diskless.enable from false to true `diskless.enable` for a topic can be switched from false to true, only if `diskless.migration.enable` is enabled. --- .../ControllerConfigurationValidator.scala | 3 +- .../main/scala/kafka/server/KafkaConfig.scala | 1 + .../java/kafka/server/InklessConfigsTest.java | 34 ++++++++++++++++++- .../scala/unit/kafka/log/LogConfigTest.scala | 31 +++++++++++++++++ .../kafka/server/config/ServerConfigs.java | 8 +++++ .../storage/internals/log/LogConfig.java | 30 +++++++++++----- 6 files changed, 97 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index f163a2739a..d5e944685d 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -118,7 +118,8 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu nullTopicConfigs.mkString(",")) } LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, - kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), + kafkaConfig.disklessMigrationEnabled) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0cec30c06a..81e91aa731 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -428,6 +428,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) /** Diskless Configuration */ val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG) + val disklessMigrationEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG) def addReconfigurable(reconfigurable: Reconfigurable): Unit = { dynamicConfig.addReconfigurable(reconfigurable) diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java index 21548e98fd..5c87cf4912 100644 --- a/core/src/test/java/kafka/server/InklessConfigsTest.java +++ b/core/src/test/java/kafka/server/InklessConfigsTest.java @@ -68,7 +68,11 @@ public class InklessConfigsTest { @Container protected static MinioContainer s3Container = S3TestContainer.minio(); - private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception { + private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception { + return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, false); + } + + private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean disklessMigrationEnableConfig) throws Exception { final TestKitNodes nodes = new TestKitNodes.Builder() .setCombined(true) .setNumBrokerNodes(1) @@ -78,6 +82,7 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") .setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, String.valueOf(defaultDisklessEnableConfig)) .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig)) + .setConfigProp(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG, String.valueOf(disklessMigrationEnableConfig)) // PG control plane config .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) @@ -186,6 +191,33 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception { cluster.close(); } + @Test + public void disklessMigrationEnabled() throws Exception { + // Initialize cluster with diskless migration enabled + var cluster = init(false, true, true); + Map clientConfigs = new HashMap<>(); + clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + + try (Admin admin = AdminClient.create(clientConfigs)) { + // When creating a new topic with diskless.enable=false + final String classicTopic = "classicTopic"; + createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")); + // Then diskless.enable is set to false in the topic config + var classicTopicConfig = getTopicConfig(admin, classicTopic); + assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + + // When migration is enabled, it SHOULD be possible to turn on diskless after the topic is created + alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")); + // Verify the config was updated + var updatedTopicConfig = getTopicConfig(admin, classicTopic); + assertEquals("true", updatedTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + + // But it should still NOT be possible to turn off diskless after enabling it + assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"))); + } + cluster.close(); + } + public void createTopic(Admin admin, String topic, Map configs) throws Exception { admin.createTopics(Collections.singletonList( diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index cbe7ffeed4..2a7a51ead7 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -478,6 +478,37 @@ class LogConfigTest { LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false) } + @Test + def testDisklessMigrationEnabled(): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + val isDisklessMigrationEnabled = true + + val disklessAlreadyEnabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + + val setDisklessTrue = new Properties() + setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + val setDisklessFalse = new Properties() + setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + + // When migration is enabled: + // 1. Should be possible to switch from diskless.enable=false to diskless.enable=true + LogConfig.validate(disklessAlreadyDisabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + + // 2. Should still NOT be possible to switch from diskless.enable=true to diskless.enable=false (even with migration enabled) + assertThrows( + classOf[InvalidConfigurationException], + () => LogConfig.validate(disklessAlreadyEnabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + ) + + // 3. Should still be possible to keep diskless.enable=true + LogConfig.validate(disklessAlreadyEnabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + + // 4. Should still be possible to keep diskless.enable=false + LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + } + @Test def testValidDisklessAndRemoteStorageEnable(): Unit = { diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 2e53274549..60db3015d4 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -131,6 +131,12 @@ public class ServerConfigs { public static final String DISKLESS_STORAGE_SYSTEM_ENABLE_DOC = "Enable the diskless storage system. " + "This enables diskless topics alongside classic topics."; + public static final String DISKLESS_MIGRATION_ENABLE_CONFIG = "diskless.migration.enable"; + public static final boolean DISKLESS_MIGRATION_ENABLE_DEFAULT = false; + public static final String DISKLESS_MIGRATION_ENABLE_DOC = "Allow migrating existing topics from classic (diskless.enable=false) to diskless (diskless.enable=true). " + + "This should only be enabled in non-production environments for testing or migration purposes. " + + "When enabled, topics can have their diskless.enable config changed from false to true."; + /************* Authorizer Configuration ***********/ public static final String AUTHORIZER_CLASS_NAME_CONFIG = "authorizer.class.name"; @@ -178,6 +184,8 @@ public class ServerConfigs { /** Diskless Configurations **/ .define(DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, BOOLEAN, DISKLESS_STORAGE_SYSTEM_ENABLE_DEFAULT, HIGH, DISKLESS_STORAGE_SYSTEM_ENABLE_DOC) + .define(DISKLESS_MIGRATION_ENABLE_CONFIG, BOOLEAN, DISKLESS_MIGRATION_ENABLE_DEFAULT, LOW, + DISKLESS_MIGRATION_ENABLE_DOC) /** Internal Configurations **/ // This indicates whether unreleased APIs should be advertised by this node. .defineInternal(UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 838ab49929..56502942e6 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -506,16 +506,20 @@ public static void validateBrokerLogConfigValues(Map props, private static void validateDiskless(Map existingConfigs, Map newConfigs, - boolean isRemoteLogStorageEnabled) { + boolean isRemoteLogStorageEnabled, + boolean isDisklessMigrationEnabled) { Optional wasDiskless = Optional.ofNullable(existingConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)).map(Boolean::parseBoolean); Optional.ofNullable((Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) .ifPresent(isBeingEnabled -> { if (isBeingEnabled) { - // diskless.enable=true -> diskless.enable must be already set to true + // diskless.enable=true -> diskless.enable must be already set to true (unless migration is enabled) if (wasDiskless.isPresent() && !wasDiskless.get()) { - // cannot change from diskless.enable = false to diskless.enable = true - throw new InvalidConfigurationException("It is invalid to enable diskless"); + // cannot change from diskless.enable = false to diskless.enable = true (unless migration is enabled) + if (!isDisklessMigrationEnabled) { + throw new InvalidConfigurationException("It is invalid to enable diskless"); + } + // Migration is enabled, allow the change from false to true } if (isRemoteLogStorageEnabled) { @@ -538,10 +542,12 @@ private static void validateDiskless(Map existingConfigs, * @param existingConfigs The existing properties * @param newConfigs The new properties to be validated * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled + * @param isDisklessMigrationEnabled true if diskless migration is enabled (allows switching diskless.enable from false to true) */ private static void validateTopicLogConfigValues(Map existingConfigs, Map newConfigs, - boolean isRemoteLogStorageSystemEnabled) { + boolean isRemoteLogStorageSystemEnabled, + boolean isDisklessMigrationEnabled) { validateValues(newConfigs); boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); @@ -557,7 +563,7 @@ private static void validateTopicLogConfigValues(Map existingCon validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled); } - validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled); + validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled, isDisklessMigrationEnabled); } public static void validateTurningOffRemoteStorageWithDelete(Map newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) { @@ -646,13 +652,21 @@ private static void validateRemoteStorageRetentionTime(Map props) { * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ public static void validate(Properties props) { - validate(Map.of(), props, Map.of(), false); + validate(Map.of(), props, Map.of(), false, false); } public static void validate(Map existingConfigs, Properties props, Map configuredProps, boolean isRemoteLogStorageSystemEnabled) { + validate(existingConfigs, props, configuredProps, isRemoteLogStorageSystemEnabled, false); + } + + public static void validate(Map existingConfigs, + Properties props, + Map configuredProps, + boolean isRemoteLogStorageSystemEnabled, + boolean isDisklessMigrationEnabled) { validateNames(props); if (configuredProps == null || configuredProps.isEmpty()) { Map valueMaps = CONFIG.parse(props); @@ -661,7 +675,7 @@ public static void validate(Map existingConfigs, Map combinedConfigs = new HashMap<>(configuredProps); combinedConfigs.putAll(props); Map valueMaps = CONFIG.parse(combinedConfigs); - validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled); + validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, isDisklessMigrationEnabled); } } From 095d0ea425f20b601202cfb3fe8a6205a13e49f6 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 21 Jan 2026 18:10:46 +0100 Subject: [PATCH 2/2] fixup! Allow migration only for tiered classic topics and rename to `diskless.allow.from.classic.enable` --- .../ControllerConfigurationValidator.scala | 2 +- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../java/kafka/server/InklessConfigsTest.java | 59 +++++++++++---- .../scala/unit/kafka/log/LogConfigTest.scala | 75 +++++++++++++++---- .../kafka/server/config/ServerConfigs.java | 10 +-- .../storage/internals/log/LogConfig.java | 24 +++--- 6 files changed, 123 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index d5e944685d..07c544838b 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -119,7 +119,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu } LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), - kafkaConfig.disklessMigrationEnabled) + kafkaConfig.disklessAllowFromClassicEnabled) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 81e91aa731..8478f2f0de 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -428,7 +428,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) /** Diskless Configuration */ val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG) - val disklessMigrationEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG) + val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG) def addReconfigurable(reconfigurable: Reconfigurable): Unit = { dynamicConfig.addReconfigurable(reconfigurable) diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java index 5c87cf4912..f5153b79d5 100644 --- a/core/src/test/java/kafka/server/InklessConfigsTest.java +++ b/core/src/test/java/kafka/server/InklessConfigsTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,6 +59,7 @@ import io.aiven.inkless.test_utils.S3TestContainer; import static org.apache.kafka.common.config.TopicConfig.DISKLESS_ENABLE_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -72,17 +74,20 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, false); } - private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean disklessMigrationEnableConfig) throws Exception { + private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean isDisklessAllowFromClassicEnabled) throws Exception { final TestKitNodes nodes = new TestKitNodes.Builder() .setCombined(true) .setNumBrokerNodes(1) .setNumControllerNodes(1) .build(); - var cluster = new KafkaClusterTestKit.Builder(nodes) + var builder = new KafkaClusterTestKit.Builder(nodes) .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") .setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, String.valueOf(defaultDisklessEnableConfig)) .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig)) - .setConfigProp(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG, String.valueOf(disklessMigrationEnableConfig)) + .setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, String.valueOf(isDisklessAllowFromClassicEnabled)) + .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + .setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") // PG control plane config .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) @@ -96,8 +101,9 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_PATH_STYLE_ENABLED_CONFIG, "true") .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_ACCESS_KEY_ID_CONFIG, s3Container.getAccessKey()) .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) - .build(); + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()); + + var cluster = builder.build(); cluster.format(); cluster.startup(); cluster.waitForReadyBrokers(); @@ -193,31 +199,54 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception { @Test public void disklessMigrationEnabled() throws Exception { - // Initialize cluster with diskless migration enabled var cluster = init(false, true, true); Map clientConfigs = new HashMap<>(); clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); try (Admin admin = AdminClient.create(clientConfigs)) { - // When creating a new topic with diskless.enable=false - final String classicTopic = "classicTopic"; - createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")); + // When creating a new topic with diskless.enable=false AND remote.log.storage.enable=true + final String tieredTopic = "tieredTopic"; + createTopic(admin, tieredTopic, Map.of( + DISKLESS_ENABLE_CONFIG, "false", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true" + )); // Then diskless.enable is set to false in the topic config - var classicTopicConfig = getTopicConfig(admin, classicTopic); - assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + var tieredTopicConfig = getTopicConfig(admin, tieredTopic); + assertEquals("false", tieredTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + assertEquals("true", tieredTopicConfig.get("remote.storage.enable")); - // When migration is enabled, it SHOULD be possible to turn on diskless after the topic is created - alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")); + // When migration is enabled AND remote storage is enabled, it SHOULD be possible to turn on diskless + alterTopicConfig(admin, tieredTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")); // Verify the config was updated - var updatedTopicConfig = getTopicConfig(admin, classicTopic); + var updatedTopicConfig = getTopicConfig(admin, tieredTopic); assertEquals("true", updatedTopicConfig.get(DISKLESS_ENABLE_CONFIG)); // But it should still NOT be possible to turn off diskless after enabling it - assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"))); + assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, tieredTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"))); } cluster.close(); } + @Test + public void disklessMigrationRequiresRemoteStorage() throws Exception { + var cluster = init(false, true, true); + Map clientConfigs = new HashMap<>(); + clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + + try (Admin admin = AdminClient.create(clientConfigs)) { + // When creating a new topic with diskless.enable=false WITHOUT remote storage + final String classicTopic = "classicTopic"; + createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")); + // Then diskless.enable is set to false in the topic config + var classicTopicConfig = getTopicConfig(admin, classicTopic); + assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + + // Even with migration enabled, it should NOT be possible to turn on diskless + // because remote storage is not enabled on this topic + assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true"))); + } + cluster.close(); + } public void createTopic(Admin admin, String topic, Map configs) throws Exception { admin.createTopics(Collections.singletonList( diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 2a7a51ead7..02e7f15496 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -482,33 +482,80 @@ class LogConfigTest { def testDisklessMigrationEnabled(): Unit = { val kafkaProps = TestUtils.createDummyBrokerConfig() val kafkaConfig = KafkaConfig.fromProps(kafkaProps) - val isDisklessMigrationEnabled = true + val isDisklessAllowFromClassicEnabled = true + val isRemoteStorageSystemEnabled = true - val disklessAlreadyEnabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") - val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + // Tiered storage topic (remote storage enabled at topic level) - migration candidate + val tieredTopicWithDisklessDisabled = util.Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "false", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true" + ) + + // Diskless topic (no remote storage) + val disklessTopic = util.Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "true" + ) + + val migrateToDiskless = new Properties() + migrateToDiskless.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + migrateToDiskless.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - val setDisklessTrue = new Properties() - setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") val setDisklessFalse = new Properties() setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") - // When migration is enabled: - // 1. Should be possible to switch from diskless.enable=false to diskless.enable=true - LogConfig.validate(disklessAlreadyDisabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + // 1. Should be possible to switch from diskless.enable=false to diskless.enable=true for a tiered topic + LogConfig.validate(tieredTopicWithDisklessDisabled, migrateToDiskless, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled) // 2. Should still NOT be possible to switch from diskless.enable=true to diskless.enable=false (even with migration enabled) assertThrows( classOf[InvalidConfigurationException], - () => LogConfig.validate(disklessAlreadyEnabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + () => LogConfig.validate(disklessTopic, setDisklessFalse, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled) ) + } - // 3. Should still be possible to keep diskless.enable=true - LogConfig.validate(disklessAlreadyEnabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + @Test + def testDisklessMigrationRequiresBothMigrationAndRemoteStorage(): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) - // 4. Should still be possible to keep diskless.enable=false - LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) - } + val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + val setDisklessTrue = new Properties() + setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + + // Case 1: Migration enabled but remote storage NOT enabled + assertThrows( + classOf[InvalidConfigurationException], + () => LogConfig.validate( + disklessAlreadyDisabled, + setDisklessTrue, + kafkaConfig.extractLogConfigMap, + false, // isRemoteLogStorageSystemEnabled + true) // isDisklessAllowFromClassicEnabled + ) + + // Case 2: Remote storage enabled but migration NOT enabled + assertThrows( + classOf[InvalidConfigurationException], + () => LogConfig.validate( + disklessAlreadyDisabled, + setDisklessTrue, + kafkaConfig.extractLogConfigMap, + true, // isRemoteLogStorageSystemEnabled + false) // isDisklessAllowFromClassicEnabled + ) + + // Case 3: Neither migration nor remote storage enabled + assertThrows( + classOf[InvalidConfigurationException], + () => LogConfig.validate( + disklessAlreadyDisabled, + setDisklessTrue, + kafkaConfig.extractLogConfigMap, + false, // isRemoteLogStorageSystemEnabled + false) // isDisklessAllowFromClassicEnabled + ) + } @Test def testValidDisklessAndRemoteStorageEnable(): Unit = { diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 60db3015d4..ebf173d457 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -131,9 +131,9 @@ public class ServerConfigs { public static final String DISKLESS_STORAGE_SYSTEM_ENABLE_DOC = "Enable the diskless storage system. " + "This enables diskless topics alongside classic topics."; - public static final String DISKLESS_MIGRATION_ENABLE_CONFIG = "diskless.migration.enable"; - public static final boolean DISKLESS_MIGRATION_ENABLE_DEFAULT = false; - public static final String DISKLESS_MIGRATION_ENABLE_DOC = "Allow migrating existing topics from classic (diskless.enable=false) to diskless (diskless.enable=true). " + + public static final String DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG = "diskless.allow.from.classic.enable"; + public static final boolean DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT = false; + public static final String DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DOC = "Allow migrating existing topics with remote.storage.enable=true from classic (diskless.enable=false) to diskless (diskless.enable=true). " + "This should only be enabled in non-production environments for testing or migration purposes. " + "When enabled, topics can have their diskless.enable config changed from false to true."; @@ -184,8 +184,8 @@ public class ServerConfigs { /** Diskless Configurations **/ .define(DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, BOOLEAN, DISKLESS_STORAGE_SYSTEM_ENABLE_DEFAULT, HIGH, DISKLESS_STORAGE_SYSTEM_ENABLE_DOC) - .define(DISKLESS_MIGRATION_ENABLE_CONFIG, BOOLEAN, DISKLESS_MIGRATION_ENABLE_DEFAULT, LOW, - DISKLESS_MIGRATION_ENABLE_DOC) + .define(DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, BOOLEAN, DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT, LOW, + DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DOC) /** Internal Configurations **/ // This indicates whether unreleased APIs should be advertised by this node. .defineInternal(UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 56502942e6..d3d92296ed 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -507,22 +507,20 @@ public static void validateBrokerLogConfigValues(Map props, private static void validateDiskless(Map existingConfigs, Map newConfigs, boolean isRemoteLogStorageEnabled, - boolean isDisklessMigrationEnabled) { + boolean isDisklessAllowFromClassicEnabled) { Optional wasDiskless = Optional.ofNullable(existingConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)).map(Boolean::parseBoolean); Optional.ofNullable((Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) .ifPresent(isBeingEnabled -> { if (isBeingEnabled) { - // diskless.enable=true -> diskless.enable must be already set to true (unless migration is enabled) + // diskless.enable=true -> diskless.enable must be already set to true (unless migration is enabled and remote storage is enabled) if (wasDiskless.isPresent() && !wasDiskless.get()) { - // cannot change from diskless.enable = false to diskless.enable = true (unless migration is enabled) - if (!isDisklessMigrationEnabled) { + // cannot change from diskless.enable = false to diskless.enable = true (unless migration is enabled and remote storage is enabled) + if (!isDisklessAllowFromClassicEnabled || !isRemoteLogStorageEnabled) { throw new InvalidConfigurationException("It is invalid to enable diskless"); } - // Migration is enabled, allow the change from false to true - } - - if (isRemoteLogStorageEnabled) { + // Migration is enabled and remote storage is enabled, allow the change from false to true + } else if (isRemoteLogStorageEnabled) { throw new InvalidConfigurationException("Diskless and remote storage cannot be enabled simultaneously"); } } else { @@ -542,12 +540,12 @@ private static void validateDiskless(Map existingConfigs, * @param existingConfigs The existing properties * @param newConfigs The new properties to be validated * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled - * @param isDisklessMigrationEnabled true if diskless migration is enabled (allows switching diskless.enable from false to true) + * @param isDisklessAllowFromClassicEnabled true if diskless migration is enabled (allows switching diskless.enable from false to true) */ private static void validateTopicLogConfigValues(Map existingConfigs, Map newConfigs, boolean isRemoteLogStorageSystemEnabled, - boolean isDisklessMigrationEnabled) { + boolean isDisklessAllowFromClassicEnabled) { validateValues(newConfigs); boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); @@ -563,7 +561,7 @@ private static void validateTopicLogConfigValues(Map existingCon validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled); } - validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled, isDisklessMigrationEnabled); + validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled, isDisklessAllowFromClassicEnabled); } public static void validateTurningOffRemoteStorageWithDelete(Map newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) { @@ -666,7 +664,7 @@ public static void validate(Map existingConfigs, Properties props, Map configuredProps, boolean isRemoteLogStorageSystemEnabled, - boolean isDisklessMigrationEnabled) { + boolean isDisklessAllowFromClassicEnabled) { validateNames(props); if (configuredProps == null || configuredProps.isEmpty()) { Map valueMaps = CONFIG.parse(props); @@ -675,7 +673,7 @@ public static void validate(Map existingConfigs, Map combinedConfigs = new HashMap<>(configuredProps); combinedConfigs.putAll(props); Map valueMaps = CONFIG.parse(combinedConfigs); - validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, isDisklessMigrationEnabled); + validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, isDisklessAllowFromClassicEnabled); } }