Skip to content

Commit

Permalink
[CELEBORN-1052][FOLLOWUP] Introduce dynamic ConfigService at SystemLe…
Browse files Browse the repository at this point in the history
…vel and TenantLevel
  • Loading branch information
SteNicholas committed Dec 1, 2023
1 parent 8a15396 commit d344966
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4166,17 +4166,18 @@ object CelebornConf extends Logging {
val DYNAMIC_CONFIG_STORE_BACKEND: ConfigEntry[String] =
buildConf("celeborn.dynamicConfig.store.backend")
.categories("master", "worker")
.doc("Store backend for dynamic config, NONE means disabling dynamic config store")
.doc("Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store.")
.version("0.4.0")
.stringConf
.checkValues(Set("FS", "NONE"))
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set("NONE", "FS"))
.createWithDefault("NONE")

val DYNAMIC_CONFIG_REFRESH_TIME: ConfigEntry[Long] =
buildConf("celeborn.dynamicConfig.refresh.time")
.categories("master", "worker")
.version("0.4.0")
.doc("The time interval for refreshing the corresponding dynamic config periodically")
.doc("The time interval for refreshing the corresponding dynamic config periodically.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
}
4 changes: 2 additions & 2 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ license: |
<!--begin-include-->
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically | 0.4.0 |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config, NONE means disabling dynamic config store | 0.4.0 |
| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically. | 0.4.0 |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store. | 0.4.0 |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 |
| celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.3.0 |
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ license: |
<!--begin-include-->
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically | 0.4.0 |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config, NONE means disabling dynamic config store | 0.4.0 |
| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically. | 0.4.0 |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store. | 0.4.0 |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.celeborn.server.common.service.config;

