From 795e89db059c913f18fde1802e1a1233531b816a Mon Sep 17 00:00:00 2001 From: Benjamin Wagner Date: Sat, 4 Nov 2023 20:50:33 +0100 Subject: [PATCH] Interpreter Performance Tuning If we want to effectively use prefetching for our hash tables we need to make sure that we don't evict some of the prefetched data because our chunks are too large. Our microbenchmarks show that this means we can go up to a maximum chunk size of about 512 before hash table throughput starts dropping again. This commit reduces the morsel size to 512 and improves performance in the interpreter backend sufficiently that we are able to retain the same performance. We were doing an excessive number of dynamic casts in the interpreted driver that started showing up in our perf traces once we dropped to a chunk size of 512 or 256 rows per morsel. We now only do those dynamic casts once during interpreter setup and then cache the appropriately casted objects. This is especially important for the special casing of the FuseChunks. --- CMakeLists.txt | 3 +++ src/algebra/Join.cpp | 1 - src/exec/FuseChunk.h | 4 ++-- src/exec/runners/InterpretedRunner.cpp | 23 ++++++++++++----------- src/exec/runners/InterpretedRunner.h | 3 +++ 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index efb2e31..e9737aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,9 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # flto required as xxhash is also built with flto to allow efficient inlining # of the hash functions. set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -stdlib=libc++ -flto") +# For benchmarks: easier profiling & links against system installed googlebench +# if that was build with non libcxx. +# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -flto -fno-omit-frame-pointer") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -O0 -fsanitize=address") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3") diff --git a/src/algebra/Join.cpp b/src/algebra/Join.cpp index 6b11b75..1397716 100644 --- a/src/algebra/Join.cpp +++ b/src/algebra/Join.cpp @@ -330,5 +330,4 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const { // End vectorized Block. } } - } diff --git a/src/exec/FuseChunk.h b/src/exec/FuseChunk.h index 6cc394a..f6dcef7 100644 --- a/src/exec/FuseChunk.h +++ b/src/exec/FuseChunk.h @@ -9,8 +9,8 @@ namespace inkfuse { -/// Default chunk size (4096) -const uint64_t DEFAULT_CHUNK_SIZE = 4096; +/// Default chunk size 512 +const uint64_t DEFAULT_CHUNK_SIZE = 512; /// A column within a FuseChunk. struct Column { diff --git a/src/exec/runners/InterpretedRunner.cpp b/src/exec/runners/InterpretedRunner.cpp index 19660ac..839b720 100644 --- a/src/exec/runners/InterpretedRunner.cpp +++ b/src/exec/runners/InterpretedRunner.cpp @@ -1,5 +1,4 @@ #include "InterpretedRunner.h" -#include "algebra/suboperators/row_layout/KeyPackerSubop.h" #include "algebra/suboperators/sources/TableScanSource.h" #include "interpreter/FragmentCache.h" @@ -27,6 +26,14 @@ InterpretedRunner::InterpretedRunner(const Pipeline& backing_pipeline, size_t id .fuse_chunk_ptrs{original_context.getNumThreads()}, }; } + + // Extract all key packer IUs as these need special treatment during interpretation. + for (const auto& subop : pipe->getSubops()) { + if (auto* as_key_packer = dynamic_cast(subop.get())) { + assert(as_key_packer->getSourceIUs().size() == 2); + key_packer_ius.push_back(as_key_packer->getSourceIUs()[1]); + } + } } InterpretedRunner::~InterpretedRunner() { @@ -58,12 +65,8 @@ Suboperator::PickMorselResult InterpretedRunner::pickMorsel(size_t thread_id) { // It could be that we require scratch pad IUs within this pipeline. // As these are never officially produced by an expression, we need // to make sure their sizes are set up properly. - for (const auto& subop : pipe->getSubops()) { - if (dynamic_cast(subop.get())) { - assert(subop->getSourceIUs().size() == 2); - const IU* scratch_pad_iu = subop->getSourceIUs()[1]; - context.getColumn(*scratch_pad_iu, thread_id).size = picked->morsel_size; - } + for (const IU* key_packer_iu : key_packer_ius) { + context.getColumn(*key_packer_iu, thread_id).size = picked->morsel_size; } } @@ -99,10 +102,8 @@ void InterpretedRunner::runZeroCopyScan(size_t thread_id) { zero_copy_state->fuse_chunk_ptrs[thread_id] = context.getColumn(*zero_copy_state->output_iu, thread_id).raw_data; } // Get the state of the picked morsel. - TScanDriver* driver = dynamic_cast(pipe->suboperators[0].get()); - TScanIUProvider* provider = dynamic_cast(pipe->suboperators[1].get()); - assert(driver); - assert(provider); + TScanDriver* driver = reinterpret_cast(pipe->suboperators[0].get()); + TScanIUProvider* provider = reinterpret_cast(pipe->suboperators[1].get()); const auto driver_state = reinterpret_cast(driver->accessState(thread_id)); const auto provider_state = reinterpret_cast(provider->accessState(thread_id)); // And directly update the target fuse chunk column. diff --git a/src/exec/runners/InterpretedRunner.h b/src/exec/runners/InterpretedRunner.h index eeed2db..1b29904 100644 --- a/src/exec/runners/InterpretedRunner.h +++ b/src/exec/runners/InterpretedRunner.h @@ -2,6 +2,7 @@ #define INKFUSE_PIPELINEINTERPRETER_H #include "PipelineRunner.h" +#include "algebra/suboperators/row_layout/KeyPackerSubop.h" #include #include #include @@ -52,6 +53,8 @@ struct InterpretedRunner final : public PipelineRunner { std::vector fuse_chunk_ptrs; }; std::optional zero_copy_state; + /// Key packing suboperators IUs in the interpreted graph. + std::vector key_packer_ius; /// Custom interpreter for a zero copy scan. void runZeroCopyScan(size_t thread_id); };