Skip to content

Commit

Permalink
Interpreter Performance Tuning
Browse files Browse the repository at this point in the history
If we want to effectively use prefetching for our hash tables we need to
make sure that we don't evict some of the prefetched data because our
chunks are too large.

Our microbenchmarks show that this means we can go up to a maximum chunk
size of about 512 before hash table throughput starts dropping again.

This commit reduces the morsel size to 512 and improves performance in
the interpreter backend sufficiently that we are able to retain the same
performance.

We were doing an excessive number of dynamic casts in the interpreted
driver that started showing up in our perf traces once we dropped to a
chunk size of 512 or 256 rows per morsel.
We now only do those dynamic casts once during interpreter setup and
then cache the appropriately casted objects. This is especially
important for the special casing of the FuseChunks.
  • Loading branch information
wagjamin committed Nov 4, 2023
1 parent 8a44c3a commit 795e89d
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 14 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
# flto required as xxhash is also built with flto to allow efficient inlining
# of the hash functions.
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -stdlib=libc++ -flto")
# For benchmarks: easier profiling & links against system installed googlebench
# if that was build with non libcxx.
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -flto -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -O0 -fsanitize=address")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")
Expand Down
1 change: 0 additions & 1 deletion src/algebra/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,5 +330,4 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const {
// End vectorized Block.
}
}

}
4 changes: 2 additions & 2 deletions src/exec/FuseChunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

namespace inkfuse {

/// Default chunk size (4096)
const uint64_t DEFAULT_CHUNK_SIZE = 4096;
/// Default chunk size 512
const uint64_t DEFAULT_CHUNK_SIZE = 512;

/// A column within a FuseChunk.
struct Column {
Expand Down
23 changes: 12 additions & 11 deletions src/exec/runners/InterpretedRunner.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "InterpretedRunner.h"
#include "algebra/suboperators/row_layout/KeyPackerSubop.h"
#include "algebra/suboperators/sources/TableScanSource.h"
#include "interpreter/FragmentCache.h"

Expand Down Expand Up @@ -27,6 +26,14 @@ InterpretedRunner::InterpretedRunner(const Pipeline& backing_pipeline, size_t id
.fuse_chunk_ptrs{original_context.getNumThreads()},
};
}

// Extract all key packer IUs as these need special treatment during interpretation.
for (const auto& subop : pipe->getSubops()) {
if (auto* as_key_packer = dynamic_cast<KeyPackerSubop*>(subop.get())) {
assert(as_key_packer->getSourceIUs().size() == 2);
key_packer_ius.push_back(as_key_packer->getSourceIUs()[1]);
}
}
}

InterpretedRunner::~InterpretedRunner() {
Expand Down Expand Up @@ -58,12 +65,8 @@ Suboperator::PickMorselResult InterpretedRunner::pickMorsel(size_t thread_id) {
// It could be that we require scratch pad IUs within this pipeline.
// As these are never officially produced by an expression, we need
// to make sure their sizes are set up properly.
for (const auto& subop : pipe->getSubops()) {
if (dynamic_cast<KeyPackerSubop*>(subop.get())) {
assert(subop->getSourceIUs().size() == 2);
const IU* scratch_pad_iu = subop->getSourceIUs()[1];
context.getColumn(*scratch_pad_iu, thread_id).size = picked->morsel_size;
}
for (const IU* key_packer_iu : key_packer_ius) {
context.getColumn(*key_packer_iu, thread_id).size = picked->morsel_size;
}
}

Expand Down Expand Up @@ -99,10 +102,8 @@ void InterpretedRunner::runZeroCopyScan(size_t thread_id) {
zero_copy_state->fuse_chunk_ptrs[thread_id] = context.getColumn(*zero_copy_state->output_iu, thread_id).raw_data;
}
// Get the state of the picked morsel.
TScanDriver* driver = dynamic_cast<TScanDriver*>(pipe->suboperators[0].get());
TScanIUProvider* provider = dynamic_cast<TScanIUProvider*>(pipe->suboperators[1].get());
assert(driver);
assert(provider);
TScanDriver* driver = reinterpret_cast<TScanDriver*>(pipe->suboperators[0].get());
TScanIUProvider* provider = reinterpret_cast<TScanIUProvider*>(pipe->suboperators[1].get());
const auto driver_state = reinterpret_cast<LoopDriverState*>(driver->accessState(thread_id));
const auto provider_state = reinterpret_cast<IndexedIUProviderState*>(provider->accessState(thread_id));
// And directly update the target fuse chunk column.
Expand Down
3 changes: 3 additions & 0 deletions src/exec/runners/InterpretedRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define INKFUSE_PIPELINEINTERPRETER_H

#include "PipelineRunner.h"
#include "algebra/suboperators/row_layout/KeyPackerSubop.h"
#include <functional>
#include <map>
#include <string>
Expand Down Expand Up @@ -52,6 +53,8 @@ struct InterpretedRunner final : public PipelineRunner {
std::vector<char*> fuse_chunk_ptrs;
};
std::optional<ZeroCopyScanState> zero_copy_state;
/// Key packing suboperators IUs in the interpreted graph.
std::vector<const IU*> key_packer_ius;
/// Custom interpreter for a zero copy scan.
void runZeroCopyScan(size_t thread_id);
};
Expand Down

0 comments on commit 795e89d

Please sign in to comment.