Skip to content

Commit

Permalink
Fix decimal handling (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
elefeint authored Mar 27, 2024
1 parent 1113e7c commit afb4a2b
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 11 deletions.
6 changes: 4 additions & 2 deletions src/fivetran_duckdb_interop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ fivetran_sdk::DataType get_fivetran_type(const LogicalTypeId &duckdb_type) {
case LogicalTypeId::DATE:
return fivetran_sdk::NAIVE_DATE;
case LogicalTypeId::TIMESTAMP:
return fivetran_sdk::NAIVE_DATETIME;
case LogicalTypeId::TIMESTAMP_TZ:
return fivetran_sdk::UTC_DATETIME;
case LogicalTypeId::DECIMAL:
return fivetran_sdk::DECIMAL;
Expand Down Expand Up @@ -50,8 +52,8 @@ LogicalTypeId get_duckdb_type(const fivetran_sdk::DataType &fivetranType) {
case fivetran_sdk::NAIVE_DATETIME:
return LogicalTypeId::TIMESTAMP;
case fivetran_sdk::UTC_DATETIME:
return LogicalTypeId::TIMESTAMP; // TBD: this is pretty definitely wrong;
// needs to be with timezone
return LogicalTypeId::TIMESTAMP_TZ; // TODO: find format Fivetran sends;
// make sure UTC included
case fivetran_sdk::DECIMAL:
return LogicalTypeId::DECIMAL;
case fivetran_sdk::BINARY:
Expand Down
2 changes: 2 additions & 0 deletions src/motherduck_destination_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ grpc::Status DestinationSdkImpl::DescribeTable(
ft_col->set_name(col.name);
ft_col->set_type(get_fivetran_type(col.type));
ft_col->set_primary_key(col.primary_key);
ft_col->mutable_decimal()->set_precision(col.width);
ft_col->mutable_decimal()->set_scale(col.scale);
}

} catch (const std::exception &e) {
Expand Down
30 changes: 22 additions & 8 deletions src/sql_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ void create_table(duckdb::Connection &con, const table_def &table,
for (const auto &col : all_columns) {
ddl << KeywordHelper::WriteQuoted(col.name, '"') << " "
<< duckdb::EnumUtil::ToChars(col.type);
if (col.type == duckdb::LogicalTypeId::DECIMAL) {
ddl << " (" << col.width << "," << col.scale << ")";
}
ddl << ", "; // DuckDB allows trailing commas
}

Expand Down Expand Up @@ -122,9 +125,16 @@ std::vector<column_def> describe_table(duckdb::Connection &con,
// TBD scale/precision
std::vector<column_def> columns;

auto query = "SELECT column_name, data_type, is_nullable = 'NO' FROM "
"information_schema.columns WHERE table_catalog=? AND "
"table_schema=? AND table_name=?";
auto query = "SELECT "
"column_name, "
"data_type_id, "
"NOT is_nullable, "
"numeric_precision, "
"numeric_scale "
"FROM duckdb_columns() "
"WHERE database_name=? "
"AND schema_name=? "
"AND table_name=?";
mdlog::info("describe_table: " + std::string(query));
auto statement = con.Prepare(query);
duckdb::vector<duckdb::Value> params = {duckdb::Value(table.db_name),
Expand All @@ -141,11 +151,15 @@ std::vector<column_def> describe_table(duckdb::Connection &con,
duckdb::QueryResult, duckdb::MaterializedQueryResult>(std::move(result));

for (const auto &row : materialized_result->Collection().GetRows()) {
columns.push_back(
column_def{row.GetValue(0).GetValue<duckdb::string>(),
duckdb::EnumUtil::FromString<duckdb::LogicalTypeId>(
row.GetValue(1).GetValue<duckdb::string>()),
row.GetValue(2).GetValue<bool>()});
duckdb::LogicalTypeId column_type =
static_cast<duckdb::LogicalTypeId>(row.GetValue(1).GetValue<int8_t>());
column_def col{row.GetValue(0).GetValue<duckdb::string>(), column_type,
row.GetValue(2).GetValue<bool>()};
if (column_type == duckdb::LogicalTypeId::DECIMAL) {
col.width = row.GetValue(3).GetValue<uint32_t>();
col.scale = row.GetValue(4).GetValue<uint32_t>();
}
columns.push_back(col);
}
return columns;
}
Expand Down
137 changes: 136 additions & 1 deletion test/integration/test_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -842,4 +842,139 @@ TEST_CASE("reading inaccessible or nonexistent files fails") {
"WriteBatch endpoint failed for schema <>, table <unused_table>:File <" +
bad_file_name + "> is missing or inaccessible";
REQUIRE_FAIL(status, expected);
}
}

TEST_CASE("Test all types with create and describe table") {

DestinationSdkImpl service;

auto token = std::getenv("motherduck_token");
REQUIRE(token);

const std::string table_name =
"all_types_table" + std::to_string(Catch::rngSeed());

{
// Create Table
::fivetran_sdk::CreateTableRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
request.mutable_table()->set_name(table_name);
auto col1 = request.mutable_table()->add_columns();
col1->set_name("col_string");
col1->set_type(::fivetran_sdk::DataType::STRING);
col1->set_primary_key(true);
auto col2 = request.mutable_table()->add_columns();
col2->set_name("col_int");
col2->set_type(::fivetran_sdk::DataType::INT);
col2->set_primary_key(true);
auto col3 = request.mutable_table()->add_columns();
col3->set_name("col_decimal");
col3->set_type(::fivetran_sdk::DataType::DECIMAL);
col3->mutable_decimal()->set_precision(20);
col3->mutable_decimal()->set_scale(11);

auto col4 = request.mutable_table()->add_columns();
col4->set_name("col_utc_datetime");
col4->set_type(::fivetran_sdk::DataType::UTC_DATETIME);
auto col5 = request.mutable_table()->add_columns();
col5->set_name("col_naive_datetime");
col5->set_type(::fivetran_sdk::DataType::NAIVE_DATETIME);
auto col6 = request.mutable_table()->add_columns();
col6->set_name("col_naive_date");
col6->set_type(::fivetran_sdk::DataType::NAIVE_DATE);

auto col7 = request.mutable_table()->add_columns();
col7->set_name("col_boolean");
col7->set_type(::fivetran_sdk::DataType::BOOLEAN);
auto col8 = request.mutable_table()->add_columns();
col8->set_name("col_short");
col8->set_type(::fivetran_sdk::DataType::SHORT);
auto col9 = request.mutable_table()->add_columns();
col9->set_name("col_long");
col9->set_type(::fivetran_sdk::DataType::LONG);
auto col10 = request.mutable_table()->add_columns();
col10->set_name("col_float");
col10->set_type(::fivetran_sdk::DataType::FLOAT);
auto col11 = request.mutable_table()->add_columns();
col11->set_name("col_double");
col11->set_type(::fivetran_sdk::DataType::DOUBLE);
auto col12 = request.mutable_table()->add_columns();
col12->set_name("col_binary");
col12->set_type(::fivetran_sdk::DataType::BINARY);

::fivetran_sdk::CreateTableResponse response;
auto status = service.CreateTable(nullptr, &request, &response);
REQUIRE_NO_FAIL(status);
}

{
// Describe table
::fivetran_sdk::DescribeTableRequest request;
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())["motherduck_database"] = "fivetran_test";
request.set_table_name(table_name);

::fivetran_sdk::DescribeTableResponse response;
auto status = service.DescribeTable(nullptr, &request, &response);
REQUIRE_NO_FAIL(status);
REQUIRE(!response.not_found());

REQUIRE(response.table().name() == table_name);
REQUIRE(response.table().columns_size() == 12);

REQUIRE(response.table().columns(0).name() == "col_string");
REQUIRE(response.table().columns(0).type() ==
::fivetran_sdk::DataType::STRING);
REQUIRE(response.table().columns(0).primary_key());
REQUIRE(response.table().columns(1).name() == "col_int");
REQUIRE(response.table().columns(1).type() ==
::fivetran_sdk::DataType::INT);
REQUIRE(response.table().columns(1).primary_key());

REQUIRE(response.table().columns(2).name() == "col_decimal");
REQUIRE(response.table().columns(2).type() ==
::fivetran_sdk::DataType::DECIMAL);
REQUIRE(response.table().columns(2).decimal().scale() == 11);
REQUIRE(response.table().columns(2).decimal().precision() == 20);
REQUIRE_FALSE(response.table().columns(2).primary_key());

REQUIRE(response.table().columns(3).name() == "col_utc_datetime");
REQUIRE(response.table().columns(3).type() ==
::fivetran_sdk::DataType::UTC_DATETIME);
REQUIRE_FALSE(response.table().columns(3).primary_key());
REQUIRE(response.table().columns(4).name() == "col_naive_datetime");
REQUIRE(response.table().columns(4).type() ==
::fivetran_sdk::DataType::NAIVE_DATETIME);
REQUIRE_FALSE(response.table().columns(4).primary_key());
REQUIRE(response.table().columns(5).name() == "col_naive_date");
REQUIRE(response.table().columns(5).type() ==
::fivetran_sdk::DataType::NAIVE_DATE);
REQUIRE_FALSE(response.table().columns(5).primary_key());

REQUIRE(response.table().columns(6).name() == "col_boolean");
REQUIRE(response.table().columns(6).type() ==
::fivetran_sdk::DataType::BOOLEAN);
REQUIRE_FALSE(response.table().columns(6).primary_key());
REQUIRE(response.table().columns(7).name() == "col_short");
REQUIRE(response.table().columns(7).type() ==
::fivetran_sdk::DataType::SHORT);
REQUIRE_FALSE(response.table().columns(7).primary_key());
REQUIRE(response.table().columns(8).name() == "col_long");
REQUIRE(response.table().columns(8).type() ==
::fivetran_sdk::DataType::LONG);
REQUIRE_FALSE(response.table().columns(8).primary_key());
REQUIRE(response.table().columns(9).name() == "col_float");
REQUIRE(response.table().columns(9).type() ==
::fivetran_sdk::DataType::FLOAT);
REQUIRE_FALSE(response.table().columns(9).primary_key());
REQUIRE(response.table().columns(10).name() == "col_double");
REQUIRE(response.table().columns(10).type() ==
::fivetran_sdk::DataType::DOUBLE);
REQUIRE_FALSE(response.table().columns(10).primary_key());
REQUIRE(response.table().columns(11).name() == "col_binary");
REQUIRE(response.table().columns(11).type() ==
::fivetran_sdk::DataType::BINARY);
REQUIRE_FALSE(response.table().columns(11).primary_key());
}
}

0 comments on commit afb4a2b

Please sign in to comment.