diff --git a/src/motherduck_destination_server.cpp b/src/motherduck_destination_server.cpp index 41eb054..d9511cb 100644 --- a/src/motherduck_destination_server.cpp +++ b/src/motherduck_destination_server.cpp @@ -62,10 +62,12 @@ std::vector get_duckdb_columns( DataType_Name(col.type()) + "> for column <" + col.name() + "> to a DuckDB type"); } - auto precision = col.has_decimal() ? col.decimal().precision() : DUCKDB_DEFAULT_PRECISION; - auto scale = col.has_decimal() ? col.decimal().scale() : DUCKDB_DEFAULT_SCALE; - duckdb_columns.push_back(column_def{col.name(), ddbtype, col.primary_key(), - precision, scale}); + auto precision = col.has_decimal() ? col.decimal().precision() + : DUCKDB_DEFAULT_PRECISION; + auto scale = + col.has_decimal() ? col.decimal().scale() : DUCKDB_DEFAULT_SCALE; + duckdb_columns.push_back( + column_def{col.name(), ddbtype, col.primary_key(), precision, scale}); } return duckdb_columns; } @@ -311,7 +313,8 @@ DestinationSdkImpl::Truncate(::grpc::ServerContext *context, std::chrono::nanoseconds delete_before_ts = std::chrono::seconds(request->utc_delete_before().seconds()) + std::chrono::nanoseconds(request->utc_delete_before().nanos()); - const std::string deleted_column = request->has_soft() ? request->soft().deleted_column() : ""; + const std::string deleted_column = + request->has_soft() ? request->soft().deleted_column() : ""; truncate_table(*con, table_name, request->synced_column(), delete_before_ts, deleted_column); } else { diff --git a/src/sql_generator.cpp b/src/sql_generator.cpp index 0eb3fc7..8c91106 100644 --- a/src/sql_generator.cpp +++ b/src/sql_generator.cpp @@ -77,14 +77,17 @@ bool schema_exists(duckdb::Connection &con, const std::string &db_name, const std::string query = "SELECT schema_name FROM information_schema.schemata " "WHERE catalog_name=? AND schema_name=?"; + const std::string err = "Could not find whether schema <" + schema_name + + "> exists in database <" + db_name + ">"; auto statement = con.Prepare(query); + if (statement->HasError()) { + throw std::runtime_error(err + " (at bind step): " + statement->GetError()); + } duckdb::vector params = {duckdb::Value(db_name), duckdb::Value(schema_name)}; auto result = statement->Execute(params, false); if (result->HasError()) { - throw std::runtime_error("Could not find whether schema <" + schema_name + - "> exists in database <" + db_name + - ">: " + result->GetError()); + throw std::runtime_error(err + ": " + result->GetError()); } auto materialized_result = duckdb::unique_ptr_cast< duckdb::QueryResult, duckdb::MaterializedQueryResult>(std::move(result)); @@ -96,16 +99,19 @@ bool table_exists(duckdb::Connection &con, const table_def &table) { const std::string query = "SELECT table_name FROM information_schema.tables WHERE " "table_catalog=? AND table_schema=? AND table_name=?"; + const std::string err = + "Could not find whether table <" + table.to_escaped_string() + "> exists"; auto statement = con.Prepare(query); + if (statement->HasError()) { + throw std::runtime_error(err + " (at bind step): " + statement->GetError()); + } duckdb::vector params = {duckdb::Value(table.db_name), duckdb::Value(table.schema_name), duckdb::Value(table.table_name)}; auto result = statement->Execute(params, false); if (result->HasError()) { - throw std::runtime_error("Could not find whether table <" + - table.to_escaped_string() + - "> exists: " + result->GetError()); + throw std::runtime_error(err + ": " + result->GetError()); } auto materialized_result = duckdb::unique_ptr_cast< duckdb::QueryResult, duckdb::MaterializedQueryResult>(std::move(result)); @@ -193,17 +199,20 @@ std::vector describe_table(duckdb::Connection &con, "WHERE database_name=? " "AND schema_name=? " "AND table_name=?"; + const std::string err = + "Could not describe table <" + table.to_escaped_string() + ">"; mdlog::info("describe_table: " + std::string(query)); auto statement = con.Prepare(query); + if (statement->HasError()) { + throw std::runtime_error(err + " (at bind step): " + statement->GetError()); + } duckdb::vector params = {duckdb::Value(table.db_name), duckdb::Value(table.schema_name), duckdb::Value(table.table_name)}; auto result = statement->Execute(params, false); if (result->HasError()) { - throw std::runtime_error("Could not describe table <" + - table.to_escaped_string() + - ">:" + result->GetError()); + throw std::runtime_error(err + ": " + result->GetError()); } auto materialized_result = duckdb::unique_ptr_cast< duckdb::QueryResult, duckdb::MaterializedQueryResult>(std::move(result)); @@ -487,6 +496,7 @@ void truncate_table(duckdb::Connection &con, const table_def &table, const std::string absolute_table_name = table.to_escaped_string(); std::ostringstream sql; + mdlog::info("truncate_table request: deleted column = " + deleted_column); if (deleted_column.empty()) { // hard delete sql << "DELETE FROM " << absolute_table_name; @@ -495,11 +505,17 @@ void truncate_table(duckdb::Connection &con, const table_def &table, sql << "UPDATE " << absolute_table_name << " SET " << KeywordHelper::WriteQuoted(deleted_column, '"') << " = true"; } + mdlog::info("truncate_table request: synced column = " + synced_column); sql << " WHERE " << KeywordHelper::WriteQuoted(synced_column, '"') << " < make_timestamp(?)"; auto query = sql.str(); + const std::string err = + "Error truncating table at bind step <" + absolute_table_name + ">"; mdlog::info("truncate_table: " + query); auto statement = con.Prepare(query); + if (statement->HasError()) { + throw std::runtime_error(err + " (at bind step):" + statement->GetError()); + } // DuckDB make_timestamp takes microseconds; Fivetran sends millisecond // precision -- safe to divide with truncation @@ -510,8 +526,7 @@ void truncate_table(duckdb::Connection &con, const table_def &table, std::to_string(cutoff_microseconds) + ">"); auto result = statement->Execute(params, false); if (result->HasError()) { - throw std::runtime_error("Error truncating table <" + absolute_table_name + - ">:" + result->GetError()); + throw std::runtime_error(err + ": " + result->GetError()); } } diff --git a/test/integration/test_server.cpp b/test/integration/test_server.cpp index ceacfdb..acc322e 100644 --- a/test/integration/test_server.cpp +++ b/test/integration/test_server.cpp @@ -1519,3 +1519,54 @@ TEST_CASE("AlterTable with constraints", "[integration]") { REQUIRE(res->GetValue(5, 0) == 0.0); } } + +TEST_CASE("Invalid truncate with nonexisting delete column", + "[integration][current]") { + DestinationSdkImpl service; + + const std::string table_name = + "empty_table" + std::to_string(Catch::rngSeed()); + auto token = std::getenv("motherduck_token"); + REQUIRE(token); + + { + // Create Table that is missing the _fivetran_deleted column + ::fivetran_sdk::CreateTableRequest request; + (*request.mutable_configuration())["motherduck_token"] = token; + (*request.mutable_configuration())["motherduck_database"] = + TEST_DATABASE_NAME; + request.mutable_table()->set_name(table_name); + auto col1 = request.mutable_table()->add_columns(); + col1->set_name("something"); + col1->set_type(::fivetran_sdk::DataType::STRING); + + ::fivetran_sdk::CreateTableResponse response; + auto status = service.CreateTable(nullptr, &request, &response); + REQUIRE_NO_FAIL(status); + } + + { + // Attempt to truncate the table using a nonexisting _fivetran_deleted + // column + ::fivetran_sdk::TruncateRequest request; + (*request.mutable_configuration())["motherduck_token"] = token; + (*request.mutable_configuration())["motherduck_database"] = + TEST_DATABASE_NAME; + request.set_table_name(table_name); + request.set_synced_column( + "_fivetran_synced"); // also does not exist although that does not + // matter + request.mutable_soft()->set_deleted_column("_fivetran_deleted"); + + const auto cutoff_datetime = 1707436800; // 2024-02-09 0:0:0 GMT, trust me + request.mutable_utc_delete_before()->set_seconds(cutoff_datetime); + request.mutable_utc_delete_before()->set_nanos(0); + ::fivetran_sdk::TruncateResponse response; + auto status = service.Truncate(nullptr, &request, &response); + REQUIRE_FALSE(status.ok()); + CHECK_THAT( + status.error_message(), + Catch::Matchers::ContainsSubstring( + "Referenced column \"_fivetran_synced\" not found in FROM clause")); + } +}