From 05999055ae7b556a650b86886c2bd391019c3022 Mon Sep 17 00:00:00 2001 From: Elena Felder <41136058+elefeint@users.noreply.github.com> Date: Mon, 18 Mar 2024 12:36:17 -0700 Subject: [PATCH] join WHERE columns on AND, not on comma --- src/sql_generator.cpp | 29 ++--- test/files/multikey_table_delete.csv | 3 + test/files/multikey_table_update.csv | 3 + test/files/multikey_table_upsert.csv | 4 + test/integration/test_server.cpp | 153 +++++++++++++++++++++++++-- 5 files changed, 169 insertions(+), 23 deletions(-) create mode 100644 test/files/multikey_table_delete.csv create mode 100644 test/files/multikey_table_update.csv create mode 100644 test/files/multikey_table_upsert.csv diff --git a/src/sql_generator.cpp b/src/sql_generator.cpp index d424590..7685028 100644 --- a/src/sql_generator.cpp +++ b/src/sql_generator.cpp @@ -21,13 +21,14 @@ const auto print_column = [](const std::string "ed_col, void write_joined( std::ostringstream &sql, const std::vector &columns, - std::function print_str) { + std::function print_str, + const std::string &separator = ", ") { bool first = true; for (const auto &col : columns) { if (first) { first = false; } else { - sql << ", "; + sql << separator; } print_str(KeywordHelper::WriteQuoted(col->name, '"'), sql); } @@ -301,11 +302,13 @@ void update_values(duckdb::Connection &con, const table_def &table, }); sql << " FROM " << staging_table_name << " WHERE "; - write_joined(sql, columns_pk, - [&](const std::string "ed_col, std::ostringstream &out) { - out << table.table_name << "." << quoted_col << " = " - << staging_table_name << "." << quoted_col; - }); + write_joined( + sql, columns_pk, + [&](const std::string "ed_col, std::ostringstream &out) { + out << table.table_name << "." << quoted_col << " = " + << staging_table_name << "." << quoted_col; + }, + " AND "); auto query = sql.str(); mdlog::info("update: " + query); @@ -325,11 +328,13 @@ void delete_rows(duckdb::Connection &con, const table_def &table, sql << "DELETE FROM " + absolute_table_name << " USING " << staging_table_name << " WHERE "; - write_joined(sql, columns_pk, - [&](const std::string "ed_col, std::ostringstream &out) { - out << table.table_name << "." << quoted_col << " = " - << staging_table_name << "." << quoted_col; - }); + write_joined( + sql, columns_pk, + [&](const std::string "ed_col, std::ostringstream &out) { + out << table.table_name << "." << quoted_col << " = " + << staging_table_name << "." << quoted_col; + }, + " AND "); auto query = sql.str(); mdlog::info("delete_rows: " + query); diff --git a/test/files/multikey_table_delete.csv b/test/files/multikey_table_delete.csv new file mode 100644 index 0000000..51ea4bf --- /dev/null +++ b/test/files/multikey_table_delete.csv @@ -0,0 +1,3 @@ +id1,id2,text,_fivetran_deleted,_fivetran_synced +1,100,"does not matter",true,"2024-01-09T04:23:41.165531936Z" +3,300,"this value does not matter, and neither does _fivetran_deleted -- the row will still be hard deleted because it's in this file",false,"2024-01-09T04:23:41.165531936Z" \ No newline at end of file diff --git a/test/files/multikey_table_update.csv b/test/files/multikey_table_update.csv new file mode 100644 index 0000000..4a7b19d --- /dev/null +++ b/test/files/multikey_table_update.csv @@ -0,0 +1,3 @@ +id1,id2,text,_fivetran_deleted,_fivetran_synced +2,200,"second row updated",false,"2024-02-09T00:00:00.000000000Z" +3,300,"third row soft deleted - but also this value updated",true,"2024-02-09T00:00:00.000000000Z" diff --git a/test/files/multikey_table_upsert.csv b/test/files/multikey_table_upsert.csv new file mode 100644 index 0000000..3195af0 --- /dev/null +++ b/test/files/multikey_table_upsert.csv @@ -0,0 +1,4 @@ +id1,id2,text,_fivetran_deleted,_fivetran_synced +1,100,"first row",false,"2024-01-09T04:10:19.156057706Z" +2,200,"second row",false,"2024-01-09T04:10:19.156057706Z" +3,300,"third row",false,"2024-01-09T04:10:19.156057706Z" \ No newline at end of file diff --git a/test/integration/test_server.cpp b/test/integration/test_server.cpp index b8c6acb..41c099a 100644 --- a/test/integration/test_server.cpp +++ b/test/integration/test_server.cpp @@ -266,6 +266,32 @@ void define_test_table(T &request, const std::string &table_name) { col5->set_type(::fivetran_sdk::DataType::UTC_DATETIME); } +template +void define_test_multikey_table(T &request, const std::string &table_name) { + request.mutable_table()->set_name(table_name); + auto col1 = request.mutable_table()->add_columns(); + col1->set_name("id1"); + col1->set_type(::fivetran_sdk::DataType::INT); + col1->set_primary_key(true); + + auto col2 = request.mutable_table()->add_columns(); + col2->set_name("id2"); + col2->set_type(::fivetran_sdk::DataType::INT); + col2->set_primary_key(true); + + auto col3 = request.mutable_table()->add_columns(); + col3->set_name("text"); + col3->set_type(::fivetran_sdk::DataType::STRING); + + auto col4 = request.mutable_table()->add_columns(); + col4->set_name("_fivetran_deleted"); + col4->set_type(::fivetran_sdk::DataType::BOOLEAN); + + auto col5 = request.mutable_table()->add_columns(); + col5->set_name("_fivetran_synced"); + col5->set_type(::fivetran_sdk::DataType::UTC_DATETIME); +} + std::unique_ptr get_test_connection(char *token) { std::unordered_map props{ {"motherduck_token", token}, @@ -555,7 +581,7 @@ TEST_CASE("WriteBatch", "[integration][current]") { } } -TEST_CASE("CreateTable with multiple primary keys", "[integration]") { +TEST_CASE("Table with multiple primary keys", "[integration]") { DestinationSdkImpl service; const std::string table_name = @@ -568,15 +594,7 @@ TEST_CASE("CreateTable with multiple primary keys", "[integration]") { ::fivetran_sdk::CreateTableRequest request; (*request.mutable_configuration())["motherduck_token"] = token; (*request.mutable_configuration())["motherduck_database"] = "fivetran_test"; - request.mutable_table()->set_name(table_name); - auto col1 = request.mutable_table()->add_columns(); - col1->set_name("id1"); - col1->set_type(::fivetran_sdk::DataType::INT); - col1->set_primary_key(true); - auto col2 = request.mutable_table()->add_columns(); - col2->set_name("id2"); - col2->set_type(::fivetran_sdk::DataType::INT); - col2->set_primary_key(true); + define_test_multikey_table(request, table_name); ::fivetran_sdk::CreateTableResponse response; auto status = service.CreateTable(nullptr, &request, &response); @@ -594,9 +612,122 @@ TEST_CASE("CreateTable with multiple primary keys", "[integration]") { ::fivetran_sdk::DescribeTableResponse response; auto status = service.DescribeTable(nullptr, &request, &response); REQUIRE_NO_FAIL(status); - REQUIRE(response.table().columns().size() == 2); + REQUIRE(response.table().columns().size() == 5); + + REQUIRE(response.table().columns(0).name() == "id1"); + REQUIRE(response.table().columns(1).name() == "id2"); + REQUIRE(response.table().columns(2).name() == "text"); + REQUIRE(response.table().columns(3).name() == "_fivetran_deleted"); + REQUIRE(response.table().columns(4).name() == "_fivetran_synced"); } } + + // test connection needs to be created after table creation to avoid stale + // catalog + auto con = get_test_connection(token); + { + // insert rows + ::fivetran_sdk::WriteBatchRequest request; + (*request.mutable_configuration())["motherduck_token"] = token; + (*request.mutable_configuration())["motherduck_database"] = "fivetran_test"; + define_test_multikey_table(request, table_name); + const std::string filename = "multikey_table_upsert.csv"; + const std::string filepath = TEST_RESOURCES_DIR + filename; + + request.add_replace_files(filepath); + + ::fivetran_sdk::WriteBatchResponse response; + auto status = service.WriteBatch(nullptr, &request, &response); + REQUIRE_NO_FAIL(status); + } + + { + // check inserted rows + auto res = con->Query("SELECT id1, id2, text FROM " + table_name + + " ORDER BY id1, id2"); + REQUIRE_NO_FAIL(res); + REQUIRE(res->RowCount() == 3); + REQUIRE(res->GetValue(0, 0) == 1); + REQUIRE(res->GetValue(1, 0) == 100); + REQUIRE(res->GetValue(2, 0) == "first row"); + + REQUIRE(res->GetValue(0, 1) == 2); + REQUIRE(res->GetValue(1, 1) == 200); + REQUIRE(res->GetValue(2, 1) == "second row"); + + REQUIRE(res->GetValue(0, 2) == 3); + REQUIRE(res->GetValue(1, 2) == 300); + REQUIRE(res->GetValue(2, 2) == "third row"); + } + + { + // update + ::fivetran_sdk::WriteBatchRequest request; + (*request.mutable_configuration())["motherduck_token"] = token; + (*request.mutable_configuration())["motherduck_database"] = "fivetran_test"; + request.mutable_csv()->set_unmodified_string("magic-unmodified-value"); + request.mutable_csv()->set_null_string("magic-nullvalue"); + define_test_multikey_table(request, table_name); + const std::string filename = "multikey_table_update.csv"; + const std::string filepath = TEST_RESOURCES_DIR + filename; + + request.add_update_files(filepath); + + ::fivetran_sdk::WriteBatchResponse response; + auto status = service.WriteBatch(nullptr, &request, &response); + REQUIRE_NO_FAIL(status); + } + + { + // check after update, including a soft delete + auto res = con->Query("SELECT id1, id2, text, _fivetran_deleted FROM " + + table_name + " ORDER BY id1, id2"); + REQUIRE_NO_FAIL(res); + REQUIRE(res->RowCount() == 3); + REQUIRE(res->GetValue(0, 0) == 1); + REQUIRE(res->GetValue(1, 0) == 100); + REQUIRE(res->GetValue(2, 0) == "first row"); + REQUIRE(res->GetValue(3, 0) == false); + + REQUIRE(res->GetValue(0, 1) == 2); + REQUIRE(res->GetValue(1, 1) == 200); + REQUIRE(res->GetValue(2, 1) == "second row updated"); + REQUIRE(res->GetValue(3, 1) == false); + + REQUIRE(res->GetValue(0, 2) == 3); + REQUIRE(res->GetValue(1, 2) == 300); + REQUIRE(res->GetValue(2, 2) == + "third row soft deleted - but also this value updated"); + REQUIRE(res->GetValue(3, 2) == true); + } + + { + // delete + ::fivetran_sdk::WriteBatchRequest request; + (*request.mutable_configuration())["motherduck_token"] = token; + (*request.mutable_configuration())["motherduck_database"] = "fivetran_test"; + define_test_multikey_table(request, table_name); + const std::string filename = "multikey_table_delete.csv"; + const std::string filepath = TEST_RESOURCES_DIR + filename; + + request.add_delete_files(filepath); + + ::fivetran_sdk::WriteBatchResponse response; + auto status = service.WriteBatch(nullptr, &request, &response); + REQUIRE_NO_FAIL(status); + } + + { + // check after hard delete + auto res = con->Query("SELECT id1, id2, text, _fivetran_deleted FROM " + + table_name + " ORDER BY id1, id2"); + REQUIRE_NO_FAIL(res); + REQUIRE(res->RowCount() == 1); + REQUIRE(res->GetValue(0, 0) == 2); + REQUIRE(res->GetValue(1, 0) == 200); + REQUIRE(res->GetValue(2, 0) == "second row updated"); + REQUIRE(res->GetValue(3, 0) == false); + } } TEST_CASE("CreateTable with JSON column", "[integration]") {