Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manually recreate a table if any column needs to add/remove constraint #45

Merged
merged 7 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions includes/sql_generator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ struct table_def {
std::string to_escaped_string() const;
};

void find_primary_keys(
const std::vector<column_def> &cols,
std::vector<const column_def *> &columns_pk,
std::vector<const column_def *> *columns_regular = nullptr);

bool schema_exists(duckdb::Connection &con, const std::string &db_name,
const std::string &schema_name);

Expand All @@ -27,8 +32,8 @@ void create_schema(duckdb::Connection &con, const std::string &db_name,
bool table_exists(duckdb::Connection &con, const table_def &table);

void create_table(duckdb::Connection &con, const table_def &table,
const std::vector<const column_def *> &columns_pk,
const std::vector<column_def> &all_columns);
const std::vector<column_def> &all_columns,
const std::set<std::string> &columns_with_default_value);

std::vector<column_def> describe_table(duckdb::Connection &con,
const table_def &table);
Expand Down
21 changes: 4 additions & 17 deletions src/motherduck_destination_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ int find_optional_property(
const std::string &property_name, int default_value,
const std::function<int(const std::string &)> &parse) {
auto token_it = config.find(property_name);
return token_it == config.end() || token_it->second.empty() ? default_value : parse(token_it->second);
return token_it == config.end() || token_it->second.empty()
? default_value
: parse(token_it->second);
}

