Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.policy;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;

import java.util.Map;
import java.util.Objects;

/**
* An interface for enforcing a policy on alter configs requests with access to both
* the existing and resulting configuration state.
*
* <p>If <code>alter.config.v2.policy.class.name</code> is defined, Kafka will create an instance
* of the specified class using the default constructor and will then pass the broker configs to
* its <code>configure()</code> method. During broker shutdown, the <code>close()</code> method will
* be invoked so that resources can be released (if necessary).
*/
public interface AlterConfigV2Policy extends Configurable, AutoCloseable {

/**
* Class containing the alter config request parameters with before/after state.
*/
class RequestMetadata {

private final ConfigResource resource;
private final Map<String, String> configsBefore;
private final Map<String, String> configsAfter;

/**
* Create an instance of this class with the provided parameters.
*
* This constructor is public to make testing of <code>AlterConfigV2Policy</code>
* implementations easier.
*/
public RequestMetadata(ConfigResource resource,
Map<String, String> configsBefore,
Map<String, String> configsAfter) {
this.resource = resource;
this.configsBefore = configsBefore;
this.configsAfter = configsAfter;
}

/**
* Return the configs before the alteration.
*/
public Map<String, String> configsBefore() {
return configsBefore;
}

/**
* Return the configs after the alteration is applied.
*/
public Map<String, String> configsAfter() {
return configsAfter;
}

/**
* Return the resource being altered.
*/
public ConfigResource resource() {
return resource;
}

@Override
public int hashCode() {
return Objects.hash(resource, configsBefore, configsAfter);
}

@Override
public boolean equals(Object o) {
if ((o == null) || (!o.getClass().equals(getClass()))) return false;
RequestMetadata other = (RequestMetadata) o;
return resource.equals(other.resource) &&
configsBefore.equals(other.configsBefore) &&
configsAfter.equals(other.configsAfter);
}

@Override
public String toString() {
return "AlterConfigV2Policy.RequestMetadata(resource=" + resource +
", configsBefore=" + configsBefore +
", configsAfter=" + configsAfter + ")";
}
}

/**
* Validate the request parameters and throw a <code>PolicyViolationException</code>
* with a suitable error message if the alter configs request parameters for the
* provided resource do not satisfy this policy.
*
* @param requestMetadata the alter configs request parameters including before/after state
* for the provided resource.
* @throws PolicyViolationException if the request parameters do not satisfy this policy.
*/
void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.policy;

import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.server.policy.AlterConfigV2Policy.RequestMetadata;

import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

public class AlterConfigV2PolicyTest {

@Test
public void testRequestMetadataEquals() {
RequestMetadata requestMetadata = new RequestMetadata(
new ConfigResource(Type.BROKER, "0"),
Map.of("foo", "bar"),
Map.of("foo", "baz")
);

assertEquals(requestMetadata, requestMetadata);

assertNotEquals(requestMetadata, null);
assertNotEquals(requestMetadata, new Object());
assertNotEquals(requestMetadata, new RequestMetadata(
new ConfigResource(Type.BROKER, "1"),
Map.of("foo", "bar"),
Map.of("foo", "baz")
));
assertNotEquals(requestMetadata, new RequestMetadata(
new ConfigResource(Type.BROKER, "0"),
Map.of("foo", "bar"),
Map.of()
));
assertNotEquals(requestMetadata, new RequestMetadata(
new ConfigResource(Type.BROKER, "0"),
Map.of(),
Map.of("foo", "baz")
));
}
}
9 changes: 7 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.policy.{AlterConfigPolicy, AlterConfigV2Policy, CreateTopicPolicy}
import org.apache.kafka.server.util.{Deadline, FutureUtils}

import java.util
Expand Down Expand Up @@ -90,6 +90,7 @@ class ControllerServer(
val socketServerFirstBoundPortFuture = new CompletableFuture[Integer]()
var createTopicPolicy: Option[CreateTopicPolicy] = None
var alterConfigPolicy: Option[AlterConfigPolicy] = None
var alterConfigV2Policy: Option[AlterConfigV2Policy] = None
@volatile var quorumControllerMetrics: QuorumControllerMetrics = _
var controller: Controller = _
var quotaManagers: QuotaManagers = _
Expand Down Expand Up @@ -194,6 +195,8 @@ class ControllerServer(

createTopicPolicy = Option(config.
getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy]))
alterConfigV2Policy = Option(config.
getConfiguredInstance(ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigV2Policy]))
alterConfigPolicy = Option(config.
getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigPolicy]))

