Skip to content

Commit

Permalink
More accurate bind errors (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
elefeint authored Aug 28, 2024
1 parent 0dad35f commit 9aa84bb
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 16 deletions.
13 changes: 8 additions & 5 deletions src/motherduck_destination_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ std::vector<column_def> get_duckdb_columns(
DataType_Name(col.type()) + "> for column <" +
col.name() + "> to a DuckDB type");
}
auto precision = col.has_decimal() ? col.decimal().precision() : DUCKDB_DEFAULT_PRECISION;
auto scale = col.has_decimal() ? col.decimal().scale() : DUCKDB_DEFAULT_SCALE;
duckdb_columns.push_back(column_def{col.name(), ddbtype, col.primary_key(),
precision, scale});
auto precision = col.has_decimal() ? col.decimal().precision()
: DUCKDB_DEFAULT_PRECISION;
auto scale =
col.has_decimal() ? col.decimal().scale() : DUCKDB_DEFAULT_SCALE;
duckdb_columns.push_back(
column_def{col.name(), ddbtype, col.primary_key(), precision, scale});
}
return duckdb_columns;
}
Expand Down Expand Up @@ -311,7 +313,8 @@ DestinationSdkImpl::Truncate(::grpc::ServerContext *context,
std::chrono::nanoseconds delete_before_ts =
std::chrono::seconds(request->utc_delete_before().seconds()) +
std::chrono::nanoseconds(request->utc_delete_before().nanos());
const std::string deleted_column = request->has_soft() ? request->soft().deleted_column() : "";
const std::string deleted_column =
request->has_soft() ? request->soft().deleted_column() : "";
truncate_table(*con, table_name, request->synced_column(),
delete_before_ts, deleted_column);
} else {
Expand Down
37 changes: 26 additions & 11 deletions src/sql_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,17 @@ bool schema_exists(duckdb::Connection &con, const std::string &db_name,
const std::string query =
"SELECT schema_name FROM information_schema.schemata "
"WHERE catalog_name=? AND schema_name=?";
const std::string err = "Could not find whether schema <" + schema_name +
"> exists in database <" + db_name + ">";
auto statement = con.Prepare(query);
if (statement->HasError()) {
throw std::runtime_error(err + " (at bind step): " + statement->GetError());
}
duckdb::vector<duckdb::Value> params = {duckdb::Value(db_name),
duckdb::Value(schema_name)};
auto result = statement->Execute(params, false);
if (result->HasError()) {
throw std::runtime_error("Could not find whether schema <" + schema_name +
"> exists in database <" + db_name +
">: " + result->GetError());
throw std::runtime_error(err + ": " + result->GetError());
}
auto materialized_result = duckdb::unique_ptr_cast<
duckdb::QueryResult, duckdb::MaterializedQueryResult>(std::move(result));
Expand All @@ -96,16 +99,19 @@ bool table_exists(duckdb::Connection &con, const table_def &table) {
const std::string query =
"SELECT table_name FROM information_schema.tables WHERE "
"table_catalog=? AND table_schema=? AND table_name=?";
const std::string err =
"Could not find whether table <" + table.to_escaped_string() + "> exists";
auto statement = con.Prepare(query);
if (statement->HasError()) {
throw std::runtime_error(err + " (at bind step): " + statement->GetError());
}
duckdb::vector<duckdb::Value> params = {duckdb::Value(table.db_name),
duckdb::Value(table.schema_name),
duckdb::Value(table.table_name)};
auto result = statement->Execute(params, false);

if (result->HasError()) {
throw std::runtime_error("Could not find whether table <" +
table.to_escaped_string() +
"> exists: " + result->GetError());
throw std::runtime_error(err + ": " + result->GetError());
}
auto materialized_result = duckdb::unique_ptr_cast<
duckdb::QueryResult, duckdb::MaterializedQueryResult>(std::move(result));
Expand Down Expand Up @@ -193,17 +199,20 @@ std::vector<column_def> describe_table(duckdb::Connection &con,
"WHERE database_name=? "
"AND schema_name=? "
"AND table_name=?";
const std::string err =
"Could not describe table <" + table.to_escaped_string() + ">";
mdlog::info("describe_table: " + std::string(query));
auto statement = con.Prepare(query);
if (statement->HasError()) {
throw std::runtime_error(err + " (at bind step): " + statement->GetError());
}
duckdb::vector<duckdb::Value> params = {duckdb::Value(table.db_name),
duckdb::Value(table.schema_name),
duckdb::Value(table.table_name)};
auto result = statement->Execute(params, false);

if (result->HasError()) {
throw std::runtime_error("Could not describe table <" +
table.to_escaped_string() +
">:" + result->GetError());
throw std::runtime_error(err + ": " + result->GetError());
}
auto materialized_result = duckdb::unique_ptr_cast<
duckdb::QueryResult, duckdb::MaterializedQueryResult>(std::move(result));
Expand Down Expand Up @@ -487,6 +496,7 @@ void truncate_table(duckdb::Connection &con, const table_def &table,
const std::string absolute_table_name = table.to_escaped_string();
std::ostringstream sql;

mdlog::info("truncate_table request: deleted column = " + deleted_column);
if (deleted_column.empty()) {
// hard delete
sql << "DELETE FROM " << absolute_table_name;
Expand All @@ -495,11 +505,17 @@ void truncate_table(duckdb::Connection &con, const table_def &table,
sql << "UPDATE " << absolute_table_name << " SET "
<< KeywordHelper::WriteQuoted(deleted_column, '"') << " = true";
}
mdlog::info("truncate_table request: synced column = " + synced_column);
sql << " WHERE " << KeywordHelper::WriteQuoted(synced_column, '"')
<< " < make_timestamp(?)";
auto query = sql.str();
const std::string err =
"Error truncating table at bind step <" + absolute_table_name + ">";
mdlog::info("truncate_table: " + query);
auto statement = con.Prepare(query);
if (statement->HasError()) {
throw std::runtime_error(err + " (at bind step):" + statement->GetError());
}

// DuckDB make_timestamp takes microseconds; Fivetran sends millisecond
// precision -- safe to divide with truncation
Expand All @@ -510,8 +526,7 @@ void truncate_table(duckdb::Connection &con, const table_def &table,
std::to_string(cutoff_microseconds) + ">");
auto result = statement->Execute(params, false);
if (result->HasError()) {
throw std::runtime_error("Error truncating table <" + absolute_table_name +
">:" + result->GetError());
throw std::runtime_error(err + ": " + result->GetError());
}
}

Expand Down
51 changes: 51 additions & 0 deletions test/integration/test_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1519,3 +1519,54 @@ TEST_CASE("AlterTable with constraints", "[integration]") {
REQUIRE(res->GetValue(5, 0) == 0.0);
}
}

TEST_CASE("Invalid truncate with nonexisting delete column",
"[integration][current]") {
DestinationSdkImpl service;

const std::string table_name =
"empty_table" + std::to_string(Catch::rngSeed());
auto token = std::getenv("motherduck_token");
REQUIRE(token);

{
// Create Table that is missing the _fivetran_deleted column
::fivetran_sdk::CreateTableRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] =
TEST_DATABASE_NAME;
request.mutable_table()->set_name(table_name);
auto col1 = request.mutable_table()->add_columns();
col1->set_name("something");
col1->set_type(::fivetran_sdk::DataType::STRING);

::fivetran_sdk::CreateTableResponse response;
auto status = service.CreateTable(nullptr, &request, &response);
REQUIRE_NO_FAIL(status);
}

{
// Attempt to truncate the table using a nonexisting _fivetran_deleted
// column
::fivetran_sdk::TruncateRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] =
TEST_DATABASE_NAME;
request.set_table_name(table_name);
request.set_synced_column(
"_fivetran_synced"); // also does not exist although that does not
// matter
request.mutable_soft()->set_deleted_column("_fivetran_deleted");

const auto cutoff_datetime = 1707436800; // 2024-02-09 0:0:0 GMT, trust me
request.mutable_utc_delete_before()->set_seconds(cutoff_datetime);
request.mutable_utc_delete_before()->set_nanos(0);
::fivetran_sdk::TruncateResponse response;
auto status = service.Truncate(nullptr, &request, &response);
REQUIRE_FALSE(status.ok());
CHECK_THAT(
status.error_message(),
Catch::Matchers::ContainsSubstring(
"Referenced column \"_fivetran_synced\" not found in FROM clause"));
}
}

0 comments on commit 9aa84bb

Please sign in to comment.