From 61efde51fad3ffa7ec5996964d38b462b13620db Mon Sep 17 00:00:00 2001 From: Guilherme Caminha Date: Thu, 8 Jan 2026 16:17:46 +0100 Subject: [PATCH] KIP 935 --- .../server/policy/AlterConfigV2Policy.java | 113 ++++++++++++++++++ .../policy/AlterConfigV2PolicyTest.java | 60 ++++++++++ .../scala/kafka/server/ControllerServer.scala | 9 +- .../main/scala/kafka/server/KafkaConfig.scala | 1 + .../ConfigurationControlManager.java | 18 ++- .../kafka/controller/QuorumController.java | 10 ++ .../ConfigurationControlManagerTest.java | 58 +++++++++ .../kafka/server/config/ServerLogConfigs.java | 4 + .../storage/internals/log/LogConfig.java | 1 + 9 files changed, 271 insertions(+), 3 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/server/policy/AlterConfigV2Policy.java create mode 100644 clients/src/test/java/org/apache/kafka/server/policy/AlterConfigV2PolicyTest.java diff --git a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigV2Policy.java b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigV2Policy.java new file mode 100644 index 0000000000..cfa58f7cf7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigV2Policy.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.policy; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.PolicyViolationException; + +import java.util.Map; +import java.util.Objects; + +/** + * An interface for enforcing a policy on alter configs requests with access to both + * the existing and resulting configuration state. + * + *

If alter.config.v2.policy.class.name is defined, Kafka will create an instance + * of the specified class using the default constructor and will then pass the broker configs to + * its configure() method. During broker shutdown, the close() method will + * be invoked so that resources can be released (if necessary). + */ +public interface AlterConfigV2Policy extends Configurable, AutoCloseable { + + /** + * Class containing the alter config request parameters with before/after state. + */ + class RequestMetadata { + + private final ConfigResource resource; + private final Map configsBefore; + private final Map configsAfter; + + /** + * Create an instance of this class with the provided parameters. + * + * This constructor is public to make testing of AlterConfigV2Policy + * implementations easier. + */ + public RequestMetadata(ConfigResource resource, + Map configsBefore, + Map configsAfter) { + this.resource = resource; + this.configsBefore = configsBefore; + this.configsAfter = configsAfter; + } + + /** + * Return the configs before the alteration. + */ + public Map configsBefore() { + return configsBefore; + } + + /** + * Return the configs after the alteration is applied. + */ + public Map configsAfter() { + return configsAfter; + } + + /** + * Return the resource being altered. + */ + public ConfigResource resource() { + return resource; + } + + @Override + public int hashCode() { + return Objects.hash(resource, configsBefore, configsAfter); + } + + @Override + public boolean equals(Object o) { + if ((o == null) || (!o.getClass().equals(getClass()))) return false; + RequestMetadata other = (RequestMetadata) o; + return resource.equals(other.resource) && + configsBefore.equals(other.configsBefore) && + configsAfter.equals(other.configsAfter); + } + + @Override + public String toString() { + return "AlterConfigV2Policy.RequestMetadata(resource=" + resource + + ", configsBefore=" + configsBefore + + ", configsAfter=" + configsAfter + ")"; + } + } + + /** + * Validate the request parameters and throw a PolicyViolationException + * with a suitable error message if the alter configs request parameters for the + * provided resource do not satisfy this policy. + * + * @param requestMetadata the alter configs request parameters including before/after state + * for the provided resource. + * @throws PolicyViolationException if the request parameters do not satisfy this policy. + */ + void validate(RequestMetadata requestMetadata) throws PolicyViolationException; +} diff --git a/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigV2PolicyTest.java b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigV2PolicyTest.java new file mode 100644 index 0000000000..d85e56b548 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigV2PolicyTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.policy; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.ConfigResource.Type; +import org.apache.kafka.server.policy.AlterConfigV2Policy.RequestMetadata; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class AlterConfigV2PolicyTest { + + @Test + public void testRequestMetadataEquals() { + RequestMetadata requestMetadata = new RequestMetadata( + new ConfigResource(Type.BROKER, "0"), + Map.of("foo", "bar"), + Map.of("foo", "baz") + ); + + assertEquals(requestMetadata, requestMetadata); + + assertNotEquals(requestMetadata, null); + assertNotEquals(requestMetadata, new Object()); + assertNotEquals(requestMetadata, new RequestMetadata( + new ConfigResource(Type.BROKER, "1"), + Map.of("foo", "bar"), + Map.of("foo", "baz") + )); + assertNotEquals(requestMetadata, new RequestMetadata( + new ConfigResource(Type.BROKER, "0"), + Map.of("foo", "bar"), + Map.of() + )); + assertNotEquals(requestMetadata, new RequestMetadata( + new ConfigResource(Type.BROKER, "0"), + Map.of(), + Map.of("foo", "baz") + )); + } +} diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 7f7d45d15f..fbb1184a06 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -43,12 +43,12 @@ import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager} import org.apache.kafka.server.authorizer.Authorizer -import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG} +import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG} import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager} import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs} import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} -import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} +import org.apache.kafka.server.policy.{AlterConfigPolicy, AlterConfigV2Policy, CreateTopicPolicy} import org.apache.kafka.server.util.{Deadline, FutureUtils} import java.util @@ -90,6 +90,7 @@ class ControllerServer( val socketServerFirstBoundPortFuture = new CompletableFuture[Integer]() var createTopicPolicy: Option[CreateTopicPolicy] = None var alterConfigPolicy: Option[AlterConfigPolicy] = None + var alterConfigV2Policy: Option[AlterConfigV2Policy] = None @volatile var quorumControllerMetrics: QuorumControllerMetrics = _ var controller: Controller = _ var quotaManagers: QuotaManagers = _ @@ -194,6 +195,8 @@ class ControllerServer( createTopicPolicy = Option(config. getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy])) + alterConfigV2Policy = Option(config. + getConfiguredInstance(ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigV2Policy])) alterConfigPolicy = Option(config. getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigPolicy])) @@ -243,6 +246,7 @@ class ControllerServer( setMetrics(quorumControllerMetrics). setCreateTopicPolicy(createTopicPolicy.toJava). setAlterConfigPolicy(alterConfigPolicy.toJava). + setAlterConfigV2Policy(alterConfigV2Policy.toJava). setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). setStaticConfig(config.originals). setBootstrapMetadata(bootstrapMetadata). @@ -478,6 +482,7 @@ class ControllerServer( authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin")) createTopicPolicy.foreach(policy => Utils.closeQuietly(policy, "create topic policy")) alterConfigPolicy.foreach(policy => Utils.closeQuietly(policy, "alter config policy")) + alterConfigV2Policy.foreach(policy => Utils.closeQuietly(policy, "alter config v2 policy")) socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down")) CoreUtils.swallow(config.dynamicConfig.clear(), this) sharedServer.stopForController() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0cec30c06a..d3efbbfb9b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -588,6 +588,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) // warn if create.topic.policy.class.name or alter.config.policy.class.name is defined in the broker role warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG) warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG) + warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG) } else if (processRoles == Set(ProcessRole.ControllerRole)) { // KRaft controller-only validateQuorumVotersAndQuorumBootstrapServerForKRaft() diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 2b88d47431..5dd918002e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -37,6 +37,7 @@ import org.apache.kafka.server.mutable.BoundedList; import org.apache.kafka.server.policy.AlterConfigPolicy; import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata; +import org.apache.kafka.server.policy.AlterConfigV2Policy; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashSet; @@ -71,6 +72,7 @@ public class ConfigurationControlManager { private final KafkaConfigSchema configSchema; private final Consumer existenceChecker; private final Optional alterConfigPolicy; + private final Optional alterConfigV2Policy; private final ConfigurationValidator validator; private final TimelineHashMap> configData; private final TimelineHashSet brokersWithConfigs; @@ -84,6 +86,7 @@ static class Builder { private KafkaConfigSchema configSchema = null; private Consumer existenceChecker = __ -> { }; private Optional alterConfigPolicy = Optional.empty(); + private Optional alterConfigV2Policy = Optional.empty(); private ConfigurationValidator validator = ConfigurationValidator.NO_OP; private Map staticConfig = Map.of(); private int nodeId = 0; @@ -114,6 +117,11 @@ Builder setAlterConfigPolicy(Optional alterConfigPolicy) { return this; } + Builder setAlterConfigV2Policy(Optional alterConfigV2Policy) { + this.alterConfigV2Policy = alterConfigV2Policy; + return this; + } + Builder setValidator(ConfigurationValidator validator) { this.validator = validator; return this; @@ -149,6 +157,7 @@ ConfigurationControlManager build() { configSchema, existenceChecker, alterConfigPolicy, + alterConfigV2Policy, validator, staticConfig, nodeId, @@ -161,6 +170,7 @@ private ConfigurationControlManager(LogContext logContext, KafkaConfigSchema configSchema, Consumer existenceChecker, Optional alterConfigPolicy, + Optional alterConfigV2Policy, ConfigurationValidator validator, Map staticConfig, int nodeId, @@ -171,6 +181,7 @@ private ConfigurationControlManager(LogContext logContext, this.configSchema = configSchema; this.existenceChecker = existenceChecker; this.alterConfigPolicy = alterConfigPolicy; + this.alterConfigV2Policy = alterConfigV2Policy; this.validator = validator; this.configData = new TimelineHashMap<>(snapshotRegistry, 0); this.brokersWithConfigs = new TimelineHashSet<>(snapshotRegistry, 0); @@ -369,7 +380,12 @@ private ApiError validateAlterConfig( if (!newlyCreatedResource) { existenceChecker.accept(configResource); } - if (alterConfigPolicy.isPresent()) { + if (alterConfigV2Policy.isPresent()) { + alterConfigV2Policy.get().validate(new AlterConfigV2Policy.RequestMetadata( + configResource, + Map.copyOf(existingConfigsMap), + Map.copyOf(allConfigs))); + } else if (alterConfigPolicy.isPresent()) { alterConfigPolicy.get().validate(new RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck)); } } catch (ConfigException e) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 77639ba9a6..d941f1c7b0 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -120,6 +120,7 @@ import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.server.fault.FaultHandlerException; import org.apache.kafka.server.policy.AlterConfigPolicy; +import org.apache.kafka.server.policy.AlterConfigV2Policy; import org.apache.kafka.server.policy.CreateTopicPolicy; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.Snapshots; @@ -208,6 +209,7 @@ public static class Builder { private QuorumControllerMetrics controllerMetrics = null; private Optional createTopicPolicy = Optional.empty(); private Optional alterConfigPolicy = Optional.empty(); + private Optional alterConfigV2Policy = Optional.empty(); private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP; private Map staticConfig = Map.of(); private BootstrapMetadata bootstrapMetadata = null; @@ -353,6 +355,11 @@ public Builder setAlterConfigPolicy(Optional alterConfigPolic return this; } + public Builder setAlterConfigV2Policy(Optional alterConfigV2Policy) { + this.alterConfigV2Policy = alterConfigV2Policy; + return this; + } + public Builder setConfigurationValidator(ConfigurationValidator configurationValidator) { this.configurationValidator = configurationValidator; return this; @@ -443,6 +450,7 @@ public QuorumController build() throws Exception { controllerMetrics, createTopicPolicy, alterConfigPolicy, + alterConfigV2Policy, configurationValidator, staticConfig, bootstrapMetadata, @@ -1489,6 +1497,7 @@ private QuorumController( QuorumControllerMetrics controllerMetrics, Optional createTopicPolicy, Optional alterConfigPolicy, + Optional alterConfigV2Policy, ConfigurationValidator configurationValidator, Map staticConfig, BootstrapMetadata bootstrapMetadata, @@ -1548,6 +1557,7 @@ private QuorumController( setKafkaConfigSchema(configSchema). setExistenceChecker(resourceExists). setAlterConfigPolicy(alterConfigPolicy). + setAlterConfigV2Policy(alterConfigV2Policy). setValidator(configurationValidator). setStaticConfig(staticConfig). setNodeId(nodeId). diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 2c93d1100e..4393c63ae9 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.server.config.ConfigSynonym; import org.apache.kafka.server.policy.AlterConfigPolicy; import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata; +import org.apache.kafka.server.policy.AlterConfigV2Policy; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -366,6 +367,40 @@ public void testIncrementalAlterConfigsWithPolicy() { true)); } + @Test + public void testAlterConfigV2PolicyReceivesBeforeAndAfter() { + CaptureAlterConfigV2Policy policy = new CaptureAlterConfigV2Policy(); + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setFeatureControl(createFeatureControlManager()). + setKafkaConfigSchema(SCHEMA). + setAlterConfigV2Policy(Optional.of(policy)). + build(); + + manager.replay(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName(MYTOPIC.name()). + setName("cleanup.policy").setValue("delete")); + manager.replay(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName(MYTOPIC.name()). + setName("retention.ms").setValue("10000")); + + ControllerResult incrementalResult = manager.incrementalAlterConfig(MYTOPIC, + toMap(entry("retention.ms", entry(SET, "20000")), + entry("cleanup.policy", entry(SET, "compact"))), + true); + RecordTestUtils.replayAll(manager, incrementalResult.records()); + + AlterConfigV2Policy.RequestMetadata incremental = policy.invocations().get(0); + assertEquals(Map.of("cleanup.policy", "delete", "retention.ms", "10000"), incremental.configsBefore()); + assertEquals(Map.of("cleanup.policy", "compact", "retention.ms", "20000"), incremental.configsAfter()); + + ControllerResult> legacyResult = manager.legacyAlterConfigs( + toMap(entry(MYTOPIC, toMap(entry("retention.ms", "30000")))), + true); + RecordTestUtils.replayAll(manager, legacyResult.records()); + + AlterConfigV2Policy.RequestMetadata legacy = policy.invocations().get(1); + assertEquals(Map.of("cleanup.policy", "compact", "retention.ms", "20000"), legacy.configsBefore()); + assertEquals(Map.of("retention.ms", "30000"), legacy.configsAfter()); + } + private static class CheckForNullValuesPolicy implements AlterConfigPolicy { @Override public void validate(RequestMetadata actual) throws PolicyViolationException { @@ -387,6 +422,29 @@ public void configure(Map configs) { } } + private static class CaptureAlterConfigV2Policy implements AlterConfigV2Policy { + private final List invocations = new ArrayList<>(); + + @Override + public void validate(AlterConfigV2Policy.RequestMetadata requestMetadata) { + invocations.add(requestMetadata); + } + + @Override + public void close() { + // empty + } + + @Override + public void configure(Map configs) { + // empty + } + + List invocations() { + return invocations; + } + } + @Test public void testLegacyAlterConfigs() { ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java index 945e0b5cf2..bf8bcc3e97 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java @@ -147,6 +147,10 @@ public class ServerLogConfigs { public static final String ALTER_CONFIG_POLICY_CLASS_NAME_DOC = "The alter configs policy class that should be used for validation. The class should " + "implement the org.apache.kafka.server.policy.AlterConfigPolicy interface. " + "

Note: This policy runs on the controller instead of the broker.

"; + public static final String ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG = "alter.config.v2.policy.class.name"; + public static final String ALTER_CONFIG_V2_POLICY_CLASS_NAME_DOC = "The alter configs v2 policy class that should be used for validation. The class should " + + "implement the org.apache.kafka.server.policy.AlterConfigV2Policy interface. " + + "

Note: This policy runs on the controller instead of the broker.

"; public static final String LOG_INITIAL_TASK_DELAY_MS_CONFIG = LOG_PREFIX + "initial.task.delay.ms"; public static final long LOG_INITIAL_TASK_DELAY_MS_DEFAULT = 30 * 1000L; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 838ab49929..be2c773658 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -179,6 +179,7 @@ public Optional serverConfigName(String configName) { .define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC) .define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC) .define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC) + .define(ServerLogConfigs.ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_V2_POLICY_CLASS_NAME_DOC) .define(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, LONG, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, atLeast(1), LOW, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DOC) .defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC) .defineInternal(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.DISKLESS_ENABLE_DEFAULT, null, LOW, ServerLogConfigs.DISKLESS_ENABLE_DOC);