From 34b225453d2dee7e11c28d84bc18de114323e5a6 Mon Sep 17 00:00:00 2001 From: Shuang Date: Sun, 4 Feb 2024 22:35:32 +0800 Subject: [PATCH] Add a config to disable/enable dynamic config refresher --- .../apache/celeborn/common/CelebornConf.scala | 18 +++++++++++++----- .../service/config/BaseConfigServiceImpl.java | 13 +++++++------ 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 0be9db50ab..10532eee94 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -367,7 +367,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se } def dynamicConfigStoreBackend: String = get(DYNAMIC_CONFIG_STORE_BACKEND) - def dynamicConfigRefreshInterval: Long = get(DYNAMIC_CONFIG_REFRESH_INTERVAL).getOrElse(-1L) + def dynamicConfigRefreshEnabled: Boolean = get(DYNAMIC_CONFIG_REFRESH_ENABLED) + def dynamicConfigRefreshInterval: Long = get(DYNAMIC_CONFIG_REFRESH_INTERVAL) def dynamicConfigStoreDbFetchPageSize: Int = get(DYNAMIC_CONFIG_STORE_DB_FETCH_PAGE_SIZE) def dynamicConfigStoreDbHikariDriverClassName: String = get(DYNAMIC_CONFIG_STORE_DB_HIKARI_DRIVER_CLASS_NAME) @@ -4392,14 +4393,21 @@ object CelebornConf extends Logging { .checkValues(Set("FS", "DB")) .createWithDefault("FS") - val DYNAMIC_CONFIG_REFRESH_INTERVAL: OptionalConfigEntry[Long] = + val DYNAMIC_CONFIG_REFRESH_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.dynamicConfig.refresh.enabled") + .categories("master", "worker") + .version("0.5.0") + .doc("Whether to enable configuration refresher.") + .booleanConf + .createWithDefault(false) + + val DYNAMIC_CONFIG_REFRESH_INTERVAL: ConfigEntry[Long] = buildConf("celeborn.dynamicConfig.refresh.interval") .categories("master", "worker") .version("0.4.0") - .doc("Interval for refreshing the corresponding dynamic config periodically, None means disable the " + - "corresponding config refresh") + .doc("Interval for refreshing the corresponding dynamic config periodically.") .timeConf(TimeUnit.MILLISECONDS) - .createOptional + .createWithDefaultString("120s") val DYNAMIC_CONFIG_STORE_DB_FETCH_PAGE_SIZE: ConfigEntry[Int] = buildConf("celeborn.dynamicConfig.store.db.fetch.pageSize") diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java index 0b12e0be14..b61bc367e5 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java @@ -38,7 +38,6 @@ public abstract class BaseConfigServiceImpl implements ConfigService { new AtomicReference<>(); protected final AtomicReference> tenantConfigAtomicReference = new AtomicReference<>(new HashMap<>()); - protected final long dynamicConfigRefreshTime; private final ScheduledExecutorService configRefreshService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher"); @@ -47,18 +46,20 @@ public BaseConfigServiceImpl(CelebornConf celebornConf) throws IOException { this.celebornConf = celebornConf; this.systemConfigAtomicReference.set(new SystemConfig(celebornConf)); this.refreshAllCache(); - this.dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshInterval(); - if (dynamicConfigRefreshTime > 0) { + boolean dynamicConfigRefreshEnabled = celebornConf.dynamicConfigRefreshEnabled(); + if (dynamicConfigRefreshEnabled) { + LOG.info("Celeborn config refresher is enabled."); + long dynamicConfigRefreshInterval = celebornConf.dynamicConfigRefreshInterval(); this.configRefreshService.scheduleWithFixedDelay( () -> { try { refreshAllCache(); } catch (Throwable e) { - LOG.error("Refresh configuration encounter exception: {}", e.getMessage(), e); + LOG.error("Refresh config encounter exception: {}", e.getMessage(), e); } }, - dynamicConfigRefreshTime, - dynamicConfigRefreshTime, + dynamicConfigRefreshInterval, + dynamicConfigRefreshInterval, TimeUnit.MILLISECONDS); } else { LOG.info(