Skip to content

Commit

Permalink
fix copyright header
Browse files Browse the repository at this point in the history
support quoting identifiers for various temp table flows
  • Loading branch information
gs-rpant1729 committed Dec 4, 2024
1 parent e920eba commit 6010bf2
Show file tree
Hide file tree
Showing 39 changed files with 245 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public boolean process(String line) throws Exception
String tableName = specifiedTableName != null ? specifiedTableName : "test" + (getTables(connection).size() + 1);
try (Statement statement = connection.createStatement())
{
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.getTemporaryPathForFile(), relationalResultColumns));
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.getTemporaryPathForFile(), relationalResultColumns, databaseConnection.quoteIdentifiers));
this.client.println("Cached into table: '" + tableName + "'. Launching DataCube...");

String functionBodyCode = "#>{" + DataCube.getLocalDatabasePath() + "." + tableName + "}#->from(" + DataCube.getLocalRuntimePath() + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public boolean process(String line) throws Exception
Connection connection = ConnectionHelper.getConnection(databaseConnection, client.getPlanExecutor());
Statement statement = connection.createStatement())
{
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tokens[2]));
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tokens[2], databaseConnection.quoteIdentifiers));
this.client.println("Loaded into table: '" + tableName + "'");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void beforeStep()
Path tempFile = Files.createTempFile("walkthrough-sample-data", ".csv");
FileOutputStream fos = new FileOutputStream(tempFile.toFile());
IOUtils.copy(Objects.requireNonNull(inputStream, "Can't extract sample data for walkthrough"), fos);
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.toString()));
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.toString(), databaseConnection.quoteIdentifiers));
}
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ public void load(Client client)
MutableList<Table> tables = getTables(connection);
if (tables.anySatisfy(t -> t.name.equals(this.tableName)))
{
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().dropTable(tableName));
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().dropTable(tableName, databaseConnection.quoteIdentifiers));
}
Path tempFile = Files.createTempFile("sample-data" + this.name, ".csv");
FileOutputStream outputStream = new FileOutputStream(tempFile.toFile());
IOUtils.copy(Objects.requireNonNull(inputStream, "Can't extract sample data '" + this.name + "' from " + this.csvFilePath), outputStream);
outputStream.close(); // explicitly close output stream to let database access this file, else this would throw `java.sql.SQLException: IO Error: File is already open in` error on Windows
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.toString()));
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.toString(), databaseConnection.quoteIdentifiers));

// post check
tables = getTables(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public boolean process(String line) throws Exception
String tableName = specifiedTableName != null ? specifiedTableName : "test" + (getTables(connection).size() + 1);
try (Statement statement = connection.createStatement())
{
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.getTemporaryPathForFile(), relationalResultColumns));
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tempFile.getTemporaryPathForFile(), relationalResultColumns, databaseConnection.quoteIdentifiers));
this.client.println("Cached into table: '" + tableName + "'");
this.client.printDebug(printExecutionTime(startTime));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean process(String line) throws Exception
String tableName = tokens[2];
try (Statement statement = connection.createStatement())
{
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().dropTable(tableName));
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().dropTable(tableName, databaseConnection.quoteIdentifiers));
this.client.println("Dropped table: '" + tableName + "'");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public boolean process(String line) throws Exception

try (Statement statement = connection.createStatement())
{
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tokens[1]));
statement.executeUpdate(DatabaseManager.fromString(databaseConnection.type.name()).relationalDatabaseSupport().load(tableName, tokens[1], databaseConnection.quoteIdentifiers));
this.client.println("Loaded into table: '" + tableName + "'");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
public class AthenaCommands extends RelationalDatabaseCommands
{
@Override
public String dropTempTable(String tableName)
public String dropTempTable(String tableName, Boolean quoteIdentifiers)
{
throw new UnsupportedOperationException("not yet implemented");
}

@Override
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation)
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation, Boolean quoteIdentifiers)
{
throw new UnsupportedOperationException("not yet implemented");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@
public class BigQueryCommands extends RelationalDatabaseCommands
{
@Override
public String dropTempTable(String tableName)
public String getQuoteCharacter()
{
return "Drop table if exists " + tableName;
return "`";
}

@Override
public String dropTempTable(String tableName, Boolean quoteIdentifiers)
{
return "Drop table if exists " + mayQuoteIdentifier(tableName, quoteIdentifiers);
}

@Override
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation)
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation, Boolean quoteIdentifiers)
{
throw new UnsupportedOperationException("not yet implemented");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@
public class DatabricksCommands extends RelationalDatabaseCommands
{
@Override
public String dropTempTable(String tableName)
public String getQuoteCharacter()
{
return "Drop table if exists " + tableName;
return "`";
}

@Override
public String dropTempTable(String tableName, Boolean quoteIdentifiers)
{
return "Drop table if exists " + mayQuoteIdentifier(tableName, quoteIdentifiers);
}

@Override
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation)
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation, Boolean quoteIdentifiers)
{
throw new UnsupportedOperationException("not yet implemented");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
public class DuckDBCommands extends RelationalDatabaseCommands
{
@Override
public String dropTempTable(String tableName)
public String dropTempTable(String tableName, Boolean quoteIdentifiers)
{
throw new UnsupportedOperationException("not yet implemented");
}

@Override
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation)
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation, Boolean quoteIdentifiers)
{
throw new UnsupportedOperationException("not yet implemented");
}
Expand All @@ -43,22 +43,22 @@ public IngestionMethod getDefaultIngestionMethod()
}

