diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 93d53fbe0..730e16847 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -50,13 +50,13 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -87,7 +87,7 @@ public class FlinkConfigManager { private final AtomicLong defaultConfigVersion = new AtomicLong(0); private final LoadingCache cache; private final Consumer> namespaceListener; - private volatile Map> relevantFlinkVersionPrefixes; + private volatile ConcurrentHashMap> relevantFlinkVersionPrefixes; protected static final Pattern FLINK_VERSION_PATTERN = Pattern.compile( @@ -114,7 +114,7 @@ public FlinkConfigManager( this.namespaceListener = namespaceListener; Duration cacheTimeout = defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT); - this.relevantFlinkVersionPrefixes = new HashMap<>(); + this.relevantFlinkVersionPrefixes = new ConcurrentHashMap<>(); this.cache = CacheBuilder.newBuilder() .maximumSize( @@ -189,7 +189,7 @@ public void updateDefaultConfig(Configuration newConf) { // We clear the cached relevant Flink version prefixes as the base config may include new // version overrides. // This will trigger a regeneration of the prefixes in the next call to getDefaultConfig. - relevantFlinkVersionPrefixes = new HashMap<>(); + relevantFlinkVersionPrefixes = new ConcurrentHashMap<>(); } /** diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java index fcd55157e..b6bca056d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES; @@ -380,4 +381,44 @@ public void testVersionNamespaceDefaultConfs() { assertEquals("v3", observeConfig.get("conf3")); assertEquals("false", observeConfig.get("conf0")); } + + @Test + public void testConcurrentDefaultConfig() throws InterruptedException { + var opConf = new Configuration(); + var configManager = new FlinkConfigManager(opConf); + var completed1 = new AtomicBoolean(); + var completed2 = new AtomicBoolean(); + var completed3 = new AtomicBoolean(); + + var t1 = + new Thread( + () -> { + configManager.getDefaultConfig("ns1", FlinkVersion.v1_18); + completed1.set(true); + }); + var t2 = + new Thread( + () -> { + configManager.getDefaultConfig("ns1", FlinkVersion.v1_18); + completed2.set(true); + }); + var t3 = + new Thread( + () -> { + configManager.getDefaultConfig("ns1", FlinkVersion.v1_18); + completed3.set(true); + }); + + t1.start(); + t2.start(); + t3.start(); + + t1.join(); + t2.join(); + t3.join(); + + assertTrue(completed1.get()); + assertTrue(completed2.get()); + assertTrue(completed3.get()); + } }