Skip to content

Commit

Permalink
unify the datasource configuration to celebornConf
Browse files Browse the repository at this point in the history
  • Loading branch information
RexXiong committed Jan 31, 2024
1 parent b1654c4 commit 9ee2121
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,14 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se

def dynamicConfigStoreBackend: String = get(DYNAMIC_CONFIG_STORE_BACKEND)
def dynamicConfigRefreshInterval: Long = get(DYNAMIC_CONFIG_REFRESH_INTERVAL)
def dynamicConfigFetchPageSize: Int = get(DYNAMIC_CONFIG_FETCH_PAGE_SIZE)
def dynamicConfigStoreDbJdbcUrl: String = get(DYNAMIC_CONFIG_STORE_DB_JDBC_URL)
def dynamicConfigStoreDbUsername: String = get(DYNAMIC_CONFIG_STORE_DB_USERNAME)
def dynamicConfigStoreDbPassword: String = get(DYNAMIC_CONFIG_STORE_DB_PASSWORD)
def dynamicConfigStoreDbConnectionTimeout: Long = get(DYNAMIC_CONFIG_STORE_DB_CONNECTION_TIMEOUT)
def dynamicConfigStoreDbIdleTimeout: Long = get(DYNAMIC_CONFIG_STORE_DB_IDLE_TIMEOUT)
def dynamicConfigStoreDbMaxLifetime: Long = get(DYNAMIC_CONFIG_STORE_DB_MAX_LIFETIME)
def dynamicConfigStoreDbMaximumPoolSize: Int = get(DYNAMIC_CONFIG_STORE_DB_MAXIMUM_POOL_SIZE)
def dynamicConfigStoreDbFetchPageSize: Int = get(DYNAMIC_CONFIG_STORE_DB_FETCH_PAGE_SIZE)

// //////////////////////////////////////////////////////
// Network //
Expand Down Expand Up @@ -4382,11 +4389,67 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")

val DYNAMIC_CONFIG_FETCH_PAGE_SIZE: ConfigEntry[Int] =
buildConf("celeborn.dynamicConfig.fetch.pageSize")
val DYNAMIC_CONFIG_STORE_DB_JDBC_URL: ConfigEntry[String] =
buildConf("celeborn.dynamicConfig.store.db.jdbcUrl")
.categories("master", "worker")
.version("0.5.0")
.doc("Default page size for config server get configurations.")
.doc("The jdbc url of db store backend.")
.stringConf
.createWithDefaultString("")

val DYNAMIC_CONFIG_STORE_DB_USERNAME: ConfigEntry[String] =
buildConf("celeborn.dynamicConfig.store.db.username")
.categories("master", "worker")
.version("0.5.0")
.doc("The username of db store backend.")
.stringConf
.createWithDefaultString("")

val DYNAMIC_CONFIG_STORE_DB_PASSWORD: ConfigEntry[String] =
buildConf("celeborn.dynamicConfig.store.db.password")
.categories("master", "worker")
.version("0.5.0")
.doc("The password of db store backend.")
.stringConf
.createWithDefaultString("")

val DYNAMIC_CONFIG_STORE_DB_CONNECTION_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.dynamicConfig.store.db.connectionTimeout")
.categories("master", "worker")
.version("0.5.0")
.doc("The connection timeout that a client will wait for a connection from the pool for db store backend.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")

