From cb9a38e6da78f26b99d81df879b1d61ba7f7ec69 Mon Sep 17 00:00:00 2001 From: Benjamin Wagner Date: Fri, 3 Nov 2023 14:16:35 +0100 Subject: [PATCH] Improve Prefetching Primitives At the moment our ROF backend doesn't perform better than the regular JIT backend. We need to get to a point where the vectorized hash table lookups actually help our overall performance. This commit makes a first step in this direction: Improve hash/prefetch primitives: Until now we had a single primitive that would hash, and then another primitive that would prefetch based on the hash. Looking at the ROF paper their prefetching strategy is actually different: they fuse hashing and prefetching into a single primitive. This allows overlapping memory loads with computation, leading to better CPU utilization. We now do the same and have a fused hash/prefetch primitive. --- CMakeLists.txt | 5 +- bench/benchmarks.cpp | 2 +- bench/compiler_invoke.cpp | 3 +- bench/vectorized_ht.cpp | 54 +++++++++++++++++-- src/algebra/Join.cpp | 18 +++---- .../suboperators/RuntimeFunctionSubop.h | 34 ++---------- .../RuntimeFunctionSubopFragmentizer.cpp | 12 +---- src/runtime/HashTableRuntime.cpp | 12 ++--- src/runtime/HashTableRuntime.h | 4 +- src/runtime/NewHashTables.cpp | 10 +++- src/runtime/NewHashTables.h | 4 +- test/operators/test_expression.cpp | 7 +-- test/operators/test_table_scan.cpp | 12 ++--- test/runtime/test_atomic_hash_table.cpp | 4 +- .../test_atomic_hash_table_complex_key.cpp | 4 +- .../aggregation/test_agg_reader_subop.cpp | 3 +- .../aggregation/test_aggregator_subop.cpp | 5 +- test/suboperators/test_ht_inserts.cpp | 9 ++-- test/suboperators/test_ht_lookup.cpp | 13 ++--- test/suboperators/test_key_packing.cpp | 7 +-- .../test_runtime_key_expression.cpp | 11 ++-- 21 files changed, 124 insertions(+), 109 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index eb65df8..5b96824 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,10 +12,9 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/") set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -rdynamic -stdlib=libc++") -# set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -rdynamic -g -O0 -fsanitize=address") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -rdynamic -stdlib=libc++ -gdwarf-4") # Generate DWARF 4 in debug to work on older GDB versions -set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -rdynamic -g -gdwarf-4 -O0 -fsanitize=address") +set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -O0 -fsanitize=address") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}") # --------------------------------------------------------------------------- diff --git a/bench/benchmarks.cpp b/bench/benchmarks.cpp index 2a3fbf9..71fefa0 100644 --- a/bench/benchmarks.cpp +++ b/bench/benchmarks.cpp @@ -1,3 +1,3 @@ #include -BENCHMARK_MAIN(); \ No newline at end of file +BENCHMARK_MAIN(); diff --git a/bench/compiler_invoke.cpp b/bench/compiler_invoke.cpp index f69fa6b..38bbcae 100644 --- a/bench/compiler_invoke.cpp +++ b/bench/compiler_invoke.cpp @@ -1,5 +1,6 @@ #include "benchmark/benchmark.h" #include "exec/InterruptableJob.h" +#include #include /// The benchmarks in this file test the overhead of invoking the @@ -84,4 +85,4 @@ BENCHMARK(invoke_gcc_direct)->Arg(0)->Arg(1); } -} \ No newline at end of file +} diff --git a/bench/vectorized_ht.cpp b/bench/vectorized_ht.cpp index a2823f8..a794b3c 100644 --- a/bench/vectorized_ht.cpp +++ b/bench/vectorized_ht.cpp @@ -107,6 +107,13 @@ struct BenchmarkHashTable { __builtin_prefetch(&entries[slot_idx]); }; + const uint64_t vec_slot_and_load(const KeyType& key) const { + const auto hash = XXH3_64bits(&key, key_size); + const auto slot = hash % capacity; + __builtin_prefetch(&entries[slot]); + return slot; + }; + const Entry* vec_lookup(const KeyType& key, uint64_t slot_idx) const { const Entry* entry = &entries[slot_idx]; while (entry->key != 0) { @@ -190,6 +197,42 @@ void BM_ht_perf_vectorized(benchmark::State& state) { state.SetItemsProcessed(state.iterations() * num_elems); } +/** + * Vectorized hash table as in the ROF paper. Fused prefetching & hash + * computation to overlap loads and computation nicely. + */ +void BM_ht_perf_vectorized_rof(benchmark::State& state) { + const uint64_t num_elems = state.range(0); + const uint64_t batch_size = state.range(1); + BenchmarkHashTable ht{static_cast(num_elems) * 2, 8}; + for (uint64_t k = 1; k <= num_elems; ++k) { + ht.tat_insert(7 * k, k); + } + std::vector keys(batch_size); + std::vector slots(batch_size); + for (auto _ : state) { + // Lookup every key again. + for (uint64_t k = 1; k <= num_elems; k += batch_size) { + const auto curr_batch = std::min(batch_size, num_elems - k + 1); + for (uint64_t tid = 0; tid < curr_batch; ++tid) { + keys[tid] = 7 * (k + tid); + } + for (uint64_t tid = 0; tid < curr_batch; ++tid) { + slots[tid] = ht.vec_slot_and_load(keys[tid]); + } + for (uint64_t tid = 0; tid < curr_batch; ++tid) { + const auto* res = ht.vec_lookup(keys[tid], slots[tid]); + // We have to do something with the result, otherwise the compiler is too smart + // to optimize memory accesses away. + if (res->value > num_elems) { + throw std::runtime_error("bad ht lookup for " + std::to_string(k)); + } + } + } + } + state.SetItemsProcessed(state.iterations() * num_elems); +} + void BM_ht_perf_tat_inkfuse(benchmark::State& state) { const uint64_t num_elems = state.range(0); inkfuse::SimpleKeyComparator comp{8}; @@ -231,10 +274,7 @@ void BM_ht_perf_vectorized_inkfuse(benchmark::State& state) { keys[tid] = 7 * (k + tid); } for (uint64_t tid = 0; tid < curr_batch; ++tid) { - hashes[tid] = ht.compute_hash(reinterpret_cast(&keys[tid])); - } - for (uint64_t tid = 0; tid < curr_batch; ++tid) { - ht.slot_prefetch(hashes[tid]); + hashes[tid] = ht.compute_hash_and_prefetch(reinterpret_cast(&keys[tid])); } for (uint64_t tid = 0; tid < curr_batch; ++tid) { const auto* res = ht.lookup(reinterpret_cast(&keys[tid]), hashes[tid]); @@ -254,8 +294,12 @@ BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->A // Different internal batch sizes. 256 is a good value. BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192); +BENCHMARK(BM_ht_perf_vectorized_rof)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256); +// Different internal batch sizes. 256 is a good value. +BENCHMARK(BM_ht_perf_vectorized_rof)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192); + BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256); // Different internal batch sizes. 256 is a good value. BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192); -} // namespacf +} // namespace diff --git a/src/algebra/Join.cpp b/src/algebra/Join.cpp index 9c1795c..056d911 100644 --- a/src/algebra/Join.cpp +++ b/src/algebra/Join.cpp @@ -54,12 +54,9 @@ void materializedTupleToHashTable( size_t curr_batch_size = std::min(batch_size, (chunk->end_ptr - curr_tuple) / slot_size); const char* curr_tuple_hash_it = curr_tuple; for (size_t batch_idx = 0; batch_idx < curr_batch_size; ++batch_idx) { - hashes[batch_idx] = ht_state.hash_table->compute_hash(curr_tuple_hash_it); + hashes[batch_idx] = ht_state.hash_table->compute_hash_and_prefetch(curr_tuple_hash_it); curr_tuple_hash_it += slot_size; } - for (size_t batch_idx = 0; batch_idx < curr_batch_size; ++batch_idx) { - ht_state.hash_table->slot_prefetch(hashes[batch_idx]); - } for (size_t batch_idx = 0; batch_idx < curr_batch_size; ++batch_idx) { ht_state.hash_table->insert(curr_tuple, hashes[batch_idx]); curr_tuple += slot_size; @@ -269,19 +266,16 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const { pseudo.push_back(&pseudo_iu); } - // 2.2.1 Compute the hash. - probe_pipe.attachSuboperator(RuntimeFunctionSubop::htHash>(this, *hash_right, *scratch_pad_right, std::move(pseudo), &ht_state)); - - // 2.2.2 Prefetch the slot. - probe_pipe.attachSuboperator(RuntimeFunctionSubop::htPrefetch>(this, &*prefetch_pseudo, *hash_right, &ht_state)); + // 2.2.1 Compute the hash and prefetch the slot. + probe_pipe.attachSuboperator(RuntimeFunctionSubop::htHashAndPrefetch>(this, *hash_right, *scratch_pad_right, std::move(pseudo), &ht_state)); - // 2.2.3 Perfom the lookup. + // 2.2.2 Perfom the lookup. if (type == JoinType::LeftSemi) { // Lookup on a slot disables the slot, giving semi-join behaviour. - probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash, true>(this, *lookup_right, *scratch_pad_right, *hash_right, &*prefetch_pseudo, &ht_state)); + probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash, true>(this, *lookup_right, *scratch_pad_right, *hash_right, /* prefetch_pseudo = */ nullptr, &ht_state)); } else { // Regular lookup that does not disable slots. - probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash, false>(this, *lookup_right, *scratch_pad_right, *hash_right, &*prefetch_pseudo, &ht_state)); + probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash, false>(this, *lookup_right, *scratch_pad_right, *hash_right, /* prefetch_pseudo = */ nullptr, &ht_state)); } } diff --git a/src/algebra/suboperators/RuntimeFunctionSubop.h b/src/algebra/suboperators/RuntimeFunctionSubop.h index 6f9ddcd..3d94c02 100644 --- a/src/algebra/suboperators/RuntimeFunctionSubop.h +++ b/src/algebra/suboperators/RuntimeFunctionSubop.h @@ -30,10 +30,10 @@ struct RuntimeFunctionSubop : public TemplatedSuboperator htInsert(const RelAlgOp* source, const IU* pointers_, const IU& key_, std::vector pseudo_ius_, DefferredStateInitializer* state_init_ = nullptr); - /// Hash a key with the hash table's hash function. + /// Hash a key with the hash table's hash function and prefetch the corresponding slot. template - static std::unique_ptr htHash(const RelAlgOp* source, const IU& hash_, const IU& key_, std::vector pseudo_ius_, DefferredStateInitializer* state_init_ = nullptr) { - std::string fct_name = "ht_" + HashTable::ID + "_compute_hash"; + static std::unique_ptr htHashAndPrefetch(const RelAlgOp* source, const IU& hash_, const IU& key_, std::vector pseudo_ius_, DefferredStateInitializer* state_init_ = nullptr) { + std::string fct_name = "ht_" + HashTable::ID + "_compute_hash_and_prefetch"; std::vector in_ius{&key_}; for (auto pseudo : pseudo_ius_) { // Pseudo IUs are used as input IUs in the backing graph, but do not influence arguments. @@ -55,34 +55,6 @@ struct RuntimeFunctionSubop : public TemplatedSuboperator - static std::unique_ptr htPrefetch(const RelAlgOp* source, const IU* prefetch_pseudo, const IU& hash_, DefferredStateInitializer* state_init_ = nullptr) { - std::string fct_name = "ht_" + HashTable::ID + "_slot_prefetch"; - std::vector in_ius{&hash_}; - std::vector ref{false}; - std::vector out_ius_{}; - if (prefetch_pseudo) { - out_ius_.push_back(prefetch_pseudo); - } - std::vector args{&hash_}; - std::unique_ptr result_subop{new RuntimeFunctionSubop( - source, - state_init_, - std::move(fct_name), - std::move(in_ius), - std::move(out_ius_), - std::move(args), - std::move(ref), - /* out = */ nullptr)}; - // Prefetch instructions should never be generated in the operator-fusing code. - // When performing operator-fusing code generation, we are going through - // the code tuple-at-a time. As a result, the followup superator (e.g. HT lookup) - // will directly cause the cache miss anyways. - result_subop->optimization_properties.ct_only_vectorized = true; - return result_subop; - } - /// Build a hash table lookup function. template static std::unique_ptr htLookupWithHash(const RelAlgOp* source, const IU& pointers_, const IU& key_, const IU& hash_, const IU* prefetch_pseudo_, DefferredStateInitializer* state_init_ = nullptr) { diff --git a/src/interpreter/RuntimeFunctionSubopFragmentizer.cpp b/src/interpreter/RuntimeFunctionSubopFragmentizer.cpp index c55c78c..dd430ff 100644 --- a/src/interpreter/RuntimeFunctionSubopFragmentizer.cpp +++ b/src/interpreter/RuntimeFunctionSubopFragmentizer.cpp @@ -40,11 +40,11 @@ RuntimeFunctionSubopFragmentizer::RuntimeFunctionSubopFragmentizer() { // Fragmentize Vectorized Hash Table Primitives { - // Hash: + // Hash and prefetch: auto& [name, pipe] = pipes.emplace_back(); const auto& key = generated_ius.emplace_back(in_type); const auto& hash = generated_ius.emplace_back(IR::UnsignedInt::build(8)); - const auto& op = pipe.attachSuboperator(RuntimeFunctionSubop::htHash>(nullptr, hash, key, {})); + const auto& op = pipe.attachSuboperator(RuntimeFunctionSubop::htHashAndPrefetch>(nullptr, hash, key, {})); name = op.id(); } { @@ -106,14 +106,6 @@ RuntimeFunctionSubopFragmentizer::RuntimeFunctionSubopFragmentizer() { } } - // Fragmentize Prefetch. - { - auto& [name, pipe] = pipes.emplace_back(); - const auto& hash = generated_ius.emplace_back(IR::UnsignedInt::build(8)); - const auto& op = pipe.attachSuboperator(RuntimeFunctionSubop::htPrefetch>(nullptr, nullptr, hash)); - name = op.id(); - } - // Fragmentize tuple materialization. { auto& [name, pipe] = pipes.emplace_back(); diff --git a/src/runtime/HashTableRuntime.cpp b/src/runtime/HashTableRuntime.cpp index 149197f..798667e 100644 --- a/src/runtime/HashTableRuntime.cpp +++ b/src/runtime/HashTableRuntime.cpp @@ -58,8 +58,8 @@ extern "C" void HashTableRuntime::ht_dl_it_advance(void* table, char** it_data, } // Atomic hash table. -extern "C" uint64_t HashTableRuntime::ht_at_sk_compute_hash(void* table, char* key) { - return reinterpret_cast*>(table)->compute_hash(key); +extern "C" uint64_t HashTableRuntime::ht_at_sk_compute_hash_and_prefetch(void* table, char* key) { + return reinterpret_cast*>(table)->compute_hash_and_prefetch(key); } extern "C" void HashTableRuntime::ht_at_sk_slot_prefetch(void* table, uint64_t hash) { @@ -74,8 +74,8 @@ extern "C" char* HashTableRuntime::ht_at_sk_lookup_with_hash_disable(void* table return reinterpret_cast*>(table)->lookupDisable(key, hash); } -extern "C" uint64_t HashTableRuntime::ht_at_ck_compute_hash(void* table, char* key) { - return reinterpret_cast*>(table)->compute_hash(key); +extern "C" uint64_t HashTableRuntime::ht_at_ck_compute_hash_and_prefetch(void* table, char* key) { + return reinterpret_cast*>(table)->compute_hash_and_prefetch(key); } extern "C" void HashTableRuntime::ht_at_ck_slot_prefetch(void* table, uint64_t hash) { @@ -172,7 +172,7 @@ void HashTableRuntime::registerRuntime() { .addArg("table", IR::Pointer::build(IR::Void::build())) .addArg("key", IR::Pointer::build(IR::Char::build()), true); - RuntimeFunctionBuilder("ht_at_sk_compute_hash", IR::UnsignedInt::build(8)) + RuntimeFunctionBuilder("ht_at_sk_compute_hash_and_prefetch", IR::UnsignedInt::build(8)) .addArg("table", IR::Pointer::build(IR::Void::build())) .addArg("key", IR::Pointer::build(IR::Char::build()), true); @@ -190,7 +190,7 @@ void HashTableRuntime::registerRuntime() { .addArg("key", IR::Pointer::build(IR::Char::build())) .addArg("hash", IR::UnsignedInt::build(8), true); - RuntimeFunctionBuilder("ht_at_ck_compute_hash", IR::UnsignedInt::build(8)) + RuntimeFunctionBuilder("ht_at_ck_compute_hash_and_prefetch", IR::UnsignedInt::build(8)) .addArg("table", IR::Pointer::build(IR::Void::build())) .addArg("key", IR::Pointer::build(IR::Char::build()), true); diff --git a/src/runtime/HashTableRuntime.h b/src/runtime/HashTableRuntime.h index b9c041f..9d9bd61 100644 --- a/src/runtime/HashTableRuntime.h +++ b/src/runtime/HashTableRuntime.h @@ -29,12 +29,12 @@ extern "C" char* ht_at_sk_lookup(void* table, char* key); extern "C" char* ht_at_sk_lookup_disable(void* table, char* key); extern "C" char* ht_at_ck_lookup(void* table, char* key); -extern "C" uint64_t ht_at_sk_compute_hash(void* table, char* key); +extern "C" uint64_t ht_at_sk_compute_hash_and_prefetch(void* table, char* key); extern "C" void ht_at_sk_slot_prefetch(void* table, uint64_t hash); extern "C" char* ht_at_sk_lookup_with_hash(void* table, char* key, uint64_t hash); extern "C" char* ht_at_sk_lookup_with_hash_disable(void* table, char* key, uint64_t hash); -extern "C" uint64_t ht_at_ck_compute_hash(void* table, char* key); +extern "C" uint64_t ht_at_ck_compute_hash_and_prefetch(void* table, char* key); extern "C" void ht_at_ck_slot_prefetch(void* table, uint64_t hash); extern "C" char* ht_at_ck_lookup_with_hash(void* table, char* key, uint64_t hash); extern "C" char* ht_at_ck_lookup_with_hash_disable(void* table, char* key, uint64_t hash); diff --git a/src/runtime/NewHashTables.cpp b/src/runtime/NewHashTables.cpp index 995e4ef..f7eaa44 100644 --- a/src/runtime/NewHashTables.cpp +++ b/src/runtime/NewHashTables.cpp @@ -83,8 +83,14 @@ AtomicHashTable::AtomicHashTable(Comparator comp_, uint16_t total_sl } template -uint64_t AtomicHashTable::compute_hash(const char* key) const { - return comp.hash(key); +uint64_t AtomicHashTable::compute_hash_and_prefetch(const char* key) const { + uint64_t hash = comp.hash(key); + const uint64_t slot_id = hash & mod_mask; + // Prefetch the actual data array. + __builtin_prefetch(&data[slot_id * total_slot_size]); + // Prefetch the bitmask slot. + __builtin_prefetch(&tags[slot_id]); + return hash; } template diff --git a/src/runtime/NewHashTables.h b/src/runtime/NewHashTables.h index 446e6c1..e1f59f6 100644 --- a/src/runtime/NewHashTables.h +++ b/src/runtime/NewHashTables.h @@ -43,8 +43,8 @@ struct AtomicHashTable { AtomicHashTable(Comparator comp_, uint16_t total_slot_size_, size_t num_slots_); - /// Compute the hash for a given key. - uint64_t compute_hash(const char* key) const; + /// Compute the hash for a given key and prefetch the corresponding hash table slot. + uint64_t compute_hash_and_prefetch(const char* key) const; /// Prefetch the tag and data slots for a specific hash. void slot_prefetch(uint64_t hash) const; /// Get the pointer to a given key, or nullptr if the group does not exist. diff --git a/test/operators/test_expression.cpp b/test/operators/test_expression.cpp index 337fda1..5da10a6 100644 --- a/test/operators/test_expression.cpp +++ b/test/operators/test_expression.cpp @@ -4,6 +4,7 @@ #include "algebra/RelAlgOp.h" #include "codegen/Value.h" #include "codegen/backend_c/BackendC.h" +#include "exec/FuseChunk.h" #include "exec/PipelineExecutor.h" #include @@ -167,8 +168,8 @@ TEST_P(ExpressionTParametrized, hash) { auto& ctx = exec.getExecutionContext(); auto& c_in1 = ctx.getColumn(source, 0); - c_in1.size = 1000; - for (uint16_t k = 0; k < 1000; ++k) { + c_in1.size = DEFAULT_CHUNK_SIZE; + for (uint16_t k = 0; k < DEFAULT_CHUNK_SIZE; ++k) { reinterpret_cast(c_in1.raw_data)[k] = k; } @@ -180,7 +181,7 @@ TEST_P(ExpressionTParametrized, hash) { std::unordered_set seen; // This set should have no hash collisions. auto& hash_col = ctx.getColumn(hash_iu, 0); - for (uint16_t k = 0; k < 1000; ++k) { + for (uint16_t k = 0; k < DEFAULT_CHUNK_SIZE; ++k) { auto elem = reinterpret_cast(hash_col.raw_data)[k]; EXPECT_EQ(seen.count(elem), 0); seen.insert(elem); diff --git a/test/operators/test_table_scan.cpp b/test/operators/test_table_scan.cpp index c25460c..358faa3 100644 --- a/test/operators/test_table_scan.cpp +++ b/test/operators/test_table_scan.cpp @@ -4,6 +4,7 @@ #include "algebra/TableScan.h" #include "algebra/suboperators/sinks/FuseChunkSink.h" #include "codegen/backend_c/BackendC.h" +#include "exec/FuseChunk.h" #include "exec/PipelineExecutor.h" #include @@ -15,9 +16,9 @@ TEST(test_table_scan, scan_1) { StoredRelation rel; auto& col_1 = rel.attachPODColumn("col_1", IR::UnsignedInt::build(8)); auto& storage = col_1.getStorage(); - storage.resize(8 * 1000); - for (uint64_t k = 0; k < 1000; ++k) - { + // two full fuse chunks in the source table + storage.resize(8 * 2 * DEFAULT_CHUNK_SIZE); + for (uint64_t k = 0; k < 2 * DEFAULT_CHUNK_SIZE; ++k) { reinterpret_cast(storage.data())[k] = k; } @@ -40,9 +41,8 @@ TEST(test_table_scan, scan_1) { EXPECT_NO_THROW(exec.runPipeline()); auto& col = exec.getExecutionContext().getColumn(tscan_iu, 0); - for (uint64_t k = 0; k < 1000; ++k) - { - EXPECT_EQ(reinterpret_cast(col.raw_data)[k], k); + for (uint64_t k = 0; k < DEFAULT_CHUNK_SIZE; ++k) { + EXPECT_EQ(reinterpret_cast(col.raw_data)[k], DEFAULT_CHUNK_SIZE + k); } } diff --git a/test/runtime/test_atomic_hash_table.cpp b/test/runtime/test_atomic_hash_table.cpp index 3dd7b30..a37b1c6 100644 --- a/test/runtime/test_atomic_hash_table.cpp +++ b/test/runtime/test_atomic_hash_table.cpp @@ -62,7 +62,7 @@ struct AtomicHashTableTestT : public ::testing::TestWithParam { void checkContains(const RandomDataResult& data, size_t idx) { const char* key_ptr = &data.keys[idx * std::get<0>(GetParam())]; const char* payload_ptr = &data.payloads[idx * 16]; - const auto hash = ht.compute_hash(key_ptr); + const auto hash = ht.compute_hash_and_prefetch(key_ptr); ht.slot_prefetch(hash); const auto slot_lookup = ht.lookup(key_ptr, hash); ASSERT_NE(slot_lookup, nullptr); @@ -74,7 +74,7 @@ struct AtomicHashTableTestT : public ::testing::TestWithParam { void checkNotContains(const RandomDataResult& data, size_t idx) { const char* key_ptr = &data.keys[idx * std::get<0>(GetParam())]; - const auto hash = ht.compute_hash(key_ptr); + const auto hash = ht.compute_hash_and_prefetch(key_ptr); ht.slot_prefetch(hash); const auto slot = ht.lookup(key_ptr, hash); EXPECT_EQ(slot, nullptr); diff --git a/test/runtime/test_atomic_hash_table_complex_key.cpp b/test/runtime/test_atomic_hash_table_complex_key.cpp index 2cb2fb8..f7ade26 100644 --- a/test/runtime/test_atomic_hash_table_complex_key.cpp +++ b/test/runtime/test_atomic_hash_table_complex_key.cpp @@ -63,7 +63,7 @@ struct AtomicComplexHashTableTestT : public ::testing::TestWithParam { void checkContains(const std::vector& data, size_t idx) { const char* raw_string = data[idx].data(); const char* key_ptr = reinterpret_cast(&raw_string); - const auto hash = ht.compute_hash(key_ptr); + const auto hash = ht.compute_hash_and_prefetch(key_ptr); ht.slot_prefetch(hash); const auto slot_lookup = ht.lookup(key_ptr, hash); ASSERT_NE(slot_lookup, nullptr); @@ -77,7 +77,7 @@ struct AtomicComplexHashTableTestT : public ::testing::TestWithParam { if (std::find(data_exists.begin(), data_exists.end(), str) == data_exists.end()) { const char* raw_string = str.data(); const char* key_ptr = reinterpret_cast(&raw_string); - const auto hash = ht.compute_hash(key_ptr); + const auto hash = ht.compute_hash_and_prefetch(key_ptr); ht.slot_prefetch(hash); const auto slot = ht.lookup(key_ptr, hash); EXPECT_EQ(slot, nullptr); diff --git a/test/suboperators/aggregation/test_agg_reader_subop.cpp b/test/suboperators/aggregation/test_agg_reader_subop.cpp index 2ae995c..c78b249 100644 --- a/test/suboperators/aggregation/test_agg_reader_subop.cpp +++ b/test/suboperators/aggregation/test_agg_reader_subop.cpp @@ -2,6 +2,7 @@ #include "algebra/suboperators/aggregation/AggComputeUnpack.h" #include "algebra/suboperators/aggregation/AggReaderSubop.h" #include "codegen/Type.h" +#include "exec/FuseChunk.h" #include "exec/PipelineExecutor.h" #include "gtest/gtest.h" @@ -9,7 +10,7 @@ namespace inkfuse { namespace { -const size_t block_size = 1000; +const size_t block_size = DEFAULT_CHUNK_SIZE; /// Test the aggregate state reader. Takes input column [Ptr] /// containing serialized aggregation state and produces aggregation result. diff --git a/test/suboperators/aggregation/test_aggregator_subop.cpp b/test/suboperators/aggregation/test_aggregator_subop.cpp index fe94448..8655c1e 100644 --- a/test/suboperators/aggregation/test_aggregator_subop.cpp +++ b/test/suboperators/aggregation/test_aggregator_subop.cpp @@ -2,6 +2,7 @@ #include "algebra/suboperators/aggregation/AggStateCount.h" #include "algebra/suboperators/aggregation/AggregatorSubop.h" #include "codegen/Type.h" +#include "exec/FuseChunk.h" #include "exec/PipelineExecutor.h" #include "gtest/gtest.h" #include @@ -11,7 +12,7 @@ namespace inkfuse { namespace { -const size_t block_size = 1000; +const size_t block_size = DEFAULT_CHUNK_SIZE; /// Test the aggregator sub-operator. Takes input columns [Ptr (agg_state), Bool (not_init), I4, U8, F8]. /// The pointers are the aggregation state target. Agg state size 32 bytes. @@ -32,7 +33,7 @@ struct AggregatorSubopTest { cols.resize(5); for (const auto iu : target_iu_idxs) { auto& col = ctx.getColumn(src_ius[iu], 0); - col.size = 1000; + col.size = block_size; cols[iu] = &col; } for (size_t k = 0; k < block_size; ++k) { diff --git a/test/suboperators/test_ht_inserts.cpp b/test/suboperators/test_ht_inserts.cpp index 5da5f8a..b4b478d 100644 --- a/test/suboperators/test_ht_inserts.cpp +++ b/test/suboperators/test_ht_inserts.cpp @@ -2,6 +2,7 @@ #include "algebra/suboperators/RuntimeFunctionSubop.h" #include "algebra/suboperators/row_layout/KeyPackerSubop.h" #include "codegen/Type.h" +#include "exec/FuseChunk.h" #include "exec/PipelineExecutor.h" #include "gtest/gtest.h" @@ -58,12 +59,12 @@ TEST_P(HtInsertTest, insert_trigger_resize) { // Morsel with 32 keys should trigger a first resize to 64 elements. prepareMorsel(ctx, 32, 0); exec.runMorsel(0); - // Morsel with 1024 keys should trigger multiple additional resizes. - prepareMorsel(ctx, 1024, 32); + // Morsel with DEFAULT_CHUNK_SIZE keys should trigger multiple additional resizes. + prepareMorsel(ctx, DEFAULT_CHUNK_SIZE, 32); exec.runMorsel(0); // Now we need to check that we have everything we need. - EXPECT_EQ(ht.size(), 1024 + 32); - for (uint32_t key = 0; key < 1024 + 32; ++key) { + EXPECT_EQ(ht.size(), DEFAULT_CHUNK_SIZE + 32); + for (uint32_t key = 0; key < DEFAULT_CHUNK_SIZE + 32; ++key) { auto res = reinterpret_cast(ht.lookup(reinterpret_cast(&key))); ASSERT_NE(res, nullptr); EXPECT_EQ(*res, key); diff --git a/test/suboperators/test_ht_lookup.cpp b/test/suboperators/test_ht_lookup.cpp index 0109632..85b1b18 100644 --- a/test/suboperators/test_ht_lookup.cpp +++ b/test/suboperators/test_ht_lookup.cpp @@ -1,6 +1,7 @@ #include "algebra/Pipeline.h" #include "algebra/suboperators/RuntimeFunctionSubop.h" #include "codegen/Type.h" +#include "exec/FuseChunk.h" #include "exec/PipelineExecutor.h" #include "gtest/gtest.h" #include "xxhash.h" @@ -48,16 +49,16 @@ TEST_P(HtLookupSubopTest, lookup_existing) { // Prepare input chunk. auto& ctx = exec->getExecutionContext(); auto& keys = ctx.getColumn(key_iu, 0); - for (uint64_t k = 0; k < 1000; ++k) { + for (uint64_t k = 0; k < DEFAULT_CHUNK_SIZE; ++k) { reinterpret_cast(keys.raw_data)[k] = k; } - keys.size = 1000; + keys.size = DEFAULT_CHUNK_SIZE; // Run the morsel doing the has table lookups. exec->runMorsel(0); auto& pointers = ctx.getColumn(ptr_iu, 0); - for (uint64_t k = 0; k < 1000; ++k) { + for (uint64_t k = 0; k < DEFAULT_CHUNK_SIZE; ++k) { // Check that the output pointers are valid. char* ptr = table.lookup(reinterpret_cast(&k)); EXPECT_EQ(ptr, reinterpret_cast(pointers.raw_data)[k]); @@ -70,17 +71,17 @@ TEST_P(HtLookupSubopTest, lookup_nonexisting) { // Prepare input chunk. auto& ctx = exec->getExecutionContext(); auto& keys = ctx.getColumn(key_iu, 0); - for (uint64_t k = 0; k < 1000; ++k) { + for (uint64_t k = 0; k < DEFAULT_CHUNK_SIZE; ++k) { uint64_t nonexitant = 20000 + k; reinterpret_cast(keys.raw_data)[k] = nonexitant; } - keys.size = 1000; + keys.size = DEFAULT_CHUNK_SIZE; // Run the morsel doing the has table lookups. exec->runMorsel(0); auto& pointers = ctx.getColumn(ptr_iu, 0); - for (uint64_t k = 0; k < 1000; ++k) { + for (uint64_t k = 0; k < DEFAULT_CHUNK_SIZE; ++k) { // All the outputs should be nullptrs. EXPECT_EQ(nullptr, reinterpret_cast(pointers.raw_data)[k]); } diff --git a/test/suboperators/test_key_packing.cpp b/test/suboperators/test_key_packing.cpp index 2515a51..4ee0f77 100644 --- a/test/suboperators/test_key_packing.cpp +++ b/test/suboperators/test_key_packing.cpp @@ -2,6 +2,7 @@ #include "algebra/suboperators/row_layout/KeyPackerSubop.h" #include "algebra/suboperators/row_layout/KeyUnpackerSubop.h" #include "codegen/Type.h" +#include "exec/FuseChunk.h" #include "exec/PipelineExecutor.h" #include "gtest/gtest.h" #include @@ -51,7 +52,7 @@ struct KeyPackingTest : public ::testing::TestWithParamgetExecutionContext(); auto& ptrs_pack = ctx_pack.getColumn(ptr_iu, 0); auto& ptrs_unpack = ctx_unpack.getColumn(ptr_iu, 0); - const size_t num_rows = 1000; + const size_t num_rows = DEFAULT_CHUNK_SIZE; ptrs_unpack.size = num_rows; ptrs_pack.size = num_rows; // 13 bytes state on the compound key. @@ -104,12 +105,12 @@ TEST_P(KeyPackingTest, test_pack_unpack) { // Pointers should be the same. const auto& ptrs_pack = ctx_pack.getColumn(ptr_iu, 0); const auto& ptrs_unpack = ctx_unpack.getColumn(ptr_iu, 0); - EXPECT_EQ(0, std::memcmp(ptrs_pack.raw_data, ptrs_unpack.raw_data, 1000 * sizeof(char*))); + EXPECT_EQ(0, std::memcmp(ptrs_pack.raw_data, ptrs_unpack.raw_data, DEFAULT_CHUNK_SIZE * sizeof(char*))); for (uint64_t k = 0; k < 3; ++k) { const auto& source = ctx_pack.getColumn(src_ius[k], 0); const auto& target = ctx_unpack.getColumn(out_ius[k], 0); // Input and output column should be the same. - EXPECT_EQ(0, std::memcmp(source.raw_data, target.raw_data, 1000 * bytes[k])); + EXPECT_EQ(0, std::memcmp(source.raw_data, target.raw_data, DEFAULT_CHUNK_SIZE * bytes[k])); } } diff --git a/test/suboperators/test_runtime_key_expression.cpp b/test/suboperators/test_runtime_key_expression.cpp index 1e76e86..a149196 100644 --- a/test/suboperators/test_runtime_key_expression.cpp +++ b/test/suboperators/test_runtime_key_expression.cpp @@ -1,6 +1,7 @@ #include "algebra/Pipeline.h" #include "algebra/suboperators/expressions/RuntimeKeyExpressionSubop.h" #include "codegen/Type.h" +#include "exec/FuseChunk.h" #include "exec/PipelineExecutor.h" #include "gtest/gtest.h" @@ -34,9 +35,9 @@ struct RuntimeKeyExpressionTest : public ::testing::TestWithParamgetExecutionContext(); auto& pointers = ctx.getColumn(ptr_iu, 0); auto& input_data = ctx.getColumn(src_iu, 0); - raw_data = std::make_unique(12 * 1000); + raw_data = std::make_unique(12 * DEFAULT_CHUNK_SIZE); uint32_t key = 1; - for (uint64_t k = 0; k < 1000; ++k) { + for (uint64_t k = 0; k < DEFAULT_CHUNK_SIZE; ++k) { reinterpret_cast(pointers.raw_data)[k] = &raw_data[12 * k]; if (k % 2 == 0) { // Keq equals. @@ -49,8 +50,8 @@ struct RuntimeKeyExpressionTest : public ::testing::TestWithParamgetExecutionContext(); auto& out = ctx.getColumn(provided_iu, 0); - for (uint64_t k = 0; k < 1000; ++k) { + for (uint64_t k = 0; k < DEFAULT_CHUNK_SIZE; ++k) { EXPECT_EQ(reinterpret_cast(out.raw_data)[k], k % 2 == 0); } }