diff --git a/CMakeLists.txt b/CMakeLists.txt index efb2e31..e9737aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,9 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # flto required as xxhash is also built with flto to allow efficient inlining # of the hash functions. set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -stdlib=libc++ -flto") +# For benchmarks: easier profiling & links against system installed googlebench +# if that was build with non libcxx. +# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -flto -fno-omit-frame-pointer") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -O0 -fsanitize=address") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3") diff --git a/src/algebra/Join.cpp b/src/algebra/Join.cpp index 6b11b75..1397716 100644 --- a/src/algebra/Join.cpp +++ b/src/algebra/Join.cpp @@ -330,5 +330,4 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const { // End vectorized Block. } } - } diff --git a/src/exec/FuseChunk.h b/src/exec/FuseChunk.h index 6cc394a..f6dcef7 100644 --- a/src/exec/FuseChunk.h +++ b/src/exec/FuseChunk.h @@ -9,8 +9,8 @@ namespace inkfuse { -/// Default chunk size (4096) -const uint64_t DEFAULT_CHUNK_SIZE = 4096; +/// Default chunk size 512 +const uint64_t DEFAULT_CHUNK_SIZE = 512; /// A column within a FuseChunk. struct Column { diff --git a/src/exec/runners/InterpretedRunner.cpp b/src/exec/runners/InterpretedRunner.cpp index 19660ac..839b720 100644 --- a/src/exec/runners/InterpretedRunner.cpp +++ b/src/exec/runners/InterpretedRunner.cpp @@ -1,5 +1,4 @@ #include "InterpretedRunner.h" -#include "algebra/suboperators/row_layout/KeyPackerSubop.h" #include "algebra/suboperators/sources/TableScanSource.h" #include "interpreter/FragmentCache.h" @@ -27,6 +26,14 @@ InterpretedRunner::InterpretedRunner(const Pipeline& backing_pipeline, size_t id .fuse_chunk_ptrs{original_context.getNumThreads()}, }; } + + // Extract all key packer IUs as these need special treatment during interpretation. + for (const auto& subop : pipe->getSubops()) { + if (auto* as_key_packer = dynamic_cast(subop.get())) { + assert(as_key_packer->getSourceIUs().size() == 2); + key_packer_ius.push_back(as_key_packer->getSourceIUs()[1]); + } + } } InterpretedRunner::~InterpretedRunner() { @@ -58,12 +65,8 @@ Suboperator::PickMorselResult InterpretedRunner::pickMorsel(size_t thread_id) { // It could be that we require scratch pad IUs within this pipeline. // As these are never officially produced by an expression, we need // to make sure their sizes are set up properly. - for (const auto& subop : pipe->getSubops()) { - if (dynamic_cast(subop.get())) { - assert(subop->getSourceIUs().size() == 2); - const IU* scratch_pad_iu = subop->getSourceIUs()[1]; - context.getColumn(*scratch_pad_iu, thread_id).size = picked->morsel_size; - } + for (const IU* key_packer_iu : key_packer_ius) { + context.getColumn(*key_packer_iu, thread_id).size = picked->morsel_size; } } @@ -99,10 +102,8 @@ void InterpretedRunner::runZeroCopyScan(size_t thread_id) { zero_copy_state->fuse_chunk_ptrs[thread_id] = context.getColumn(*zero_copy_state->output_iu, thread_id).raw_data; } // Get the state of the picked morsel. - TScanDriver* driver = dynamic_cast(pipe->suboperators[0].get()); - TScanIUProvider* provider = dynamic_cast(pipe->suboperators[1].get()); - assert(driver); - assert(provider); + TScanDriver* driver = reinterpret_cast(pipe->suboperators[0].get()); + TScanIUProvider* provider = reinterpret_cast(pipe->suboperators[1].get()); const auto driver_state = reinterpret_cast(driver->accessState(thread_id)); const auto provider_state = reinterpret_cast(provider->accessState(thread_id)); // And directly update the target fuse chunk column. diff --git a/src/exec/runners/InterpretedRunner.h b/src/exec/runners/InterpretedRunner.h index eeed2db..1b29904 100644 --- a/src/exec/runners/InterpretedRunner.h +++ b/src/exec/runners/InterpretedRunner.h @@ -2,6 +2,7 @@ #define INKFUSE_PIPELINEINTERPRETER_H #include "PipelineRunner.h" +#include "algebra/suboperators/row_layout/KeyPackerSubop.h" #include #include #include @@ -52,6 +53,8 @@ struct InterpretedRunner final : public PipelineRunner { std::vector fuse_chunk_ptrs; }; std::optional zero_copy_state; + /// Key packing suboperators IUs in the interpreted graph. + std::vector key_packer_ius; /// Custom interpreter for a zero copy scan. void runZeroCopyScan(size_t thread_id); };