Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1264] ConfigService supports TENANT_USER config level #2285

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d47f8ec
[CELEBORN-1264][IMPROVEMENT] ConfigService support user level config
AngersZhuuuu Feb 4, 2024
57c5609
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 4, 2024
ad83968
Merge branch 'main' into CELEBORN-1264
AngersZhuuuu Feb 5, 2024
1008c8d
update
AngersZhuuuu Feb 5, 2024
73eabcd
Update ConfigService.java
AngersZhuuuu Feb 5, 2024
bfffdd2
update
AngersZhuuuu Feb 5, 2024
5138b81
update
AngersZhuuuu Feb 5, 2024
41d840c
Update
AngersZhuuuu Feb 5, 2024
36f4e3f
Update celeborn-0.5.0-h2-ut-data.sql
AngersZhuuuu Feb 5, 2024
dd5f6a9
Update celeborn-0.5.0-h2-ut-data.sql
AngersZhuuuu Feb 5, 2024
cad1be3
update
AngersZhuuuu Feb 5, 2024
692f81d
Update TenantConfig.java
AngersZhuuuu Feb 5, 2024
2673b41
follow comment
AngersZhuuuu Feb 6, 2024
519bd05
update
AngersZhuuuu Feb 6, 2024
9d9120a
follow comment
AngersZhuuuu Feb 6, 2024
483e0b0
update
AngersZhuuuu Feb 6, 2024
a60d4ff
Update DbServiceManagerImpl.java
AngersZhuuuu Feb 6, 2024
88607d6
update
AngersZhuuuu Feb 6, 2024
6670a63
update
AngersZhuuuu Feb 7, 2024
042fe26
update
AngersZhuuuu Feb 7, 2024
4079232
update
AngersZhuuuu Feb 7, 2024
6c71db6
update
AngersZhuuuu Feb 7, 2024
bbd9963
Merge branch 'main' into CELEBORN-1264
AngersZhuuuu Feb 18, 2024
e7fd282
update
AngersZhuuuu Feb 18, 2024
368b79f
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 18, 2024
728b6d7
Update DbServiceManagerImpl.java
AngersZhuuuu Feb 18, 2024
0219de5
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 18, 2024
a6a01b3
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 18, 2024
435e61c
update
AngersZhuuuu Feb 18, 2024
0c761a6
update
AngersZhuuuu Feb 18, 2024
69e5ece
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 18, 2024
a119e91
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,6 +40,9 @@ public abstract class BaseConfigServiceImpl implements ConfigService {
protected final AtomicReference<Map<String, TenantConfig>> tenantConfigAtomicReference =
new AtomicReference<>(new HashMap<>());

protected final AtomicReference<Map<Pair<String, String>, TenantConfig>>
tenantUserConfigAtomicReference = new AtomicReference<>(new HashMap<>());

private final ScheduledExecutorService configRefreshService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher");

Expand Down Expand Up @@ -81,6 +85,11 @@ public TenantConfig getRawTenantConfigFromCache(String tenantId) {
return tenantConfigAtomicReference.get().get(tenantId);
}

@Override
public TenantConfig getRawTenantUserConfig(String tenantId, String userId) {
return tenantUserConfigAtomicReference.get().get(Pair.of(tenantId, userId));
}

@Override
public void shutdown() {
ThreadUtils.shutdown(configRefreshService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
public enum ConfigLevel {
SYSTEM,
TENANT,
TENANT_USER
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ default DynamicConfig getTenantConfigFromCache(String tenantId) {
}
}

TenantConfig getRawTenantUserConfig(String tenantId, String userId);
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved

default DynamicConfig getTenantUserConfig(String tenantId, String userId) {
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
TenantConfig tenantConfig = getRawTenantUserConfig(tenantId, userId);
if (tenantConfig == null) {
return getTenantConfigFromCache(tenantId);
} else {
return tenantConfig;
}
}

void refreshAllCache() throws IOException;

void shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.server.common.service.store.IServiceManager;
import org.apache.celeborn.server.common.service.store.db.DbServiceManagerImpl;
Expand Down Expand Up @@ -50,5 +52,13 @@ public void refreshAllCache() throws IOException {
allTenantConfigs.stream()
.collect(Collectors.toMap(TenantConfig::getTenantId, Function.identity()));
tenantConfigAtomicReference.set(tenantConfigMap);
List<TenantConfig> allTenantUserConfigs = iServiceManager.getAllTenantUserConfigs();
Map<Pair<String, String>, TenantConfig> tenantUserConfigMap =
allTenantUserConfigs.stream()
.collect(
Collectors.toMap(
tenantConfig -> Pair.of(tenantConfig.getTenantId(), tenantConfig.getName()),
Function.identity()));
tenantUserConfigAtomicReference.set(tenantUserConfigMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
Expand All @@ -35,6 +36,8 @@
public class FsConfigServiceImpl extends BaseConfigServiceImpl implements ConfigService {
private static final Logger LOG = LoggerFactory.getLogger(FsConfigServiceImpl.class);
private static final String CONF_TENANT_ID = "tenantId";
private static final String CONF_TENANT_USERS = "users";
private static final String CONF_TENANT_NAME = "name";
private static final String CONF_LEVEL = "level";
private static final String CONF_CONFIG = "config";

Expand All @@ -51,34 +54,62 @@ public synchronized void refreshAllCache() {

SystemConfig systemConfig = null;
Map<String, TenantConfig> tenantConfs = new HashMap<>();
Map<Pair<String, String>, TenantConfig> tenantUserConfs = new HashMap<>();
try (FileInputStream fileInputStream = new FileInputStream(configurationFile)) {
Yaml yaml = new Yaml();
List<Map<String, Object>> dynamicConfigs = yaml.load(fileInputStream);
for (Map<String, Object> settings : dynamicConfigs) {
String tenantId = (String) settings.get(CONF_TENANT_ID);
String level = (String) settings.get(CONF_LEVEL);
Map<String, String> config =
((Map<String, Object>) settings.get(CONF_CONFIG))
.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString()));
if (ConfigLevel.TENANT.name().equals(level)) {
TenantConfig tenantConfig = new TenantConfig(this, tenantId, null, config);
tenantConfs.put(tenantId, tenantConfig);
if (settings.containsKey(CONF_TENANT_ID)) {
String tenantId = (String) settings.get(CONF_TENANT_ID);
if (settings.containsKey(CONF_CONFIG)) {
Map<String, String> config = extractConfig(settings);
TenantConfig tenantConfig = new TenantConfig(this, tenantId, null, config);
tenantConfs.put(tenantId, tenantConfig);
}
if (settings.containsKey(CONF_TENANT_USERS)) {
List<Map<String, Object>> users =
(List<Map<String, Object>>) settings.get(CONF_TENANT_USERS);
for (Map<String, Object> userSetting : users) {
if (userSetting.containsKey(CONF_TENANT_NAME)
&& userSetting.containsKey(CONF_CONFIG)) {
String name = (String) userSetting.get(CONF_TENANT_NAME);
Map<String, String> userConfig = extractConfig(userSetting);
TenantConfig tenantUserConfig =
new TenantConfig(this, tenantId, name, userConfig);
tenantUserConfs.put(Pair.of(tenantId, name), tenantUserConfig);
}
}
}
}
} else {
systemConfig = new SystemConfig(celebornConf, config);
if (settings.containsKey(CONF_CONFIG)) {
Map<String, String> config = extractConfig(settings);
systemConfig = new SystemConfig(celebornConf, config);
}
}
}
} catch (Exception e) {
LOG.warn("Refresh dynamic config error: {}", e.getMessage(), e);
return;
}

tenantUserConfigAtomicReference.set(tenantUserConfs);
tenantConfigAtomicReference.set(tenantConfs);
if (systemConfig != null) {
systemConfigAtomicReference.set(systemConfig);
}
}

private Map<String, String> extractConfig(Map<String, Object> setting) {
Map<String, String> config =
((Map<String, Object>) setting.get(CONF_CONFIG))
.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString()));
return config;
}

private File getConfigurationFile(Map<String, String> env) {
if (!this.celebornConf.quotaConfigurationPath().isEmpty()) {
return new File(this.celebornConf.quotaConfigurationPath().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public String getName() {

@Override
public DynamicConfig getParentLevelConfig() {
return configService.getSystemConfigFromCache();
if (name == null) {
return configService.getSystemConfigFromCache();
} else {
return configService.getTenantConfigFromCache(tenantId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ClusterTenantConfig {
private Integer clusterId;
private String tenantId;
private String level;
private String user;
private String name;
private String configKey;
private String configValue;
private String type;
Expand Down Expand Up @@ -67,12 +67,12 @@ public void setLevel(String level) {
this.level = level;
}

public String getUser() {
return StringUtils.isBlank(user) ? null : user;
public String getName() {
return StringUtils.isBlank(name) ? null : name;
}

public void setUser(String user) {
this.user = user;
public void setName(String name) {
this.name = name;
}

public String getConfigKey() {
Expand Down Expand Up @@ -116,7 +116,7 @@ public void setGmtModify(Date gmtModify) {
}

public Pair getTenantInfo() {
return Pair.of(tenantId, user);
return Pair.of(tenantId, name);
}

@Override
Expand All @@ -126,7 +126,7 @@ public String toString() {
sb.append(", clusterId=").append(clusterId);
sb.append(", tenantId='").append(tenantId).append('\'');
sb.append(", level='").append(level).append('\'');
sb.append(", user='").append(user).append('\'');
sb.append(", user='").append(name).append('\'');
sb.append(", configKey='").append(configKey).append('\'');
sb.append(", configValue='").append(configValue).append('\'');
sb.append(", type='").append(type).append('\'');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ public interface IServiceManager {

List<TenantConfig> getAllTenantConfigs();

List<TenantConfig> getAllTenantUserConfigs();
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved

SystemConfig getSystemConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -113,6 +114,33 @@ public List<TenantConfig> getAllTenantConfigs() {
}
}

@Override
public List<TenantConfig> getAllTenantUserConfigs() {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
ClusterTenantConfigMapper mapper = sqlSession.getMapper(ClusterTenantConfigMapper.class);
int totalNum = mapper.getClusterTenantConfigsNum(clusterId, ConfigLevel.TENANT_USER.name());
int offset = 0;
List<ClusterTenantConfig> clusterAllTenantConfigs = new ArrayList<>();
while (offset < totalNum) {
List<ClusterTenantConfig> clusterTenantConfigs =
mapper.getClusterTenantConfigs(
clusterId, ConfigLevel.TENANT_USER.name(), offset, pageSize);
clusterAllTenantConfigs.addAll(clusterTenantConfigs);
offset = offset + pageSize;
}

Map<Pair<String, String>, List<ClusterTenantConfig>> tenantConfigMaps =
clusterAllTenantConfigs.stream()
.collect(Collectors.groupingBy(ClusterTenantConfig::getTenantInfo));
return tenantConfigMaps.entrySet().stream()
.map(
t ->
new TenantConfig(
configService, t.getKey().getKey(), t.getKey().getValue(), t.getValue()))
.collect(Collectors.toList());
}
}

@Override
public SystemConfig getSystemConfig() {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public interface ClusterTenantConfigMapper {

@Select(
"SELECT id, cluster_id, tenant_id, level, user, config_key, config_value, type, gmt_create, gmt_modify "
"SELECT id, cluster_id, tenant_id, level, name, config_key, config_value, type, gmt_create, gmt_modify "
+ "FROM celeborn_cluster_tenant_config WHERE cluster_id = #{clusterId} AND level=#{level} LIMIT #{offset}, #{pageSize}")
List<ClusterTenantConfig> getClusterTenantConfigs(
@Param("clusterId") int clusterId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public void testDbConfig() throws IOException {
CelebornConf.DYNAMIC_CONFIG_STORE_DB_HIKARI_DRIVER_CLASS_NAME(), "org.h2.Driver");
celebornConf.set(CelebornConf.DYNAMIC_CONFIG_STORE_DB_HIKARI_MAXIMUM_POOL_SIZE(), "1");
configService = new DbConfigServiceImpl(celebornConf);
verifyConfig(configService);
verifySystemConfig(configService);
verifyTenantConfig(configService);
verifyTenantUserConfig(configService);

SqlSessionFactory sqlSessionFactory = DBSessionFactory.get(celebornConf);
try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) {
Expand All @@ -68,7 +70,9 @@ public void testFsConfig() throws IOException {
celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file);
celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 5L);
configService = new FsConfigServiceImpl(celebornConf);
verifyConfig(configService);
verifySystemConfig(configService);
verifyTenantConfig(configService);
verifyTenantUserConfig(configService);
// change -> refresh config
file = getClass().getResource("/dynamicConfig_2.yaml").getFile();
celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file);
Expand All @@ -84,7 +88,7 @@ public void teardown() {
}
}

public void verifyConfig(ConfigService configService) {
public void verifySystemConfig(ConfigService configService) {
// ------------- Verify SystemConfig ----------------- //
SystemConfig systemConfig = configService.getSystemConfigFromCache();
// verify systemConfig's bytesConf -- use systemConfig
Expand Down Expand Up @@ -135,11 +139,13 @@ public void verifyConfig(ConfigService configService) {
Integer intConfValue =
systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, ConfigType.STRING);
Assert.assertEquals(intConfValue.intValue(), 10);
}

public void verifyTenantConfig(ConfigService configService) {
// ------------- Verify TenantConfig ----------------- //
DynamicConfig tenantConfig = configService.getTenantConfigFromCache("tenant_id");
// verify tenantConfig's bytesConf -- use tenantConf
value =
Long value =
tenantConfig.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(),
Expand Down Expand Up @@ -201,6 +207,69 @@ public void verifyConfig(ConfigService configService) {
Assert.assertEquals(withDefaultValue.longValue(), 10);
}

public void verifyTenantUserConfig(ConfigService configService) {
// ------------- Verify UserConfig ----------------- //
DynamicConfig userConfig = configService.getTenantUserConfig("tenant_id1", "Jerry");
// verify userConfig's bytesConf -- use userConf
Long value =
userConfig.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024);

// verify userConfig's bytesConf -- defer to tenantConf
value =
userConfig.getValue(
CelebornConf.CLIENT_PUSH_QUEUE_CAPACITY().key(),
CelebornConf.CLIENT_PUSH_QUEUE_CAPACITY(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024);

// verify userConfig's bytesConf -- defer to systemConf
value =
userConfig.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024000);

// verify userConfig's bytesConf -- defer to celebornConf
value =
userConfig.getValue(
CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(),
CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1073741824);

// verify userConfig's bytesConf with none
value =
userConfig.getValue(
"celeborn.client.push.buffer.initial.size.only.none",
null,
Long.TYPE,
ConfigType.BYTES);
Assert.assertNull(value);

DynamicConfig userConfigNone = configService.getTenantUserConfig("tenant_id", "non_exist");
// verify userConfig's bytesConf -- defer to tenantConf
value =
userConfigNone.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024000);

Long withDefaultValue =
userConfigNone.getWithDefaultValue("none", 10L, Long.TYPE, ConfigType.STRING);
Assert.assertEquals(withDefaultValue.longValue(), 10);
}

public void verifyConfigChanged(ConfigService configService) {

SystemConfig systemConfig = configService.getSystemConfigFromCache();
Expand Down
Loading
Loading