Skip to content

Commit

Permalink
Add a config to disable/enable dynamic config refresher
Browse files Browse the repository at this point in the history
  • Loading branch information
RexXiong committed Feb 4, 2024
1 parent 641ff15 commit 34b2254
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
}

def dynamicConfigStoreBackend: String = get(DYNAMIC_CONFIG_STORE_BACKEND)
def dynamicConfigRefreshInterval: Long = get(DYNAMIC_CONFIG_REFRESH_INTERVAL).getOrElse(-1L)
def dynamicConfigRefreshEnabled: Boolean = get(DYNAMIC_CONFIG_REFRESH_ENABLED)
def dynamicConfigRefreshInterval: Long = get(DYNAMIC_CONFIG_REFRESH_INTERVAL)
def dynamicConfigStoreDbFetchPageSize: Int = get(DYNAMIC_CONFIG_STORE_DB_FETCH_PAGE_SIZE)
def dynamicConfigStoreDbHikariDriverClassName: String =
get(DYNAMIC_CONFIG_STORE_DB_HIKARI_DRIVER_CLASS_NAME)
Expand Down Expand Up @@ -4392,14 +4393,21 @@ object CelebornConf extends Logging {
.checkValues(Set("FS", "DB"))
.createWithDefault("FS")

val DYNAMIC_CONFIG_REFRESH_INTERVAL: OptionalConfigEntry[Long] =
val DYNAMIC_CONFIG_REFRESH_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.dynamicConfig.refresh.enabled")
.categories("master", "worker")
.version("0.5.0")
.doc("Whether to enable configuration refresher.")
.booleanConf
.createWithDefault(false)

val DYNAMIC_CONFIG_REFRESH_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.dynamicConfig.refresh.interval")
.categories("master", "worker")
.version("0.4.0")
.doc("Interval for refreshing the corresponding dynamic config periodically, None means disable the " +
"corresponding config refresh")
.doc("Interval for refreshing the corresponding dynamic config periodically.")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
.createWithDefaultString("120s")

val DYNAMIC_CONFIG_STORE_DB_FETCH_PAGE_SIZE: ConfigEntry[Int] =
buildConf("celeborn.dynamicConfig.store.db.fetch.pageSize")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public abstract class BaseConfigServiceImpl implements ConfigService {
new AtomicReference<>();
protected final AtomicReference<Map<String, TenantConfig>> tenantConfigAtomicReference =
new AtomicReference<>(new HashMap<>());
protected final long dynamicConfigRefreshTime;

private final ScheduledExecutorService configRefreshService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher");
Expand All @@ -47,18 +46,20 @@ public BaseConfigServiceImpl(CelebornConf celebornConf) throws IOException {
this.celebornConf = celebornConf;
this.systemConfigAtomicReference.set(new SystemConfig(celebornConf));
this.refreshAllCache();
this.dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshInterval();
if (dynamicConfigRefreshTime > 0) {
boolean dynamicConfigRefreshEnabled = celebornConf.dynamicConfigRefreshEnabled();
if (dynamicConfigRefreshEnabled) {
LOG.info("Celeborn config refresher is enabled.");
long dynamicConfigRefreshInterval = celebornConf.dynamicConfigRefreshInterval();
this.configRefreshService.scheduleWithFixedDelay(
() -> {
try {
refreshAllCache();
} catch (Throwable e) {
LOG.error("Refresh configuration encounter exception: {}", e.getMessage(), e);
LOG.error("Refresh config encounter exception: {}", e.getMessage(), e);
}
},
dynamicConfigRefreshTime,
dynamicConfigRefreshTime,
dynamicConfigRefreshInterval,
dynamicConfigRefreshInterval,
TimeUnit.MILLISECONDS);
} else {
LOG.info(
Expand Down

0 comments on commit 34b2254

Please sign in to comment.