diff --git a/build.sbt b/build.sbt index 8f8fbbd5f..14cb25c13 100644 --- a/build.sbt +++ b/build.sbt @@ -21,7 +21,7 @@ lazy val root = Project(id = "spark-cyclone-sql-plugin", base = file(".")) .configs(AcceptanceTest) .configs(VectorEngine) .configs(TPC) - .settings(version := "1.0.3") + .settings(version := "1.0.4") lazy val tracing = project .enablePlugins(JavaServerAppPackaging) diff --git a/rddbench/run_rddbench.sh b/rddbench/run_rddbench.sh index a89926b54..f67ab8402 100755 --- a/rddbench/run_rddbench.sh +++ b/rddbench/run_rddbench.sh @@ -8,8 +8,8 @@ time $SPARK_HOME/bin/spark-submit \ --deploy-mode cluster \ --name RDDBench \ --conf spark.com.nec.spark.ncc.path=/opt/nec/ve/bin/ncc \ - --jars ../target/scala-2.12/spark-cyclone-sql-plugin-assembly-1.0.3.jar \ - --conf spark.executor.extraClassPath=../target/scala-2.12/spark-cyclone-sql-plugin-assembly-1.0.3.jar \ + --jars ../target/scala-2.12/spark-cyclone-sql-plugin-assembly-1.0.4.jar \ + --conf spark.executor.extraClassPath=../target/scala-2.12/spark-cyclone-sql-plugin-assembly-1.0.4.jar \ --conf spark.rpc.message.maxSize=1024 \ --conf spark.plugins=com.nec.spark.AuroraSqlPlugin \ --conf spark.sql.columnVector.offheap.enabled=true \ diff --git a/src/main/resources/com/nec/cyclone/cpp/cyclone/cyclone_utils.hpp b/src/main/resources/com/nec/cyclone/cpp/cyclone/cyclone_utils.hpp index 5a26dfda1..904936eb4 100644 --- a/src/main/resources/com/nec/cyclone/cpp/cyclone/cyclone_utils.hpp +++ b/src/main/resources/com/nec/cyclone/cpp/cyclone/cyclone_utils.hpp @@ -204,7 +204,6 @@ namespace cyclone { return out_dangling; } - // Calculate how many full elements are necessary to fit the second bitset into the given type auto is_big_steps = sizeof(T) > 1; // How many elements *can* be fit into the tail diff --git a/src/main/resources/com/nec/cyclone/cpp/cyclone/packed_transfer.cc b/src/main/resources/com/nec/cyclone/cpp/cyclone/packed_transfer.cc index a00ebc782..c4b1d3eaa 100644 --- a/src/main/resources/com/nec/cyclone/cpp/cyclone/packed_transfer.cc +++ b/src/main/resources/com/nec/cyclone/cpp/cyclone/packed_transfer.cc @@ -30,15 +30,15 @@ #include #include -void merge_varchar_transfer(size_t batch_count, size_t total_element_count, char* col_header, char* input_data, char* out_data, uint64_t* out_validity_buffer, char* out_lengths, char* out_offsets, uintptr_t* od, size_t &output_pos){ +void merge_varchar_transfer(size_t batch_count, char* col_header, char* input_data, char* out_data, uint64_t* out_validity_buffer, char* out_lengths, char* out_offsets, uintptr_t* od, size_t &output_pos){ //std::cout << "merge_varchar_transfer" << std::endl; size_t cur_col_pos = 0; size_t cur_data_pos = 0; size_t cur_out_data_pos = 0; size_t cur_out_lengths_pos = 0; size_t cur_out_offsets_pos = 0; - size_t cur_out_validity_data_pos = 0; size_t dangling_bits = 0; + size_t processed_elements = 0; for(auto b = 0; b < batch_count; b++){ cur_col_pos += sizeof(column_type); // Don't care about column type anymore - it has been asserted to be correct previously @@ -82,16 +82,16 @@ void merge_varchar_transfer(size_t batch_count, size_t total_element_count, char cur_out_lengths_pos += col_in->lengths_size; //std::cout << "merge_varchar_transfer: copy validity buffer" << std::endl; + size_t cur_out_validity_data_pos = processed_elements / 64; + //std::cout << "merge_varchar_transfer: cur_out_validity_data_pos=" << cur_out_validity_data_pos << std::endl; + //std::cout << "merge_varchar_transfer: dangling_bits=" << dangling_bits << std::endl; + uint64_t* batch_validity_buffer = reinterpret_cast(&input_data[cur_data_pos]); dangling_bits = cyclone::append_bitsets( &out_validity_buffer[cur_out_validity_data_pos], dangling_bits, batch_validity_buffer, col_in->element_count); - cur_out_validity_data_pos += frovedis::ceil_div(col_in->element_count, size_t(64)); - if(dangling_bits > 0){ - // last part is not entirely filled yet - cur_out_validity_data_pos -= 1; - } + processed_elements += col_in->element_count; cur_data_pos += VECTOR_ALIGNED(col_in->validity_buffer_size); } @@ -102,7 +102,7 @@ void merge_varchar_transfer(size_t batch_count, size_t total_element_count, char result->lengths = reinterpret_cast(out_lengths); result->validityBuffer = out_validity_buffer; result->dataSize = cur_out_data_pos / sizeof(int32_t); - result->count = total_element_count; + result->count = processed_elements; //std::cout << "merge_varchar_transfer: set output pointers" << std::endl; od[output_pos++] = reinterpret_cast(result); @@ -114,15 +114,16 @@ void merge_varchar_transfer(size_t batch_count, size_t total_element_count, char template -void merge_scalar_transfer(size_t batch_count, size_t total_element_count, char* col_header, char* input_data, char* out_data, uint64_t* out_validity_buffer, uintptr_t* od, size_t &output_pos){ +void merge_scalar_transfer(size_t batch_count, char* col_header, char* input_data, char* out_data, uint64_t* out_validity_buffer, uintptr_t* od, size_t &output_pos){ //std::cout << "merge_scalar_transfer" << std::endl; size_t cur_col_pos = 0; size_t cur_data_pos = 0; size_t cur_out_data_pos = 0; - size_t cur_out_validity_data_pos = 0; size_t dangling_bits = 0; + size_t processed_elements = 0; + for(auto b = 0; b < batch_count; b++){ cur_col_pos += sizeof(column_type); // Don't care about column type anymore - it has been asserted to be correct previously scalar_col_in* col_in = reinterpret_cast(&col_header[cur_col_pos]); @@ -134,16 +135,18 @@ void merge_scalar_transfer(size_t batch_count, size_t total_element_count, char* cur_out_data_pos += col_in->data_size; //std::cout << "merge_scalar_transfer: copy validity" << std::endl; + //std::cout << "merge_scalar_transfer: cur_data_pos=" << cur_data_pos << std::endl; + + size_t cur_out_validity_data_pos = processed_elements / 64; + //std::cout << "merge_scalar_transfer: cur_out_validity_data_pos=" << cur_out_validity_data_pos << std::endl; + //std::cout << "merge_scalar_transfer: dangling_bits=" << dangling_bits << std::endl; + uint64_t* batch_validity_buffer = reinterpret_cast(&input_data[cur_data_pos]); dangling_bits = cyclone::append_bitsets( &out_validity_buffer[cur_out_validity_data_pos], dangling_bits, batch_validity_buffer, col_in->element_count); - cur_out_validity_data_pos += frovedis::ceil_div(col_in->element_count, size_t(64)); - if(dangling_bits > 0){ - // last part is not entirely filled yet - cur_out_validity_data_pos -= 1; - } + processed_elements += col_in->element_count; cur_data_pos += VECTOR_ALIGNED(col_in->validity_buffer_size); } @@ -151,7 +154,7 @@ void merge_scalar_transfer(size_t batch_count, size_t total_element_count, char* NullableScalarVec* result = NullableScalarVec::allocate(); result->data = reinterpret_cast(out_data); result->validityBuffer = out_validity_buffer; - result->count = total_element_count; + result->count = processed_elements; //std::cout << "merge_scalar_transfer: set output pointers" << std::endl; //std::cout << "merge_scalar_transfer: set output pointers: result; pos: " << output_pos << std::endl; @@ -231,7 +234,7 @@ void merge_scalar_transfer(size_t batch_count, size_t total_element_count, char* * # Output descriptor * The output descriptor has the following format: * ``` - * [column ouput descriptor]... + * [column output descriptor]... * ``` * As we are merging the entire input into a single batch, there will be exactly * `column count` output descriptors. @@ -311,19 +314,19 @@ extern "C" int handle_transfer( switch (col_type) { case COL_TYPE_SHORT: - merge_scalar_transfer(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, od, output_pos); + merge_scalar_transfer(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, od, output_pos); break; case COL_TYPE_INT: - merge_scalar_transfer(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, od, output_pos); + merge_scalar_transfer(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, od, output_pos); break; case COL_TYPE_BIGINT: - merge_scalar_transfer(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos); + merge_scalar_transfer(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos); break; case COL_TYPE_FLOAT: - merge_scalar_transfer(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos); + merge_scalar_transfer(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos); break; case COL_TYPE_DOUBLE: - merge_scalar_transfer(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos); + merge_scalar_transfer(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer , od, output_pos); break; default: // Should be impossible to reach at this point! @@ -374,7 +377,7 @@ extern "C" int handle_transfer( const auto vbytes = sizeof(uint64_t) * frovedis::ceil_div(total_element_count, size_t(64)); uint64_t* validity_buffer = static_cast(calloc(vbytes, 1)); - merge_varchar_transfer(header->batch_count, total_element_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, lengths, offsets, od, output_pos); + merge_varchar_transfer(header->batch_count, &transfer[col_start], &input_data[input_data_pos], data, validity_buffer, lengths, offsets, od, output_pos); input_data_pos += aligned_data_size; } break; diff --git a/src/main/resources/com/nec/cyclone/cpp/tests/cyclone_utils_spec.cc b/src/main/resources/com/nec/cyclone/cpp/tests/cyclone_utils_spec.cc index db52dcd16..de5c65679 100644 --- a/src/main/resources/com/nec/cyclone/cpp/tests/cyclone_utils_spec.cc +++ b/src/main/resources/com/nec/cyclone/cpp/tests/cyclone_utils_spec.cc @@ -375,4 +375,45 @@ namespace cyclone::tests { CHECK(output[4] == expected[4]); free(output); } + + TEST_CASE("Merging into tail works after several small steps landing on full byte"){ + uint64_t inputs[3] = { + 0b0000000000000000000000000000000000000000000000000110011000110010, +// ^ valid until here + 0b0000000011001001011001001000100101001000011011101000001100101000, +// ^ valid until here + 0b0000011010100101101110010010101111000011010000000001100100111001 +// ^ valid until here + }; + + uint64_t lengths[3] = {16, 56, 59}; + + const auto vbytes = sizeof(uint64_t) * frovedis::ceil_div(lengths[0] + lengths[1] + lengths[2], size_t(64)); + uint64_t* output = static_cast(calloc(vbytes, 1)); + + auto d1 = cyclone::append_bitsets(&output[0], 0, &inputs[0], lengths[0]); + auto d2 = cyclone::append_bitsets(&output[0], d1, &inputs[1], lengths[1]); + auto d3 = cyclone::append_bitsets(&output[1], d2, &inputs[2], lengths[2]); + + auto expected_d1 = lengths[0] % 64; + auto expected_d2 = (lengths[0] + lengths[1]) % 64; + auto expected_d3 = (lengths[0] + lengths[1] + lengths[2]) % 64; + + uint64_t expected[3] = { + 0b0110010010001001010010000110111010000011001010000110011000110010, +// part of inputs[1] ^ inputs[0] + 0b1010010110111001001010111100001101000000000110010011100111001001, +// part of inputs[2] ^ part of inputs[1] + 0b0000000000000000000000000000000000000000000000000000000000000110 +// empty ^ part of inputs[2] + }; + + CHECK(d1 == expected_d1); + CHECK(d2 == expected_d2); + CHECK(d3 == expected_d3); + + CHECK(output[0] == expected[0]); + CHECK(output[1] == expected[1]); + CHECK(output[2] == expected[2]); + } } diff --git a/src/main/resources/com/nec/cyclone/cpp/tests/packed_transfer_spec.cc b/src/main/resources/com/nec/cyclone/cpp/tests/packed_transfer_spec.cc index 98fd08eae..42c98a91f 100644 --- a/src/main/resources/com/nec/cyclone/cpp/tests/packed_transfer_spec.cc +++ b/src/main/resources/com/nec/cyclone/cpp/tests/packed_transfer_spec.cc @@ -172,6 +172,52 @@ namespace cyclone::tests { CHECK(transferred->equals(vec1)); } + TEST_CASE("Unpacking works for two varchar vectors of an empty string each"){ + std::vector raw { "" }; + auto *vec1 = new nullable_varchar_vector(raw); + auto *vec2 = new nullable_varchar_vector(raw); + + auto header_size = sizeof(transfer_header) + 2*(sizeof(size_t) + sizeof(varchar_col_in)); + + size_t element_count = static_cast(vec1->count); + size_t data_size = VECTOR_ALIGNED(vec1->dataSize * sizeof(int32_t)); + size_t offsets_size = VECTOR_ALIGNED(element_count * sizeof(int32_t)); + size_t lengths_size = VECTOR_ALIGNED(element_count * sizeof(int32_t)); + size_t validity_buffer_size = VECTOR_ALIGNED(frovedis::ceil_div(vec1->count, int32_t(64)) * sizeof(uint64_t)); + + char* transfer = static_cast(malloc(header_size + 2*(data_size + offsets_size + lengths_size + validity_buffer_size))); + + size_t pos = 0; + transfer_header* header = reinterpret_cast(&transfer[pos]); + header->header_size = header_size; + header->batch_count = 1; + header->column_count = 2; + pos += sizeof(transfer_header); + + size_t data_pos = 0; + size_t col_pos = 0; + copy_varchar_vec_to_transfer_buffer(vec1, &transfer[pos], &transfer[header_size], col_pos, data_pos); + copy_varchar_vec_to_transfer_buffer(vec2, &transfer[pos], &transfer[header_size], col_pos, data_pos); + + uintptr_t* od = static_cast(malloc(sizeof(uintptr_t) * 10)); + + char* target[1] = {transfer}; + + int res = handle_transfer(target, od); + CHECK(res == 0); + + nullable_varchar_vector* transferred1 = reinterpret_cast(od[0]); + nullable_varchar_vector* transferred2 = reinterpret_cast(od[5]); + + vec1->print(); + transferred1->print(); + vec2->print(); + transferred2->print(); + + CHECK(transferred1->equals(vec1)); + CHECK(transferred2->equals(vec2)); + } + TEST_CASE("Unpacking works for multiple vectors of different types"){ std::vector raw { "JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC" }; auto *vec1 = new nullable_varchar_vector(raw); @@ -322,94 +368,234 @@ namespace cyclone::tests { } TEST_CASE("Unpacking and merging three batches works for multiple vectors of different types"){ - std::vector raw1 { "JAN", "FEB", "MAR", "APR", "MAY", "JUN"}; - std::vector raw2 { "JUL", "AUG", "SEP", "OCT", "NOV", "DEC" }; - std::vector raw3 { "A", "b", "C"}; - auto *vc_vec1 = new nullable_varchar_vector(raw1); - vc_vec1->set_validity(1, 0); - vc_vec1->set_validity(3, 0); - vc_vec1->set_validity(5, 0); - - auto *vc_vec2 = new nullable_varchar_vector(raw2); - vc_vec2->set_validity(0, 0); - vc_vec2->set_validity(2, 0); - vc_vec2->set_validity(4, 0); - - auto *vc_vec3 = new nullable_varchar_vector(raw3); - vc_vec3->set_validity(1, 0); - - auto *sc_vec1 = new NullableScalarVec({586, 951, 106, 318, 538, 620}); - sc_vec1->set_validity(1, 0); - sc_vec1->set_validity(3, 0); - sc_vec1->set_validity(5, 0); - - auto *sc_vec2 = new NullableScalarVec({553, 605, 822, 941}); - sc_vec2->set_validity(2, 0); - sc_vec2->set_validity(3, 0); - - auto *sc_vec3 = new NullableScalarVec({53, 5, 22, 94}); - sc_vec2->set_validity(0, 0); - sc_vec2->set_validity(1, 0); - - auto header_size = sizeof(transfer_header) + (3 * (sizeof(size_t) + sizeof(scalar_col_in))) + (3 * (sizeof(size_t) + sizeof(varchar_col_in))); - - size_t data_size = (VECTOR_ALIGNED(vc_vec1->dataSize * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec2->dataSize * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec3->dataSize * sizeof(int32_t)) - + VECTOR_ALIGNED(sc_vec1->count * sizeof(int32_t)) + VECTOR_ALIGNED(sc_vec2->count * sizeof(int32_t)) + VECTOR_ALIGNED(sc_vec3->count * sizeof(int32_t))); - size_t offsets_size = VECTOR_ALIGNED(vc_vec1->count * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec2->count * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec3->count * sizeof(int32_t)); - size_t lengths_size = VECTOR_ALIGNED(vc_vec1->count * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec2->count * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec3->count * sizeof(int32_t)); - size_t validity_buffer_size = VECTOR_ALIGNED(sizeof(uint64_t) * ( frovedis::ceil_div(vc_vec1->count, int32_t(64)) - + frovedis::ceil_div(vc_vec2->count, int32_t(64)) - + frovedis::ceil_div(vc_vec3->count, int32_t(64)) - + frovedis::ceil_div(sc_vec1->count, int32_t(64)) - + frovedis::ceil_div(sc_vec2->count, int32_t(64)) - + frovedis::ceil_div(sc_vec3->count, int32_t(64)))); - - char* transfer = static_cast(malloc(header_size + data_size + offsets_size + lengths_size + validity_buffer_size)); - - size_t pos = 0; - transfer_header* header = reinterpret_cast(&transfer[pos]); - header->header_size = header_size; - header->batch_count = 3; - header->column_count = 2; - pos += sizeof(transfer_header); - - size_t data_pos = 0; - size_t col_pos = 0; - copy_varchar_vec_to_transfer_buffer(vc_vec1, &transfer[pos], &transfer[header_size], col_pos, data_pos); - copy_varchar_vec_to_transfer_buffer(vc_vec2, &transfer[pos], &transfer[header_size], col_pos, data_pos); - copy_varchar_vec_to_transfer_buffer(vc_vec3, &transfer[pos], &transfer[header_size], col_pos, data_pos); - copy_scalar_vec_to_transfer_buffer(sc_vec1, &transfer[pos], &transfer[header_size], col_pos, data_pos); - copy_scalar_vec_to_transfer_buffer(sc_vec2, &transfer[pos], &transfer[header_size], col_pos, data_pos); - copy_scalar_vec_to_transfer_buffer(sc_vec3, &transfer[pos], &transfer[header_size], col_pos, data_pos); - - uintptr_t* od = static_cast(malloc(sizeof(uintptr_t) * (5 + 3))); - - char* target[1] = {transfer}; - - int res = handle_transfer(target, od); - CHECK(res == 0); - - nullable_varchar_vector* varchars[3] = {vc_vec1, vc_vec2, vc_vec3}; - NullableScalarVec* scalars[3] = {sc_vec1, sc_vec2, sc_vec3}; - - nullable_varchar_vector* vc_merged = nullable_varchar_vector::merge(varchars, 3); - NullableScalarVec* sc_merged = NullableScalarVec::merge(scalars, 3); - - nullable_varchar_vector* transferred1 = reinterpret_cast(od[0]); - - std::cout << "vc_merged = " << std::endl; - vc_merged->print(); - std::cout << "transferred1 = " << std::endl; - transferred1->print(); - CHECK(transferred1->equals(vc_merged)); - - NullableScalarVec* transferred2 = reinterpret_cast*>(od[5]); - - std::cout << "sc_merged = " << std::endl; - sc_merged->print(); - std::cout << "transferred2 = " << std::endl; - transferred2->print(); - CHECK(transferred2->equals(sc_merged)); - } + std::vector raw1 { "JAN", "FEB", "MAR", "APR", "MAY", "JUN"}; + std::vector raw2 { "JUL", "AUG", "SEP", "OCT", "NOV", "DEC" }; + std::vector raw3 { "A", "b", "C"}; + auto *vc_vec1 = new nullable_varchar_vector(raw1); + vc_vec1->set_validity(1, 0); + vc_vec1->set_validity(3, 0); + vc_vec1->set_validity(5, 0); + + auto *vc_vec2 = new nullable_varchar_vector(raw2); + vc_vec2->set_validity(0, 0); + vc_vec2->set_validity(2, 0); + vc_vec2->set_validity(4, 0); + + auto *vc_vec3 = new nullable_varchar_vector(raw3); + vc_vec3->set_validity(1, 0); + + auto *sc_vec1 = new NullableScalarVec({586, 951, 106, 318, 538, 620}); + sc_vec1->set_validity(1, 0); + sc_vec1->set_validity(3, 0); + sc_vec1->set_validity(5, 0); + + auto *sc_vec2 = new NullableScalarVec({553, 605, 822, 941}); + sc_vec2->set_validity(2, 0); + sc_vec2->set_validity(3, 0); + + auto *sc_vec3 = new NullableScalarVec({53, 5, 22, 94}); + sc_vec2->set_validity(0, 0); + sc_vec2->set_validity(1, 0); + + auto header_size = sizeof(transfer_header) + (3 * (sizeof(size_t) + sizeof(scalar_col_in))) + (3 * (sizeof(size_t) + sizeof(varchar_col_in))); + + size_t data_size = (VECTOR_ALIGNED(vc_vec1->dataSize * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec2->dataSize * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec3->dataSize * sizeof(int32_t)) + + VECTOR_ALIGNED(sc_vec1->count * sizeof(int32_t)) + VECTOR_ALIGNED(sc_vec2->count * sizeof(int32_t)) + VECTOR_ALIGNED(sc_vec3->count * sizeof(int32_t))); + size_t offsets_size = VECTOR_ALIGNED(vc_vec1->count * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec2->count * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec3->count * sizeof(int32_t)); + size_t lengths_size = VECTOR_ALIGNED(vc_vec1->count * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec2->count * sizeof(int32_t)) + VECTOR_ALIGNED(vc_vec3->count * sizeof(int32_t)); + size_t validity_buffer_size = VECTOR_ALIGNED(sizeof(uint64_t) * ( frovedis::ceil_div(vc_vec1->count, int32_t(64)) + + frovedis::ceil_div(vc_vec2->count, int32_t(64)) + + frovedis::ceil_div(vc_vec3->count, int32_t(64)) + + frovedis::ceil_div(sc_vec1->count, int32_t(64)) + + frovedis::ceil_div(sc_vec2->count, int32_t(64)) + + frovedis::ceil_div(sc_vec3->count, int32_t(64)))); + + char* transfer = static_cast(malloc(header_size + data_size + offsets_size + lengths_size + validity_buffer_size)); + + size_t pos = 0; + transfer_header* header = reinterpret_cast(&transfer[pos]); + header->header_size = header_size; + header->batch_count = 3; + header->column_count = 2; + pos += sizeof(transfer_header); + + size_t data_pos = 0; + size_t col_pos = 0; + copy_varchar_vec_to_transfer_buffer(vc_vec1, &transfer[pos], &transfer[header_size], col_pos, data_pos); + copy_varchar_vec_to_transfer_buffer(vc_vec2, &transfer[pos], &transfer[header_size], col_pos, data_pos); + copy_varchar_vec_to_transfer_buffer(vc_vec3, &transfer[pos], &transfer[header_size], col_pos, data_pos); + copy_scalar_vec_to_transfer_buffer(sc_vec1, &transfer[pos], &transfer[header_size], col_pos, data_pos); + copy_scalar_vec_to_transfer_buffer(sc_vec2, &transfer[pos], &transfer[header_size], col_pos, data_pos); + copy_scalar_vec_to_transfer_buffer(sc_vec3, &transfer[pos], &transfer[header_size], col_pos, data_pos); + + uintptr_t* od = static_cast(malloc(sizeof(uintptr_t) * (5 + 3))); + + char* target[1] = {transfer}; + + int res = handle_transfer(target, od); + CHECK(res == 0); + + nullable_varchar_vector* varchars[3] = {vc_vec1, vc_vec2, vc_vec3}; + NullableScalarVec* scalars[3] = {sc_vec1, sc_vec2, sc_vec3}; + + nullable_varchar_vector* vc_merged = nullable_varchar_vector::merge(varchars, 3); + NullableScalarVec* sc_merged = NullableScalarVec::merge(scalars, 3); + + nullable_varchar_vector* transferred1 = reinterpret_cast(od[0]); + + std::cout << "vc_merged = " << std::endl; + vc_merged->print(); + std::cout << "transferred1 = " << std::endl; + transferred1->print(); + CHECK(transferred1->equals(vc_merged)); + + NullableScalarVec* transferred2 = reinterpret_cast*>(od[5]); + + std::cout << "sc_merged = " << std::endl; + sc_merged->print(); + std::cout << "transferred2 = " << std::endl; + transferred2->print(); + CHECK(transferred2->equals(sc_merged)); + } + + TEST_CASE_TEMPLATE("Unpacking works for three scalar vector of T=", T, int32_t, int64_t, float, double) { + uint64_t validity_buffers[3] = { + 0b0000000000000000000000000000000000000000000000000110011000110010, + // ^ valid until here + 0b0000000011001001011001001000100101001000011011101000001100101000, + // ^ valid until here + 0b0000011010100101101110010010101111000011010000000001100100111001 + // ^ valid until here + }; + const std::vector a{0, 4436, 0, 0, 9586, 2142, 0, 0, 0, 2149, 4297, 0, 0, 3278, 6668, 0}; + auto *vec1 = new NullableScalarVec(a); + vec1->validityBuffer = &validity_buffers[0]; + + const std::vector b{ + 0, 0, 0, 8051, 0, 1383, 0, 0, 2256, 5785, 0, 0, 0, 0, 0, 4693, 0, 1849, 3790, 8995, 0, 6961, 7132, 0, 0, 0, 0, 6968, 0, 0, 3763, 0, 3558, 0, 0, 2011, 0, 0, 0, 3273, 0, 0, 9428, 0, 0, 6408, 7940, 0, 9521, 0, 0, 5832, 0, 0, 5817, 5949 + }; + auto *vec2 = new NullableScalarVec(b); + vec2->validityBuffer = &validity_buffers[1]; + + const std::vector c{ + 7319, 0, 0, 4859, 524, 406, 0, 0, 1154, 0, 0, 1650, 8040, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1146, 0, 7268, 8197, 0, 0, 0, 0, 81, 2053, 6571, 4600, 0, 3699, 0, 8404, 0, 0, 8401, 0, 0, 6234, 6281, 7367, 0, 4688, 7490, 0, 5412, 0, 0, 871, 0, 9086, 0, 5362, 6516 + }; + auto *vec3 = new NullableScalarVec(c); + vec3->validityBuffer = &validity_buffers[2]; + + + + auto header_size = sizeof(transfer_header) + 3 * (sizeof(size_t) + sizeof(scalar_col_in)); + + size_t data_size = VECTOR_ALIGNED(sizeof(T) * vec1->count) + VECTOR_ALIGNED(sizeof(T) * vec2->count) + VECTOR_ALIGNED(sizeof(T) * vec3->count); + size_t validity_buffer_size = VECTOR_ALIGNED(frovedis::ceil_div(vec1->count, int32_t(64)) * sizeof(uint64_t)) + + VECTOR_ALIGNED(frovedis::ceil_div(vec2->count, int32_t(64)) * sizeof(uint64_t)) + + VECTOR_ALIGNED(frovedis::ceil_div(vec3->count, int32_t(64)) * sizeof(uint64_t)); + + char* transfer = static_cast(malloc(header_size + data_size + validity_buffer_size)); + + size_t pos = 0; + transfer_header* header = reinterpret_cast(&transfer[pos]); + header->header_size = header_size; + header->batch_count = 3; + header->column_count = 1; + pos += sizeof(transfer_header); + + size_t data_pos = 0; + size_t col_pos = 0; + copy_scalar_vec_to_transfer_buffer(vec1, &transfer[pos], &transfer[header_size], col_pos, data_pos); + copy_scalar_vec_to_transfer_buffer(vec2, &transfer[pos], &transfer[header_size], col_pos, data_pos); + copy_scalar_vec_to_transfer_buffer(vec3, &transfer[pos], &transfer[header_size], col_pos, data_pos); + + uintptr_t* od = static_cast(malloc(sizeof(uintptr_t) * 3)); + + char* target[1] = {transfer}; + + int res = handle_transfer(target, od); + CHECK(res == 0); + + NullableScalarVec* transferred = reinterpret_cast*>(od[0]); + + NullableScalarVec* scalars[3] = {vec1, vec2, vec3}; + NullableScalarVec* merged = NullableScalarVec::merge(scalars, 3); + + std::cout << "merged = " << std::endl; + merged->print(); + std::cout << "transferred = " << std::endl; + transferred->print(); + + CHECK(transferred->equals(merged)); + } + + TEST_CASE("Unpacking works for three varchar vectors") { + uint64_t validity_buffers[3] = { + 0b0000000000000000000000000000000000000000000000000110011000110010, +// ^ valid until here + 0b0000000011001001011001001000100101001000011011101000001100101000, +// ^ valid until here + 0b0000011010100101101110010010101111000011010000000001100100111001 +// ^ valid until here + }; + const std::vector a{"0", "4436", "0", "0", "9586", "2142", "0", "0", "0", "2149", "4297", "0", "0", "3278", "6668", "0"}; + auto *vec1 = new nullable_varchar_vector(a); + vec1->validityBuffer = &validity_buffers[0]; + + const std::vector b{ + "0", "0", "0", "8051", "0", "1383", "0", "0", "2256", "5785", "0", "0", "0", "0", "0", "4693", "0", "1849", "3790", "8995", "0", "6961", "7132", "0", "0", "0", "0", "6968", "0", "0", "3763", "0", "3558", "0", "0", "2011", "0", "0", "0", "3273", "0", "0", "9428", "0", "0", "6408", "7940", "0", "9521", "0", "0", "5832", "0", "0", "5817", "5949" + }; + auto *vec2 = new nullable_varchar_vector(b); + vec2->validityBuffer = &validity_buffers[1]; + + const std::vector c{ + "7319", "0", "0", "4859", "524", "406", "0", "0", "1154", "0", "0", "1650", "8040", "0", "0", "0", "0", "0", "0", "0", "0", "0", "1146", "0", "7268", "8197", "0", "0", "0", "0", "81", "2053", "6571", "4600", "0", "3699", "0", "8404", "0", "0", "8401", "0", "0", "6234", "6281", "7367", "0", "4688", "7490", "0", "5412", "0", "0", "871", "0", "9086", "0", "5362", "6516" + }; + auto *vec3 = new nullable_varchar_vector(c); + vec3->validityBuffer = &validity_buffers[2]; + + + + auto header_size = sizeof(transfer_header) + 3 * (sizeof(size_t) + sizeof(varchar_col_in)); + + size_t data_size = VECTOR_ALIGNED(vec1->dataSize * sizeof(int32_t)) + VECTOR_ALIGNED(vec2->dataSize * sizeof(int32_t)) + VECTOR_ALIGNED(vec3->dataSize * sizeof(int32_t)); + size_t validity_buffer_size = VECTOR_ALIGNED(frovedis::ceil_div(vec1->count, int32_t(64)) * sizeof(uint64_t)) + + VECTOR_ALIGNED(frovedis::ceil_div(vec2->count, int32_t(64)) * sizeof(uint64_t)) + + VECTOR_ALIGNED(frovedis::ceil_div(vec3->count, int32_t(64)) * sizeof(uint64_t)); + size_t offsets_size = VECTOR_ALIGNED(vec1->count * sizeof(int32_t)) + VECTOR_ALIGNED(vec2->count * sizeof(int32_t)) + VECTOR_ALIGNED(vec3->count * sizeof(int32_t)); + size_t lengths_size = VECTOR_ALIGNED(vec1->count * sizeof(int32_t)) + VECTOR_ALIGNED(vec2->count * sizeof(int32_t)) + VECTOR_ALIGNED(vec3->count * sizeof(int32_t)); + + char* transfer = static_cast(malloc(header_size + data_size + validity_buffer_size + offsets_size + lengths_size)); + + size_t pos = 0; + transfer_header* header = reinterpret_cast(&transfer[pos]); + header->header_size = header_size; + header->batch_count = 3; + header->column_count = 1; + pos += sizeof(transfer_header); + + size_t data_pos = 0; + size_t col_pos = 0; + copy_varchar_vec_to_transfer_buffer(vec1, &transfer[pos], &transfer[header_size], col_pos, data_pos); + copy_varchar_vec_to_transfer_buffer(vec2, &transfer[pos], &transfer[header_size], col_pos, data_pos); + copy_varchar_vec_to_transfer_buffer(vec3, &transfer[pos], &transfer[header_size], col_pos, data_pos); + + uintptr_t* od = static_cast(malloc(sizeof(uintptr_t) * 5)); + + char* target[1] = {transfer}; + + int res = handle_transfer(target, od); + CHECK(res == 0); + + nullable_varchar_vector* transferred = reinterpret_cast(od[0]); + + nullable_varchar_vector* varchars[3] = {vec1, vec2, vec3}; + nullable_varchar_vector* merged = nullable_varchar_vector::merge(varchars, 3); + + std::cout << "merged = " << std::endl; + merged->print(); + std::cout << "transferred = " << std::endl; + transferred->print(); + + CHECK(transferred->equals(merged)); + } } } diff --git a/src/main/scala/com/nec/cache/TransferDescriptor.scala b/src/main/scala/com/nec/cache/TransferDescriptor.scala index 30a6f9abb..3f194d6f1 100644 --- a/src/main/scala/com/nec/cache/TransferDescriptor.scala +++ b/src/main/scala/com/nec/cache/TransferDescriptor.scala @@ -18,6 +18,10 @@ case class TransferDescriptor( val batchCount: Long = batches.size val columnCount: Long = batches.head.size + require(batchCount > 0, "Need more than 0 batches for transfer!") + require(columnCount > 0, "Need more than 0 columns for transfer!") + require(batches.forall(_.size == columnCount), "All batches must have the same column count!") + logger.debug(s"Preparing transfer buffer for ${batchCount} batches of ${columnCount} columns") val sizeOfSizeT = 8 @@ -95,6 +99,13 @@ case class TransferDescriptor( buffer.position(0) } + def printBuffer(): Unit = { + println("Transfer Buffer = ") + val arr = Array.ofDim[Byte](buffer.limit().toInt) + buffer.get(arr) + println(arr.mkString("[", ", ", "]")) + } + def closeOutputBuffer(): Unit = outputBuffer.close() lazy val outputBuffer: BytePointer = { require(nonEmpty, "Can not create output buffer for empty TransferDescriptor!") diff --git a/src/main/scala/com/nec/colvector/SeqOptTConversions.scala b/src/main/scala/com/nec/colvector/SeqOptTConversions.scala index f645fa171..f7e538273 100644 --- a/src/main/scala/com/nec/colvector/SeqOptTConversions.scala +++ b/src/main/scala/com/nec/colvector/SeqOptTConversions.scala @@ -1,12 +1,13 @@ package com.nec.colvector -import ArrayTConversions._ +import com.nec.colvector.ArrayTConversions._ import com.nec.spark.agile.core._ -import scala.reflect.ClassTag -import scala.collection.mutable.{Seq => MSeq} import com.nec.util.FixedBitSet import org.bytedeco.javacpp._ +import scala.collection.mutable.{Seq => MSeq} +import scala.reflect.ClassTag + object SeqOptTConversions { private[colvector] def constructValidityBuffer[T](input: Seq[Option[T]]): BytePointer = { val bitset = new FixedBitSet(input.size) diff --git a/src/main/scala/com/nec/spark/planning/VERewriteStrategy.scala b/src/main/scala/com/nec/spark/planning/VERewriteStrategy.scala index d6a3807f2..772795170 100644 --- a/src/main/scala/com/nec/spark/planning/VERewriteStrategy.scala +++ b/src/main/scala/com/nec/spark/planning/VERewriteStrategy.scala @@ -223,7 +223,7 @@ final case class VERewriteStrategy(options: VeRewriteStrategyOptions) VeOneStageEvaluationPlan( outputExpressions = s.output, veFunction = veFunction, - child = SparkToVectorEnginePlan(planLater(child), veFunction) + child = SparkToVectorEnginePlan(planLater(child), veFunction, Some(orders)), ) ) ) diff --git a/src/main/scala/com/nec/spark/planning/VeColumnarRule.scala b/src/main/scala/com/nec/spark/planning/VeColumnarRule.scala index 0770a4268..2a64ec2ee 100644 --- a/src/main/scala/com/nec/spark/planning/VeColumnarRule.scala +++ b/src/main/scala/com/nec/spark/planning/VeColumnarRule.scala @@ -7,8 +7,9 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} final class VeColumnarRule extends ColumnarRule { override def preColumnarTransitions: Rule[SparkPlan] = { case plan => plan.transform { - case SparkToVectorEnginePlan(VectorEngineToSparkPlan(child), _) => child - case SparkToVectorEnginePlan(child: SupportsVeColBatch, _) => child + // TODO: Decide what to do with required sortOrders + case SparkToVectorEnginePlan(VectorEngineToSparkPlan(child), _, _) => child + case SparkToVectorEnginePlan(child: SupportsVeColBatch, _, _) => child } } } diff --git a/src/main/scala/com/nec/spark/planning/plans/SparkToVectorEnginePlan.scala b/src/main/scala/com/nec/spark/planning/plans/SparkToVectorEnginePlan.scala index 37c2dd141..45938c276 100644 --- a/src/main/scala/com/nec/spark/planning/plans/SparkToVectorEnginePlan.scala +++ b/src/main/scala/com/nec/spark/planning/plans/SparkToVectorEnginePlan.scala @@ -12,7 +12,8 @@ import com.typesafe.scalalogging.LazyLogging import org.apache.arrow.memory.BufferAllocator import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.util.ArrowUtilsExposed @@ -20,7 +21,7 @@ import java.nio.file.Paths // SparkToVectorEnginePlan calls handleTransfer, a library function. It uses the parentVeFunction // to get access to the library. -case class SparkToVectorEnginePlan(childPlan: SparkPlan, parentVeFunction: VeFunction) +case class SparkToVectorEnginePlan(childPlan: SparkPlan, parentVeFunction: VeFunction, sortOrder: Option[Seq[SortOrder]] = None) extends UnaryExecNode with LazyLogging with SupportsVeColBatch @@ -35,6 +36,10 @@ case class SparkToVectorEnginePlan(childPlan: SparkPlan, parentVeFunction: VeFun override def output: Seq[Attribute] = child.output + override def outputOrdering: Seq[SortOrder] = sortOrder.getOrElse(Nil) + + override def requiredChildDistribution: Seq[Distribution] = if (sortOrder.isDefined) Seq(OrderedDistribution(sortOrder.get)) else Seq(UnspecifiedDistribution) + override def dataCleanup: DataCleanup = DataCleanup.cleanup(this.getClass) private def metricsFn[T](f:() => T): T = withInvocationMetrics(VE)(f.apply()) diff --git a/src/test/scala/com/nec/colvector/PackedTransferSpec.scala b/src/test/scala/com/nec/colvector/PackedTransferSpec.scala index 7bb41594a..dc3ff7716 100644 --- a/src/test/scala/com/nec/colvector/PackedTransferSpec.scala +++ b/src/test/scala/com/nec/colvector/PackedTransferSpec.scala @@ -4,16 +4,21 @@ import com.nec.cache.TransferDescriptor import com.nec.colvector.ArrayTConversions._ import com.nec.colvector.SeqOptTConversions._ import com.nec.cyclone.annotations.VectorEngineTest -import com.nec.util.FixedBitSet import com.nec.ve.WithVeProcess import com.nec.vectorengine.LibCyclone import org.bytedeco.javacpp.LongPointer +import org.scalacheck.Gen import org.scalatest.matchers.should.Matchers._ +import org.scalatest.prop.TableDrivenPropertyChecks.whenever import org.scalatest.wordspec.AnyWordSpec -import java.nio.file.Paths +import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks.forAll + +import scala.reflect.ClassTag @VectorEngineTest final class PackedTransferSpec extends AnyWordSpec with WithVeProcess { + private lazy val libRef = veProcess.loadLibrary(LibCyclone.SoPath) + private def batch1() = Seq( Array[Double](1, 2, 3).toBytePointerColVector("col1_b1"), Array[String]("a", "b", "c").toBytePointerColVector("col2_b1") @@ -94,13 +99,13 @@ final class PackedTransferSpec extends AnyWordSpec with WithVeProcess { "handle_transfer" should { import com.nec.util.CallContextOps._ + "correctly unpack a single batch of mixed vector types" in { val cols = batch1() val descriptor = new TransferDescriptor.Builder() .newBatch().addColumns(cols) .build() - val libRef = veProcess.loadLibrary(LibCyclone.SoPath) val batch = veProcess.executeTransfer(libRef, descriptor) batch.numRows should be (3) @@ -118,12 +123,7 @@ final class PackedTransferSpec extends AnyWordSpec with WithVeProcess { "correctly unpack multiple batches of mixed vector types" in { val descriptor = transferDescriptor() - println("Transfer Buffer = ") - val buffer = Array.ofDim[Byte](descriptor.buffer.limit().toInt) - descriptor.buffer.get(buffer) - println(buffer.mkString("Array(", ", ", ")")) - val libRef = veProcess.loadLibrary(LibCyclone.SoPath) val batch = veProcess.executeTransfer(libRef, descriptor) batch.numRows should be (8) @@ -160,16 +160,13 @@ final class PackedTransferSpec extends AnyWordSpec with WithVeProcess { Seq(c1.toBytePointerColVector("_")) )) - - val lib = veProcess.loadLibrary(LibCyclone.SoPath) - val batch = veProcess.executeTransfer(lib, descriptor) - + val batch = veProcess.executeTransfer(libRef, descriptor) batch.columns.size should be (1) batch.columns(0).toBytePointerColVector.toSeqOpt[Int] should be (a1 ++ b1 ++ c1) } - "correctly transfer a batch of Seq[Option[Int]] to the VE and back without loss of data fidelity [2]" ignore { + "correctly transfer a batch of Seq[Option[Int]] to the VE and back without loss of data fidelity [2]" in { // Batch A val a1 = Seq(None, Some(4436), None, None, Some(9586), Some(2142), None, None, None, Some(2149), Some(4297), None, None, Some(3278), Some(6668), None) @@ -191,8 +188,7 @@ final class PackedTransferSpec extends AnyWordSpec with WithVeProcess { Seq(c1v) )) - val lib = veProcess.loadLibrary(LibCyclone.SoPath) - val batch = veProcess.executeTransfer(lib, descriptor) + val batch = veProcess.executeTransfer(libRef, descriptor) batch.columns.size should be (1) val output = batch.columns(0).toBytePointerColVector @@ -203,16 +199,192 @@ final class PackedTransferSpec extends AnyWordSpec with WithVeProcess { // The lengths match as well output.toArray[Int].size should be (a1.size + b1.size + c1.size) + import com.nec.util.FixedBitSet + val a1bits = FixedBitSet.from(a1v.buffers(1)) val b1bits = FixedBitSet.from(b1v.buffers(1)) val c1bits = FixedBitSet.from(c1v.buffers(1)) - val outbits = FixedBitSet.from(output.buffers(1)).toSeq + val outbits = FixedBitSet.from(output.buffers(1)) + + outbits.toSeq.take(a1.size + b1.size + c1.size) should equal(a1bits.toSeq.take(a1.size) ++ b1bits.toSeq.take(b1.size) ++ c1bits.toSeq.take(c1.size)) + } + + "correctly unpack a batch of two columns of a single item of an empty string" in { + val input = List(Array(""), Array("")) + val inputBatch = List(input.map(_.toBytePointerColVector("_"))) + val descriptor = new TransferDescriptor(inputBatch) + + val batch = veProcess.executeTransfer(libRef, descriptor) + + batch.numRows should be(1) + batch.columns.size should be(2) + + batch.columns.zip(input).foreach{ case (bc, inCol) => + bc.toBytePointerColVector.toBytes should equal(inCol.toBytePointerColVector("_").toBytes) + } + } + + "correctly unpack a batch of two columns of a 3-items of an non-empty string" in { + val input = List(Array("a", "c", "e"), Array("b", "d", "f")) + val inputBatch = List(input.map(_.toBytePointerColVector("_"))) + val descriptor = new TransferDescriptor(inputBatch) + + val batch = veProcess.executeTransfer(libRef, descriptor) + + batch.numRows should be(3) + batch.columns.size should be(2) + + batch.columns.zip(input).foreach{ case (bc, inCol) => + bc.toBytePointerColVector.toBytes should equal(inCol.toBytePointerColVector("_").toBytes) + } + } + + // TODO: Ignore the property based tests for now, they sometimes find ways to crash the JVM without any good reason. + // We will have to address that problem after the bugfixes contained in this branch. + + def generatedColumn[T: ClassTag] = Gen.choose[Int](1, 512).map(InputSamples.seqOpt[T](_)) + def generatedColumns[T: ClassTag] = Gen.zip(Gen.choose[Int](1, 25), Gen.choose[Int](1, 512)).map{ case (colCount, colLength) => + (0 until colCount).map{ i => InputSamples.seqOpt[T](colLength) }.toList + } + def generatedBatches[T: ClassTag](maxLength: Int = 512) = Gen.zip(Gen.choose[Int](1, 10), Gen.choose[Int](1, 10), Gen.choose[Int](1, maxLength)).map{ case(batchCount, colCount, colLength) => + (0 until batchCount).map{batch => + (0 until colCount).map{col => + InputSamples.seqOpt[T](colLength) + }.toList + }.toList + } - // But the output bitset does not match the merge of the 3 input bitsets,,, - println(a1bits.toSeq.take(a1.size)) - println(b1bits.toSeq.take(b1.size)) - println(c1bits.toSeq.take(c1.size)) - println(outbits.toSeq) + def generatedMixedBatches(klasses: Class[_]*) = { + Gen.zip(Gen.choose[Int](1, 10), Gen.choose[Int](1, 10)).map{ case (batchCount, colLength) => + (0 until batchCount).map{batch => + klasses.map{ klass => + InputSamples.seqOpt(colLength)(ClassTag(klass)) + }.toList + }.toList + } } + + def generatedAnyMixBatches = { + Gen.choose[Int](1, 25).flatMap{ colCount => + Gen.pick(colCount, + Stream.continually(Seq(classOf[Int], classOf[Short], classOf[Long], classOf[Float], classOf[Double], classOf[String])).flatten + ).flatMap{ colClasses => + generatedMixedBatches(colClasses.toArray:_*) + } + } + } + + "satisfy the property: All single scalar batches unpack correctly" ignore { + forAll(generatedColumns[Int]){ cols => + whenever(cols.nonEmpty && cols.forall(_.nonEmpty)){ + val descriptor = new TransferDescriptor(List(cols.map(_.toBytePointerColVector("_")))) + + val batch = veProcess.executeTransfer(libRef, descriptor) + + batch.columns.size should be(cols.length) + + batch.columns.zipWithIndex.foreach{ case (col, i) => + col.toBytePointerColVector.toBytes should equal(cols(i).toBytePointerColVector("_").toBytes) + } + batch.free() + } + } + } + + "satisfy the property: All single varchar batches unpack correctly" ignore { + forAll(generatedColumns[String]){ cols => + whenever(cols.nonEmpty && cols.forall(_.nonEmpty)) { + val descriptor = new TransferDescriptor(List(cols.map(_.toBytePointerColVector("_")))) + + val batch = veProcess.executeTransfer(libRef, descriptor) + + batch.columns.size should be(cols.length) + + batch.columns.zipWithIndex.foreach { case (col, i) => + col.toBytePointerColVector.toBytes should equal(cols(i).toBytePointerColVector("_").toBytes) + } + batch.free() + } + } + } + + "satisfy the property: All sets of scalar batches unpack correctly" ignore { + forAll(generatedBatches[Int](512)){ batches => + whenever(batches.nonEmpty && batches.forall{b => b.nonEmpty && b.forall(_.nonEmpty)} && batches.forall(_.size == batches.head.size)){ + val descriptor = new TransferDescriptor(batches.map(_.map(_.toBytePointerColVector("_")))) + + val mergedCols = batches.transpose.map(_.flatten) + + val batch = veProcess.executeTransfer(libRef, descriptor) + + batch.columns.size should be(batches.head.length) + batch.numRows should be(mergedCols.head.size) + + batch.columns.zipWithIndex.foreach{ case (col, i) => + col.toBytePointerColVector.toBytes should equal(mergedCols(i).toBytePointerColVector("_").toBytes) + } + batch.free() + } + } + } + + "satisfy the property: All sets of varchar batches unpack correctly" ignore { + forAll(generatedBatches[String](50)){ (batches) => + whenever(batches.nonEmpty && batches.forall{b => b.nonEmpty && b.forall(_.nonEmpty)} && batches.forall(_.size == batches.head.size)){ + val descriptor = new TransferDescriptor(batches.map(_.map(_.toArray.toBytePointerColVector("_")))) + + val mergedCols = batches.transpose.map(_.flatten) + + //println(s"batches: ${batches}") + //descriptor.printBuffer() + + val batch = veProcess.executeTransfer(libRef, descriptor) + + batch.columns.size should be(batches.head.length) + batch.numRows should be(mergedCols.head.size) + + batch.columns.zipWithIndex.foreach{ case (col, i) => + col.toBytePointerColVector.toBytes should equal(mergedCols(i).toBytePointerColVector("_").toBytes) + } + batch.free() + } + } + } + + // Doesn't compile with SeqOpt, need to figure out a better way of doing this. + /*"satisfy the property: All sets of mixed batches unpack correctly" ignore { + forAll(generatedAnyMixBatches) { batches => + whenever(batches.nonEmpty && batches.forall { b => b.nonEmpty && b.forall(_.nonEmpty) } && batches.forall(_.size == batches.head.size)) { + println("before descriptor") + val descriptor = new TransferDescriptor(batches.map(_.map(_.toArray.toBytePointerColVector("_")))) + + println("before merge") + val mergedCols = batches.transpose.map(_.flatten) + + println("before transfer") + val batch = veProcess.executeTransfer(libRef, descriptor) + + println("before sanity") + batch.columns.size should be(batches.head.length) + batch.numRows should be(mergedCols.head.size) + + println("before actual") + batch.columns.zipWithIndex.foreach { case (col, i) => + println(s"printing $i") + val curCol = mergedCols(i) + val curFirst = curCol.filter(_.isDefined).headOption + if(curFirst.isDefined && curFirst.get.get.isInstanceOf[String]){ + val bytes = SeqOptStringToBPCV(curCol).toBytePointerColVector("_").toBytes + col.toBytePointerColVector.toBytes should equal(bytes) + }else{ + val tag: ClassTag[_] = ClassTag(curFirst.get.get.getClass) + val bytes = SeqOptTToBPCV(curCol)(tag).toBytePointerColVector("_").toBytes + col.toBytePointerColVector.toBytes should equal(bytes) + } + } + batch.free() + } + } + }*/ } }