Skip to content

Commit

Permalink
Make CSV block size configurable. (#42)
Browse files Browse the repository at this point in the history
The default block size in Arrow CSV reader is 1MB (or so?). This PR makes it configurable in multiples of a megabyte.

Fixes ECO-142
  • Loading branch information
elefeint authored Jul 2, 2024
1 parent 79ef005 commit 996ec23
Show file tree
Hide file tree
Showing 8 changed files with 41,628 additions and 71 deletions.
22 changes: 22 additions & 0 deletions includes/common.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include <string>
#include <vector>

struct IngestProperties {

IngestProperties(const std::string &_filename,
const std::string &_decryption_key,
const std::vector<std::string> &_utf8_columns,
const std::string &_null_value,
const int &_csv_block_size_mb)
: filename(_filename), decryption_key(_decryption_key),
utf8_columns(_utf8_columns), null_value(_null_value),
csv_block_size_mb(_csv_block_size_mb) {}

const std::string &filename;
const std::string &decryption_key;
const std::vector<std::string> &utf8_columns;
const std::string &null_value;
const int &csv_block_size_mb;
};
9 changes: 3 additions & 6 deletions includes/csv_arrow_ingest.hpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#include "common.hpp"
#include <arrow/csv/api.h>

std::shared_ptr<arrow::Table> read_encrypted_csv(
const std::string &filename, const std::string &decryption_key,
std::vector<std::string> &utf8_columns, const std::string &null_value);
std::shared_ptr<arrow::Table> read_encrypted_csv(const IngestProperties &props);

std::shared_ptr<arrow::Table>
read_unencrypted_csv(const std::string &filename,
std::vector<std::string> &utf8_columns,
const std::string &null_value);
read_unencrypted_csv(const IngestProperties &props);
6 changes: 6 additions & 0 deletions includes/motherduck_destination_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ static constexpr const char *const MD_PROP_DATABASE = "motherduck_database";

static constexpr const char *const MD_PROP_TOKEN = "motherduck_token";

static constexpr const char *const MD_PROP_CSV_BLOCK_SIZE =
"motherduck_csv_block_size";

static constexpr const char *const CONFIG_TEST_NAME_AUTHENTICATE =
"test_authentication";

static constexpr const char *const CONFIG_TEST_NAME_CSV_BLOCK_SIZE =
"test_csv_block_size";

class DestinationSdkImpl final : public fivetran_sdk::Destination::Service {
public:
DestinationSdkImpl() = default;
Expand Down
40 changes: 19 additions & 21 deletions src/csv_arrow_ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <decryption.hpp>

arrow::csv::ConvertOptions
get_arrow_convert_options(std::vector<std::string> &utf8_columns,
get_arrow_convert_options(const std::vector<std::string> &utf8_columns,
const std::string &null_value) {
auto convert_options = arrow::csv::ConvertOptions::Defaults();
convert_options.null_values
Expand All @@ -21,13 +21,15 @@ get_arrow_convert_options(std::vector<std::string> &utf8_columns,
}

template <typename T>
std::shared_ptr<arrow::Table> read_csv_stream_to_arrow_table(
T &input_stream, std::vector<std::string> &utf8_columns,
const std::string &null_value, const std::string &filename) {
std::shared_ptr<arrow::Table>
read_csv_stream_to_arrow_table(T &input_stream, const IngestProperties &props) {

auto read_options = arrow::csv::ReadOptions::Defaults();
read_options.block_size = props.csv_block_size_mb << 20;
auto parse_options = arrow::csv::ParseOptions::Defaults();
auto convert_options = get_arrow_convert_options(utf8_columns, null_value);
parse_options.newlines_in_values = true;
auto convert_options =
get_arrow_convert_options(props.utf8_columns, props.null_value);

auto maybe_table_reader = arrow::csv::TableReader::Make(
arrow::io::default_io_context(), std::move(input_stream), read_options,
Expand All @@ -42,21 +44,20 @@ std::shared_ptr<arrow::Table> read_csv_stream_to_arrow_table(

auto maybe_table = table_reader->Read();
if (!maybe_table.ok()) {
throw std::runtime_error("Could not read CSV <" + filename +
throw std::runtime_error("Could not read CSV <" + props.filename +
">: " + maybe_table.status().message());
}
auto table = std::move(maybe_table.ValueOrDie());

return table;
}

std::shared_ptr<arrow::Table> read_encrypted_csv(
const std::string &filename, const std::string &decryption_key,
std::vector<std::string> &utf8_columns, const std::string &null_value) {
std::shared_ptr<arrow::Table>
read_encrypted_csv(const IngestProperties &props) {

std::vector<unsigned char> plaintext = decrypt_file(
filename,
reinterpret_cast<const unsigned char *>(decryption_key.c_str()));
props.filename,
reinterpret_cast<const unsigned char *>(props.decryption_key.c_str()));
auto buffer = std::make_shared<arrow::Buffer>(
reinterpret_cast<const uint8_t *>(plaintext.data()), plaintext.size());
auto buffer_reader = std::make_shared<arrow::io::BufferReader>(buffer);
Expand All @@ -78,23 +79,20 @@ std::shared_ptr<arrow::Table> read_encrypted_csv(
auto compressed_input_stream =
std::move(maybe_compressed_input_stream.ValueOrDie());

return read_csv_stream_to_arrow_table(compressed_input_stream, utf8_columns,
null_value, filename);
return read_csv_stream_to_arrow_table(compressed_input_stream, props);
}

std::shared_ptr<arrow::Table>
read_unencrypted_csv(const std::string &filename,
std::vector<std::string> &utf8_columns,
const std::string &null_value) {
read_unencrypted_csv(const IngestProperties &props) {

auto maybe_file =
arrow::io::ReadableFile::Open(filename, arrow::default_memory_pool());
auto maybe_file = arrow::io::ReadableFile::Open(props.filename,
arrow::default_memory_pool());
if (!maybe_file.ok()) {
throw std::runtime_error("Could not open uncompressed file <" + filename +
throw std::runtime_error("Could not open uncompressed file <" +
props.filename +
">: " + maybe_file.status().message());
}
auto plaintext_input_stream = std::move(maybe_file.ValueOrDie());

return read_csv_stream_to_arrow_table(plaintext_input_stream, utf8_columns,
null_value, filename);
return read_csv_stream_to_arrow_table(plaintext_input_stream, props);
}
107 changes: 75 additions & 32 deletions src/motherduck_destination_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <arrow/c/bridge.h>
#include <grpcpp/grpcpp.h>

#include <common.hpp>
#include <csv_arrow_ingest.hpp>
#include <destination_sdk.grpc.pb.h>
#include <fivetran_duckdb_interop.hpp>
Expand All @@ -23,6 +24,14 @@ find_property(const google::protobuf::Map<std::string, std::string> &config,
return token_it->second;
}

int find_optional_property(
const google::protobuf::Map<std::string, std::string> &config,
const std::string &property_name, int default_value,
const std::function<int(const std::string &)> &parse) {
auto token_it = config.find(property_name);
return token_it == config.end() ? default_value : parse(token_it->second);
}

template <typename T> std::string get_schema_name(const T *request) {
std::string schema_name = request->schema_name();
if (schema_name.empty()) {
Expand Down Expand Up @@ -99,16 +108,12 @@ void validate_file(const std::string &file_path) {
}

void process_file(
duckdb::Connection &con, const std::string &filename,
const std::string &decryption_key, std::vector<std::string> &utf8_columns,
const std::string &null_value,
duckdb::Connection &con, IngestProperties &props,
const std::function<void(const std::string &view_name)> &process_view) {

validate_file(filename);
auto table = decryption_key.empty()
? read_unencrypted_csv(filename, utf8_columns, null_value)
: read_encrypted_csv(filename, decryption_key, utf8_columns,
null_value);
validate_file(props.filename);
auto table = props.decryption_key.empty() ? read_unencrypted_csv(props)
: read_encrypted_csv(props);

auto batch_reader = std::make_shared<arrow::TableBatchReader>(*table);
ArrowArrayStream arrow_array_stream;
Expand All @@ -117,7 +122,7 @@ void process_file(
if (!status.ok()) {
throw std::runtime_error(
"Could not convert Arrow batch reader to an array stream for file <" +
filename + ">: " + status.message());
props.filename + ">: " + status.message());
}

duckdb_connection c_con = reinterpret_cast<duckdb_connection>(&con);
Expand Down Expand Up @@ -158,19 +163,37 @@ grpc::Status DestinationSdkImpl::ConfigurationForm(
token_field.set_text_field(fivetran_sdk::Password);
token_field.set_required(true);

response->add_fields()->CopyFrom(token_field);

fivetran_sdk::FormField db_field;
db_field.set_name(MD_PROP_DATABASE);
db_field.set_label("Database Name");
db_field.set_description("The database to work in");
db_field.set_text_field(fivetran_sdk::PlainText);
db_field.set_required(true);

response->add_fields()->CopyFrom(token_field);
response->add_fields()->CopyFrom(db_field);

auto test = response->add_tests();
test->set_name(CONFIG_TEST_NAME_AUTHENTICATE);
test->set_label("Test Authentication");
fivetran_sdk::FormField block_size_field;
block_size_field.set_name(MD_PROP_CSV_BLOCK_SIZE);
block_size_field.set_label(
"Maximum individual value size, in megabytes (default 1 MB)");
block_size_field.set_description(
"This field limits the maximum length of a single field value coming "
"from the input source."
"Must be a valid numeric value");
block_size_field.set_text_field(fivetran_sdk::PlainText);
block_size_field.set_required(false);
response->add_fields()->CopyFrom(block_size_field);

auto block_size_test = response->add_tests();
block_size_test->set_name(CONFIG_TEST_NAME_CSV_BLOCK_SIZE);
block_size_test->set_label("Maximum value size is a valid number");

auto connection_test = response->add_tests();
connection_test->set_name(CONFIG_TEST_NAME_AUTHENTICATE);
connection_test->set_label("Test Authentication");

return ::grpc::Status(::grpc::StatusCode::OK, "");
}

Expand Down Expand Up @@ -330,6 +353,11 @@ DestinationSdkImpl::WriteBatch(::grpc::ServerContext *context,

const std::string db_name =
find_property(request->configuration(), MD_PROP_DATABASE);
const int csv_block_size = find_optional_property(
request->configuration(), MD_PROP_CSV_BLOCK_SIZE, 1,
[&](const std::string &val) -> int { return std::stoi(val); });
mdlog::info("CSV BLOCK SIZE = " + std::to_string(csv_block_size));

table_def table_name{db_name, get_schema_name(request),
request->table().name()};
std::unique_ptr<duckdb::Connection> con =
Expand Down Expand Up @@ -360,33 +388,35 @@ DestinationSdkImpl::WriteBatch(::grpc::ServerContext *context,
const auto decryption_key = get_encryption_key(
filename, request->keys(), request->csv().encryption());

process_file(
*con, filename, decryption_key, column_names,
request->csv().null_string(), [&](const std::string &view_name) {
upsert(*con, table_name, view_name, columns_pk, columns_regular);
});
IngestProperties props(filename, decryption_key, column_names,
request->csv().null_string(), csv_block_size);

process_file(*con, props, [&](const std::string &view_name) {
upsert(*con, table_name, view_name, columns_pk, columns_regular);
});
}
for (auto &filename : request->update_files()) {

auto decryption_key = get_encryption_key(filename, request->keys(),
request->csv().encryption());
IngestProperties props(filename, decryption_key, column_names,
request->csv().null_string(), csv_block_size);

process_file(
*con, filename, decryption_key, column_names,
request->csv().null_string(), [&](const std::string &view_name) {
update_values(*con, table_name, view_name, columns_pk,
columns_regular, request->csv().unmodified_string());
});
process_file(*con, props, [&](const std::string &view_name) {
update_values(*con, table_name, view_name, columns_pk, columns_regular,
request->csv().unmodified_string());
});
}
for (auto &filename : request->delete_files()) {
auto decryption_key = get_encryption_key(filename, request->keys(),
request->csv().encryption());
std::vector<std::string> empty;
process_file(*con, filename, decryption_key, empty,
request->csv().null_string(),
[&](const std::string &view_name) {
delete_rows(*con, table_name, view_name, columns_pk);
});
IngestProperties props(filename, decryption_key, empty,
request->csv().null_string(), csv_block_size);

process_file(*con, props, [&](const std::string &view_name) {
delete_rows(*con, table_name, view_name, columns_pk);
});
}

} catch (const std::exception &e) {
Expand All @@ -403,6 +433,18 @@ DestinationSdkImpl::WriteBatch(::grpc::ServerContext *context,
return ::grpc::Status(::grpc::StatusCode::OK, "");
}

void check_csv_block_size_is_numeric(
const google::protobuf::Map<std::string, std::string> &config) {
auto token_it = config.find(MD_PROP_CSV_BLOCK_SIZE);

// missing token is fine but non-numeric token isn't
if (token_it != config.end() &&
token_it->second.find_first_not_of("0123456789") != std::string::npos) {
throw std::runtime_error(
"Maximum individual value size must be numeric if present");
}
}

grpc::Status
DestinationSdkImpl::Test(::grpc::ServerContext *context,
const ::fivetran_sdk::TestRequest *request,
Expand All @@ -426,23 +468,24 @@ DestinationSdkImpl::Test(::grpc::ServerContext *context,

if (request->name() == CONFIG_TEST_NAME_AUTHENTICATE) {
check_connection(*con);
response->set_success(true);
} else if (request->name() == CONFIG_TEST_NAME_CSV_BLOCK_SIZE) {
check_csv_block_size_is_numeric(request->configuration());
} else {
auto const msg = "Unknown test requested: <" + request->name() + ">";
mdlog::severe(msg);
response->set_success(false);
response->set_failure(msg);
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, msg);
}
response->set_success(true);
} catch (const std::exception &e) {
auto msg = "Authentication test for database <" + db_name +
auto msg = "Test <" + request->name() + "> for database <" + db_name +
"> failed: " + std::string(e.what());
response->set_success(false);
response->set_failure(msg);
// grpc call succeeded; the response reflects config test failure
return ::grpc::Status(::grpc::StatusCode::OK, msg);
}

response->set_success(true);
return ::grpc::Status(::grpc::StatusCode::OK, "");
}
2 changes: 1 addition & 1 deletion src/sql_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,4 +400,4 @@ void check_connection(duckdb::Connection &con) {
throw std::runtime_error("Error checking connection: " +
result->GetError());
}
}
}
Loading

0 comments on commit 996ec23

Please sign in to comment.