Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
elefeint committed Jul 1, 2024
1 parent 4f73a6a commit 38009ad
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 62 deletions.
27 changes: 13 additions & 14 deletions includes/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@
#include <string>
#include <vector>


struct IngestProperties {

IngestProperties(const std::string &_filename, const std::string &_decryption_key,
const std::vector<std::string> &_utf8_columns, const std::string &_null_value,
const int &_csv_block_size_mb):
filename(_filename), decryption_key(_decryption_key), utf8_columns(_utf8_columns),
null_value(_null_value), csv_block_size_mb(_csv_block_size_mb) {

}

const std::string &filename;
const std::string &decryption_key;
const std::vector<std::string> &utf8_columns;
const std::string &null_value;
const int &csv_block_size_mb;
IngestProperties(const std::string &_filename,
const std::string &_decryption_key,
const std::vector<std::string> &_utf8_columns,
const std::string &_null_value,
const int &_csv_block_size_mb)
: filename(_filename), decryption_key(_decryption_key),
utf8_columns(_utf8_columns), null_value(_null_value),
csv_block_size_mb(_csv_block_size_mb) {}

const std::string &filename;
const std::string &decryption_key;
const std::vector<std::string> &utf8_columns;
const std::string &null_value;
const int &csv_block_size_mb;
};
5 changes: 3 additions & 2 deletions includes/csv_arrow_ingest.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <arrow/csv/api.h>
#include "common.hpp"
#include <arrow/csv/api.h>

std::shared_ptr<arrow::Table> read_encrypted_csv(const IngestProperties &props);

std::shared_ptr<arrow::Table> read_unencrypted_csv(const IngestProperties &props);
std::shared_ptr<arrow::Table>
read_unencrypted_csv(const IngestProperties &props);
3 changes: 2 additions & 1 deletion includes/motherduck_destination_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ static constexpr const char *const MD_PROP_DATABASE = "motherduck_database";

static constexpr const char *const MD_PROP_TOKEN = "motherduck_token";

static constexpr const char *const MD_PROP_CSV_BLOCK_SIZE = "motherduck_csv_block_size";
static constexpr const char *const MD_PROP_CSV_BLOCK_SIZE =
"motherduck_csv_block_size";

static constexpr const char *const CONFIG_TEST_NAME_AUTHENTICATE =
"test_authentication";
Expand Down
16 changes: 10 additions & 6 deletions src/csv_arrow_ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ get_arrow_convert_options(const std::vector<std::string> &utf8_columns,
}

