diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 995f00ae2248..2c1d1e8df4dc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -18,6 +18,7 @@ package org.apache.paimon.catalog; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -47,6 +48,8 @@ public class FileSystemCatalog extends AbstractCatalog { private final Path warehouse; + private ClientPool.ClientPoolImpl clientPool; + public FileSystemCatalog(FileIO fileIO, Path warehouse) { super(fileIO); this.warehouse = warehouse; @@ -159,7 +162,10 @@ private SchemaManager schemaManager(Identifier identifier) { @Override public Optional lockContext() { - return LockContextUtils.lockContext(catalogOptions, "filesystem"); + if (clientPool == null) { + this.clientPool = LockContextUtils.tryInitializeClientPool(catalogOptions); + } + return LockContextUtils.lockContext(this.clientPool, catalogOptions, "filesystem"); } @Override @@ -194,7 +200,7 @@ private static String database(Path path) { @Override public void close() throws Exception { - LockContextUtils.close(); + LockContextUtils.close(clientPool); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java index 99716946e9d4..699c54de4474 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/LockContextUtils.java @@ -18,6 +18,7 @@ package org.apache.paimon.catalog; +import org.apache.paimon.client.ClientPool; import org.apache.paimon.jdbc.JdbcCatalogFactory; import org.apache.paimon.jdbc.JdbcCatalogLock; import org.apache.paimon.jdbc.JdbcClientPool; @@ -36,18 +37,15 @@ public class LockContextUtils { private static final Logger LOG = LoggerFactory.getLogger(FileSystemCatalog.class); - private static JdbcClientPool connections; - public static Optional lockContext( - Options catalogOptions, String catalogKey) { - String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE); - if (lockType == null) { + ClientPool.ClientPoolImpl clientPool, Options catalogOptions, String catalogKey) { + if (clientPool == null) { return Optional.of(new AbstractCatalog.OptionLockContext(catalogOptions)); } + String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE); switch (lockType) { case JdbcCatalogFactory.IDENTIFIER: - // Try init jdbc connections. - tryInitializeJdbcConnections(catalogOptions); + JdbcClientPool connections = (JdbcClientPool) clientPool; return Optional.of( new JdbcCatalogLock.JdbcLockContext( connections, catalogKey, catalogOptions)); @@ -57,27 +55,44 @@ public static Optional lockContext( } } - private static void tryInitializeJdbcConnections(Options catalogOptions) { - if (connections == null) { - connections = - new JdbcClientPool( - catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE), - catalogOptions.get(CatalogOptions.URI.key()), - catalogOptions.toMap()); - try { - JdbcUtils.createDistributedLockTable(connections, catalogOptions); - } catch (SQLException e) { - throw new RuntimeException("Cannot initialize JDBC distributed lock.", e); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted in call to initialize", e); - } + public static ClientPool.ClientPoolImpl tryInitializeClientPool(Options catalogOptions) { + String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE); + if (lockType == null) { + return null; + } + switch (lockType) { + case JdbcCatalogFactory.IDENTIFIER: + JdbcClientPool connections = + new JdbcClientPool( + catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE), + catalogOptions.get(CatalogOptions.URI.key()), + catalogOptions.toMap()); + try { + JdbcUtils.createDistributedLockTable(connections, catalogOptions); + } catch (SQLException e) { + throw new RuntimeException("Cannot initialize JDBC distributed lock.", e); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted in call to initialize", e); + } + return connections; + default: + LOG.warn("Unsupported lock type:" + lockType); + return null; } } - public static void close() { - if (connections != null && !connections.isClosed()) { - connections.close(); - connections = null; + public static void close(ClientPool.ClientPoolImpl clientPool) { + if (clientPool == null) { + return; + } + if (clientPool instanceof JdbcClientPool) { + JdbcClientPool connections = (JdbcClientPool) clientPool; + if (!connections.isClosed()) { + connections.close(); + } + } else { + clientPool.close(); } + clientPool = null; } }