Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
nullTopicConfigs.mkString(","))
}
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
kafkaConfig.disklessMigrationEnabled)
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])

/** Diskless Configuration */
val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG)
val disklessMigrationEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG)

def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
Expand Down
34 changes: 33 additions & 1 deletion core/src/test/java/kafka/server/InklessConfigsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ public class InklessConfigsTest {
@Container
protected static MinioContainer s3Container = S3TestContainer.minio();

private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception {
private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception {
return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, false);
}

private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean disklessMigrationEnableConfig) throws Exception {
final TestKitNodes nodes = new TestKitNodes.Builder()
.setCombined(true)
.setNumBrokerNodes(1)
Expand All @@ -78,6 +82,7 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di
.setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
.setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, String.valueOf(defaultDisklessEnableConfig))
.setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig))
.setConfigProp(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG, String.valueOf(disklessMigrationEnableConfig))
// PG control plane config
.setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl())
Expand Down Expand Up @@ -186,6 +191,33 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception {
cluster.close();
}

@Test
public void disklessMigrationEnabled() throws Exception {
// Initialize cluster with diskless migration enabled
var cluster = init(false, true, true);
Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (Admin admin = AdminClient.create(clientConfigs)) {
// When creating a new topic with diskless.enable=false
final String classicTopic = "classicTopic";
createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"));
// Then diskless.enable is set to false in the topic config
var classicTopicConfig = getTopicConfig(admin, classicTopic);
assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG));

// When migration is enabled, it SHOULD be possible to turn on diskless after the topic is created
alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true"));
// Verify the config was updated
var updatedTopicConfig = getTopicConfig(admin, classicTopic);
assertEquals("true", updatedTopicConfig.get(DISKLESS_ENABLE_CONFIG));

// But it should still NOT be possible to turn off diskless after enabling it
assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")));
}
cluster.close();
}


public void createTopic(Admin admin, String topic, Map<String, String> configs) throws Exception {
admin.createTopics(Collections.singletonList(
Expand Down
31 changes: 31 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,37 @@ class LogConfigTest {
LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false)
}

@Test
def testDisklessMigrationEnabled(): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val isDisklessMigrationEnabled = true

val disklessAlreadyEnabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")

val setDisklessTrue = new Properties()
setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
val setDisklessFalse = new Properties()
setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")

// When migration is enabled:
// 1. Should be possible to switch from diskless.enable=false to diskless.enable=true
LogConfig.validate(disklessAlreadyDisabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled)

// 2. Should still NOT be possible to switch from diskless.enable=true to diskless.enable=false (even with migration enabled)
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(disklessAlreadyEnabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled)
)

// 3. Should still be possible to keep diskless.enable=true
LogConfig.validate(disklessAlreadyEnabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled)

// 4. Should still be possible to keep diskless.enable=false
LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled)
}


@Test
def testValidDisklessAndRemoteStorageEnable(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public class ServerConfigs {
public static final String DISKLESS_STORAGE_SYSTEM_ENABLE_DOC = "Enable the diskless storage system. " +
"This enables diskless topics alongside classic topics.";

public static final String DISKLESS_MIGRATION_ENABLE_CONFIG = "diskless.migration.enable";
public static final boolean DISKLESS_MIGRATION_ENABLE_DEFAULT = false;
public static final String DISKLESS_MIGRATION_ENABLE_DOC = "Allow migrating existing topics from classic (diskless.enable=false) to diskless (diskless.enable=true). " +
"This should only be enabled in non-production environments for testing or migration purposes. " +
"When enabled, topics can have their diskless.enable config changed from false to true.";


/************* Authorizer Configuration ***********/
public static final String AUTHORIZER_CLASS_NAME_CONFIG = "authorizer.class.name";
Expand Down Expand Up @@ -178,6 +184,8 @@ public class ServerConfigs {
/** Diskless Configurations **/
.define(DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, BOOLEAN, DISKLESS_STORAGE_SYSTEM_ENABLE_DEFAULT, HIGH,
DISKLESS_STORAGE_SYSTEM_ENABLE_DOC)
.define(DISKLESS_MIGRATION_ENABLE_CONFIG, BOOLEAN, DISKLESS_MIGRATION_ENABLE_DEFAULT, LOW,
DISKLESS_MIGRATION_ENABLE_DOC)
/** Internal Configurations **/
// This indicates whether unreleased APIs should be advertised by this node.
.defineInternal(UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,16 +506,20 @@ public static void validateBrokerLogConfigValues(Map<?, ?> props,

private static void validateDiskless(Map<String, String> existingConfigs,
Map<?, ?> newConfigs,
boolean isRemoteLogStorageEnabled) {
boolean isRemoteLogStorageEnabled,
boolean isDisklessMigrationEnabled) {
Optional<Boolean> wasDiskless = Optional.ofNullable(existingConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)).map(Boolean::parseBoolean);

Optional.ofNullable((Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG))
.ifPresent(isBeingEnabled -> {
if (isBeingEnabled) {
// diskless.enable=true -> diskless.enable must be already set to true
// diskless.enable=true -> diskless.enable must be already set to true (unless migration is enabled)
if (wasDiskless.isPresent() && !wasDiskless.get()) {
// cannot change from diskless.enable = false to diskless.enable = true
throw new InvalidConfigurationException("It is invalid to enable diskless");
// cannot change from diskless.enable = false to diskless.enable = true (unless migration is enabled)
if (!isDisklessMigrationEnabled) {
throw new InvalidConfigurationException("It is invalid to enable diskless");
}
// Migration is enabled, allow the change from false to true
}

if (isRemoteLogStorageEnabled) {
Expand All @@ -538,10 +542,12 @@ private static void validateDiskless(Map<String, String> existingConfigs,
* @param existingConfigs The existing properties
* @param newConfigs The new properties to be validated
* @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled
* @param isDisklessMigrationEnabled true if diskless migration is enabled (allows switching diskless.enable from false to true)
*/
private static void validateTopicLogConfigValues(Map<String, String> existingConfigs,
Map<?, ?> newConfigs,
boolean isRemoteLogStorageSystemEnabled) {
boolean isRemoteLogStorageSystemEnabled,
boolean isDisklessMigrationEnabled) {
validateValues(newConfigs);

boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
Expand All @@ -557,7 +563,7 @@ private static void validateTopicLogConfigValues(Map<String, String> existingCon
validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled);
}

validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled);
validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled, isDisklessMigrationEnabled);
}

public static void validateTurningOffRemoteStorageWithDelete(Map<?, ?> newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) {
Expand Down Expand Up @@ -646,13 +652,21 @@ private static void validateRemoteStorageRetentionTime(Map<?, ?> props) {
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/
public static void validate(Properties props) {
validate(Map.of(), props, Map.of(), false);
validate(Map.of(), props, Map.of(), false, false);
}

public static void validate(Map<String, String> existingConfigs,
Properties props,
Map<?, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled) {
validate(existingConfigs, props, configuredProps, isRemoteLogStorageSystemEnabled, false);
}

public static void validate(Map<String, String> existingConfigs,
Properties props,
Map<?, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled,
boolean isDisklessMigrationEnabled) {
validateNames(props);
if (configuredProps == null || configuredProps.isEmpty()) {
Map<?, ?> valueMaps = CONFIG.parse(props);
Expand All @@ -661,7 +675,7 @@ public static void validate(Map<String, String> existingConfigs,
Map<Object, Object> combinedConfigs = new HashMap<>(configuredProps);
combinedConfigs.putAll(props);
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled);
validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, isDisklessMigrationEnabled);
}
}

Expand Down