Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More accurate bind errors #48

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/motherduck_destination_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ std::vector<column_def> get_duckdb_columns(
DataType_Name(col.type()) + "> for column <" +
col.name() + "> to a DuckDB type");
}
auto precision = col.has_decimal() ? col.decimal().precision() : DUCKDB_DEFAULT_PRECISION;
auto scale = col.has_decimal() ? col.decimal().scale() : DUCKDB_DEFAULT_SCALE;
duckdb_columns.push_back(column_def{col.name(), ddbtype, col.primary_key(),
precision, scale});
auto precision = col.has_decimal() ? col.decimal().precision()
: DUCKDB_DEFAULT_PRECISION;
auto scale =
col.has_decimal() ? col.decimal().scale() : DUCKDB_DEFAULT_SCALE;
duckdb_columns.push_back(
column_def{col.name(), ddbtype, col.primary_key(), precision, scale});
}
return duckdb_columns;
}
Expand Down Expand Up @@ -311,7 +313,8 @@ DestinationSdkImpl::Truncate(::grpc::ServerContext *context,
std::chrono::nanoseconds delete_before_ts =
std::chrono::seconds(request->utc_delete_before().seconds()) +
std::chrono::nanoseconds(request->utc_delete_before().nanos());
const std::string deleted_column = request->has_soft() ? request->soft().deleted_column() : "";
const std::string deleted_column =
request->has_soft() ? request->soft().deleted_column() : "";
truncate_table(*con, table_name, request->synced_column(),
delete_before_ts, deleted_column);
} else {
Expand Down
37 changes: 26 additions & 11 deletions src/sql_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,17 @@ bool schema_exists(duckdb::Connection &con, const std::string &db_name,
const std::string query =
"SELECT schema_name FROM information_schema.schemata "
"WHERE catalog_name=? AND schema_name=?";
const std::string err = "Could not find whether schema <" + schema_name +
"> exists in database <" + db_name + ">";
auto statement = con.Prepare(query);
if (statement->HasError()) {
throw std::runtime_error(err + " (at bind step): " + statement->GetError());
}
duckdb::vector<duckdb::Value> params = {duckdb::Value(db_name),
duckdb::Value(schema_name)};
auto result = statement->Execute(params, false);
if (result->HasError()) {
throw std::runtime_error("Could not find whether schema <" + schema_name +
"> exists in database <" + db_name +
">: " + result->GetError());
throw std::runtime_error(err + ": " + result->GetError());
}
auto materialized_result = duckdb::unique_ptr_cast<
duckdb::QueryResult, duckdb::MaterializedQueryResult>(std::move(result));
Expand All @@ -96,16 +99,19 @@ bool table_exists(duckdb::Connection &con, const table_def &table) {
const std::string query =
"SELECT table_name FROM information_schema.tables WHERE "
"table_catalog=? AND table_schema=? AND table_name=?";
const std::string err =
"Could not find whether table <" + table.to_escaped_string() + "> exists";
auto statement = con.Prepare(query);
if (statement->HasError()) {
throw std::runtime_error(err + " (at bind step): " + statement->GetError());
}
duckdb::vector<duckdb::Value> params = {duckdb::Value(table.db_name),
duckdb::Value(table.schema_name),
duckdb::Value(table.table_name)};
auto result = statement->Execute(params, false);

if (result->HasError()) {
throw std::runtime_error("Could not find whether table <" +
table.to_escaped_string() +
"> exists: " + result->GetError());
throw std::runtime_error(err + ": " + result->GetError());
}
auto materialized_result = duckdb::unique_ptr_cast<
duckdb::QueryResult, duckdb::MaterializedQueryResult>(std::move(result));
Expand Down Expand Up @@ -193,17 +199,20 @@ std::vector<column_def> describe_table(duckdb::Connection &con,
"WHERE database_name=? "
"AND schema_name=? "
"AND table_name=?";
const std::string err =
"Could not describe table <" + table.to_escaped_string() + ">";
mdlog::info("describe_table: " + std::string(query));
auto statement = con.Prepare(query);
if (statement->HasError()) {
throw std::runtime_error(err + " (at bind step): " + statement->GetError());
}
duckdb::vector<duckdb::Value> params = {duckdb::Value(table.db_name),
duckdb::Value(table.schema_name),
duckdb::Value(table.table_name)};
auto result = statement->Execute(params, false);

if (result->HasError()) {
throw std::runtime_error("Could not describe table <" +
table.to_escaped_string() +
">:" + result->GetError());
throw std::runtime_error(err + ": " + result->GetError());
}
auto materialized_result = duckdb::unique_ptr_cast<
duckdb::QueryResult, duckdb::MaterializedQueryResult>(std::move(result));
Expand Down Expand Up @@ -487,6 +496,7 @@ void truncate_table(duckdb::Connection &con, const table_def &table,
const std::string absolute_table_name = table.to_escaped_string();
std::ostringstream sql;

mdlog::info("truncate_table request: deleted column = " + deleted_column);
if (deleted_column.empty()) {
// hard delete
sql << "DELETE FROM " << absolute_table_name;
Expand All @@ -495,11 +505,17 @@ void truncate_table(duckdb::Connection &con, const table_def &table,
sql << "UPDATE " << absolute_table_name << " SET "
<< KeywordHelper::WriteQuoted(deleted_column, '"') << " = true";
}
mdlog::info("truncate_table request: synced column = " + synced_column);
sql << " WHERE " << KeywordHelper::WriteQuoted(synced_column, '"')
<< " < make_timestamp(?)";
auto query = sql.str();
const std::string err =
"Error truncating table at bind step <" + absolute_table_name + ">";
mdlog::info("truncate_table: " + query);
auto statement = con.Prepare(query);
if (statement->HasError()) {
throw std::runtime_error(err + " (at bind step):" + statement->GetError());
Copy link
Member

@guenp guenp Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we say at bind step twice, effectively:
"Error truncating table at bind step <" + absolute_table_name + ">" + " (at bind step):"

}

// DuckDB make_timestamp takes microseconds; Fivetran sends millisecond
// precision -- safe to divide with truncation
Expand All @@ -510,8 +526,7 @@ void truncate_table(duckdb::Connection &con, const table_def &table,
std::to_string(cutoff_microseconds) + ">");
auto result = statement->Execute(params, false);
if (result->HasError()) {
throw std::runtime_error("Error truncating table <" + absolute_table_name +
">:" + result->GetError());
throw std::runtime_error(err + ": " + result->GetError());
}
}

Expand Down
51 changes: 51 additions & 0 deletions test/integration/test_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1519,3 +1519,54 @@ TEST_CASE("AlterTable with constraints", "[integration]") {
REQUIRE(res->GetValue(5, 0) == 0.0);
}
}

TEST_CASE("Invalid truncate with nonexisting delete column",
"[integration][current]") {
DestinationSdkImpl service;

const std::string table_name =
"empty_table" + std::to_string(Catch::rngSeed());
auto token = std::getenv("motherduck_token");
REQUIRE(token);

{
// Create Table that is missing the _fivetran_deleted column
::fivetran_sdk::CreateTableRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] =
TEST_DATABASE_NAME;
request.mutable_table()->set_name(table_name);
auto col1 = request.mutable_table()->add_columns();
col1->set_name("something");
col1->set_type(::fivetran_sdk::DataType::STRING);

::fivetran_sdk::CreateTableResponse response;
auto status = service.CreateTable(nullptr, &request, &response);
REQUIRE_NO_FAIL(status);
}

{
// Attempt to truncate the table using a nonexisting _fivetran_deleted
// column
::fivetran_sdk::TruncateRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] =
TEST_DATABASE_NAME;
request.set_table_name(table_name);
request.set_synced_column(
"_fivetran_synced"); // also does not exist although that does not
// matter
request.mutable_soft()->set_deleted_column("_fivetran_deleted");

const auto cutoff_datetime = 1707436800; // 2024-02-09 0:0:0 GMT, trust me
request.mutable_utc_delete_before()->set_seconds(cutoff_datetime);
request.mutable_utc_delete_before()->set_nanos(0);
::fivetran_sdk::TruncateResponse response;
auto status = service.Truncate(nullptr, &request, &response);
REQUIRE_FALSE(status.ok());
CHECK_THAT(
status.error_message(),
Catch::Matchers::ContainsSubstring(
"Referenced column \"_fivetran_synced\" not found in FROM clause"));
}
}
Loading