Expand Down Expand Up @@ -243,6 +246,7 @@ class ControllerServer(
setMetrics(quorumControllerMetrics).
setCreateTopicPolicy(createTopicPolicy.toJava).
setAlterConfigPolicy(alterConfigPolicy.toJava).
setAlterConfigV2Policy(alterConfigV2Policy.toJava).
setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata).
Expand Down Expand Up @@ -478,6 +482,7 @@ class ControllerServer(
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin"))
createTopicPolicy.foreach(policy => Utils.closeQuietly(policy, "create topic policy"))
alterConfigPolicy.foreach(policy => Utils.closeQuietly(policy, "alter config policy"))
alterConfigV2Policy.foreach(policy => Utils.closeQuietly(policy, "alter config v2 policy"))
socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
CoreUtils.swallow(config.dynamicConfig.clear(), this)
sharedServer.stopForController()
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
// warn if create.topic.policy.class.name or alter.config.policy.class.name is defined in the broker role
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG)
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG)
warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, ServerLogConfigs.ALTER_CONFIG_V2_POLICY_CLASS_NAME_CONFIG)
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
// KRaft controller-only
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
import org.apache.kafka.server.policy.AlterConfigV2Policy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class ConfigurationControlManager {
private final KafkaConfigSchema configSchema;
private final Consumer<ConfigResource> existenceChecker;
private final Optional<AlterConfigPolicy> alterConfigPolicy;
private final Optional<AlterConfigV2Policy> alterConfigV2Policy;
private final ConfigurationValidator validator;
private final TimelineHashMap<ConfigResource, TimelineHashMap<String, String>> configData;
private final TimelineHashSet<Integer> brokersWithConfigs;
Expand All @@ -84,6 +86,7 @@ static class Builder {
private KafkaConfigSchema configSchema = null;
private Consumer<ConfigResource> existenceChecker = __ -> { };
private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
private Optional<AlterConfigV2Policy> alterConfigV2Policy = Optional.empty();
private ConfigurationValidator validator = ConfigurationValidator.NO_OP;
private Map<String, Object> staticConfig = Map.of();
private int nodeId = 0;
Expand Down Expand Up @@ -114,6 +117,11 @@ Builder setAlterConfigPolicy(Optional<AlterConfigPolicy> alterConfigPolicy) {
return this;
}

Builder setAlterConfigV2Policy(Optional<AlterConfigV2Policy> alterConfigV2Policy) {
this.alterConfigV2Policy = alterConfigV2Policy;
return this;
}

Builder setValidator(ConfigurationValidator validator) {
this.validator = validator;
return this;
Expand Down Expand Up @@ -149,6 +157,7 @@ ConfigurationControlManager build() {
configSchema,
existenceChecker,
alterConfigPolicy,
alterConfigV2Policy,
validator,
staticConfig,
nodeId,
Expand All @@ -161,6 +170,7 @@ private ConfigurationControlManager(LogContext logContext,
KafkaConfigSchema configSchema,
Consumer<ConfigResource> existenceChecker,
Optional<AlterConfigPolicy> alterConfigPolicy,
Optional<AlterConfigV2Policy> alterConfigV2Policy,
ConfigurationValidator validator,
Map<String, Object> staticConfig,
int nodeId,
Expand All @@ -171,6 +181,7 @@ private ConfigurationControlManager(LogContext logContext,
this.configSchema = configSchema;
this.existenceChecker = existenceChecker;
this.alterConfigPolicy = alterConfigPolicy;
this.alterConfigV2Policy = alterConfigV2Policy;
this.validator = validator;
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersWithConfigs = new TimelineHashSet<>(snapshotRegistry, 0);
Expand Down Expand Up @@ -369,7 +380,12 @@ private ApiError validateAlterConfig(
if (!newlyCreatedResource) {
existenceChecker.accept(configResource);
}
if (alterConfigPolicy.isPresent()) {
if (alterConfigV2Policy.isPresent()) {
alterConfigV2Policy.get().validate(new AlterConfigV2Policy.RequestMetadata(
configResource,
Map.copyOf(existingConfigsMap),
Map.copyOf(allConfigs)));
} else if (alterConfigPolicy.isPresent()) {
alterConfigPolicy.get().validate(new RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck));
}
} catch (ConfigException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigV2Policy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
Expand Down Expand Up @@ -208,6 +209,7 @@ public static class Builder {
private QuorumControllerMetrics controllerMetrics = null;
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
private Optional<AlterConfigV2Policy> alterConfigV2Policy = Optional.empty();
private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
private Map<String, Object> staticConfig = Map.of();
private BootstrapMetadata bootstrapMetadata = null;
Expand Down Expand Up @@ -353,6 +355,11 @@ public Builder setAlterConfigPolicy(Optional<AlterConfigPolicy> alterConfigPolic
return this;
}

public Builder setAlterConfigV2Policy(Optional<AlterConfigV2Policy> alterConfigV2Policy) {
this.alterConfigV2Policy = alterConfigV2Policy;
return this;
}

public Builder setConfigurationValidator(ConfigurationValidator configurationValidator) {
this.configurationValidator = configurationValidator;
return this;
Expand Down Expand Up @@ -443,6 +450,7 @@ public QuorumController build() throws Exception {
controllerMetrics,
createTopicPolicy,
alterConfigPolicy,
alterConfigV2Policy,
configurationValidator,
staticConfig,
bootstrapMetadata,
Expand Down Expand Up @@ -1489,6 +1497,7 @@ private QuorumController(
QuorumControllerMetrics controllerMetrics,
Optional<CreateTopicPolicy> createTopicPolicy,
Optional<AlterConfigPolicy> alterConfigPolicy,
Optional<AlterConfigV2Policy> alterConfigV2Policy,
ConfigurationValidator configurationValidator,
Map<String, Object> staticConfig,
BootstrapMetadata bootstrapMetadata,
Expand Down Expand Up @@ -1548,6 +1557,7 @@ private QuorumController(
setKafkaConfigSchema(configSchema).
setExistenceChecker(resourceExists).
setAlterConfigPolicy(alterConfigPolicy).
setAlterConfigV2Policy(alterConfigV2Policy).
setValidator(configurationValidator).
setStaticConfig(staticConfig).
setNodeId(nodeId).
Expand Down
Loading