template <typename T>
std::shared_ptr<arrow::Table> read_csv_stream_to_arrow_table(T &input_stream, const IngestProperties &props) {
std::shared_ptr<arrow::Table>
read_csv_stream_to_arrow_table(T &input_stream, const IngestProperties &props) {

auto read_options = arrow::csv::ReadOptions::Defaults();
read_options.block_size = props.csv_block_size_mb << 20;
auto parse_options = arrow::csv::ParseOptions::Defaults();
parse_options.newlines_in_values = true;
auto convert_options = get_arrow_convert_options(props.utf8_columns, props.null_value);
auto convert_options =
get_arrow_convert_options(props.utf8_columns, props.null_value);

auto maybe_table_reader = arrow::csv::TableReader::Make(
arrow::io::default_io_context(), std::move(input_stream), read_options,
Expand All @@ -50,7 +52,8 @@ std::shared_ptr<arrow::Table> read_csv_stream_to_arrow_table(T &input_stream, co
return table;
}

std::shared_ptr<arrow::Table> read_encrypted_csv(const IngestProperties &props) {
std::shared_ptr<arrow::Table>
read_encrypted_csv(const IngestProperties &props) {

std::vector<unsigned char> plaintext = decrypt_file(
props.filename,
Expand Down Expand Up @@ -82,10 +85,11 @@ std::shared_ptr<arrow::Table> read_encrypted_csv(const IngestProperties &props)
std::shared_ptr<arrow::Table>
read_unencrypted_csv(const IngestProperties &props) {

auto maybe_file =
arrow::io::ReadableFile::Open(props.filename, arrow::default_memory_pool());
auto maybe_file = arrow::io::ReadableFile::Open(props.filename,
arrow::default_memory_pool());
if (!maybe_file.ok()) {
throw std::runtime_error("Could not open uncompressed file <" + props.filename +
throw std::runtime_error("Could not open uncompressed file <" +
props.filename +
">: " + maybe_file.status().message());
}
auto plaintext_input_stream = std::move(maybe_file.ValueOrDie());
Expand Down
60 changes: 35 additions & 25 deletions src/motherduck_destination_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
#include <arrow/c/bridge.h>
#include <grpcpp/grpcpp.h>

#include <common.hpp>
#include <csv_arrow_ingest.hpp>
#include <destination_sdk.grpc.pb.h>
#include <fivetran_duckdb_interop.hpp>
#include <md_logging.hpp>
#include <motherduck_destination_server.hpp>
#include <sql_generator.hpp>
#include <common.hpp>

std::string
find_property(const google::protobuf::Map<std::string, std::string> &config,
Expand All @@ -24,9 +24,10 @@ find_property(const google::protobuf::Map<std::string, std::string> &config,
return token_it->second;
}


int find_optional_property(const google::protobuf::Map<std::string, std::string> &config,
const std::string &property_name, int default_value, const std::function<int(const std::string &)> &parse ) {
int find_optional_property(
const google::protobuf::Map<std::string, std::string> &config,
const std::string &property_name, int default_value,
const std::function<int(const std::string &)> &parse) {
auto token_it = config.find(property_name);
return token_it == config.end() ? default_value : parse(token_it->second);
}
Expand Down Expand Up @@ -111,7 +112,8 @@ void process_file(
const std::function<void(const std::string &view_name)> &process_view) {

validate_file(props.filename);
auto table = props.decryption_key.empty() ? read_unencrypted_csv(props) : read_encrypted_csv(props);
auto table = props.decryption_key.empty() ? read_unencrypted_csv(props)
: read_encrypted_csv(props);

auto batch_reader = std::make_shared<arrow::TableBatchReader>(*table);
ArrowArrayStream arrow_array_stream;
Expand Down Expand Up @@ -174,9 +176,12 @@ grpc::Status DestinationSdkImpl::ConfigurationForm(

fivetran_sdk::FormField block_size_field;
block_size_field.set_name(MD_PROP_CSV_BLOCK_SIZE);
block_size_field.set_label("Maximum individual value size, in megabytes (default 1 MB)");
block_size_field.set_description("This field limits the maximum length of a single field value coming from the input source."
"Must be a valid numeric value");
block_size_field.set_label(
"Maximum individual value size, in megabytes (default 1 MB)");
block_size_field.set_description(
"This field limits the maximum length of a single field value coming "
"from the input source."
"Must be a valid numeric value");
block_size_field.set_text_field(fivetran_sdk::PlainText);
block_size_field.set_required(false);
response->add_fields()->CopyFrom(block_size_field);
Expand Down Expand Up @@ -379,38 +384,39 @@ DestinationSdkImpl::WriteBatch(::grpc::ServerContext *context,
std::transform(cols.begin(), cols.end(), column_names.begin(),
[](const column_def &col) { return col.name; });


for (auto &filename : request->replace_files()) {
const auto decryption_key = get_encryption_key(
filename, request->keys(), request->csv().encryption());

IngestProperties props(filename, decryption_key, column_names, request->csv().null_string(), csv_block_size);
IngestProperties props(filename, decryption_key, column_names,
request->csv().null_string(), csv_block_size);

process_file(*con, props, [&](const std::string &view_name) {
upsert(*con, table_name, view_name, columns_pk, columns_regular);
});
upsert(*con, table_name, view_name, columns_pk, columns_regular);
});
}
for (auto &filename : request->update_files()) {

auto decryption_key = get_encryption_key(filename, request->keys(),
request->csv().encryption());
IngestProperties props(filename, decryption_key, column_names, request->csv().null_string(), csv_block_size);
IngestProperties props(filename, decryption_key, column_names,
request->csv().null_string(), csv_block_size);

process_file(
*con, props, [&](const std::string &view_name) {
update_values(*con, table_name, view_name, columns_pk,
columns_regular, request->csv().unmodified_string());
});
process_file(*con, props, [&](const std::string &view_name) {
update_values(*con, table_name, view_name, columns_pk, columns_regular,
request->csv().unmodified_string());
});
}
for (auto &filename : request->delete_files()) {
auto decryption_key = get_encryption_key(filename, request->keys(),
request->csv().encryption());
std::vector<std::string> empty;
IngestProperties props(filename, decryption_key, empty, request->csv().null_string(), csv_block_size);
IngestProperties props(filename, decryption_key, empty,
request->csv().null_string(), csv_block_size);

process_file(*con, props, [&](const std::string &view_name) {
delete_rows(*con, table_name, view_name, columns_pk);
});
delete_rows(*con, table_name, view_name, columns_pk);
});
}

} catch (const std::exception &e) {
Expand All @@ -427,12 +433,15 @@ DestinationSdkImpl::WriteBatch(::grpc::ServerContext *context,
return ::grpc::Status(::grpc::StatusCode::OK, "");
}

void check_csv_block_size_is_numeric(const google::protobuf::Map<std::string, std::string> &config) {
void check_csv_block_size_is_numeric(
const google::protobuf::Map<std::string, std::string> &config) {
auto token_it = config.find(MD_PROP_CSV_BLOCK_SIZE);

// missing token is fine but non-numeric token isn't
if (token_it != config.end() && token_it->second.find_first_not_of("0123456789") != std::string::npos) {
throw std::runtime_error("Maximum individual value size must be numeric if present");
if (token_it != config.end() &&
token_it->second.find_first_not_of("0123456789") != std::string::npos) {
throw std::runtime_error(
"Maximum individual value size must be numeric if present");
}
}

Expand Down Expand Up @@ -469,7 +478,8 @@ DestinationSdkImpl::Test(::grpc::ServerContext *context,
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, msg);
}
} catch (const std::exception &e) {
auto msg = "Test for database <" + db_name + "> failed: " + std::string(e.what());
auto msg =
"Test for database <" + db_name + "> failed: " + std::string(e.what());
response->set_success(false);
response->set_failure(msg);
// grpc call succeeded; the response reflects config test failure
Expand Down
33 changes: 19 additions & 14 deletions test/integration/test_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ TEST_CASE("ConfigurationForm", "[integration][config]") {
REQUIRE(response.fields(0).name() == "motherduck_token");
REQUIRE(response.fields(1).name() == "motherduck_database");
REQUIRE(response.fields(2).name() == "motherduck_csv_block_size");
REQUIRE(response.fields(2).label() == "Maximum individual value size, in megabytes (default 1 MB)");
REQUIRE(response.fields(2).label() ==
"Maximum individual value size, in megabytes (default 1 MB)");

REQUIRE(response.tests_size() == 2);
REQUIRE(response.tests(0).name() == CONFIG_TEST_NAME_CSV_BLOCK_SIZE);
Expand Down Expand Up @@ -213,15 +214,15 @@ TEST_CASE("Test fails when token is missing", "[integration][configtest]") {

auto status = service.Test(nullptr, &request, &response);
REQUIRE_NO_FAIL(status);
auto expected_message = "Test for database <" +
TEST_DATABASE_NAME +
auto expected_message = "Test for database <" + TEST_DATABASE_NAME +
"> failed: Missing "
"property motherduck_token";
REQUIRE(status.error_message() == expected_message);
REQUIRE(response.failure() == expected_message);
}

TEST_CASE("Test endpoint fails when token is bad", "[integration][configtest]") {
TEST_CASE("Test endpoint fails when token is bad",
"[integration][configtest]") {
DestinationSdkImpl service;

::fivetran_sdk::TestRequest request;
Expand All @@ -240,8 +241,9 @@ TEST_CASE("Test endpoint fails when token is bad", "[integration][configtest]")
Catch::Matchers::ContainsSubstring("UNAUTHENTICATED"));
}

TEST_CASE("Test endpoint authentication test succeeds when everything is in order",
"[integration][configtest]") {
TEST_CASE(
"Test endpoint authentication test succeeds when everything is in order",
"[integration][configtest]") {
DestinationSdkImpl service;

::fivetran_sdk::TestRequest request;
Expand All @@ -258,7 +260,8 @@ TEST_CASE("Test endpoint authentication test succeeds when everything is in orde
REQUIRE_NO_FAIL(status);
}

TEST_CASE("Test endpoint block size validation succeeds when optional block size is missing",
TEST_CASE("Test endpoint block size validation succeeds when optional block "
"size is missing",
"[integration][configtest]") {
DestinationSdkImpl service;

Expand All @@ -276,7 +279,8 @@ TEST_CASE("Test endpoint block size validation succeeds when optional block size
REQUIRE_NO_FAIL(status);
}

TEST_CASE("Test endpoint block size validation succeeds when optional block size is a valid number",
TEST_CASE("Test endpoint block size validation succeeds when optional block "
"size is a valid number",
"[integration][configtest]") {
DestinationSdkImpl service;

Expand All @@ -289,14 +293,14 @@ TEST_CASE("Test endpoint block size validation succeeds when optional block size
(*request.mutable_configuration())["motherduck_token"] = token;
(*request.mutable_configuration())[MD_PROP_CSV_BLOCK_SIZE] = "5";


::fivetran_sdk::TestResponse response;

auto status = service.Test(nullptr, &request, &response);
REQUIRE_NO_FAIL(status);
}

TEST_CASE("Test endpoint block size validation fails when optional block size is not a valid number",
TEST_CASE("Test endpoint block size validation fails when optional block size "
"is not a valid number",
"[integration][configtest]") {
DestinationSdkImpl service;

Expand All @@ -315,7 +319,9 @@ TEST_CASE("Test endpoint block size validation fails when optional block size is
REQUIRE_NO_FAIL(status);
REQUIRE_FALSE(response.success());

auto expected_message = "Test for database <" + TEST_DATABASE_NAME + "> failed: Maximum individual value size must be numeric if present";
auto expected_message =
"Test for database <" + TEST_DATABASE_NAME +
"> failed: Maximum individual value size must be numeric if present";
REQUIRE(response.failure() == expected_message);
}

Expand Down Expand Up @@ -875,8 +881,6 @@ void make_book_table(T &request, const std::string &table_name) {
auto col1 = request.mutable_table()->add_columns();
col1->set_name("text");
col1->set_type(::fivetran_sdk::DataType::STRING);


}

TEST_CASE("Table with large json row", "[integration][write-batch]") {
Expand Down Expand Up @@ -918,7 +922,8 @@ TEST_CASE("Table with large json row", "[integration][write-batch]") {
auto status = service.WriteBatch(nullptr, &request, &response);
REQUIRE_FALSE(status.ok());
CHECK_THAT(status.error_message(),
Catch::Matchers::ContainsSubstring("straddling object straddles two block boundaries"));
Catch::Matchers::ContainsSubstring(
"straddling object straddles two block boundaries"));
}

{
Expand Down

0 comments on commit 38009ad

Please sign in to comment.