diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index f163a2739a..d5e944685d 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -118,7 +118,8 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu nullTopicConfigs.mkString(",")) } LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, - kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), + kafkaConfig.disklessMigrationEnabled) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0cec30c06a..81e91aa731 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -428,6 +428,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) /** Diskless Configuration */ val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG) + val disklessMigrationEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG) def addReconfigurable(reconfigurable: Reconfigurable): Unit = { dynamicConfig.addReconfigurable(reconfigurable) diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java index 21548e98fd..5c87cf4912 100644 --- a/core/src/test/java/kafka/server/InklessConfigsTest.java +++ b/core/src/test/java/kafka/server/InklessConfigsTest.java @@ -68,7 +68,11 @@ public class InklessConfigsTest { @Container protected static MinioContainer s3Container = S3TestContainer.minio(); - private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception { + private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception { + return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, false); + } + + private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean disklessMigrationEnableConfig) throws Exception { final TestKitNodes nodes = new TestKitNodes.Builder() .setCombined(true) .setNumBrokerNodes(1) @@ -78,6 +82,7 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") .setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, String.valueOf(defaultDisklessEnableConfig)) .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig)) + .setConfigProp(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG, String.valueOf(disklessMigrationEnableConfig)) // PG control plane config .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) @@ -186,6 +191,33 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception { cluster.close(); } + @Test + public void disklessMigrationEnabled() throws Exception { + // Initialize cluster with diskless migration enabled + var cluster = init(false, true, true); + Map clientConfigs = new HashMap<>(); + clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + + try (Admin admin = AdminClient.create(clientConfigs)) { + // When creating a new topic with diskless.enable=false + final String classicTopic = "classicTopic"; + createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")); + // Then diskless.enable is set to false in the topic config + var classicTopicConfig = getTopicConfig(admin, classicTopic); + assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + + // When migration is enabled, it SHOULD be possible to turn on diskless after the topic is created + alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")); + // Verify the config was updated + var updatedTopicConfig = getTopicConfig(admin, classicTopic); + assertEquals("true", updatedTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + + // But it should still NOT be possible to turn off diskless after enabling it + assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"))); + } + cluster.close(); + } + public void createTopic(Admin admin, String topic, Map configs) throws Exception { admin.createTopics(Collections.singletonList( diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index cbe7ffeed4..2a7a51ead7 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -478,6 +478,37 @@ class LogConfigTest { LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false) } + @Test + def testDisklessMigrationEnabled(): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + val isDisklessMigrationEnabled = true + + val disklessAlreadyEnabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + + val setDisklessTrue = new Properties() + setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + val setDisklessFalse = new Properties() + setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + + // When migration is enabled: + // 1. Should be possible to switch from diskless.enable=false to diskless.enable=true + LogConfig.validate(disklessAlreadyDisabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + + // 2. Should still NOT be possible to switch from diskless.enable=true to diskless.enable=false (even with migration enabled) + assertThrows( + classOf[InvalidConfigurationException], + () => LogConfig.validate(disklessAlreadyEnabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + ) + + // 3. Should still be possible to keep diskless.enable=true + LogConfig.validate(disklessAlreadyEnabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + + // 4. Should still be possible to keep diskless.enable=false + LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + } + @Test def testValidDisklessAndRemoteStorageEnable(): Unit = { diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 2e53274549..60db3015d4 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -131,6 +131,12 @@ public class ServerConfigs { public static final String DISKLESS_STORAGE_SYSTEM_ENABLE_DOC = "Enable the diskless storage system. " + "This enables diskless topics alongside classic topics."; + public static final String DISKLESS_MIGRATION_ENABLE_CONFIG = "diskless.migration.enable"; + public static final boolean DISKLESS_MIGRATION_ENABLE_DEFAULT = false; + public static final String DISKLESS_MIGRATION_ENABLE_DOC = "Allow migrating existing topics from classic (diskless.enable=false) to diskless (diskless.enable=true). " + + "This should only be enabled in non-production environments for testing or migration purposes. " + + "When enabled, topics can have their diskless.enable config changed from false to true."; + /************* Authorizer Configuration ***********/ public static final String AUTHORIZER_CLASS_NAME_CONFIG = "authorizer.class.name"; @@ -178,6 +184,8 @@ public class ServerConfigs { /** Diskless Configurations **/ .define(DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, BOOLEAN, DISKLESS_STORAGE_SYSTEM_ENABLE_DEFAULT, HIGH, DISKLESS_STORAGE_SYSTEM_ENABLE_DOC) + .define(DISKLESS_MIGRATION_ENABLE_CONFIG, BOOLEAN, DISKLESS_MIGRATION_ENABLE_DEFAULT, LOW, + DISKLESS_MIGRATION_ENABLE_DOC) /** Internal Configurations **/ // This indicates whether unreleased APIs should be advertised by this node. .defineInternal(UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 838ab49929..56502942e6 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -506,16 +506,20 @@ public static void validateBrokerLogConfigValues(Map props, private static void validateDiskless(Map existingConfigs, Map newConfigs, - boolean isRemoteLogStorageEnabled) { + boolean isRemoteLogStorageEnabled, + boolean isDisklessMigrationEnabled) { Optional wasDiskless = Optional.ofNullable(existingConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)).map(Boolean::parseBoolean); Optional.ofNullable((Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) .ifPresent(isBeingEnabled -> { if (isBeingEnabled) { - // diskless.enable=true -> diskless.enable must be already set to true + // diskless.enable=true -> diskless.enable must be already set to true (unless migration is enabled) if (wasDiskless.isPresent() && !wasDiskless.get()) { - // cannot change from diskless.enable = false to diskless.enable = true - throw new InvalidConfigurationException("It is invalid to enable diskless"); + // cannot change from diskless.enable = false to diskless.enable = true (unless migration is enabled) + if (!isDisklessMigrationEnabled) { + throw new InvalidConfigurationException("It is invalid to enable diskless"); + } + // Migration is enabled, allow the change from false to true } if (isRemoteLogStorageEnabled) { @@ -538,10 +542,12 @@ private static void validateDiskless(Map existingConfigs, * @param existingConfigs The existing properties * @param newConfigs The new properties to be validated * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled + * @param isDisklessMigrationEnabled true if diskless migration is enabled (allows switching diskless.enable from false to true) */ private static void validateTopicLogConfigValues(Map existingConfigs, Map newConfigs, - boolean isRemoteLogStorageSystemEnabled) { + boolean isRemoteLogStorageSystemEnabled, + boolean isDisklessMigrationEnabled) { validateValues(newConfigs); boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); @@ -557,7 +563,7 @@ private static void validateTopicLogConfigValues(Map existingCon validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled); } - validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled); + validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled, isDisklessMigrationEnabled); } public static void validateTurningOffRemoteStorageWithDelete(Map newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) { @@ -646,13 +652,21 @@ private static void validateRemoteStorageRetentionTime(Map props) { * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ public static void validate(Properties props) { - validate(Map.of(), props, Map.of(), false); + validate(Map.of(), props, Map.of(), false, false); } public static void validate(Map existingConfigs, Properties props, Map configuredProps, boolean isRemoteLogStorageSystemEnabled) { + validate(existingConfigs, props, configuredProps, isRemoteLogStorageSystemEnabled, false); + } + + public static void validate(Map existingConfigs, + Properties props, + Map configuredProps, + boolean isRemoteLogStorageSystemEnabled, + boolean isDisklessMigrationEnabled) { validateNames(props); if (configuredProps == null || configuredProps.isEmpty()) { Map valueMaps = CONFIG.parse(props); @@ -661,7 +675,7 @@ public static void validate(Map existingConfigs, Map combinedConfigs = new HashMap<>(configuredProps); combinedConfigs.putAll(props); Map valueMaps = CONFIG.parse(combinedConfigs); - validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled); + validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, isDisklessMigrationEnabled); } }