diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 199ab7b084da..ac433f2d015c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -23,10 +23,12 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; @@ -38,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -132,7 +135,7 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc @Override protected boolean databaseExistsImpl(String databaseName) { - return JdbcUtils.databaseExists(catalogName, connections, databaseName); + return JdbcUtils.databaseExists(connections, catalogName, databaseName); } @Override @@ -234,10 +237,80 @@ protected void dropTableImpl(Identifier identifier) { } @Override - protected void createTableImpl(Identifier identifier, Schema schema) {} + protected void createTableImpl(Identifier identifier, Schema schema) { + try { + // create table file + getSchemaManager(identifier).createTable(schema); + // Update schema metadata + Path path = getDataTableLocation(identifier); + int insertRecord = + connections.run( + conn -> { + try (PreparedStatement sql = + conn.prepareStatement( + JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) { + sql.setString(1, catalogName); + sql.setString(2, identifier.getDatabaseName()); + sql.setString(3, identifier.getObjectName()); + sql.setString(4, path.toString()); + return sql.executeUpdate(); + } + }); + if (insertRecord == 1) { + LOG.debug("Successfully committed to new table: {}", identifier); + } else { + try { + fileIO.deleteDirectoryQuietly(path); + } catch (Exception ee) { + LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); + } + throw new RuntimeException( + String.format( + "Failed to create table %s in catalog %s", + identifier.getFullName(), catalogName)); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); + } + } + + private SchemaManager getSchemaManager(Identifier identifier) { + return new SchemaManager(fileIO, getDataTableLocation(identifier)) + .withLock(lock(identifier)); + } + + private Lock lock(Identifier identifier) { + return null; + } @Override protected void renameTableImpl(Identifier fromTable, Identifier toTable) { + try { + updateCatalogMetadata(fromTable, toTable); + + Path fromPath = getDataTableLocation(fromTable); + if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) { + // Rename the file system's table directory. Maintain consistency between tables in + // the file system and tables in the Hive Metastore. + Path toPath = getDataTableLocation(toTable); + try { + fileIO.rename(fromPath, toPath); + } catch (IOException e) { + throw new RuntimeException( + "Failed to rename changes of table " + + toTable.getFullName() + + " to underlying files.", + e); + } + // Update table metadata + updateTable(fromTable, toPath.toString(), fromPath.toString()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e); + } + } + + private void updateCatalogMetadata(Identifier fromTable, Identifier toTable) { int updatedRecords = execute( err -> { @@ -270,7 +343,59 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { @Override protected void alterTableImpl(Identifier identifier, List changes) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {} + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + if (!tableExists(identifier)) { + throw new RuntimeException( + String.format("Table is not exists {}", identifier.getFullName())); + } + final SchemaManager schemaManager = getSchemaManager(identifier); + // first commit changes to underlying files + TableSchema schema = schemaManager.commitChanges(changes); + try { + String newMetadataLocation = getDataTableLocation(identifier).toString(); + String oldMetadataLocation = + JdbcUtils.getTableMetadataLocation( + connections, + catalogName, + identifier.getDatabaseName(), + identifier.getObjectName()); + if (!newMetadataLocation.equals(oldMetadataLocation)) { + updateTable(identifier, newMetadataLocation, oldMetadataLocation); + } + } catch (Exception te) { + schemaManager.deleteSchema(schema.id()); + throw new RuntimeException(te); + } + } + + private void updateTable( + Identifier identifier, String newMetadataLocation, String oldMetadataLocation) + throws SQLException, InterruptedException { + int updatedRecords = + connections.run( + conn -> { + try (PreparedStatement sql = + conn.prepareStatement(JdbcUtils.DO_COMMIT_SQL)) { + // UPDATE + sql.setString(1, newMetadataLocation); + sql.setString(2, oldMetadataLocation); + // WHERE + sql.setString(3, catalogName); + sql.setString(4, identifier.getDatabaseName()); + sql.setString(5, identifier.getObjectName()); + sql.setString(6, oldMetadataLocation); + return sql.executeUpdate(); + } + }); + if (updatedRecords == 1) { + LOG.debug("Successfully committed to existing table: {}", identifier.getFullName()); + } else { + throw new RuntimeException( + String.format( + "Failed to update table %s from catalog %s", + identifier.getFullName(), catalogName)); + } + } @Override public String warehouse() { @@ -279,7 +404,23 @@ public String warehouse() { @Override protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { - return null; + if (!tableExists(identifier)) { + throw new TableNotExistException(identifier); + } + Path tableLocation = getDataTableLocation(identifier); + return new SchemaManager(fileIO, tableLocation) + .latest() + .orElseThrow( + () -> new RuntimeException("There is no paimon table in " + tableLocation)); + } + + @Override + public boolean tableExists(Identifier identifier) { + if (isSystemTable(identifier)) { + return super.tableExists(identifier); + } + return JdbcUtils.tableExists( + connections, catalogName, identifier.getDatabaseName(), identifier.getObjectName()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java index a45356b4ab93..cc6c667c708f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java @@ -33,8 +33,5 @@ public final class JdbcCatalogOptions { public static final ConfigOption CLIENT_POOL_SIZE = ConfigOptions.key("client-pool-size").intType().defaultValue(2).withDescription("."); - public static final String METADATA_LOCATION_PROP = "metadata_location"; - public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location"; - private JdbcCatalogOptions() {} } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 4b2604f01a21..e9c825bcc202 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -27,7 +27,9 @@ import java.util.Set; public final class JdbcUtils { - public static final String STRICT_MODE_PROPERTY = JdbcCatalog.PROPERTY_PREFIX + "strict-mode"; + + public static final String METADATA_LOCATION_PROP = "metadata_location"; + public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location"; public static final String CATALOG_TABLE_NAME = "paimon_tables"; public static final String CATALOG_NAME = "catalog_name"; public static final String TABLE_DATABASE = "table_database"; @@ -37,9 +39,9 @@ public final class JdbcUtils { "UPDATE " + CATALOG_TABLE_NAME + " SET " - + JdbcCatalogOptions.METADATA_LOCATION_PROP + + METADATA_LOCATION_PROP + " = ? , " - + JdbcCatalogOptions.PREVIOUS_METADATA_LOCATION_PROP + + PREVIOUS_METADATA_LOCATION_PROP + " = ? " + " WHERE " + CATALOG_NAME @@ -48,7 +50,7 @@ public final class JdbcUtils { + " = ? AND " + TABLE_NAME + " = ? AND " - + JdbcCatalogOptions.METADATA_LOCATION_PROP + + METADATA_LOCATION_PROP + " = ?"; static final String CREATE_CATALOG_TABLE = "CREATE TABLE " @@ -60,9 +62,9 @@ public final class JdbcUtils { + " VARCHAR(255) NOT NULL," + TABLE_NAME + " VARCHAR(255) NOT NULL," - + JdbcCatalogOptions.METADATA_LOCATION_PROP + + METADATA_LOCATION_PROP + " VARCHAR(1000)," - + JdbcCatalogOptions.PREVIOUS_METADATA_LOCATION_PROP + + PREVIOUS_METADATA_LOCATION_PROP + " VARCHAR(1000)," + "PRIMARY KEY (" + CATALOG_NAME @@ -72,7 +74,7 @@ public final class JdbcUtils { + TABLE_NAME + ")" + ")"; - static final String SELECT_TABLE_SQL = + static final String GET_TABLE_SQL = "SELECT * FROM " + CATALOG_TABLE_NAME + " WHERE " @@ -153,9 +155,9 @@ public final class JdbcUtils { + ", " + TABLE_NAME + ", " - + JdbcCatalogOptions.METADATA_LOCATION_PROP + + METADATA_LOCATION_PROP + ", " - + JdbcCatalogOptions.PREVIOUS_METADATA_LOCATION_PROP + + PREVIOUS_METADATA_LOCATION_PROP + ") " + " VALUES (?,?,?,?,null)"; @@ -266,7 +268,7 @@ public static Properties filterAndRemovePrefix(Map properties, S } public static boolean databaseExists( - String catalogName, JdbcClientPool connections, String databaseName) { + JdbcClientPool connections, String catalogName, String databaseName) { if (exists(connections, JdbcUtils.GET_DATABASE_SQL, catalogName, databaseName)) { return true; @@ -278,6 +280,46 @@ public static boolean databaseExists( return false; } + /** + * Get table metadata location + * + * @param catalogName + * @param connections + * @param databaseName + * @param tableName + * @return + */ + public static String getTableMetadataLocation( + JdbcClientPool connections, String catalogName, String databaseName, String tableName) { + try { + return connections.run( + conn -> { + try (PreparedStatement preparedStatement = + conn.prepareStatement(GET_TABLE_SQL)) { + preparedStatement.setString(1, catalogName); + preparedStatement.setString(2, databaseName); + preparedStatement.setString(3, tableName); + try (ResultSet rs = preparedStatement.executeQuery()) { + return rs.getString(METADATA_LOCATION_PROP); + } + } + }); + } catch (SQLException e) { + throw new UncheckedSQLException(e, "Failed to execute query: %s", GET_TABLE_SQL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e, "Interrupted in SQL query"); + } + } + + public static boolean tableExists( + JdbcClientPool connections, String catalogName, String databaseName, String tableName) { + if (exists(connections, JdbcUtils.GET_TABLE_SQL, catalogName, databaseName, tableName)) { + return true; + } + return false; + } + @SuppressWarnings("checkstyle:NestedTryDepth") private static boolean exists(JdbcClientPool connections, String sql, String... args) { try {