Skip to content

Commit

Permalink
[FLINK-37126] Add Validator for Autoscaler
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanfeng committed Jan 20, 2025
1 parent 4c2c90c commit 439229b
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
Expand All @@ -41,6 +43,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -51,6 +54,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM;

Expand All @@ -69,6 +73,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService scalingThreadPool;
private final UnmodifiableConfiguration baseConf;
private final AutoscalerValidator autoscalerValidator;

/**
* Maintain a set of job keys that during scaling, it should be accessed at {@link
Expand Down Expand Up @@ -104,6 +109,7 @@ public StandaloneAutoscalerExecutor(
parallelism, new ExecutorThreadFactory("autoscaler-standalone-scaling"));
this.scalingJobKeys = new HashSet<>();
this.baseConf = new UnmodifiableConfiguration(conf);
this.autoscalerValidator = new DefaultAutoscalerValidator();
}

public void start() {
Expand Down Expand Up @@ -189,7 +195,19 @@ private void cleanupStoppedJob(Collection<Context> jobList) {
protected void scalingSingleJob(Context jobContext) {
try {
MDC.put("job.key", jobContext.getJobKey().toString());
autoScaler.scale(jobContext);
Optional<String> validationError =
autoscalerValidator.validateAutoscalerOptions(jobContext.getConfiguration());
if (validationError.isPresent()) {
eventHandler.handleEvent(
jobContext,
AutoScalerEventHandler.Type.Warning,
"AutoScaler Options Validation",
validationError.get(),
null,
baseConf.get(SCALING_EVENT_INTERVAL));
} else {
autoScaler.scale(jobContext);
}
} catch (Throwable e) {
LOG.error("Error while scaling job", e);
eventHandler.handleException(jobContext, AUTOSCALER_ERROR, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.standalone;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.configuration.Configuration;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import static org.assertj.core.api.Assertions.assertThat;

class StandaloneAutoscalerValidatorTest {
@Test
public void testAutoScalerWithInvalidConfig() {
var jobList = new ArrayList<JobAutoScalerContext<JobID>>();
var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();

Configuration correctConfiguration = new Configuration();
correctConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
Configuration invalidConfiguration = new Configuration();
invalidConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
invalidConfiguration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, -1.);

var correctConfigurationJob = createJobAutoScalerContextWithConf(correctConfiguration);
var illegalConfigurationJob = createJobAutoScalerContextWithConf(invalidConfiguration);
var scaleCounter = new ConcurrentHashMap<JobID, Integer>();

try (var autoscalerExecutor =
new StandaloneAutoscalerExecutor<>(
new Configuration(),
baseConf -> jobList,
eventCollector,
new JobAutoScaler<>() {
@Override
public void scale(JobAutoScalerContext<JobID> context) {
scaleCounter.put(
context.getJobKey(),
scaleCounter.getOrDefault(context.getJobKey(), 0) + 1);
}

@Override
public void cleanup(JobAutoScalerContext<JobID> context) {
// do nothing
}
})) {
jobList.add(correctConfigurationJob);
jobList.add(illegalConfigurationJob);
List<CompletableFuture<Void>> scaledFutures = autoscalerExecutor.scaling();

assertThat(scaledFutures).hasSize(2);
assertThat(scaleCounter).hasSize(1);

assertThat(eventCollector.events).size().isEqualTo(1);
assertThat(eventCollector.events)
.allMatch(event -> event.getContext().equals(illegalConfigurationJob));
}
}

private static JobAutoScalerContext<JobID> createJobAutoScalerContextWithConf(
Configuration configuration) {
var jobID = new JobID();
return new JobAutoScalerContext<>(
jobID, jobID, JobStatus.RUNNING, configuration, null, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.validation;

import org.apache.flink.configuration.Configuration;

import java.util.Optional;

/** Validator for Autoscaler. */
public interface AutoscalerValidator {

/**
* Validate autoscaler config and return optional error.
*
* @param flinkConf autoscaler config
* @return Optional error string, should be present iff validation resulted in an error
*/
Optional<String> validateAutoscalerOptions(Configuration flinkConf);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.validation;

import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.utils.CalendarUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;

import java.util.Optional;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;

/** Default implementation of {@link AutoscalerValidator}. */
public class DefaultAutoscalerValidator implements AutoscalerValidator {

public Optional<String> validateAutoscalerOptions(Configuration flinkConf) {

if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
return Optional.empty();
}
return firstPresent(
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d),
validateNumber(flinkConf, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
validateNumber(flinkConf, UTILIZATION_MAX, flinkConf.get(UTILIZATION_TARGET), 1.0d),
validateNumber(flinkConf, UTILIZATION_MIN, 0.0d, flinkConf.get(UTILIZATION_TARGET)),
CalendarUtils.validateExcludedPeriods(flinkConf));
}

@SafeVarargs
private static Optional<String> firstPresent(Optional<String>... errOpts) {
for (Optional<String> opt : errOpts) {
if (opt.isPresent()) {
return opt;
}
}
return Optional.empty();
}

private static <T extends Number> Optional<String> validateNumber(
Configuration flinkConfiguration,
ConfigOption<T> autoScalerConfig,
Double min,
Double max) {
try {
var configValue = flinkConfiguration.get(autoScalerConfig);
if (configValue != null) {
double value = configValue.doubleValue();
if ((min != null && value < min) || (max != null && value > max)) {
return Optional.of(
String.format(
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
autoScalerConfig.key(),
min != null ? min.toString() : "-Infinity",
max != null ? max.toString() : "+Infinity"));
}
}
return Optional.empty();
} catch (IllegalArgumentException e) {
return Optional.of(
String.format(
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
}
}

private static <T extends Number> Optional<String> validateNumber(
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.flink.kubernetes.operator.validation;

import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.utils.CalendarUtils;
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -65,10 +64,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;

/** Default validator implementation for {@link FlinkDeployment}. */
public class DefaultValidator implements FlinkResourceValidator {

Expand All @@ -87,9 +82,11 @@ public class DefaultValidator implements FlinkResourceValidator {
Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);

private final FlinkConfigManager configManager;
private final AutoscalerValidator autoscalerValidator;

public DefaultValidator(FlinkConfigManager configManager) {
this.configManager = configManager;
this.autoscalerValidator = new DefaultAutoscalerValidator();
}

@Override
Expand Down Expand Up @@ -597,62 +594,12 @@ private Optional<String> validateServiceAccount(String serviceAccount) {
return Optional.empty();
}

public static Optional<String> validateAutoScalerFlinkConfiguration(
public Optional<String> validateAutoScalerFlinkConfiguration(
Map<String, String> effectiveConfig) {
if (effectiveConfig == null) {
return Optional.empty();
}
Configuration flinkConfiguration = Configuration.fromMap(effectiveConfig);
if (!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
return Optional.empty();
}
return firstPresent(
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 1.0d),
validateNumber(
flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
validateNumber(
flinkConfiguration,
UTILIZATION_MAX,
flinkConfiguration.get(UTILIZATION_TARGET),
1.0d),
validateNumber(
flinkConfiguration,
UTILIZATION_MIN,
0.0d,
flinkConfiguration.get(UTILIZATION_TARGET)),
CalendarUtils.validateExcludedPeriods(flinkConfiguration));
}

private static <T extends Number> Optional<String> validateNumber(
Configuration flinkConfiguration,
ConfigOption<T> autoScalerConfig,
Double min,
Double max) {
try {
var configValue = flinkConfiguration.get(autoScalerConfig);
if (configValue != null) {
double value = configValue.doubleValue();
if ((min != null && value < min) || (max != null && value > max)) {
return Optional.of(
String.format(
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
autoScalerConfig.key(),
min != null ? min.toString() : "-Infinity",
max != null ? max.toString() : "+Infinity"));
}
}
return Optional.empty();
} catch (IllegalArgumentException e) {
return Optional.of(
String.format(
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
}
}

private static <T extends Number> Optional<String> validateNumber(
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
return autoscalerValidator.validateAutoscalerOptions(flinkConfiguration);
}
}

0 comments on commit 439229b

Please sign in to comment.