public interface ConfigService {
public interface ConfigService extends AutoCloseable {

SystemConfig getSystemConfig();

Expand All @@ -33,7 +33,4 @@ default DynamicConfig getTenantConfig(String tenantId) {
}

void refreshAllCache();

void shutdown();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@

package org.apache.celeborn.server.common.service.config;

import org.apache.celeborn.common.internal.config.ConfigEntry;
import org.apache.celeborn.common.util.Utils;
import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import org.apache.celeborn.common.internal.config.ConfigEntry;
import org.apache.celeborn.common.util.Utils;

/**
* Dynamic configuration is a type of configuration that can be changed at runtime as needed. It can be used at system level/tenant level.
* When applying dynamic configuration, the priority order is as follows: tenant level overrides system level,
* which in turn overrides static configuration(CelebornConf). This means that if a configuration is defined at the tenant level,
* it will be used instead of the system level or static configuration(CelebornConf). If the tenant-level configuration is missing,
* the system-level configuration will be used. If the system-level configuration is also missing, CelebornConf
* will be used as the default value.
* Dynamic configuration is a type of configuration that can be changed at runtime as needed. It can
* be used at system level/tenant level. When applying dynamic configuration, the priority order is
* as follows: tenant level overrides system level, which in turn overrides static
* configuration(CelebornConf). This means that if a configuration is defined at the tenant level,
* it will be used instead of the system level or static configuration(CelebornConf). If the
* tenant-level configuration is missing, the system-level configuration will be used. If the
* system-level configuration is also missing, CelebornConf will be used as the default value.
*/
public abstract class DynamicConfig {
private static final Logger LOG = LoggerFactory.getLogger(DynamicConfig.class);
Expand All @@ -42,26 +44,35 @@ public abstract class DynamicConfig {
public <T> T getWithDefaultValue(
String configKey, T defaultValue, Class<T> finalType, ConfigType configType) {
String configValue = configs.get(configKey);
T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
T formatValue =
configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
if (formatValue == null) {
return defaultValue;
} else {
return formatValue;
}
}

public <T> T getValue(String configKey, ConfigEntry<Object> configEntry, Class<T> finalType, ConfigType configType) {
public <T> T getValue(
String configKey,
ConfigEntry<Object> configEntry,
Class<T> finalType,
ConfigType configType) {
String configValue = configs.get(configKey);
T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
T formatValue =
configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
if (formatValue == null) {
DynamicConfig parentLevelConfig = getParentLevelConfig();
return parentLevelConfig != null? parentLevelConfig.getValue(configKey, configEntry, finalType, configType): null;
return parentLevelConfig != null
? parentLevelConfig.getValue(configKey, configEntry, finalType, configType)
: null;
} else {
return formatValue;
}
}

public <T> T formatValue(String configKey, String configValue, Class<T> finalType, ConfigType configType) {
public <T> T formatValue(
String configKey, String configValue, Class<T> finalType, ConfigType configType) {
try {
if (configValue != null) {
if (ConfigType.BYTES == configType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ public static ConfigService getConfigService(CelebornConf celebornConf) {

return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.celeborn.server.common.service.config;

import org.apache.celeborn.common.util.ThreadUtils;

import java.io.File;
import java.io.FileInputStream;
import java.util.HashMap;
Expand All @@ -30,19 +28,21 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import scala.concurrent.duration.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import org.apache.celeborn.common.CelebornConf;

import scala.concurrent.duration.Duration;
import org.apache.celeborn.common.util.ThreadUtils;

public class FsConfigServiceImpl implements ConfigService {
private static final Logger LOG = LoggerFactory.getLogger(FsConfigServiceImpl.class);
private CelebornConf celebornConf;
private final CelebornConf celebornConf;
private final AtomicReference<SystemConfig> systemConfigAtomicReference = new AtomicReference<>();
private final AtomicReference<Map<String, TenantConfig>> tenantConfigAtomicReference = new AtomicReference<>(new HashMap<>());
private final AtomicReference<Map<String, TenantConfig>> tenantConfigAtomicReference =
new AtomicReference<>(new HashMap<>());
private static final String CONF_TENANT_ID = "tenantId";
private static final String CONF_LEVEL = "level";
private static final String CONF_CONFIG = "config";
Expand All @@ -55,10 +55,7 @@ public FsConfigServiceImpl(CelebornConf celebornConf) {
this.refresh();
long dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshTime();
this.configRefreshService.scheduleWithFixedDelay(
() -> refresh(),
dynamicConfigRefreshTime,
dynamicConfigRefreshTime,
TimeUnit.MILLISECONDS);
this::refresh, dynamicConfigRefreshTime, dynamicConfigRefreshTime, TimeUnit.MILLISECONDS);
}

private synchronized void refresh() {
Expand All @@ -78,7 +75,7 @@ private synchronized void refresh() {
Map<String, String> config =
((Map<String, Object>) settings.get(CONF_CONFIG))
.entrySet().stream()
.collect(Collectors.toMap(a -> a.getKey(), a -> a.getValue().toString()));
.collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString()));
if (ConfigLevel.TENANT.name().equals(level)) {
TenantConfig tenantConfig = new TenantConfig(this, tenantId, config);
tenantConfs.put(tenantId, tenantConfig);
Expand All @@ -88,10 +85,12 @@ private synchronized void refresh() {
}
} catch (Exception e) {
LOG.warn("Refresh dynamic config error: {}", e.getMessage(), e);
return;
}

tenantConfigAtomicReference.set(tenantConfs);
systemConfigAtomicReference.set(systemConfig == null ? new SystemConfig(celebornConf) : systemConfig);
systemConfigAtomicReference.set(
systemConfig == null ? new SystemConfig(celebornConf) : systemConfig);
}

@Override
Expand All @@ -110,7 +109,7 @@ public void refreshAllCache() {
}

@Override
public void shutdown() {
public void close() {
ThreadUtils.shutdown(configRefreshService, Duration.apply("800ms"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.celeborn.server.common.service.config;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.internal.config.ConfigEntry;

import java.util.HashMap;
import java.util.Map;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.internal.config.ConfigEntry;

public class SystemConfig extends DynamicConfig {
private CelebornConf celebornConf;
private final CelebornConf celebornConf;

public SystemConfig(CelebornConf celebornConf, Map<String, String> configs) {
this.celebornConf = celebornConf;
this.configs.putAll(configs);
Expand All @@ -40,9 +41,14 @@ public DynamicConfig getParentLevelConfig() {
return null;
}

public <T> T getValue(String configKey, ConfigEntry<Object> configEntry, Class<T> finalType, ConfigType configType) {
public <T> T getValue(
String configKey,
ConfigEntry<Object> configEntry,
Class<T> finalType,
ConfigType configType) {
String configValue = configs.get(configKey);
T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
T formatValue =
configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
if (formatValue == null && configEntry != null) {
return convert(finalType, celebornConf.get(configEntry).toString());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
import java.util.Map;

public class TenantConfig extends DynamicConfig {
private String tenantId;
private ConfigService configService;
private final String tenantId;
private final ConfigService configService;

public TenantConfig(ConfigService configService, String tenantId, Map<String, String> configs) {
this.configService = configService;
this.configs.putAll(configs);
this.tenantId = tenantId;
}

public Map<String, String> getConfigs() {
return configs;
public String getTenantId() {
return tenantId;
}

@Override
Expand Down
Loading

0 comments on commit d344966

Please sign in to comment.