Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1264] ConfigService supports TENANT_USER config level #2285

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d47f8ec
[CELEBORN-1264][IMPROVEMENT] ConfigService support user level config
AngersZhuuuu Feb 4, 2024
57c5609
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 4, 2024
ad83968
Merge branch 'main' into CELEBORN-1264
AngersZhuuuu Feb 5, 2024
1008c8d
update
AngersZhuuuu Feb 5, 2024
73eabcd
Update ConfigService.java
AngersZhuuuu Feb 5, 2024
bfffdd2
update
AngersZhuuuu Feb 5, 2024
5138b81
update
AngersZhuuuu Feb 5, 2024
41d840c
Update
AngersZhuuuu Feb 5, 2024
36f4e3f
Update celeborn-0.5.0-h2-ut-data.sql
AngersZhuuuu Feb 5, 2024
dd5f6a9
Update celeborn-0.5.0-h2-ut-data.sql
AngersZhuuuu Feb 5, 2024
cad1be3
update
AngersZhuuuu Feb 5, 2024
692f81d
Update TenantConfig.java
AngersZhuuuu Feb 5, 2024
2673b41
follow comment
AngersZhuuuu Feb 6, 2024
519bd05
update
AngersZhuuuu Feb 6, 2024
9d9120a
follow comment
AngersZhuuuu Feb 6, 2024
483e0b0
update
AngersZhuuuu Feb 6, 2024
a60d4ff
Update DbServiceManagerImpl.java
AngersZhuuuu Feb 6, 2024
88607d6
update
AngersZhuuuu Feb 6, 2024
6670a63
update
AngersZhuuuu Feb 7, 2024
042fe26
update
AngersZhuuuu Feb 7, 2024
4079232
update
AngersZhuuuu Feb 7, 2024
6c71db6
update
AngersZhuuuu Feb 7, 2024
bbd9963
Merge branch 'main' into CELEBORN-1264
AngersZhuuuu Feb 18, 2024
e7fd282
update
AngersZhuuuu Feb 18, 2024
368b79f
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 18, 2024
728b6d7
Update DbServiceManagerImpl.java
AngersZhuuuu Feb 18, 2024
0219de5
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 18, 2024
a6a01b3
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 18, 2024
435e61c
update
AngersZhuuuu Feb 18, 2024
0c761a6
update
AngersZhuuuu Feb 18, 2024
69e5ece
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 18, 2024
a119e91
Update FsConfigServiceImpl.java
AngersZhuuuu Feb 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,6 +40,9 @@ public abstract class BaseConfigServiceImpl implements ConfigService {
protected final AtomicReference<Map<String, TenantConfig>> tenantConfigAtomicReference =
new AtomicReference<>(new HashMap<>());

protected final AtomicReference<Map<Pair<String, String>, TenantConfig>>
tenantUserConfigAtomicReference = new AtomicReference<>(new HashMap<>());

private final ScheduledExecutorService configRefreshService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher");

Expand Down Expand Up @@ -81,6 +85,11 @@ public TenantConfig getRawTenantConfigFromCache(String tenantId) {
return tenantConfigAtomicReference.get().get(tenantId);
}

@Override
public TenantConfig getRawTenantUserConfig(String tenantId, String userId) {
return tenantUserConfigAtomicReference.get().get(Pair.of(tenantId, userId));
}

@Override
public void shutdown() {
ThreadUtils.shutdown(configRefreshService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ default DynamicConfig getTenantConfigFromCache(String tenantId) {
}
}

TenantConfig getRawTenantUserConfig(String tenantId, String userId);
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved

default DynamicConfig getTenantUserConfig(String tenantId, String userId) {
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
TenantConfig tenantConfig = getRawTenantUserConfig(tenantId, userId);
if (tenantConfig == null) {
return getTenantConfigFromCache(tenantId);
} else {
return tenantConfig;
}
}

void refreshAllCache() throws IOException;

void shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.server.common.service.store.IServiceManager;
import org.apache.celeborn.server.common.service.store.db.DbServiceManagerImpl;
Expand All @@ -48,7 +50,16 @@ public void refreshAllCache() throws IOException {
List<TenantConfig> allTenantConfigs = iServiceManager.getAllTenantConfigs();
Map<String, TenantConfig> tenantConfigMap =
allTenantConfigs.stream()
.filter(tenantConfig -> tenantConfig.getName().isEmpty())
.collect(Collectors.toMap(TenantConfig::getTenantId, Function.identity()));
tenantConfigAtomicReference.set(tenantConfigMap);
Map<Pair<String, String>, TenantConfig> tenantUserConfigMap =
allTenantConfigs.stream()
.filter(tenantConfig -> !tenantConfig.getName().isEmpty())
.collect(
Collectors.toMap(
tenantConfig -> Pair.of(tenantConfig.getTenantId(), tenantConfig.getName()),
Function.identity()));
tenantUserConfigAtomicReference.set(tenantUserConfigMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
Expand All @@ -35,6 +36,8 @@
public class FsConfigServiceImpl extends BaseConfigServiceImpl implements ConfigService {
private static final Logger LOG = LoggerFactory.getLogger(FsConfigServiceImpl.class);
private static final String CONF_TENANT_ID = "tenantId";
private static final String CONF_TENANT_USERS_ID = "users";
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
private static final String CONF_TENANT_NAME_ID = "name";
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
private static final String CONF_LEVEL = "level";
private static final String CONF_CONFIG = "config";

Expand All @@ -51,6 +54,7 @@ public synchronized void refreshAllCache() {

SystemConfig systemConfig = null;
Map<String, TenantConfig> tenantConfs = new HashMap<>();
Map<Pair<String, String>, TenantConfig> tenantUserConfs = new HashMap<>();
try (FileInputStream fileInputStream = new FileInputStream(configurationFile)) {
Yaml yaml = new Yaml();
List<Map<String, Object>> dynamicConfigs = yaml.load(fileInputStream);
Expand All @@ -61,9 +65,21 @@ public synchronized void refreshAllCache() {
((Map<String, Object>) settings.get(CONF_CONFIG))
.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString()));

if (ConfigLevel.TENANT.name().equals(level)) {
TenantConfig tenantConfig = new TenantConfig(this, tenantId, null, config);
TenantConfig tenantConfig = new TenantConfig(this, tenantId, "", config);
tenantConfs.put(tenantId, tenantConfig);
List<Map<String, Object>> users =
(List<Map<String, Object>>) settings.get(CONF_TENANT_USERS_ID);
for (Map<String, Object> userSetting : users) {
String name = (String) userSetting.get(CONF_TENANT_NAME_ID);
Map<String, String> userConfig =
((Map<String, Object>) userSetting.get(CONF_CONFIG))
.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString()));
TenantConfig tenantUserConfig = new TenantConfig(this, tenantId, name, userConfig);
tenantUserConfs.put(Pair.of(tenantId, name), tenantUserConfig);
}
} else {
systemConfig = new SystemConfig(celebornConf, config);
}
Expand All @@ -73,6 +89,7 @@ public synchronized void refreshAllCache() {
return;
}

tenantUserConfigAtomicReference.set(tenantUserConfs);
tenantConfigAtomicReference.set(tenantConfs);
if (systemConfig != null) {
systemConfigAtomicReference.set(systemConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public String getName() {

@Override
public DynamicConfig getParentLevelConfig() {
return configService.getSystemConfigFromCache();
if (name.isEmpty()) {
return configService.getSystemConfigFromCache();
} else {
return configService.getTenantConfigFromCache(tenantId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ClusterTenantConfig {
private Integer clusterId;
private String tenantId;
private String level;
private String user;
private String name;
private String configKey;
private String configValue;
private String type;
Expand Down Expand Up @@ -67,12 +67,12 @@ public void setLevel(String level) {
this.level = level;
}

public String getUser() {
return StringUtils.isBlank(user) ? null : user;
public String getName() {
return StringUtils.isBlank(name) ? null : name;
}

public void setUser(String user) {
this.user = user;
public void setName(String name) {
this.name = name;
}

public String getConfigKey() {
Expand Down Expand Up @@ -116,7 +116,7 @@ public void setGmtModify(Date gmtModify) {
}

public Pair getTenantInfo() {
return Pair.of(tenantId, user);
return Pair.of(tenantId, name);
}

@Override
Expand All @@ -126,7 +126,7 @@ public String toString() {
sb.append(", clusterId=").append(clusterId);
sb.append(", tenantId='").append(tenantId).append('\'');
sb.append(", level='").append(level).append('\'');
sb.append(", user='").append(user).append('\'');
sb.append(", user='").append(name).append('\'');
sb.append(", configKey='").append(configKey).append('\'');
sb.append(", configValue='").append(configValue).append('\'');
sb.append(", type='").append(type).append('\'');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -103,12 +104,14 @@ public List<TenantConfig> getAllTenantConfigs() {
offset = offset + pageSize;
}

Map<String, List<ClusterTenantConfig>> tenantConfigMaps =
Map<Pair<String, String>, List<ClusterTenantConfig>> tenantConfigMaps =
clusterAllTenantConfigs.stream()
.collect(
Collectors.groupingBy(clusterTenantConfig -> clusterTenantConfig.getTenantId()));
.collect(Collectors.groupingBy(ClusterTenantConfig::getTenantInfo));
return tenantConfigMaps.entrySet().stream()
.map(t -> new TenantConfig(configService, t.getKey(), null, t.getValue()))
.map(
t ->
new TenantConfig(
configService, t.getKey().getKey(), t.getKey().getValue(), t.getValue()))
.collect(Collectors.toList());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public interface ClusterTenantConfigMapper {

@Select(
"SELECT id, cluster_id, tenant_id, level, user, config_key, config_value, type, gmt_create, gmt_modify "
"SELECT id, cluster_id, tenant_id, level, name, config_key, config_value, type, gmt_create, gmt_modify "
+ "FROM celeborn_cluster_tenant_config WHERE cluster_id = #{clusterId} AND level=#{level} LIMIT #{offset}, #{pageSize}")
List<ClusterTenantConfig> getClusterTenantConfigs(
@Param("clusterId") int clusterId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public void testDbConfig() throws IOException {
CelebornConf.DYNAMIC_CONFIG_STORE_DB_HIKARI_DRIVER_CLASS_NAME(), "org.h2.Driver");
celebornConf.set(CelebornConf.DYNAMIC_CONFIG_STORE_DB_HIKARI_MAXIMUM_POOL_SIZE(), "1");
configService = new DbConfigServiceImpl(celebornConf);
verifyConfig(configService);
verifySystemConfig(configService);
verifyTenantConfig(configService);
verifyTenantUserConfig(configService);

SqlSessionFactory sqlSessionFactory = DBSessionFactory.get(celebornConf);
try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) {
Expand All @@ -68,7 +70,9 @@ public void testFsConfig() throws IOException {
celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file);
celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 5L);
configService = new FsConfigServiceImpl(celebornConf);
verifyConfig(configService);
verifySystemConfig(configService);
verifyTenantConfig(configService);
verifyTenantUserConfig(configService);
// change -> refresh config
file = getClass().getResource("/dynamicConfig_2.yaml").getFile();
celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file);
Expand All @@ -84,7 +88,7 @@ public void teardown() {
}
}

public void verifyConfig(ConfigService configService) {
public void verifySystemConfig(ConfigService configService) {
// ------------- Verify SystemConfig ----------------- //
SystemConfig systemConfig = configService.getSystemConfigFromCache();
// verify systemConfig's bytesConf -- use systemConfig
Expand Down Expand Up @@ -135,11 +139,13 @@ public void verifyConfig(ConfigService configService) {
Integer intConfValue =
systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, ConfigType.STRING);
Assert.assertEquals(intConfValue.intValue(), 10);
}

public void verifyTenantConfig(ConfigService configService) {
// ------------- Verify TenantConfig ----------------- //
DynamicConfig tenantConfig = configService.getTenantConfigFromCache("tenant_id");
// verify tenantConfig's bytesConf -- use tenantConf
value =
Long value =
tenantConfig.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(),
Expand Down Expand Up @@ -201,6 +207,69 @@ public void verifyConfig(ConfigService configService) {
Assert.assertEquals(withDefaultValue.longValue(), 10);
}

public void verifyTenantUserConfig(ConfigService configService) {
// ------------- Verify UserConfig ----------------- //
DynamicConfig userConfig = configService.getTenantUserConfig("tenant_id", "Jerry");
// verify userConfig's bytesConf -- use userConf
Long value =
userConfig.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024);

// verify userConfig's bytesConf -- defer to tenantConf
value =
userConfig.getValue(
CelebornConf.CLIENT_PUSH_QUEUE_CAPACITY().key(),
CelebornConf.CLIENT_PUSH_QUEUE_CAPACITY(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024);

// verify userConfig's bytesConf -- defer to systemConf
value =
userConfig.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024000);

// verify userConfig's bytesConf -- defer to celebornConf
value =
userConfig.getValue(
CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(),
CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1073741824);

// verify userConfig's bytesConf with none
value =
userConfig.getValue(
"celeborn.client.push.buffer.initial.size.only.none",
null,
Long.TYPE,
ConfigType.BYTES);
Assert.assertNull(value);

DynamicConfig userConfigNone = configService.getTenantUserConfig("tenant_id", "non_exist");
// verify userConfig's bytesConf -- defer to tenantConf
value =
userConfigNone.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024000);

Long withDefaultValue =
userConfigNone.getWithDefaultValue("none", 10L, Long.TYPE, ConfigType.STRING);
Assert.assertEquals(withDefaultValue.longValue(), 10);
}

public void verifyConfigChanged(ConfigService configService) {

SystemConfig systemConfig = configService.getSystemConfigFromCache();
Expand Down
7 changes: 5 additions & 2 deletions service/src/test/resources/celeborn-0.5.0-h2-ut-data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ VALUES
( 6, 1, 'celeborn.test.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 7, 1, 'celeborn.test.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 8, 1, 'celeborn.test.int.only', '10', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' );
INSERT INTO `celeborn_cluster_tenant_config` ( `id`, `cluster_id`, `tenant_id`, `level`, `user`, `config_key`, `config_value`, `type`, `gmt_create`, `gmt_modify` )
INSERT INTO `celeborn_cluster_tenant_config` ( `id`, `cluster_id`, `tenant_id`, `level`, `name`, `config_key`, `config_value`, `type`, `gmt_create`, `gmt_modify` )
VALUES
( 1, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 2, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 3, 1, 'tenant_id', 'TENANT', '', 'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 4, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 5, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 6, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' );
( 6, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 7, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.queue.capacity', '1024', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 8, 1, 'tenant_id', 'TENANT', 'Jerry', 'celeborn.client.push.buffer.initial.size', '1k', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 9, 1, 'tenant_id', 'TENANT', 'Jerry', 'celeborn.client.push.buffer.initial.size.user.only', '512k', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' );
4 changes: 2 additions & 2 deletions service/src/test/resources/celeborn-0.5.0-h2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ CREATE TABLE IF NOT EXISTS celeborn_cluster_tenant_config
cluster_id int NOT NULL,
tenant_id varchar(255) NOT NULL,
level varchar(255) NOT NULL COMMENT 'config level, valid level is TENANT,USER',
`user` varchar(255) DEFAULT NULL COMMENT 'tenant sub user',
name varchar(255) NOT NULL COMMENT 'tenant sub user',
config_key varchar(255) NOT NULL,
config_value varchar(255) NOT NULL,
type varchar(255) DEFAULT NULL COMMENT 'conf categories, such as quota',
gmt_create timestamp NOT NULL,
gmt_modify timestamp NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY `index_unique_tenant_config_key` (`cluster_id`, `tenant_id`, `user`, `config_key`)
UNIQUE KEY `index_unique_tenant_config_key` (`cluster_id`, `tenant_id`, `name`, `config_key`)
);
7 changes: 7 additions & 0 deletions service/src/test/resources/dynamicConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,11 @@
celeborn.test.tenant.timeoutMs.only: 100s
celeborn.test.tenant.enabled.only: false
celeborn.test.tenant.int.only: 10
celeborn.client.push.queue.capacity: 1024
users:
- name: Jerry
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
config:
celeborn.client.push.buffer.initial.size: 1k
celeborn.client.push.buffer.initial.size.user.only: 512k


Loading