diff --git a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigV2Policy.java b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigV2Policy.java
new file mode 100644
index 0000000000..cfa58f7cf7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigV2Policy.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.policy;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.PolicyViolationException;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * An interface for enforcing a policy on alter configs requests with access to both
+ * the existing and resulting configuration state.
+ *
+ *
If alter.config.v2.policy.class.name is defined, Kafka will create an instance
+ * of the specified class using the default constructor and will then pass the broker configs to
+ * its configure() method. During broker shutdown, the close() method will
+ * be invoked so that resources can be released (if necessary).
+ */
+public interface AlterConfigV2Policy extends Configurable, AutoCloseable {
+
+ /**
+ * Class containing the alter config request parameters with before/after state.
+ */
+ class RequestMetadata {
+
+ private final ConfigResource resource;
+ private final Map configsBefore;
+ private final Map configsAfter;
+
+ /**
+ * Create an instance of this class with the provided parameters.
+ *
+ * This constructor is public to make testing of AlterConfigV2Policy
+ * implementations easier.
+ */
+ public RequestMetadata(ConfigResource resource,
+ Map configsBefore,
+ Map configsAfter) {
+ this.resource = resource;
+ this.configsBefore = configsBefore;
+ this.configsAfter = configsAfter;
+ }
+
+ /**
+ * Return the configs before the alteration.
+ */
+ public Map configsBefore() {
+ return configsBefore;
+ }
+
+ /**
+ * Return the configs after the alteration is applied.
+ */
+ public Map configsAfter() {
+ return configsAfter;
+ }
+
+ /**
+ * Return the resource being altered.
+ */
+ public ConfigResource resource() {
+ return resource;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(resource, configsBefore, configsAfter);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if ((o == null) || (!o.getClass().equals(getClass()))) return false;
+ RequestMetadata other = (RequestMetadata) o;
+ return resource.equals(other.resource) &&
+ configsBefore.equals(other.configsBefore) &&
+ configsAfter.equals(other.configsAfter);
+ }
+
+ @Override
+ public String toString() {
+ return "AlterConfigV2Policy.RequestMetadata(resource=" + resource +
+ ", configsBefore=" + configsBefore +
+ ", configsAfter=" + configsAfter + ")";
+ }
+ }
+
+ /**
+ * Validate the request parameters and throw a PolicyViolationException
+ * with a suitable error message if the alter configs request parameters for the
+ * provided resource do not satisfy this policy.
+ *
+ * @param requestMetadata the alter configs request parameters including before/after state
+ * for the provided resource.
+ * @throws PolicyViolationException if the request parameters do not satisfy this policy.
+ */
+ void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
+}
diff --git a/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigV2PolicyTest.java b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigV2PolicyTest.java
new file mode 100644
index 0000000000..d85e56b548
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/server/policy/AlterConfigV2PolicyTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.policy;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.server.policy.AlterConfigV2Policy.RequestMetadata;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class AlterConfigV2PolicyTest {
+
+ @Test
+ public void testRequestMetadataEquals() {
+ RequestMetadata requestMetadata = new RequestMetadata(
+ new ConfigResource(Type.BROKER, "0"),
+ Map.of("foo", "bar"),
+ Map.of("foo", "baz")
+ );
+
+ assertEquals(requestMetadata, requestMetadata);
+
+ assertNotEquals(requestMetadata, null);
+ assertNotEquals(requestMetadata, new Object());
+ assertNotEquals(requestMetadata, new RequestMetadata(
+ new ConfigResource(Type.BROKER, "1"),
+ Map.of("foo", "bar"),
+ Map.of("foo", "baz")
+ ));
+ assertNotEquals(requestMetadata, new RequestMetadata(
+ new ConfigResource(Type.BROKER, "0"),
+ Map.of("foo", "bar"),
+ Map.of()
+ ));
+ assertNotEquals(requestMetadata, new RequestMetadata(
+ new ConfigResource(Type.BROKER, "0"),
+ Map.of(),
+ Map.of("foo", "baz")
+ ));
+ }
+}
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 7f7d45d15f..fbb1184a06 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -43,12 +43,12 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
+import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
-import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
+import org.apache.kafka.server.policy.{AlterConfigPolicy, AlterConfigV2Policy, CreateTopicPolicy}
import org.apache.kafka.server.util.{Deadline, FutureUtils}
import java.util
@@ -90,6 +90,7 @@ class ControllerServer(
val socketServerFirstBoundPortFuture = new CompletableFuture[Integer]()
var createTopicPolicy: Option[CreateTopicPolicy] = None
var alterConfigPolicy: Option[AlterConfigPolicy] = None
+ var alterConfigV2Policy: Option[AlterConfigV2Policy] = None
@volatile var quorumControllerMetrics: QuorumControllerMetrics = _
var controller: Controller = _
var quotaManagers: QuotaManagers = _
@@ -194,6 +195,8 @@ class ControllerServer(
createTopicPolicy = Option(config.
getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy]))
+ alterConfigV2Policy = Option(config.
+ getConfiguredInstance(ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigV2Policy]))
alterConfigPolicy = Option(config.
getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigPolicy]))
@@ -243,6 +246,7 @@ class ControllerServer(
setMetrics(quorumControllerMetrics).
setCreateTopicPolicy(createTopicPolicy.toJava).
setAlterConfigPolicy(alterConfigPolicy.toJava).
+ setAlterConfigV2Policy(alterConfigV2Policy.toJava).
setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata).
@@ -478,6 +482,7 @@ class ControllerServer(
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin"))
createTopicPolicy.foreach(policy => Utils.closeQuietly(policy, "create topic policy"))
alterConfigPolicy.foreach(policy => Utils.closeQuietly(policy, "alter config policy"))
+ alterConfigV2Policy.foreach(policy => Utils.closeQuietly(policy, "alter config v2 policy"))
socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
CoreUtils.swallow(config.dynamicConfig.clear(), this)
sharedServer.stopForController()
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0cec30c06a..d3efbbfb9b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -588,6 +588,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
// warn if create.topic.policy.class.name or alter.config.policy.class.name is defined in the broker role
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG)
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG)
+ warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG)
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
// KRaft controller-only
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 2b88d47431..5dd918002e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -37,6 +37,7 @@
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
+import org.apache.kafka.server.policy.AlterConfigV2Policy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
@@ -71,6 +72,7 @@ public class ConfigurationControlManager {
private final KafkaConfigSchema configSchema;
private final Consumer existenceChecker;
private final Optional alterConfigPolicy;
+ private final Optional alterConfigV2Policy;
private final ConfigurationValidator validator;
private final TimelineHashMap> configData;
private final TimelineHashSet brokersWithConfigs;
@@ -84,6 +86,7 @@ static class Builder {
private KafkaConfigSchema configSchema = null;
private Consumer existenceChecker = __ -> { };
private Optional alterConfigPolicy = Optional.empty();
+ private Optional alterConfigV2Policy = Optional.empty();
private ConfigurationValidator validator = ConfigurationValidator.NO_OP;
private Map staticConfig = Map.of();
private int nodeId = 0;
@@ -114,6 +117,11 @@ Builder setAlterConfigPolicy(Optional alterConfigPolicy) {
return this;
}
+ Builder setAlterConfigV2Policy(Optional alterConfigV2Policy) {
+ this.alterConfigV2Policy = alterConfigV2Policy;
+ return this;
+ }
+
Builder setValidator(ConfigurationValidator validator) {
this.validator = validator;
return this;
@@ -149,6 +157,7 @@ ConfigurationControlManager build() {
configSchema,
existenceChecker,
alterConfigPolicy,
+ alterConfigV2Policy,
validator,
staticConfig,
nodeId,
@@ -161,6 +170,7 @@ private ConfigurationControlManager(LogContext logContext,
KafkaConfigSchema configSchema,
Consumer existenceChecker,
Optional alterConfigPolicy,
+ Optional alterConfigV2Policy,
ConfigurationValidator validator,
Map staticConfig,
int nodeId,
@@ -171,6 +181,7 @@ private ConfigurationControlManager(LogContext logContext,
this.configSchema = configSchema;
this.existenceChecker = existenceChecker;
this.alterConfigPolicy = alterConfigPolicy;
+ this.alterConfigV2Policy = alterConfigV2Policy;
this.validator = validator;
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersWithConfigs = new TimelineHashSet<>(snapshotRegistry, 0);
@@ -369,7 +380,12 @@ private ApiError validateAlterConfig(
if (!newlyCreatedResource) {
existenceChecker.accept(configResource);
}
- if (alterConfigPolicy.isPresent()) {
+ if (alterConfigV2Policy.isPresent()) {
+ alterConfigV2Policy.get().validate(new AlterConfigV2Policy.RequestMetadata(
+ configResource,
+ Map.copyOf(existingConfigsMap),
+ Map.copyOf(allConfigs)));
+ } else if (alterConfigPolicy.isPresent()) {
alterConfigPolicy.get().validate(new RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck));
}
} catch (ConfigException e) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 77639ba9a6..d941f1c7b0 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -120,6 +120,7 @@
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.server.policy.AlterConfigPolicy;
+import org.apache.kafka.server.policy.AlterConfigV2Policy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
@@ -208,6 +209,7 @@ public static class Builder {
private QuorumControllerMetrics controllerMetrics = null;
private Optional createTopicPolicy = Optional.empty();
private Optional alterConfigPolicy = Optional.empty();
+ private Optional alterConfigV2Policy = Optional.empty();
private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
private Map staticConfig = Map.of();
private BootstrapMetadata bootstrapMetadata = null;
@@ -353,6 +355,11 @@ public Builder setAlterConfigPolicy(Optional alterConfigPolic
return this;
}
+ public Builder setAlterConfigV2Policy(Optional alterConfigV2Policy) {
+ this.alterConfigV2Policy = alterConfigV2Policy;
+ return this;
+ }
+
public Builder setConfigurationValidator(ConfigurationValidator configurationValidator) {
this.configurationValidator = configurationValidator;
return this;
@@ -443,6 +450,7 @@ public QuorumController build() throws Exception {
controllerMetrics,
createTopicPolicy,
alterConfigPolicy,
+ alterConfigV2Policy,
configurationValidator,
staticConfig,
bootstrapMetadata,
@@ -1489,6 +1497,7 @@ private QuorumController(
QuorumControllerMetrics controllerMetrics,
Optional createTopicPolicy,
Optional alterConfigPolicy,
+ Optional alterConfigV2Policy,
ConfigurationValidator configurationValidator,
Map staticConfig,
BootstrapMetadata bootstrapMetadata,
@@ -1548,6 +1557,7 @@ private QuorumController(
setKafkaConfigSchema(configSchema).
setExistenceChecker(resourceExists).
setAlterConfigPolicy(alterConfigPolicy).
+ setAlterConfigV2Policy(alterConfigV2Policy).
setValidator(configurationValidator).
setStaticConfig(staticConfig).
setNodeId(nodeId).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 2c93d1100e..4393c63ae9 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -36,6 +36,7 @@
import org.apache.kafka.server.config.ConfigSynonym;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
+import org.apache.kafka.server.policy.AlterConfigV2Policy;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -366,6 +367,40 @@ public void testIncrementalAlterConfigsWithPolicy() {
true));
}
+ @Test
+ public void testAlterConfigV2PolicyReceivesBeforeAndAfter() {
+ CaptureAlterConfigV2Policy policy = new CaptureAlterConfigV2Policy();
+ ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
+ setKafkaConfigSchema(SCHEMA).
+ setAlterConfigV2Policy(Optional.of(policy)).
+ build();
+
+ manager.replay(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName(MYTOPIC.name()).
+ setName("cleanup.policy").setValue("delete"));
+ manager.replay(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName(MYTOPIC.name()).
+ setName("retention.ms").setValue("10000"));
+
+ ControllerResult incrementalResult = manager.incrementalAlterConfig(MYTOPIC,
+ toMap(entry("retention.ms", entry(SET, "20000")),
+ entry("cleanup.policy", entry(SET, "compact"))),
+ true);
+ RecordTestUtils.replayAll(manager, incrementalResult.records());
+
+ AlterConfigV2Policy.RequestMetadata incremental = policy.invocations().get(0);
+ assertEquals(Map.of("cleanup.policy", "delete", "retention.ms", "10000"), incremental.configsBefore());
+ assertEquals(Map.of("cleanup.policy", "compact", "retention.ms", "20000"), incremental.configsAfter());
+
+ ControllerResult