Skip to content

Commit

Permalink
Merge pull request #583 from XpressAI/pd/property_testing
Browse files Browse the repository at this point in the history
Bugfixes: Calculate validity buffer steps based on processed elements & fix sort order issues
  • Loading branch information
wmeddie authored May 17, 2022
2 parents a18b444 + 82593cb commit 2a8fc18
Show file tree
Hide file tree
Showing 12 changed files with 564 additions and 145 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ lazy val root = Project(id = "spark-cyclone-sql-plugin", base = file("."))
.configs(AcceptanceTest)
.configs(VectorEngine)
.configs(TPC)
.settings(version := "1.0.3")
.settings(version := "1.0.4")

lazy val tracing = project
.enablePlugins(JavaServerAppPackaging)
Expand Down
4 changes: 2 additions & 2 deletions rddbench/run_rddbench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ time $SPARK_HOME/bin/spark-submit \
--deploy-mode cluster \
--name RDDBench \
--conf spark.com.nec.spark.ncc.path=/opt/nec/ve/bin/ncc \
--jars ../target/scala-2.12/spark-cyclone-sql-plugin-assembly-1.0.3.jar \
--conf spark.executor.extraClassPath=../target/scala-2.12/spark-cyclone-sql-plugin-assembly-1.0.3.jar \
--jars ../target/scala-2.12/spark-cyclone-sql-plugin-assembly-1.0.4.jar \
--conf spark.executor.extraClassPath=../target/scala-2.12/spark-cyclone-sql-plugin-assembly-1.0.4.jar \
--conf spark.rpc.message.maxSize=1024 \
--conf spark.plugins=com.nec.spark.AuroraSqlPlugin \
--conf spark.sql.columnVector.offheap.enabled=true \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ namespace cyclone {
return out_dangling;
}

// Calculate how many full elements are necessary to fit the second bitset into the given type
auto is_big_steps = sizeof(T) > 1;

// How many elements *can* be fit into the tail
Expand Down
49 changes: 26 additions & 23 deletions src/main/resources/com/nec/cyclone/cpp/cyclone/packed_transfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
#include <type_traits>
#include <iostream>

