diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java index 5e7c270090..93e0132200 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,9 @@ public abstract class BaseConfigServiceImpl implements ConfigService { protected final AtomicReference> tenantConfigAtomicReference = new AtomicReference<>(new HashMap<>()); + protected final AtomicReference, TenantConfig>> + tenantUserConfigAtomicReference = new AtomicReference<>(new HashMap<>()); + private final ScheduledExecutorService configRefreshService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher"); @@ -81,6 +85,11 @@ public TenantConfig getRawTenantConfigFromCache(String tenantId) { return tenantConfigAtomicReference.get().get(tenantId); } + @Override + public TenantConfig getRawTenantUserConfig(String tenantId, String userId) { + return tenantUserConfigAtomicReference.get().get(Pair.of(tenantId, userId)); + } + @Override public void shutdown() { ThreadUtils.shutdown(configRefreshService); diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigLevel.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigLevel.java index 1218214618..7fcd510e64 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigLevel.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigLevel.java @@ -20,4 +20,5 @@ public enum ConfigLevel { SYSTEM, TENANT, + TENANT_USER } diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java index fc918b0a5e..1f141f551e 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java @@ -38,6 +38,17 @@ default DynamicConfig getTenantConfigFromCache(String tenantId) { } } + TenantConfig getRawTenantUserConfig(String tenantId, String userId); + + default DynamicConfig getTenantUserConfig(String tenantId, String userId) { + TenantConfig tenantConfig = getRawTenantUserConfig(tenantId, userId); + if (tenantConfig == null) { + return getTenantConfigFromCache(tenantId); + } else { + return tenantConfig; + } + } + void refreshAllCache() throws IOException; void shutdown(); diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/DbConfigServiceImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/DbConfigServiceImpl.java index 6217545b55..22bf79532f 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/DbConfigServiceImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/DbConfigServiceImpl.java @@ -23,6 +23,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; + import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.server.common.service.store.IServiceManager; import org.apache.celeborn.server.common.service.store.db.DbServiceManagerImpl; @@ -50,5 +52,13 @@ public void refreshAllCache() throws IOException { allTenantConfigs.stream() .collect(Collectors.toMap(TenantConfig::getTenantId, Function.identity())); tenantConfigAtomicReference.set(tenantConfigMap); + List allTenantUserConfigs = iServiceManager.getAllTenantUserConfigs(); + Map, TenantConfig> tenantUserConfigMap = + allTenantUserConfigs.stream() + .collect( + Collectors.toMap( + tenantConfig -> Pair.of(tenantConfig.getTenantId(), tenantConfig.getName()), + Function.identity())); + tenantUserConfigAtomicReference.set(tenantUserConfigMap); } } diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java index decf061af7..df3e807094 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; @@ -35,6 +36,8 @@ public class FsConfigServiceImpl extends BaseConfigServiceImpl implements ConfigService { private static final Logger LOG = LoggerFactory.getLogger(FsConfigServiceImpl.class); private static final String CONF_TENANT_ID = "tenantId"; + private static final String CONF_TENANT_USERS = "users"; + private static final String CONF_TENANT_NAME = "name"; private static final String CONF_LEVEL = "level"; private static final String CONF_CONFIG = "config"; @@ -51,21 +54,40 @@ public synchronized void refreshAllCache() { SystemConfig systemConfig = null; Map tenantConfs = new HashMap<>(); + Map, TenantConfig> tenantUserConfs = new HashMap<>(); try (FileInputStream fileInputStream = new FileInputStream(configurationFile)) { Yaml yaml = new Yaml(); List> dynamicConfigs = yaml.load(fileInputStream); for (Map settings : dynamicConfigs) { - String tenantId = (String) settings.get(CONF_TENANT_ID); String level = (String) settings.get(CONF_LEVEL); - Map config = - ((Map) settings.get(CONF_CONFIG)) - .entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString())); if (ConfigLevel.TENANT.name().equals(level)) { - TenantConfig tenantConfig = new TenantConfig(this, tenantId, null, config); - tenantConfs.put(tenantId, tenantConfig); + if (settings.containsKey(CONF_TENANT_ID)) { + String tenantId = (String) settings.get(CONF_TENANT_ID); + if (settings.containsKey(CONF_CONFIG)) { + Map config = extractConfig(settings); + TenantConfig tenantConfig = new TenantConfig(this, tenantId, null, config); + tenantConfs.put(tenantId, tenantConfig); + } + if (settings.containsKey(CONF_TENANT_USERS)) { + List> users = + (List>) settings.get(CONF_TENANT_USERS); + for (Map userSetting : users) { + if (userSetting.containsKey(CONF_TENANT_NAME) + && userSetting.containsKey(CONF_CONFIG)) { + String name = (String) userSetting.get(CONF_TENANT_NAME); + Map userConfig = extractConfig(userSetting); + TenantConfig tenantUserConfig = + new TenantConfig(this, tenantId, name, userConfig); + tenantUserConfs.put(Pair.of(tenantId, name), tenantUserConfig); + } + } + } + } } else { - systemConfig = new SystemConfig(celebornConf, config); + if (settings.containsKey(CONF_CONFIG)) { + Map config = extractConfig(settings); + systemConfig = new SystemConfig(celebornConf, config); + } } } } catch (Exception e) { @@ -73,12 +95,21 @@ public synchronized void refreshAllCache() { return; } + tenantUserConfigAtomicReference.set(tenantUserConfs); tenantConfigAtomicReference.set(tenantConfs); if (systemConfig != null) { systemConfigAtomicReference.set(systemConfig); } } + private Map extractConfig(Map setting) { + Map config = + ((Map) setting.get(CONF_CONFIG)) + .entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString())); + return config; + } + private File getConfigurationFile(Map env) { if (!this.celebornConf.quotaConfigurationPath().isEmpty()) { return new File(this.celebornConf.quotaConfigurationPath().get()); diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java index bea2f5960a..070c7e4d56 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java @@ -56,6 +56,10 @@ public String getName() { @Override public DynamicConfig getParentLevelConfig() { - return configService.getSystemConfigFromCache(); + if (name == null) { + return configService.getSystemConfigFromCache(); + } else { + return configService.getTenantConfigFromCache(tenantId); + } } } diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/model/ClusterTenantConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/model/ClusterTenantConfig.java index b3c7d068d1..ebb2e99772 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/model/ClusterTenantConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/model/ClusterTenantConfig.java @@ -28,7 +28,7 @@ public class ClusterTenantConfig { private Integer clusterId; private String tenantId; private String level; - private String user; + private String name; private String configKey; private String configValue; private String type; @@ -67,12 +67,12 @@ public void setLevel(String level) { this.level = level; } - public String getUser() { - return StringUtils.isBlank(user) ? null : user; + public String getName() { + return StringUtils.isBlank(name) ? null : name; } - public void setUser(String user) { - this.user = user; + public void setName(String name) { + this.name = name; } public String getConfigKey() { @@ -116,7 +116,7 @@ public void setGmtModify(Date gmtModify) { } public Pair getTenantInfo() { - return Pair.of(tenantId, user); + return Pair.of(tenantId, name); } @Override @@ -126,7 +126,7 @@ public String toString() { sb.append(", clusterId=").append(clusterId); sb.append(", tenantId='").append(tenantId).append('\''); sb.append(", level='").append(level).append('\''); - sb.append(", user='").append(user).append('\''); + sb.append(", user='").append(name).append('\''); sb.append(", configKey='").append(configKey).append('\''); sb.append(", configValue='").append(configValue).append('\''); sb.append(", type='").append(type).append('\''); diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/store/IServiceManager.java b/service/src/main/java/org/apache/celeborn/server/common/service/store/IServiceManager.java index c2a8a9d0e7..7889297d99 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/store/IServiceManager.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/store/IServiceManager.java @@ -31,5 +31,7 @@ public interface IServiceManager { List getAllTenantConfigs(); + List getAllTenantUserConfigs(); + SystemConfig getSystemConfig(); } diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/store/db/DbServiceManagerImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/store/db/DbServiceManagerImpl.java index e957cfed52..49a65ef7a8 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/store/db/DbServiceManagerImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/store/db/DbServiceManagerImpl.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.slf4j.Logger; @@ -113,6 +114,33 @@ public List getAllTenantConfigs() { } } + @Override + public List getAllTenantUserConfigs() { + try (SqlSession sqlSession = sqlSessionFactory.openSession()) { + ClusterTenantConfigMapper mapper = sqlSession.getMapper(ClusterTenantConfigMapper.class); + int totalNum = mapper.getClusterTenantConfigsNum(clusterId, ConfigLevel.TENANT_USER.name()); + int offset = 0; + List clusterAllTenantConfigs = new ArrayList<>(); + while (offset < totalNum) { + List clusterTenantConfigs = + mapper.getClusterTenantConfigs( + clusterId, ConfigLevel.TENANT_USER.name(), offset, pageSize); + clusterAllTenantConfigs.addAll(clusterTenantConfigs); + offset = offset + pageSize; + } + + Map, List> tenantConfigMaps = + clusterAllTenantConfigs.stream() + .collect(Collectors.groupingBy(ClusterTenantConfig::getTenantInfo)); + return tenantConfigMaps.entrySet().stream() + .map( + t -> + new TenantConfig( + configService, t.getKey().getKey(), t.getKey().getValue(), t.getValue())) + .collect(Collectors.toList()); + } + } + @Override public SystemConfig getSystemConfig() { try (SqlSession sqlSession = sqlSessionFactory.openSession()) { diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/store/db/mapper/ClusterTenantConfigMapper.java b/service/src/main/java/org/apache/celeborn/server/common/service/store/db/mapper/ClusterTenantConfigMapper.java index af4539f661..b67c2c233b 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/store/db/mapper/ClusterTenantConfigMapper.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/store/db/mapper/ClusterTenantConfigMapper.java @@ -27,7 +27,7 @@ public interface ClusterTenantConfigMapper { @Select( - "SELECT id, cluster_id, tenant_id, level, user, config_key, config_value, type, gmt_create, gmt_modify " + "SELECT id, cluster_id, tenant_id, level, name, config_key, config_value, type, gmt_create, gmt_modify " + "FROM celeborn_cluster_tenant_config WHERE cluster_id = #{clusterId} AND level=#{level} LIMIT #{offset}, #{pageSize}") List getClusterTenantConfigs( @Param("clusterId") int clusterId, diff --git a/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java b/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java index 03cdc9c306..089c70514c 100644 --- a/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java +++ b/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java @@ -44,7 +44,9 @@ public void testDbConfig() throws IOException { CelebornConf.DYNAMIC_CONFIG_STORE_DB_HIKARI_DRIVER_CLASS_NAME(), "org.h2.Driver"); celebornConf.set(CelebornConf.DYNAMIC_CONFIG_STORE_DB_HIKARI_MAXIMUM_POOL_SIZE(), "1"); configService = new DbConfigServiceImpl(celebornConf); - verifyConfig(configService); + verifySystemConfig(configService); + verifyTenantConfig(configService); + verifyTenantUserConfig(configService); SqlSessionFactory sqlSessionFactory = DBSessionFactory.get(celebornConf); try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) { @@ -68,7 +70,9 @@ public void testFsConfig() throws IOException { celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file); celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 5L); configService = new FsConfigServiceImpl(celebornConf); - verifyConfig(configService); + verifySystemConfig(configService); + verifyTenantConfig(configService); + verifyTenantUserConfig(configService); // change -> refresh config file = getClass().getResource("/dynamicConfig_2.yaml").getFile(); celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file); @@ -84,7 +88,7 @@ public void teardown() { } } - public void verifyConfig(ConfigService configService) { + public void verifySystemConfig(ConfigService configService) { // ------------- Verify SystemConfig ----------------- // SystemConfig systemConfig = configService.getSystemConfigFromCache(); // verify systemConfig's bytesConf -- use systemConfig @@ -135,11 +139,13 @@ public void verifyConfig(ConfigService configService) { Integer intConfValue = systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, ConfigType.STRING); Assert.assertEquals(intConfValue.intValue(), 10); + } + public void verifyTenantConfig(ConfigService configService) { // ------------- Verify TenantConfig ----------------- // DynamicConfig tenantConfig = configService.getTenantConfigFromCache("tenant_id"); // verify tenantConfig's bytesConf -- use tenantConf - value = + Long value = tenantConfig.getValue( CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(), @@ -201,6 +207,69 @@ public void verifyConfig(ConfigService configService) { Assert.assertEquals(withDefaultValue.longValue(), 10); } + public void verifyTenantUserConfig(ConfigService configService) { + // ------------- Verify UserConfig ----------------- // + DynamicConfig userConfig = configService.getTenantUserConfig("tenant_id1", "Jerry"); + // verify userConfig's bytesConf -- use userConf + Long value = + userConfig.getValue( + CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), + CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1024); + + // verify userConfig's bytesConf -- defer to tenantConf + value = + userConfig.getValue( + CelebornConf.CLIENT_PUSH_QUEUE_CAPACITY().key(), + CelebornConf.CLIENT_PUSH_QUEUE_CAPACITY(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1024); + + // verify userConfig's bytesConf -- defer to systemConf + value = + userConfig.getValue( + CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), + CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1024000); + + // verify userConfig's bytesConf -- defer to celebornConf + value = + userConfig.getValue( + CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(), + CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1073741824); + + // verify userConfig's bytesConf with none + value = + userConfig.getValue( + "celeborn.client.push.buffer.initial.size.only.none", + null, + Long.TYPE, + ConfigType.BYTES); + Assert.assertNull(value); + + DynamicConfig userConfigNone = configService.getTenantUserConfig("tenant_id", "non_exist"); + // verify userConfig's bytesConf -- defer to tenantConf + value = + userConfigNone.getValue( + CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), + CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1024000); + + Long withDefaultValue = + userConfigNone.getWithDefaultValue("none", 10L, Long.TYPE, ConfigType.STRING); + Assert.assertEquals(withDefaultValue.longValue(), 10); + } + public void verifyConfigChanged(ConfigService configService) { SystemConfig systemConfig = configService.getSystemConfigFromCache(); diff --git a/service/src/test/resources/celeborn-0.5.0-h2-ut-data.sql b/service/src/test/resources/celeborn-0.5.0-h2-ut-data.sql index 7e0b9b813f..d6dc77f872 100644 --- a/service/src/test/resources/celeborn-0.5.0-h2-ut-data.sql +++ b/service/src/test/resources/celeborn-0.5.0-h2-ut-data.sql @@ -27,11 +27,21 @@ VALUES ( 6, 1, 'celeborn.test.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), ( 7, 1, 'celeborn.test.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), ( 8, 1, 'celeborn.test.int.only', '10', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ); -INSERT INTO `celeborn_cluster_tenant_config` ( `id`, `cluster_id`, `tenant_id`, `level`, `user`, `config_key`, `config_value`, `type`, `gmt_create`, `gmt_modify` ) +INSERT INTO `celeborn_cluster_tenant_config` ( `id`, `cluster_id`, `tenant_id`, `level`, `name`, `config_key`, `config_value`, `type`, `gmt_create`, `gmt_modify` ) VALUES ( 1, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), ( 2, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), ( 3, 1, 'tenant_id', 'TENANT', '', 'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), ( 4, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), ( 5, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), - ( 6, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ); + ( 6, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), + ( 7, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.queue.capacity', '1024', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), + ( 8, 1, 'tenant_id1', 'TENANT', '', 'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), + ( 9, 1, 'tenant_id1', 'TENANT', '', 'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), + ( 10, 1, 'tenant_id1', 'TENANT', '', 'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), + ( 11, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), + ( 12, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), + ( 13, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), + ( 14, 1, 'tenant_id1', 'TENANT', '', 'celeborn.client.push.queue.capacity', '1024', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), + ( 15, 1, 'tenant_id1', 'TENANT_USER', 'Jerry', 'celeborn.client.push.buffer.initial.size', '1k', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ), + ( 16, 1, 'tenant_id1', 'TENANT_USER', 'Jerry', 'celeborn.client.push.buffer.initial.size.user.only', '512k', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ); diff --git a/service/src/test/resources/celeborn-0.5.0-h2.sql b/service/src/test/resources/celeborn-0.5.0-h2.sql index b57de8028f..0a912eae7f 100644 --- a/service/src/test/resources/celeborn-0.5.0-h2.sql +++ b/service/src/test/resources/celeborn-0.5.0-h2.sql @@ -53,12 +53,12 @@ CREATE TABLE IF NOT EXISTS celeborn_cluster_tenant_config cluster_id int NOT NULL, tenant_id varchar(255) NOT NULL, level varchar(255) NOT NULL COMMENT 'config level, valid level is TENANT,USER', - `user` varchar(255) DEFAULT NULL COMMENT 'tenant sub user', + name varchar(255) NOT NULL COMMENT 'tenant sub user', config_key varchar(255) NOT NULL, config_value varchar(255) NOT NULL, type varchar(255) DEFAULT NULL COMMENT 'conf categories, such as quota', gmt_create timestamp NOT NULL, gmt_modify timestamp NOT NULL, PRIMARY KEY (id), - UNIQUE KEY `index_unique_tenant_config_key` (`cluster_id`, `tenant_id`, `user`, `config_key`) + UNIQUE KEY `index_unique_tenant_config_key` (`cluster_id`, `tenant_id`, `name`, `config_key`) ); diff --git a/service/src/test/resources/dynamicConfig.yaml b/service/src/test/resources/dynamicConfig.yaml index 22d17ee624..ea759a4f3d 100644 --- a/service/src/test/resources/dynamicConfig.yaml +++ b/service/src/test/resources/dynamicConfig.yaml @@ -34,3 +34,20 @@ celeborn.test.tenant.enabled.only: false celeborn.test.tenant.int.only: 10 +- tenantId: tenant_id1 + level: TENANT + config: + celeborn.client.push.buffer.initial.size: 10k + celeborn.client.push.buffer.initial.size.only: 100k + celeborn.worker.fetch.heartbeat.enabled: false + celeborn.test.tenant.timeoutMs.only: 100s + celeborn.test.tenant.enabled.only: false + celeborn.test.tenant.int.only: 10 + celeborn.client.push.queue.capacity: 1024 + users: + - name: Jerry + config: + celeborn.client.push.buffer.initial.size: 1k + celeborn.client.push.buffer.initial.size.user.only: 512k + +