diff --git a/CMakeLists.txt b/CMakeLists.txt index 1c05bdc..efb2e31 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,7 +15,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # Generate DWARF 4 in debug to work on older GDB versions # flto required as xxhash is also built with flto to allow efficient inlining # of the hash functions. -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++ -gdwarf-4 -flto") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -stdlib=libc++ -flto") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -O0 -fsanitize=address") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3") @@ -292,6 +292,7 @@ if (WITH_BENCH) bench/compiler_invoke.cpp bench/ht_benchmark.cpp bench/vectorized_ht.cpp + $ ) target_link_libraries(inkbench PUBLIC benchmark::benchmark inkfuse) # Move the testdata into the binary tree for easy ingest tests. diff --git a/bench/vectorized_ht.cpp b/bench/vectorized_ht.cpp index a794b3c..9cd8510 100644 --- a/bench/vectorized_ht.cpp +++ b/bench/vectorized_ht.cpp @@ -266,6 +266,7 @@ void BM_ht_perf_vectorized_inkfuse(benchmark::State& state) { } std::vector keys(batch_size); std::vector hashes(batch_size); + std::vector results(batch_size); for (auto _ : state) { // Lookup every key again. for (uint64_t k = 1; k <= num_elems; k += batch_size) { @@ -277,28 +278,25 @@ void BM_ht_perf_vectorized_inkfuse(benchmark::State& state) { hashes[tid] = ht.compute_hash_and_prefetch(reinterpret_cast(&keys[tid])); } for (uint64_t tid = 0; tid < curr_batch; ++tid) { - const auto* res = ht.lookup(reinterpret_cast(&keys[tid]), hashes[tid]); - if (reinterpret_cast(res)[1] > num_elems) { - throw std::runtime_error("bad ht lookup for " + std::to_string(k)); - } + results[tid] = ht.lookup(reinterpret_cast(&keys[tid]), hashes[tid]); } } } state.SetItemsProcessed(state.iterations() * num_elems); } -BENCHMARK(BM_ht_perf_tat)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 25)->Arg(1 << 30); -BENCHMARK(BM_ht_perf_tat_inkfuse)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 25)->Arg(1 << 30); +BENCHMARK(BM_ht_perf_tat)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 21)->Arg(1 << 25)->Arg(1 << 30); +BENCHMARK(BM_ht_perf_tat_inkfuse)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 21)->Arg(1 << 25)->Arg(1 << 30); -BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256); +BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 21, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256); // Different internal batch sizes. 256 is a good value. BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192); -BENCHMARK(BM_ht_perf_vectorized_rof)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256); +BENCHMARK(BM_ht_perf_vectorized_rof)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 21, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256); // Different internal batch sizes. 256 is a good value. BENCHMARK(BM_ht_perf_vectorized_rof)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192); -BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256); +BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 21, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256); // Different internal batch sizes. 256 is a good value. BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192); diff --git a/src/algebra/Join.cpp b/src/algebra/Join.cpp index 9d40475..6b11b75 100644 --- a/src/algebra/Join.cpp +++ b/src/algebra/Join.cpp @@ -257,79 +257,78 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const { // 2.2 Probe. { - { - // Perform the actual lookup in a fully vectorized fashion. - Pipeline::ROFScopeGuard rof_guard{probe_pipe}; + // Perform the actual lookup in a fully vectorized fashion. + Pipeline::ROFScopeGuard rof_guard{probe_pipe}; - std::vector pseudo; - for (const auto& pseudo_iu : right_pseudo_ius) { - pseudo.push_back(&pseudo_iu); - } + std::vector pseudo; + for (const auto& pseudo_iu : right_pseudo_ius) { + pseudo.push_back(&pseudo_iu); + } - // 2.2.1 Compute the hash and prefetch the slot. - probe_pipe.attachSuboperator(RuntimeFunctionSubop::htHashAndPrefetch>(this, *hash_right, *scratch_pad_right, std::move(pseudo), key_size_left, &ht_state)); + // 2.2.1 Compute the hash and prefetch the slot. + probe_pipe.attachSuboperator(RuntimeFunctionSubop::htHashAndPrefetch>(this, *hash_right, *scratch_pad_right, std::move(pseudo), key_size_left, &ht_state)); - // 2.2.2 Perfom the lookup. - if (type == JoinType::LeftSemi) { - // Lookup on a slot disables the slot, giving semi-join behaviour. - probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash, true>(this, *lookup_right, *scratch_pad_right, *hash_right, /* prefetch_pseudo = */ nullptr, &ht_state)); - } else { - // Regular lookup that does not disable slots. - probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash, false>(this, *lookup_right, *scratch_pad_right, *hash_right, /* prefetch_pseudo = */ nullptr, &ht_state)); - } + // 2.2.2 Perfom the lookup. + if (type == JoinType::LeftSemi) { + // Lookup on a slot disables the slot, giving semi-join behaviour. + probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash, true>(this, *lookup_right, *scratch_pad_right, *hash_right, /* prefetch_pseudo = */ nullptr, &ht_state)); + } else { + // Regular lookup that does not disable slots. + probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash, false>(this, *lookup_right, *scratch_pad_right, *hash_right, /* prefetch_pseudo = */ nullptr, &ht_state)); } + } - // 2.3 Filter on probe matches. - auto& filter_scope_subop = probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu)); - auto& filter_scope = reinterpret_cast(filter_scope_subop); - // The filter on the build site filters "itself". This has some repercussions on the repiping - // behaviour of the suboperator and needs to be passed explicitly. - auto& filter_1 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true)); - filter_scope.attachFilterLogicDependency(filter_1, *lookup_right); - if (type != JoinType::LeftSemi) { - // If we need to produce columns on the probe side, we also have to filter the probe result. - // Note: the filtered ByteArray from the probe side becomes a Char* after filtering. - auto& filter_2 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type)); - filter_scope.attachFilterLogicDependency(filter_2, *scratch_pad_right); - } + // 2.3 Filter on probe matches. + auto& filter_scope_subop = probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu)); + auto& filter_scope = reinterpret_cast(filter_scope_subop); + // The filter on the build site filters "itself". This has some repercussions on the repiping + // behaviour of the suboperator and needs to be passed explicitly. + auto& filter_1 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true)); + filter_scope.attachFilterLogicDependency(filter_1, *lookup_right); + if (type != JoinType::LeftSemi) { + // If we need to produce columns on the probe side, we also have to filter the probe result. + // Note: the filtered ByteArray from the probe side becomes a Char* after filtering. + auto& filter_2 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type)); + filter_scope.attachFilterLogicDependency(filter_2, *scratch_pad_right); + } - // 2.4 Unpack everything. - // 2.4.1 Unpack Build Side IUs - size_t build_unpack_offset = 0; - for (const auto& iu : keys_left_out) { - auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu)); + // 2.4 Unpack everything. + // 2.4.1 Unpack Build Side IUs + size_t build_unpack_offset = 0; + for (const auto& iu : keys_left_out) { + auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu)); + KeyPackingRuntimeParams param; + param.offsetSet(IR::UI<2>::build(build_unpack_offset)); + reinterpret_cast(unpacker).attachRuntimeParams(std::move(param)); + build_unpack_offset += iu.type->numBytes(); + } + for (const auto& iu : payload_left_out) { + auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu)); + KeyPackingRuntimeParams param; + param.offsetSet(IR::UI<2>::build(build_unpack_offset)); + reinterpret_cast(unpacker).attachRuntimeParams(std::move(param)); + build_unpack_offset += iu.type->numBytes(); + } + // 2.4.1 Unpack Probe Side IUs. Not needed for semi joins. + if (type != JoinType::LeftSemi) { + size_t probe_unpack_offset = 0; + for (const auto& iu : keys_right_out) { + auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu)); KeyPackingRuntimeParams param; - param.offsetSet(IR::UI<2>::build(build_unpack_offset)); + param.offsetSet(IR::UI<2>::build(probe_unpack_offset)); reinterpret_cast(unpacker).attachRuntimeParams(std::move(param)); - build_unpack_offset += iu.type->numBytes(); + probe_unpack_offset += iu.type->numBytes(); } - for (const auto& iu : payload_left_out) { - auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu)); + for (const auto& iu : payload_right_out) { + auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu)); KeyPackingRuntimeParams param; - param.offsetSet(IR::UI<2>::build(build_unpack_offset)); + param.offsetSet(IR::UI<2>::build(probe_unpack_offset)); reinterpret_cast(unpacker).attachRuntimeParams(std::move(param)); - build_unpack_offset += iu.type->numBytes(); + probe_unpack_offset += iu.type->numBytes(); } - // 2.4.1 Unpack Probe Side IUs. Not needed for semi joins. - if (type != JoinType::LeftSemi) { - size_t probe_unpack_offset = 0; - for (const auto& iu : keys_right_out) { - auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu)); - KeyPackingRuntimeParams param; - param.offsetSet(IR::UI<2>::build(probe_unpack_offset)); - reinterpret_cast(unpacker).attachRuntimeParams(std::move(param)); - probe_unpack_offset += iu.type->numBytes(); - } - for (const auto& iu : payload_right_out) { - auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu)); - KeyPackingRuntimeParams param; - param.offsetSet(IR::UI<2>::build(probe_unpack_offset)); - reinterpret_cast(unpacker).attachRuntimeParams(std::move(param)); - probe_unpack_offset += iu.type->numBytes(); - } - } - // End vectorized Block. } + // End vectorized Block. } } + } diff --git a/src/exec/FuseChunk.h b/src/exec/FuseChunk.h index f83336b..6cc394a 100644 --- a/src/exec/FuseChunk.h +++ b/src/exec/FuseChunk.h @@ -9,8 +9,8 @@ namespace inkfuse { -/// Default chunk size (8192) -const uint64_t DEFAULT_CHUNK_SIZE = 1024; +/// Default chunk size (4096) +const uint64_t DEFAULT_CHUNK_SIZE = 4096; /// A column within a FuseChunk. struct Column {