template <typename T> std::string get_schema_name(const T *request) {
Expand Down Expand Up @@ -134,19 +136,6 @@ void process_file(
arrow_array_stream.release(&arrow_array_stream);
}

void find_primary_keys(
const std::vector<column_def> &cols,
std::vector<const column_def *> &columns_pk,
std::vector<const column_def *> *columns_regular = nullptr) {
for (auto &col : cols) {
if (col.primary_key) {
columns_pk.push_back(&col);
} else if (columns_regular != nullptr) {
columns_regular->push_back(&col);
}
}
}

grpc::Status DestinationSdkImpl::ConfigurationForm(
::grpc::ServerContext *context,
const ::fivetran_sdk::ConfigurationFormRequest *request,
Expand Down Expand Up @@ -259,10 +248,8 @@ grpc::Status DestinationSdkImpl::CreateTable(
create_schema(*con, db_name, schema_name);
}

std::vector<const column_def *> columns_pk;
const auto cols = get_duckdb_columns(request->table().columns());
find_primary_keys(cols, columns_pk);
create_table(*con, table, columns_pk, cols);
create_table(*con, table, cols, {});
response->set_success(true);
} catch (const std::exception &e) {
mdlog::severe("CreateTable endpoint failed for schema <" +
Expand Down
245 changes: 183 additions & 62 deletions src/sql_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,42 @@ void write_joined(
}
}

void find_primary_keys(const std::vector<column_def> &cols,
std::vector<const column_def *> &columns_pk,
std::vector<const column_def *> *columns_regular) {
for (auto &col : cols) {
if (col.primary_key) {
columns_pk.push_back(&col);
} else if (columns_regular != nullptr) {
columns_regular->push_back(&col);
}
}
}

std::string
make_full_column_list(const std::vector<const column_def *> &columns_pk,
const std::vector<const column_def *> &columns_regular) {
std::ostringstream full_column_list;
if (!columns_pk.empty()) {
write_joined(full_column_list, columns_pk, print_column);
// tiny troubleshooting assist; primary columns are separated from regular
// columns by 2 spaces
full_column_list << ", ";
}
write_joined(full_column_list, columns_regular, print_column);

return full_column_list.str();
}

void run_query(duckdb::Connection &con, const std::string &log_prefix,
const std::string &query, const std::string &error_message) {
mdlog::info(log_prefix + ": " + query);
auto result = con.Query(query);
if (result->HasError()) {
throw std::runtime_error(error_message + ": " + result->GetError());
}
}

// DuckDB querying
// TODO: add test for schema or remove the logic if it's unused
bool schema_exists(duckdb::Connection &con, const std::string &db_name,
Expand Down Expand Up @@ -83,10 +119,27 @@ void create_schema(duckdb::Connection &con, const std::string &db_name,
con.Query(query);
}

std::string get_default_value(duckdb::LogicalTypeId type) {
switch (type) {
case duckdb::LogicalTypeId::VARCHAR:
return "''";
case duckdb::LogicalTypeId::DATE:
case duckdb::LogicalTypeId::TIMESTAMP:
case duckdb::LogicalTypeId::TIMESTAMP_TZ:
return "'epoch'";
default:
return "0";
}
}

void create_table(duckdb::Connection &con, const table_def &table,
const std::vector<const column_def *> &columns_pk,
const std::vector<column_def> &all_columns) {
const std::vector<column_def> &all_columns,
const std::set<std::string> &columns_with_default_value) {
const std::string absolute_table_name = table.to_escaped_string();

std::vector<const column_def *> columns_pk;
find_primary_keys(all_columns, columns_pk);

std::ostringstream ddl;
ddl << "CREATE OR REPLACE TABLE " << absolute_table_name << " (";

Expand All @@ -96,6 +149,11 @@ void create_table(duckdb::Connection &con, const table_def &table,
if (col.type == duckdb::LogicalTypeId::DECIMAL) {
ddl << " (" << col.width << "," << col.scale << ")";
}
if (columns_with_default_value.find(col.name) !=
columns_with_default_value.end()) {
ddl << " DEFAULT " + get_default_value(col.type);
}

ddl << ", "; // DuckDB allows trailing commas
}

Expand Down Expand Up @@ -164,53 +222,74 @@ std::vector<column_def> describe_table(duckdb::Connection &con,
return columns;
}

void alter_table(duckdb::Connection &con, const table_def &table,
const std::vector<column_def> &columns) {

void alter_table_recreate(duckdb::Connection &con, const table_def &table,
const std::vector<column_def> &all_columns,
const std::set<std::string> &common_columns) {
long timestamp = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
auto temp_table =
table_def{table.db_name, table.schema_name,
"tmp_" + table.table_name + "_" + std::to_string(timestamp)};
auto absolute_table_name = table.to_escaped_string();
std::set<std::string> alter_types;
std::set<std::string> added_columns;
std::set<std::string> deleted_columns;
auto absolute_temp_table_name = temp_table.to_escaped_string();

const auto &existing_columns = describe_table(con, table);
std::map<std::string, column_def> new_column_map;
run_query(con, "renaming table",
"ALTER TABLE " + table.to_escaped_string() + " RENAME TO " +
KeywordHelper::WriteQuoted(temp_table.table_name, '"'),
"Could not rename table <" + absolute_table_name + ">");

for (const auto &col : columns) {
new_column_map.emplace(col.name, col);
added_columns.emplace(col.name);
std::set<std::string> new_primary_key_cols;
for (const auto &col : all_columns) {
if (col.primary_key &&
common_columns.find(col.name) == common_columns.end()) {
new_primary_key_cols.insert(col.name);
}
}

for (const auto &col : existing_columns) {
const auto &new_col_it = new_column_map.find(col.name);

added_columns.erase(col.name);
create_table(con, table, all_columns, new_primary_key_cols);

if (new_col_it == new_column_map.end()) {
deleted_columns.emplace(col.name);
} else if (new_col_it->second.type !=
col.type) { // altering primary key not supported in duckdb
alter_types.emplace(col.name);
std::ostringstream out_column_list;
bool first = true;
for (auto &col : common_columns) {
if (first) {
first = false;
} else {
out_column_list << ",";
}
out_column_list << KeywordHelper::WriteQuoted(col, '"');
}
std::string common_column_list = out_column_list.str();

std::ostringstream out;
out << "INSERT INTO " << absolute_table_name << "(" << common_column_list
<< ") SELECT " << common_column_list << " FROM "
<< absolute_temp_table_name;

run_query(con, "Reinserting data after changing primary keys", out.str(),
"Could not reinsert data into table <" + absolute_table_name + ">");
run_query(con, "Dropping temp table after recreating table with constraints",
"DROP TABLE " + absolute_temp_table_name,
"Could not drop table <" + absolute_temp_table_name + ">");
}

void alter_table_in_place(
duckdb::Connection &con, const std::string &absolute_table_name,
const std::set<std::string> &added_columns,
const std::set<std::string> &deleted_columns,
const std::set<std::string> &alter_types,
const std::map<std::string, column_def> &new_column_map) {
for (const auto &col_name : added_columns) {
std::ostringstream out;
out << "ALTER TABLE " << absolute_table_name << " ADD COLUMN ";
const auto &col = new_column_map[col_name];
const auto &col = new_column_map.at(col_name);

out << KeywordHelper::WriteQuoted(col_name, '"') << " "
<< duckdb::EnumUtil::ToChars(col.type);
if (col.primary_key) {
out << " PRIMARY KEY";
}
auto query = out.str();
mdlog::info("alter_table: " + query);
auto result = con.Query(query);
if (result->HasError()) {
throw std::runtime_error("Could not add column <" + col_name +
"> to table <" + absolute_table_name +
">:" + result->GetError());
}

run_query(con, "alter_table add", out.str(),
"Could not add column <" + col_name + "> to table <" +
absolute_table_name + ">");
}

for (const auto &col_name : deleted_columns) {
Expand All @@ -219,48 +298,90 @@ void alter_table(duckdb::Connection &con, const table_def &table,

out << KeywordHelper::WriteQuoted(col_name, '"');

auto query = out.str();
mdlog::info("alter_table: " + query);
auto result = con.Query(query);
if (result->HasError()) {
throw std::runtime_error("Could not drop column <" + col_name +
"> from table <" + absolute_table_name +
">:" + result->GetError());
}
run_query(con, "alter_table drop", out.str(),
"Could not drop column <" + col_name + "> from table <" +
absolute_table_name + ">");
}

for (const auto &col_name : alter_types) {
std::ostringstream out;
out << "ALTER TABLE " << absolute_table_name << " ALTER ";
const auto &col = new_column_map[col_name];
const auto &col = new_column_map.at(col_name);

out << KeywordHelper::WriteQuoted(col_name, '"') << " TYPE "
<< duckdb::EnumUtil::ToChars(col.type);

auto query = out.str();
mdlog::info("alter table: " + query);
auto result = con.Query(query);
if (result->HasError()) {
throw std::runtime_error("Could not alter type for column <" + col_name +
"> in table <" + absolute_table_name +
">:" + result->GetError());
}
run_query(con, "alter table change type", out.str(),
"Could not alter type for column <" + col_name + "> in table <" +
absolute_table_name + ">");
}
}

std::string
make_full_column_list(const std::vector<const column_def *> &columns_pk,
const std::vector<const column_def *> &columns_regular) {
std::ostringstream full_column_list;
if (!columns_pk.empty()) {
write_joined(full_column_list, columns_pk, print_column);
// tiny troubleshooting assist; primary columns are separated from regular
// columns by 2 spaces
full_column_list << ", ";
void alter_table(duckdb::Connection &con, const table_def &table,
const std::vector<column_def> &columns) {

bool recreate_table = false;

auto absolute_table_name = table.to_escaped_string();
std::set<std::string> alter_types;
std::set<std::string> added_columns;
std::set<std::string> deleted_columns;
std::set<std::string> common_columns;

const auto &existing_columns = describe_table(con, table);
std::map<std::string, column_def> new_column_map;

for (const auto &col : columns) {
new_column_map.emplace(col.name, col);
added_columns.emplace(col.name);
}
write_joined(full_column_list, columns_regular, print_column);

return full_column_list.str();
for (const auto &col : existing_columns) {
const auto &new_col_it = new_column_map.find(col.name);

if (added_columns.erase(col.name)) {
common_columns.emplace(col.name);
}

if (new_col_it == new_column_map.end()) {
deleted_columns.emplace(col.name);
if (col.primary_key) {
recreate_table = true;
}
} else if (new_col_it->second.primary_key != col.primary_key) {
mdlog::info("Altering primary key requested for column <" +
new_col_it->second.name + ">");
recreate_table = true;
} else if (new_col_it->second.type != col.type) {
alter_types.emplace(col.name);
}
}

auto primary_key_added_it =
std::find_if(added_columns.begin(), added_columns.end(),
[&new_column_map](const std::string &column_name) {
return new_column_map[column_name].primary_key;
});
if (primary_key_added_it != added_columns.end()) {
mdlog::info("Adding primary key requested for column <" +
*primary_key_added_it + ">");
recreate_table = true;
}

run_query(con, "begin alter table transaction", "BEGIN TRANSACTION",
"Could not begin transaction for altering table <" +
absolute_table_name + ">");

if (recreate_table) {
alter_table_recreate(con, table, columns, common_columns);
} else {
alter_table_in_place(con, absolute_table_name, added_columns,
deleted_columns, alter_types, new_column_map);
}

run_query(con, "commit alter table transaction", "END TRANSACTION",
"Could not commit transaction for altering table <" +
absolute_table_name + ">");
}

void upsert(duckdb::Connection &con, const table_def &table,
Expand Down
Loading
Loading