Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Feb 6, 2024
1 parent be015a3 commit a0f2b2b
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 17 deletions.
149 changes: 145 additions & 4 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
Expand All @@ -38,6 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -132,7 +135,7 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc

@Override
protected boolean databaseExistsImpl(String databaseName) {
return JdbcUtils.databaseExists(catalogName, connections, databaseName);
return JdbcUtils.databaseExists(connections, catalogName, databaseName);
}

@Override
Expand Down Expand Up @@ -234,10 +237,80 @@ protected void dropTableImpl(Identifier identifier) {
}

@Override
protected void createTableImpl(Identifier identifier, Schema schema) {}
protected void createTableImpl(Identifier identifier, Schema schema) {
try {
// create table file
getSchemaManager(identifier).createTable(schema);
// Update schema metadata
Path path = getDataTableLocation(identifier);
int insertRecord =
connections.run(
conn -> {
try (PreparedStatement sql =
conn.prepareStatement(
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
sql.setString(1, catalogName);
sql.setString(2, identifier.getDatabaseName());
sql.setString(3, identifier.getObjectName());
sql.setString(4, path.toString());
return sql.executeUpdate();
}
});
if (insertRecord == 1) {
LOG.debug("Successfully committed to new table: {}", identifier);
} else {
try {
fileIO.deleteDirectoryQuietly(path);
} catch (Exception ee) {
LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee);
}
throw new RuntimeException(
String.format(
"Failed to create table %s in catalog %s",
identifier.getFullName(), catalogName));
}
} catch (Exception e) {
throw new RuntimeException("Failed to create table " + identifier.getFullName(), e);
}
}

private SchemaManager getSchemaManager(Identifier identifier) {
return new SchemaManager(fileIO, getDataTableLocation(identifier))
.withLock(lock(identifier));
}

private Lock lock(Identifier identifier) {
return null;
}

@Override
protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
try {
updateCatalogMetadata(fromTable, toTable);

Path fromPath = getDataTableLocation(fromTable);
if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
// Rename the file system's table directory. Maintain consistency between tables in
// the file system and tables in the Hive Metastore.
Path toPath = getDataTableLocation(toTable);
try {
fileIO.rename(fromPath, toPath);
} catch (IOException e) {
throw new RuntimeException(
"Failed to rename changes of table "
+ toTable.getFullName()
+ " to underlying files.",
e);
}
// Update table metadata
updateTable(fromTable, toPath.toString(), fromPath.toString());
}
} catch (Exception e) {
throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e);
}
}