@Override
public String load(String tableName, String location)
public String load(String tableName, String location, Boolean quoteIdentifiers)
{
return "CREATE TABLE " + tableName + " AS SELECT * FROM read_csv('" + location + "', header=true);";
return "CREATE TABLE " + mayQuoteIdentifier(tableName, quoteIdentifiers) + " AS SELECT * FROM read_csv('" + location + "', header=true);";
}

@Override
public String load(String tableName, String location, List<Column> columns)
public String load(String tableName, String location, List<Column> columns, Boolean quoteIdentifiers)
{
String columnTypesString = columns.stream().map(c -> String.format("'%s': '%s'", c.name, c.type)).collect(Collectors.joining(", ", "{", "}"));
return "CREATE TABLE " + tableName + " AS SELECT * FROM read_csv('" + location + "', header = true, columns = " + columnTypesString + ");";
String columnTypesString = columns.stream().map(c -> String.format("'%s': '%s'", mayQuoteIdentifier(c.name, quoteIdentifiers), c.type)).collect(Collectors.joining(", ", "{", "}"));
return "CREATE TABLE " + mayQuoteIdentifier(tableName, quoteIdentifiers) + " AS SELECT * FROM read_csv('" + location + "', header = true, columns = " + columnTypesString + ");";
}

@Override
public String dropTable(String tableName)
public String dropTable(String tableName, Boolean quoteIdentifiers)
{
return "DROP TABLE " + tableName + ";";
return "DROP TABLE " + mayQuoteIdentifier(tableName, quoteIdentifiers) + ";";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testLoadCommand() throws Exception
Statement statement = connection.createStatement()
)
{
String loadSql = DUCK_DB_COMMANDS.load("load_command_test_table", Objects.requireNonNull(TestDuckDBCommands.class.getClassLoader().getResource("personTable.csv")).getFile());
String loadSql = DUCK_DB_COMMANDS.load("load_command_test_table", Objects.requireNonNull(TestDuckDBCommands.class.getClassLoader().getResource("personTable.csv")).getFile(), false);
statement.execute(loadSql);
try (ResultSet rs = statement.executeQuery("select * from load_command_test_table"))
{
Expand All @@ -85,6 +85,33 @@ public void testLoadCommand() throws Exception
}
}

@Test
public void testLoadCommandWithQuoteIdentifiers() throws Exception
{
try (
Connection connection = CONNECTION_MANAGER_SELECTOR.getDatabaseConnection((Subject) null, this.testDuckDBConnection());
Statement statement = connection.createStatement()
)
{
String loadSql = DUCK_DB_COMMANDS.load("load_command_test_table", Objects.requireNonNull(TestDuckDBCommands.class.getClassLoader().getResource("personTable.csv")).getFile(), true);
statement.execute(loadSql);
try (ResultSet rs = statement.executeQuery("select * from \"load_command_test_table\""))
{
assertOnColumnCountAndColumnTypes(rs.getMetaData(), 4, "(id:BIGINT)|(firstName:VARCHAR)|(lastName:VARCHAR)|(age:BIGINT)");
assertOnResultSetCSV(
rs,
"1,Peter,Smith,23\r\n" +
"2,John,Johnson,22\r\n" +
"3,John,Hill,12\r\n" +
"4,Anthony,Allen,22\r\n" +
"5,Fabrice,Roberts,34\r\n" +
"6,Oliver,Hill,32\r\n" +
"7,David,Harris,35\r\n"
);
}
}
}

