Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37126] Add Validator for Autoscaler #936

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
Expand All @@ -41,6 +43,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -51,6 +54,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM;

Expand All @@ -69,6 +73,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService scalingThreadPool;
private final UnmodifiableConfiguration baseConf;
private final AutoscalerValidator autoscalerValidator;

/**
* Maintain a set of job keys that during scaling, it should be accessed at {@link
Expand Down Expand Up @@ -104,6 +109,7 @@ public StandaloneAutoscalerExecutor(
parallelism, new ExecutorThreadFactory("autoscaler-standalone-scaling"));
this.scalingJobKeys = new HashSet<>();
this.baseConf = new UnmodifiableConfiguration(conf);
this.autoscalerValidator = new DefaultAutoscalerValidator();
}

public void start() {
Expand Down Expand Up @@ -189,7 +195,19 @@ private void cleanupStoppedJob(Collection<Context> jobList) {
protected void scalingSingleJob(Context jobContext) {
try {
MDC.put("job.key", jobContext.getJobKey().toString());
autoScaler.scale(jobContext);
Optional<String> validationError =
autoscalerValidator.validateAutoscalerOptions(jobContext.getConfiguration());
if (validationError.isPresent()) {
eventHandler.handleEvent(
jobContext,
AutoScalerEventHandler.Type.Warning,
"AutoScaler Options Validation",
validationError.get(),
null,
baseConf.get(SCALING_EVENT_INTERVAL));
} else {
autoScaler.scale(jobContext);
}
} catch (Throwable e) {
LOG.error("Error while scaling job", e);
eventHandler.handleException(jobContext, AUTOSCALER_ERROR, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.standalone;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.configuration.Configuration;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import static org.assertj.core.api.Assertions.assertThat;

class StandaloneAutoscalerValidatorTest {
private List<JobAutoScalerContext<JobID>> jobList;
private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
private ConcurrentHashMap<JobID, Integer> scaleCounter;
private Configuration correctConfiguration;
private Configuration invalidConfiguration;

@BeforeEach
void setUp() {
jobList = new ArrayList<>();
eventCollector = new TestingEventCollector<>();
scaleCounter = new ConcurrentHashMap<>();

correctConfiguration = new Configuration();
correctConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);

invalidConfiguration = new Configuration();
invalidConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
invalidConfiguration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, -1.0);
}

@Test
void testAutoScalerWithInvalidConfig() {
JobAutoScalerContext<JobID> validJob = createJobAutoScalerContext(correctConfiguration);
JobAutoScalerContext<JobID> invalidJob = createJobAutoScalerContext(invalidConfiguration);

jobList.add(validJob);
jobList.add(invalidJob);

final var jobAutoScaler =
new JobAutoScaler<JobID, JobAutoScalerContext<JobID>>() {
@Override
public void scale(JobAutoScalerContext<JobID> context) {
scaleCounter.merge(context.getJobKey(), 1, Integer::sum);
}

@Override
public void cleanup(JobAutoScalerContext<JobID> context) {
// No cleanup required for the test
}
};

try (var autoscalerExecutor =
new StandaloneAutoscalerExecutor<>(
new Configuration(), baseConf -> jobList, eventCollector, jobAutoScaler)) {

List<CompletableFuture<Void>> scaledFutures = autoscalerExecutor.scaling();

// Verification triggers two scaling tasks
assertThat(scaledFutures).hasSize(2);

// Only legally configured tasks are scaled
assertThat(scaleCounter).hasSize(1).containsKey(validJob.getJobKey());

// Verification Event Collector captures an event
assertThat(eventCollector.events).hasSize(1);
assertThat(eventCollector.events)
.allMatch(event -> event.getContext().equals(invalidJob));
}
}

private JobAutoScalerContext<JobID> createJobAutoScalerContext(Configuration configuration) {
JobID jobID = new JobID();
return new JobAutoScalerContext<>(
jobID, jobID, JobStatus.RUNNING, configuration, null, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.validation;

import org.apache.flink.configuration.Configuration;

import java.util.Optional;

/** Validator for Autoscaler. */
public interface AutoscalerValidator {

/**
* Validate autoscaler config and return optional error.
*
* @param flinkConf autoscaler config
* @return Optional error string, should be present iff validation resulted in an error
*/
Optional<String> validateAutoscalerOptions(Configuration flinkConf);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.validation;

import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.utils.CalendarUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;

import java.util.Optional;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;

/** Default implementation of {@link AutoscalerValidator}. */
public class DefaultAutoscalerValidator implements AutoscalerValidator {

public Optional<String> validateAutoscalerOptions(Configuration flinkConf) {

if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
return Optional.empty();
}
return firstPresent(
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d),
validateNumber(flinkConf, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
validateNumber(flinkConf, UTILIZATION_MAX, flinkConf.get(UTILIZATION_TARGET), 1.0d),
validateNumber(flinkConf, UTILIZATION_MIN, 0.0d, flinkConf.get(UTILIZATION_TARGET)),
CalendarUtils.validateExcludedPeriods(flinkConf));
}

@SafeVarargs
private static Optional<String> firstPresent(Optional<String>... errOpts) {
for (Optional<String> opt : errOpts) {
if (opt.isPresent()) {
return opt;
}
}
return Optional.empty();
}

private static <T extends Number> Optional<String> validateNumber(
Configuration flinkConfiguration,
ConfigOption<T> autoScalerConfig,
Double min,
Double max) {
try {
var configValue = flinkConfiguration.get(autoScalerConfig);
if (configValue != null) {
double value = configValue.doubleValue();
if ((min != null && value < min) || (max != null && value > max)) {
return Optional.of(
String.format(
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
autoScalerConfig.key(),
min != null ? min.toString() : "-Infinity",
max != null ? max.toString() : "+Infinity"));
}
}
return Optional.empty();
} catch (IllegalArgumentException e) {
return Optional.of(
String.format(
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
}
}

private static <T extends Number> Optional<String> validateNumber(
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.flink.kubernetes.operator.validation;

import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.utils.CalendarUtils;
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -65,10 +64,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;

/** Default validator implementation for {@link FlinkDeployment}. */
public class DefaultValidator implements FlinkResourceValidator {

Expand All @@ -87,9 +82,11 @@ public class DefaultValidator implements FlinkResourceValidator {
Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);

private final FlinkConfigManager configManager;
private final AutoscalerValidator autoscalerValidator;

public DefaultValidator(FlinkConfigManager configManager) {
this.configManager = configManager;
this.autoscalerValidator = new DefaultAutoscalerValidator();
}

@Override
Expand Down Expand Up @@ -597,62 +594,12 @@ private Optional<String> validateServiceAccount(String serviceAccount) {
return Optional.empty();
}

public static Optional<String> validateAutoScalerFlinkConfiguration(
public Optional<String> validateAutoScalerFlinkConfiguration(
Map<String, String> effectiveConfig) {
if (effectiveConfig == null) {
return Optional.empty();
}
Configuration flinkConfiguration = Configuration.fromMap(effectiveConfig);
if (!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
return Optional.empty();
}
return firstPresent(
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 1.0d),
validateNumber(
flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
validateNumber(
flinkConfiguration,
UTILIZATION_MAX,
flinkConfiguration.get(UTILIZATION_TARGET),
1.0d),
validateNumber(
flinkConfiguration,
UTILIZATION_MIN,
0.0d,
flinkConfiguration.get(UTILIZATION_TARGET)),
CalendarUtils.validateExcludedPeriods(flinkConfiguration));
}

private static <T extends Number> Optional<String> validateNumber(
Configuration flinkConfiguration,
ConfigOption<T> autoScalerConfig,
Double min,
Double max) {
try {
var configValue = flinkConfiguration.get(autoScalerConfig);
if (configValue != null) {
double value = configValue.doubleValue();
if ((min != null && value < min) || (max != null && value > max)) {
return Optional.of(
String.format(
"The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]",
autoScalerConfig.key(),
min != null ? min.toString() : "-Infinity",
max != null ? max.toString() : "+Infinity"));
}
}
return Optional.empty();
} catch (IllegalArgumentException e) {
return Optional.of(
String.format(
"Invalid value in the autoscaler config %s", autoScalerConfig.key()));
}
}

private static <T extends Number> Optional<String> validateNumber(
Configuration flinkConfiguration, ConfigOption<T> autoScalerConfig, Double min) {
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
return autoscalerValidator.validateAutoscalerOptions(flinkConfiguration);
}
}
Loading