val DYNAMIC_CONFIG_STORE_DB_IDLE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.dynamicConfig.store.db.idleTimeout")
.categories("master", "worker")
.version("0.5.0")
.doc("The idle timeout that a connection is allowed to sit idle in the pool for db store backend.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("600s")

val DYNAMIC_CONFIG_STORE_DB_MAX_LIFETIME: ConfigEntry[Long] =
buildConf("celeborn.dynamicConfig.store.db.maxLifetime")
.categories("master", "worker")
.version("0.5.0")
.doc("The maximum lifetime of a connection in the pool for db store backend.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1800s")

val DYNAMIC_CONFIG_STORE_DB_MAXIMUM_POOL_SIZE: ConfigEntry[Int] =
buildConf("celeborn.dynamicConfig.store.db.maximumPoolSize")
.categories("master", "worker")
.version("0.5.0")
.doc("The maximum pool size of db store backend.")
.intConf
.createWithDefaultString("2")

val DYNAMIC_CONFIG_STORE_DB_FETCH_PAGE_SIZE: ConfigEntry[Int] =
buildConf("celeborn.dynamicConfig.store.db.fetch.pageSize")
.categories("master", "worker")
.version("0.5.0")
.doc("The page size for db store to query configurations.")
.intConf
.createWithDefaultString("1000")

Expand Down
23 changes: 0 additions & 23 deletions conf/hikariPool.properties.template

This file was deleted.

9 changes: 8 additions & 1 deletion docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@ license: |
| Key | Default | Description | Since | Deprecated |
| --- | ------- | ----------- | ----- | ---------- |
| celeborn.cluster.name | default | celeborn cluster name | 0.5.0 | |
| celeborn.dynamicConfig.fetch.pageSize | 1000 | Default page size for config server get configurations. | 0.5.0 | |
| celeborn.dynamicConfig.refresh.interval | 120s | Interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config service. Available options: NONE, FS, DB. Note: NONE means disabling dynamic config store. | 0.4.0 | |
| celeborn.dynamicConfig.store.db.connectionTimeout | 30s | The connection timeout that a client will wait for a connection from the pool for db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.fetch.pageSize | 1000 | The page size for db store to query configurations. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.idleTimeout | 600s | The idle timeout that a connection is allowed to sit idle in the pool for db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.jdbcUrl | | The jdbc url of db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.maxLifetime | 1800s | The maximum lifetime of a connection in the pool for db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.maximumPoolSize | 2 | The maximum pool size of db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.password | | The password of db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.username | | The username of db store backend. | 0.5.0 | |
| celeborn.internal.port.enabled | false | Whether to create a internal port on Masters/Workers for inter-Masters/Workers communication. This is beneficial when SASL authentication is enforced for all interactions between clients and Celeborn Services, but the services can exchange messages without being subject to SASL authentication. | 0.5.0 | |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 | celeborn.shuffle.estimatedPartitionSize.update.initialDelay |
Expand Down
9 changes: 8 additions & 1 deletion docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@ license: |
| Key | Default | Description | Since | Deprecated |
| --- | ------- | ----------- | ----- | ---------- |
| celeborn.cluster.name | default | celeborn cluster name | 0.5.0 | |
| celeborn.dynamicConfig.fetch.pageSize | 1000 | Default page size for config server get configurations. | 0.5.0 | |
| celeborn.dynamicConfig.refresh.interval | 120s | Interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config service. Available options: NONE, FS, DB. Note: NONE means disabling dynamic config store. | 0.4.0 | |
| celeborn.dynamicConfig.store.db.connectionTimeout | 30s | The connection timeout that a client will wait for a connection from the pool for db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.fetch.pageSize | 1000 | The page size for db store to query configurations. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.idleTimeout | 600s | The idle timeout that a connection is allowed to sit idle in the pool for db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.jdbcUrl | | The jdbc url of db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.maxLifetime | 1800s | The maximum lifetime of a connection in the pool for db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.maximumPoolSize | 2 | The maximum pool size of db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.password | | The password of db store backend. | 0.5.0 | |
| celeborn.dynamicConfig.store.db.username | | The username of db store backend. | 0.5.0 | |
| celeborn.internal.port.enabled | false | Whether to create a internal port on Masters/Workers for inter-Masters/Workers communication. This is beneficial when SASL authentication is enforced for all interactions between clients and Celeborn Services, but the services can exchange messages without being subject to SASL authentication. | 0.5.0 | |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import org.apache.celeborn.common.quota.{QuotaManager, ResourceConsumption}
import org.apache.celeborn.common.rpc._
import org.apache.celeborn.common.util.{CelebornHadoopUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.server.common.{HttpService, Service}
import org.apache.celeborn.server.common.service.config.{ConfigService, DynamicConfigServiceFactory}
import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,43 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.common.CelebornConf;

public class DBSessionFactory {
private static final Logger LOG = LoggerFactory.getLogger(DBSessionFactory.class);
private static final String MYBATIS_CONFIG_PATH = "mybatis-config.xml";
private static volatile SqlSessionFactory _instance;

public static SqlSessionFactory get() throws IOException {
public static SqlSessionFactory get(CelebornConf celebornConf) throws IOException {
if (_instance == null) {
synchronized (DBSessionFactory.class) {
if (_instance == null) {
try (InputStream inputStream = Resources.getResourceAsStream(MYBATIS_CONFIG_PATH)) {
Properties properties = new Properties();
properties.setProperty("jdbcUrl", celebornConf.dynamicConfigStoreDbJdbcUrl());
properties.setProperty("username", celebornConf.dynamicConfigStoreDbUsername());
properties.setProperty("password", celebornConf.dynamicConfigStoreDbPassword());
properties.setProperty(
"connectionTimeout",
String.valueOf(celebornConf.dynamicConfigStoreDbConnectionTimeout()));
properties.setProperty(
"idleTimeout", String.valueOf(celebornConf.dynamicConfigStoreDbIdleTimeout()));
properties.setProperty(
"maxLifetime", String.valueOf(celebornConf.dynamicConfigStoreDbMaxLifetime()));
properties.setProperty(
"maximumPoolSize",
String.valueOf(celebornConf.dynamicConfigStoreDbMaximumPoolSize()));

SqlSessionFactoryBuilder builder = new SqlSessionFactoryBuilder();
_instance = builder.build(inputStream);
_instance = builder.build(inputStream, null, properties);
LOG.info("Init sqlSessionFactory success");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public class DbServiceManagerImpl implements IServiceManager {
public DbServiceManagerImpl(CelebornConf celebornConf, ConfigService configServer)
throws IOException {
this.celebornConf = celebornConf;
this.sqlSessionFactory = DBSessionFactory.get();
this.sqlSessionFactory = DBSessionFactory.get(celebornConf);
this.configService = configServer;
this.pageSize = celebornConf.dynamicConfigFetchPageSize();
this.pageSize = celebornConf.dynamicConfigStoreDbFetchPageSize();
this.clusterId = createClusterIfNotExists(getClusterInfoFromConf());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,17 @@

package org.apache.celeborn.server.common.service.store.db;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory;
import org.apache.ibatis.io.Resources;

public class HikariDataSourceFactory extends UnpooledDataSourceFactory {
private static final String CONFIG_PATH_KEY = "HIKARI_CONFIG_PATH";
private static final String HIKARI_PROPERTIES_KEY = "hikariPool.properties";

public HikariDataSourceFactory() throws IOException {
String path = System.getenv(CONFIG_PATH_KEY);
InputStream inputStream = null;
try {
if (path == null) {
inputStream = Resources.getResourceAsStream(HIKARI_PROPERTIES_KEY);
} else {
inputStream = new FileInputStream(path);
}

Properties properties = new Properties();
properties.load(inputStream);
HikariConfig config = new HikariConfig(properties);
this.dataSource = new HikariDataSource(config);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (inputStream != null) {
inputStream.close();
}
}
@Override
public void setProperties(Properties properties) {
HikariConfig config = new HikariConfig(properties);
this.dataSource = new HikariDataSource(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public interface ClusterInfoMapper {
+ "values (#{name}, #{namespace}, #{endpoint}, #{gmtCreate}, #{gmtModify})")
void insert(ClusterInfo clusterInfo);

@Select("SELECT * from celeborn_cluster_info where name = #{clusterName}")
@Select(
"SELECT id, name, namespace, endpoint, gmt_create, gmt_modify from celeborn_cluster_info where name = #{clusterName}")
ClusterInfo getClusterInfo(String clusterName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

public interface ClusterSystemConfigMapper {

@Select("SELECT * from celeborn_cluster_system_config where cluster_id = #{clusterId}")
@Select(
"SELECT id, cluster_id, config_key, config_value, type, gmt_create, gmt_modify "
+ "from celeborn_cluster_system_config where cluster_id = #{clusterId}")
List<ClusterSystemConfig> getClusterSystemConfig(int clusterId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
public interface ClusterTenantConfigMapper {

@Select(
"SELECT * from celeborn_cluster_tenant_config where cluster_id = #{clusterId} and level=#{level} limit #{offset}, #{pageSize}")
"SELECT id, cluster_id, tenant_id, level, user, config_key, config_value, type, gmt_create, gmt_modify "
+ "from celeborn_cluster_tenant_config where cluster_id = #{clusterId} and level=#{level} limit #{offset}, #{pageSize}")
List<ClusterTenantConfig> getClusterTenantConfigs(
@Param("clusterId") int clusterId,
@Param("level") String configLevel,
Expand Down
10 changes: 9 additions & 1 deletion service/src/main/resources/mybatis-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@
<environments default="development">
<environment id="development">
<transactionManager type="JDBC"/>
<dataSource type="org.apache.celeborn.server.common.service.store.db.HikariDataSourceFactory"/>
<dataSource type="org.apache.celeborn.server.common.service.store.db.HikariDataSourceFactory">
<property name="jdbcUrl" value="${jdbcUrl}"/>
<property name="username" value="${username}"/>
<property name="password" value="${password}"/>
<property name="connectionTimeout" value="${connectionTimeout}"/>
<property name="idleTimeout" value="${idleTimeout}"/>
<property name="maxLifetime" value="${maxLifetime}"/>
<property name="maximumPoolSize" value="${maximumPoolSize}"/>
</dataSource>
</environment>
</environments>
<mappers>
Expand Down

0 comments on commit 9ee2121

Please sign in to comment.