void merge_varchar_transfer(size_t batch_count, size_t total_element_count, char* col_header, char* input_data, char* out_data, uint64_t* out_validity_buffer, char* out_lengths, char* out_offsets, uintptr_t* od, size_t &output_pos){
void merge_varchar_transfer(size_t batch_count, char* col_header, char* input_data, char* out_data, uint64_t* out_validity_buffer, char* out_lengths, char* out_offsets, uintptr_t* od, size_t &output_pos){
//std::cout << "merge_varchar_transfer" << std::endl;
size_t cur_col_pos = 0;
size_t cur_data_pos = 0;
size_t cur_out_data_pos = 0;
size_t cur_out_lengths_pos = 0;
size_t cur_out_offsets_pos = 0;
size_t cur_out_validity_data_pos = 0;
size_t dangling_bits = 0;
size_t processed_elements = 0;

for(auto b = 0; b < batch_count; b++){
cur_col_pos += sizeof(column_type); // Don't care about column type anymore - it has been asserted to be correct previously
Expand Down Expand Up @@ -82,16 +82,16 @@ void merge_varchar_transfer(size_t batch_count, size_t total_element_count, char
cur_out_lengths_pos += col_in->lengths_size;

//std::cout << "merge_varchar_transfer: copy validity buffer" << std::endl;
size_t cur_out_validity_data_pos = processed_elements / 64;
//std::cout << "merge_varchar_transfer: cur_out_validity_data_pos=" << cur_out_validity_data_pos << std::endl;
//std::cout << "merge_varchar_transfer: dangling_bits=" << dangling_bits << std::endl;

uint64_t* batch_validity_buffer = reinterpret_cast<uint64_t *>(&input_data[cur_data_pos]);
dangling_bits = cyclone::append_bitsets(
&out_validity_buffer[cur_out_validity_data_pos], dangling_bits,
batch_validity_buffer, col_in->element_count);
cur_out_validity_data_pos += frovedis::ceil_div(col_in->element_count, size_t(64));
if(dangling_bits > 0){
// last part is not entirely filled yet
cur_out_validity_data_pos -= 1;
}

processed_elements += col_in->element_count;
cur_data_pos += VECTOR_ALIGNED(col_in->validity_buffer_size);
}

Expand All @@ -102,7 +102,7 @@ void merge_varchar_transfer(size_t batch_count, size_t total_element_count, char
result->lengths = reinterpret_cast<int32_t *>(out_lengths);
result->validityBuffer = out_validity_buffer;
result->dataSize = cur_out_data_pos / sizeof(int32_t);
result->count = total_element_count;
result->count = processed_elements;

//std::cout << "merge_varchar_transfer: set output pointers" << std::endl;
od[output_pos++] = reinterpret_cast<uintptr_t>(result);
Expand All @@ -114,15 +114,16 @@ void merge_varchar_transfer(size_t batch_count, size_t total_element_count, char


template<typename T>
void merge_scalar_transfer(size_t batch_count, size_t total_element_count, char* col_header, char* input_data, char* out_data, uint64_t* out_validity_buffer, uintptr_t* od, size_t &output_pos){
void merge_scalar_transfer(size_t batch_count, char* col_header, char* input_data, char* out_data, uint64_t* out_validity_buffer, uintptr_t* od, size_t &output_pos){
//std::cout << "merge_scalar_transfer" << std::endl;

size_t cur_col_pos = 0;
size_t cur_data_pos = 0;
size_t cur_out_data_pos = 0;
size_t cur_out_validity_data_pos = 0;
size_t dangling_bits = 0;

size_t processed_elements = 0;

for(auto b = 0; b < batch_count; b++){
cur_col_pos += sizeof(column_type); // Don't care about column type anymore - it has been asserted to be correct previously
scalar_col_in* col_in = reinterpret_cast<scalar_col_in *>(&col_header[cur_col_pos]);
Expand All @@ -134,24 +135,26 @@ void merge_scalar_transfer(size_t batch_count, size_t total_element_count, char*
cur_out_data_pos += col_in->data_size;

//std::cout << "merge_scalar_transfer: copy validity" << std::endl;
//std::cout << "merge_scalar_transfer: cur_data_pos=" << cur_data_pos << std::endl;

size_t cur_out_validity_data_pos = processed_elements / 64;
//std::cout << "merge_scalar_transfer: cur_out_validity_data_pos=" << cur_out_validity_data_pos << std::endl;
//std::cout << "merge_scalar_transfer: dangling_bits=" << dangling_bits << std::endl;

uint64_t* batch_validity_buffer = reinterpret_cast<uint64_t *>(&input_data[cur_data_pos]);
dangling_bits = cyclone::append_bitsets(
&out_validity_buffer[cur_out_validity_data_pos], dangling_bits,
batch_validity_buffer, col_in->element_count);
cur_out_validity_data_pos += frovedis::ceil_div(col_in->element_count, size_t(64));
if(dangling_bits > 0){
// last part is not entirely filled yet
cur_out_validity_data_pos -= 1;
}

processed_elements += col_in->element_count;
cur_data_pos += VECTOR_ALIGNED(col_in->validity_buffer_size);
}

//std::cout << "merge_scalar_transfer: allocate vector" << std::endl;
NullableScalarVec<T>* result = NullableScalarVec<T>::allocate();
result->data = reinterpret_cast<T* >(out_data);
result->validityBuffer = out_validity_buffer;
result->count = total_element_count;
result->count = processed_elements;

//std::cout << "merge_scalar_transfer: set output pointers" << std::endl;
//std::cout << "merge_scalar_transfer: set output pointers: result; pos: " << output_pos << std::endl;
Expand Down Expand Up @@ -231,7 +234,7 @@ void merge_scalar_transfer(size_t batch_count, size_t total_element_count, char*
* # Output descriptor
* The output descriptor has the following format:
* ```
* [column ouput descriptor]...
* [column output descriptor]...
* ```
* As we are merging the entire input into a single batch, there will be exactly
* `column count` output descriptors.
Expand Down Expand Up @@ -311,19 +314,19 @@ extern "C" int handle_transfer(

switch (col_type) {
case COL_TYPE_SHORT:
merge_scalar_transfer<int32_t>(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, od, output_pos);
merge_scalar_transfer<int32_t>(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, od, output_pos);
break;
case COL_TYPE_INT:
merge_scalar_transfer<int32_t>(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, od, output_pos);
merge_scalar_transfer<int32_t>(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, od, output_pos);
break;
case COL_TYPE_BIGINT:
merge_scalar_transfer<int64_t>(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos);
merge_scalar_transfer<int64_t>(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos);
break;
case COL_TYPE_FLOAT:
merge_scalar_transfer<float>(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos);
merge_scalar_transfer<float>(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos);
break;
case COL_TYPE_DOUBLE:
merge_scalar_transfer<double>(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos);
merge_scalar_transfer<double>(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos);
break;
default:
// Should be impossible to reach at this point!
Expand Down Expand Up @@ -374,7 +377,7 @@ extern "C" int handle_transfer(
const auto vbytes = sizeof(uint64_t) * frovedis::ceil_div(total_element_count, size_t(64));
uint64_t* validity_buffer = static_cast<uint64_t *>(calloc(vbytes, 1));

merge_varchar_transfer(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, lengths, offsets, od, output_pos);
merge_varchar_transfer(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, lengths, offsets, od, output_pos);
input_data_pos += aligned_data_size;
}
break;
Expand Down
41 changes: 41 additions & 0 deletions src/main/resources/com/nec/cyclone/cpp/tests/cyclone_utils_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,4 +375,45 @@ namespace cyclone::tests {
CHECK(output[4] == expected[4]);
free(output);
}

TEST_CASE("Merging into tail works after several small steps landing on full byte"){
uint64_t inputs[3] = {
0b0000000000000000000000000000000000000000000000000110011000110010,
// ^ valid until here
0b0000000011001001011001001000100101001000011011101000001100101000,
// ^ valid until here
0b0000011010100101101110010010101111000011010000000001100100111001
// ^ valid until here
};

uint64_t lengths[3] = {16, 56, 59};

const auto vbytes = sizeof(uint64_t) * frovedis::ceil_div(lengths[0] + lengths[1] + lengths[2], size_t(64));
uint64_t* output = static_cast<uint64_t *>(calloc(vbytes, 1));

auto d1 = cyclone::append_bitsets(&output[0], 0, &inputs[0], lengths[0]);
auto d2 = cyclone::append_bitsets(&output[0], d1, &inputs[1], lengths[1]);
auto d3 = cyclone::append_bitsets(&output[1], d2, &inputs[2], lengths[2]);

auto expected_d1 = lengths[0] % 64;
auto expected_d2 = (lengths[0] + lengths[1]) % 64;
auto expected_d3 = (lengths[0] + lengths[1] + lengths[2]) % 64;

uint64_t expected[3] = {
0b0110010010001001010010000110111010000011001010000110011000110010,
// part of inputs[1] ^ inputs[0]
0b1010010110111001001010111100001101000000000110010011100111001001,
// part of inputs[2] ^ part of inputs[1]
0b0000000000000000000000000000000000000000000000000000000000000110
// empty ^ part of inputs[2]
};

CHECK(d1 == expected_d1);
CHECK(d2 == expected_d2);
CHECK(d3 == expected_d3);

CHECK(output[0] == expected[0]);
CHECK(output[1] == expected[1]);
CHECK(output[2] == expected[2]);
}
}
Loading

0 comments on commit 2a8fc18

Please sign in to comment.