private void updateCatalogMetadata(Identifier fromTable, Identifier toTable) {
int updatedRecords =
execute(
err -> {
Expand Down Expand Up @@ -270,7 +343,59 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {

@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {}
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
if (!tableExists(identifier)) {
throw new RuntimeException(
String.format("Table is not exists {}", identifier.getFullName()));
}
final SchemaManager schemaManager = getSchemaManager(identifier);
// first commit changes to underlying files
TableSchema schema = schemaManager.commitChanges(changes);
try {
String newMetadataLocation = getDataTableLocation(identifier).toString();
String oldMetadataLocation =
JdbcUtils.getTableMetadataLocation(
connections,
catalogName,
identifier.getDatabaseName(),
identifier.getObjectName());
if (!newMetadataLocation.equals(oldMetadataLocation)) {
updateTable(identifier, newMetadataLocation, oldMetadataLocation);
}
} catch (Exception te) {
schemaManager.deleteSchema(schema.id());
throw new RuntimeException(te);
}
}

private void updateTable(
Identifier identifier, String newMetadataLocation, String oldMetadataLocation)
throws SQLException, InterruptedException {
int updatedRecords =
connections.run(
conn -> {
try (PreparedStatement sql =
conn.prepareStatement(JdbcUtils.DO_COMMIT_SQL)) {
// UPDATE
sql.setString(1, newMetadataLocation);
sql.setString(2, oldMetadataLocation);
// WHERE
sql.setString(3, catalogName);
sql.setString(4, identifier.getDatabaseName());
sql.setString(5, identifier.getObjectName());
sql.setString(6, oldMetadataLocation);
return sql.executeUpdate();
}
});
if (updatedRecords == 1) {
LOG.debug("Successfully committed to existing table: {}", identifier.getFullName());
} else {
throw new RuntimeException(
String.format(
"Failed to update table %s from catalog %s",
identifier.getFullName(), catalogName));
}
}

@Override
public String warehouse() {
Expand All @@ -279,7 +404,23 @@ public String warehouse() {

@Override
protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
return null;
if (!tableExists(identifier)) {
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
return new SchemaManager(fileIO, tableLocation)
.latest()
.orElseThrow(
() -> new RuntimeException("There is no paimon table in " + tableLocation));
}

@Override
public boolean tableExists(Identifier identifier) {
if (isSystemTable(identifier)) {
return super.tableExists(identifier);
}
return JdbcUtils.tableExists(
connections, catalogName, identifier.getDatabaseName(), identifier.getObjectName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,5 @@ public final class JdbcCatalogOptions {
public static final ConfigOption<Integer> CLIENT_POOL_SIZE =
ConfigOptions.key("client-pool-size").intType().defaultValue(2).withDescription(".");

public static final String METADATA_LOCATION_PROP = "metadata_location";
public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location";

private JdbcCatalogOptions() {}
}
62 changes: 52 additions & 10 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.util.Set;

public final class JdbcUtils {
public static final String STRICT_MODE_PROPERTY = JdbcCatalog.PROPERTY_PREFIX + "strict-mode";

public static final String METADATA_LOCATION_PROP = "metadata_location";
public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location";
public static final String CATALOG_TABLE_NAME = "paimon_tables";
public static final String CATALOG_NAME = "catalog_name";
public static final String TABLE_DATABASE = "table_database";
Expand All @@ -37,9 +39,9 @@ public final class JdbcUtils {
"UPDATE "
+ CATALOG_TABLE_NAME
+ " SET "
+ JdbcCatalogOptions.METADATA_LOCATION_PROP
+ METADATA_LOCATION_PROP
+ " = ? , "
+ JdbcCatalogOptions.PREVIOUS_METADATA_LOCATION_PROP
+ PREVIOUS_METADATA_LOCATION_PROP
+ " = ? "
+ " WHERE "
+ CATALOG_NAME
Expand All @@ -48,7 +50,7 @@ public final class JdbcUtils {
+ " = ? AND "
+ TABLE_NAME
+ " = ? AND "
+ JdbcCatalogOptions.METADATA_LOCATION_PROP
+ METADATA_LOCATION_PROP
+ " = ?";
static final String CREATE_CATALOG_TABLE =
"CREATE TABLE "
Expand All @@ -60,9 +62,9 @@ public final class JdbcUtils {
+ " VARCHAR(255) NOT NULL,"
+ TABLE_NAME
+ " VARCHAR(255) NOT NULL,"
+ JdbcCatalogOptions.METADATA_LOCATION_PROP
+ METADATA_LOCATION_PROP
+ " VARCHAR(1000),"
+ JdbcCatalogOptions.PREVIOUS_METADATA_LOCATION_PROP
+ PREVIOUS_METADATA_LOCATION_PROP
+ " VARCHAR(1000),"
+ "PRIMARY KEY ("
+ CATALOG_NAME
Expand All @@ -72,7 +74,7 @@ public final class JdbcUtils {
+ TABLE_NAME
+ ")"
+ ")";
static final String SELECT_TABLE_SQL =
static final String GET_TABLE_SQL =
"SELECT * FROM "
+ CATALOG_TABLE_NAME
+ " WHERE "
Expand Down Expand Up @@ -153,9 +155,9 @@ public final class JdbcUtils {
+ ", "
+ TABLE_NAME
+ ", "
+ JdbcCatalogOptions.METADATA_LOCATION_PROP
+ METADATA_LOCATION_PROP
+ ", "
+ JdbcCatalogOptions.PREVIOUS_METADATA_LOCATION_PROP
+ PREVIOUS_METADATA_LOCATION_PROP
+ ") "
+ " VALUES (?,?,?,?,null)";

Expand Down Expand Up @@ -266,7 +268,7 @@ public static Properties filterAndRemovePrefix(Map<String, String> properties, S
}

public static boolean databaseExists(
String catalogName, JdbcClientPool connections, String databaseName) {
JdbcClientPool connections, String catalogName, String databaseName) {

if (exists(connections, JdbcUtils.GET_DATABASE_SQL, catalogName, databaseName)) {
return true;
Expand All @@ -278,6 +280,46 @@ public static boolean databaseExists(
return false;
}

/**
* Get table metadata location
*
* @param catalogName
* @param connections
* @param databaseName
* @param tableName
* @return
*/
public static String getTableMetadataLocation(
JdbcClientPool connections, String catalogName, String databaseName, String tableName) {
try {
return connections.run(
conn -> {
try (PreparedStatement preparedStatement =
conn.prepareStatement(GET_TABLE_SQL)) {
preparedStatement.setString(1, catalogName);
preparedStatement.setString(2, databaseName);
preparedStatement.setString(3, tableName);
try (ResultSet rs = preparedStatement.executeQuery()) {
return rs.getString(METADATA_LOCATION_PROP);
}
}
});
} catch (SQLException e) {
throw new UncheckedSQLException(e, "Failed to execute query: %s", GET_TABLE_SQL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e, "Interrupted in SQL query");
}
}

public static boolean tableExists(
JdbcClientPool connections, String catalogName, String databaseName, String tableName) {
if (exists(connections, JdbcUtils.GET_TABLE_SQL, catalogName, databaseName, tableName)) {
return true;
}
return false;
}

@SuppressWarnings("checkstyle:NestedTryDepth")
private static boolean exists(JdbcClientPool connections, String sql, String... args) {
try {
Expand Down

0 comments on commit a0f2b2b

Please sign in to comment.