@Test
public void testLoadCommandWithProvidedTypes() throws Exception
{
Expand All @@ -99,7 +126,7 @@ public void testLoadCommandWithProvidedTypes() throws Exception
new Column("lastName", "VARCHAR(32)"),
new Column("age", "DOUBLE")
);
String loadSql = DUCK_DB_COMMANDS.load("load_command_with_types_test_table", Objects.requireNonNull(TestDuckDBCommands.class.getClassLoader().getResource("personTable.csv")).getFile(), columns);
String loadSql = DUCK_DB_COMMANDS.load("load_command_with_types_test_table", Objects.requireNonNull(TestDuckDBCommands.class.getClassLoader().getResource("personTable.csv")).getFile(), columns, false);
statement.execute(loadSql);
try (ResultSet rs = statement.executeQuery("select * from load_command_with_types_test_table"))
{
Expand Down Expand Up @@ -130,7 +157,7 @@ public void testLoadCommandWithProvidedTypesFromRelationalResult() throws Except
Statement statement = connection.createStatement()
)
{
statement.execute(DUCK_DB_COMMANDS.load("load_command_with_types_test_table_1", Objects.requireNonNull(TestDuckDBCommands.class.getClassLoader().getResource("personTable.csv")).getFile()));
statement.execute(DUCK_DB_COMMANDS.load("load_command_with_types_test_table_1", Objects.requireNonNull(TestDuckDBCommands.class.getClassLoader().getResource("personTable.csv")).getFile(), false));
SQLExecutionNode sqlExecutionNode = new SQLExecutionNode();
sqlExecutionNode.isResultColumnsDynamic = true;
sqlExecutionNode.connection = duckDbConnection;
Expand All @@ -157,7 +184,7 @@ public void testLoadCommandWithProvidedTypesFromRelationalResult() throws Except
Statement statement = connection.createStatement()
)
{
String loadSql = DUCK_DB_COMMANDS.load("load_command_with_types_test_table_2", tempFile.path.toString(), relationalResult.getResultSetColumns());
String loadSql = DUCK_DB_COMMANDS.load("load_command_with_types_test_table_2", tempFile.path.toString(), relationalResult.getResultSetColumns(), false);
statement.execute(loadSql);
try (ResultSet rs = statement.executeQuery("select * from load_command_with_types_test_table_2 order by \"Average Age\""))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@
public class MemSQLCommands extends RelationalDatabaseCommands
{
@Override
public String dropTempTable(String tableName)
public String getQuoteCharacter()
{
return "DROP TEMPORARY TABLE IF EXISTS " + tableName;
return "`";
}

@Override
public String dropTempTable(String tableName, Boolean quoteIdentifiers)
{
return "DROP TEMPORARY TABLE IF EXISTS " + mayQuoteIdentifier(tableName, quoteIdentifiers);
}

@Override
Expand All @@ -39,12 +45,12 @@ public String dropTempTable(String tableName)
https://www.mysqltutorial.org/import-csv-file-mysql-table/
*/
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String clientFileName)
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String clientFileName, Boolean quoteIdentifiers)
{
return Lists.mutable.with(
"CREATE TEMPORARY TABLE " + tableName + " (" + columns.stream().map(c -> c.name + " " + c.type).collect(Collectors.joining(", ")) + ")",
"CREATE TEMPORARY TABLE " + mayQuoteIdentifier(tableName, quoteIdentifiers) + " (" + columns.stream().map(c -> mayQuoteIdentifier(c.name, quoteIdentifiers) + " " + c.type).collect(Collectors.joining(", ")) + ")",
"LOAD DATA LOCAL INFILE '" + clientFileName.replace("\\", "/") + "' \n" +
"INTO TABLE `" + tableName + "` \n" +
"INTO TABLE " + mayQuoteIdentifier(tableName, quoteIdentifiers) + " \n" +
"IGNORE 1 LINES;"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
public class PostgresCommands extends RelationalDatabaseCommands
{
@Override
public String dropTempTable(String tableName)
public String dropTempTable(String tableName, Boolean quoteIdentifiers)
{
return "Drop table if exists " + tableName;
return "Drop table if exists " + mayQuoteIdentifier(tableName, quoteIdentifiers);
}

@Override
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation)
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation, Boolean quoteIdentifiers)
{
throw new UnsupportedOperationException("not yet implemented");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
public class RedshiftCommands extends RelationalDatabaseCommands
{
@Override
public String dropTempTable(String tableName)
public String dropTempTable(String tableName, Boolean quoteIdentifiers)
{
return "drop table if exists " + tableName;
return "drop table if exists " + mayQuoteIdentifier(tableName, quoteIdentifiers);
}

@Override
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation)
public List<String> createAndLoadTempTable(String tableName, List<Column> columns, String optionalCSVFileLocation, Boolean quoteIdentifiers)
{
throw new UnsupportedOperationException("not yet implemented");
}
Expand Down
Loading

0 comments on commit 6010bf2

Please sign in to comment.