Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Prefetching to Hash Join #35

Merged
merged 4 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:

jobs:
build:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
# Run builds and tests both in Debug and RelWithDebInfo
strategy:
matrix:
Expand All @@ -21,5 +21,8 @@ jobs:
build-type: ${{ matrix.build-type }}
- name: Test
working-directory: ${{github.workspace}}/build
run: ./tester
# Unfortunately we're running into https://github.com/llvm/llvm-project/issues/59432
# This is some Ubuntu packaging issue that causes alloc/dealloc mismatches when asan
# is enabled with libc++
run: ASAN_OPTIONS=alloc_dealloc_mismatch=0 ./tester

2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/")
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -rdynamic")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -rdynamic -stdlib=libc++")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -rdynamic -g -O0 -fsanitize=address")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}")

Expand Down
3 changes: 1 addition & 2 deletions bench/ht_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ BENCHMARK(ht_lookup_unordered_map_nomatch<std::pair<TB32, TB8>>)->ArgsProduct({{
BENCHMARK(ht_lookup_unordered_map_nomatch<std::pair<TB32, TB64>>)->ArgsProduct({{1'000, 100'000, 10'000'000, 50'000'000}});
BENCHMARK(ht_lookup_unordered_map_nomatch<std::pair<TB64, TB8>>)->ArgsProduct({{1'000, 100'000, 10'000'000, 50'000'000}});
BENCHMARK(ht_lookup_unordered_map_nomatch<std::pair<TB64, TB64>>)->ArgsProduct({{1'000, 100'000, 10'000'000, 50'000'000}});
*/

*/
}

}
72 changes: 68 additions & 4 deletions bench/vectorized_ht.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#include "benchmark/benchmark.h"
#include "runtime/NewHashTables.h"
#include "xxhash.h"
#include <algorithm>
#include <chrono>
#include <cstring>
#include <iostream>
#include <vector>

using namespace inkfuse;

/**
* Microbenchmarks inspired by Peter's feedback: In vectorized engines,
* parallel hash table access can be made more efficient than in a tuple-at-a time
Expand Down Expand Up @@ -48,7 +51,6 @@
* BM_ht_perf_vectorized/524288/256 10098416 ns 10093265 ns 72 items_per_second=51.9443M/s
* BM_ht_perf_vectorized/33554432/256 971872286 ns 971838853 ns 1 items_per_second=34.5267M/s
* BM_ht_perf_vectorized/1073741824/256 51425526675 ns 51422464322 ns 1 items_per_second=20.8808M/s
* BM_ht_perf_vectorized/33554432/256 933936147 ns 933873161 ns 1 items_per_second=35.9304M/s
*
*/
namespace {
Expand Down Expand Up @@ -188,10 +190,72 @@ void BM_ht_perf_vectorized(benchmark::State& state) {
state.SetItemsProcessed(state.iterations() * num_elems);
}

void BM_ht_perf_tat_inkfuse(benchmark::State& state) {
const uint64_t num_elems = state.range(0);
inkfuse::SimpleKeyComparator comp{8};
AtomicHashTable<inkfuse::SimpleKeyComparator> ht{comp, 16, 2 * num_elems};
for (uint64_t k = 1; k <= num_elems; ++k) {
const uint64_t key = 7 * k;
char* value = ht.insert<true>(reinterpret_cast<const char*>(&key));
reinterpret_cast<uint64_t*>(value)[1] = k;
}
for (auto _ : state) {
for (uint64_t k = 1; k <= num_elems; ++k) {
const uint64_t key = 7 * k;
char* res = ht.lookup(reinterpret_cast<const char*>(&key));
if (reinterpret_cast<const uint64_t*>(res)[1] > num_elems) {
throw std::runtime_error("bad ht lookup for " + std::to_string(k));
}
}
}
state.SetItemsProcessed(state.iterations() * num_elems);
}

void BM_ht_perf_vectorized_inkfuse(benchmark::State& state) {
const uint64_t num_elems = state.range(0);
const uint64_t batch_size = state.range(1);
inkfuse::SimpleKeyComparator comp{8};
AtomicHashTable<inkfuse::SimpleKeyComparator> ht{comp, 16, 2 * num_elems};
for (uint64_t k = 1; k <= num_elems; ++k) {
const uint64_t key = 7 * k;
char* value = ht.insert<true>(reinterpret_cast<const char*>(&key));
reinterpret_cast<uint64_t*>(value)[1] = k;
}
std::vector<uint64_t> keys(batch_size);
std::vector<uint64_t> hashes(batch_size);
for (auto _ : state) {
// Lookup every key again.
for (uint64_t k = 1; k <= num_elems; k += batch_size) {
const auto curr_batch = std::min(batch_size, num_elems - k + 1);
for (uint64_t tid = 0; tid < curr_batch; ++tid) {
keys[tid] = 7 * (k + tid);
}
for (uint64_t tid = 0; tid < curr_batch; ++tid) {
hashes[tid] = ht.compute_hash(reinterpret_cast<const char*>(&keys[tid]));
}
for (uint64_t tid = 0; tid < curr_batch; ++tid) {
ht.slot_prefetch(hashes[tid]);
}
for (uint64_t tid = 0; tid < curr_batch; ++tid) {
const auto* res = ht.lookup(reinterpret_cast<const char*>(&keys[tid]), hashes[tid]);
if (reinterpret_cast<const uint64_t*>(res)[1] > num_elems) {
throw std::runtime_error("bad ht lookup for " + std::to_string(k));
}
}
}
}
state.SetItemsProcessed(state.iterations() * num_elems);
}

BENCHMARK(BM_ht_perf_tat)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 25)->Arg(1 << 30);
// Different hash table sizes.
BENCHMARK(BM_ht_perf_tat_inkfuse)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 25)->Arg(1 << 30);

BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256);
// Different internal batch sizes.
BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1<<25, 16192);
// Different internal batch sizes. 256 is a good value.
BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192);

BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256);
// Different internal batch sizes. 256 is a good value.
BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192);

} // namespacf
25 changes: 19 additions & 6 deletions src/algebra/CompilationContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

namespace inkfuse {

CompilationContext::CompilationContext(std::string program_name, const Pipeline& pipeline_)
: pipeline(pipeline_), program(std::make_shared<IR::Program>(std::move(program_name), false)), fct_name("execute") {
CompilationContext::CompilationContext(std::string program_name, const Pipeline& pipeline_, OptimizationHints hints_)
: pipeline(pipeline_), program(std::make_shared<IR::Program>(std::move(program_name), false)), fct_name("execute"), optimization_hints(hints_) {
}

CompilationContext::CompilationContext(IR::ProgramArc program_, std::string fct_name_, const Pipeline& pipeline_)
: pipeline(pipeline_), program(std::move(program_)), fct_name(std::move(fct_name_)) {
CompilationContext::CompilationContext(IR::ProgramArc program_, std::string fct_name_, const Pipeline& pipeline_, OptimizationHints hints_)
: pipeline(pipeline_), program(std::move(program_)), fct_name(std::move(fct_name_)), optimization_hints(hints_) {
}

void CompilationContext::compile() {
Expand Down Expand Up @@ -57,8 +57,17 @@ void CompilationContext::notifyIUsReady(Suboperator& op) {
// Consume in the original requestor.
requestor->consume(*iu, *this);
if (++properties[requestor].serviced_requests == requestor->getNumSourceIUs()) {
// Consume in the original requestor notifying it that all children were produced successfuly.
requestor->consumeAllChildren(*this);
const bool generates_fusing = optimization_hints.mode == OptimizationHints::CodegenMode::OperatorFusing;
const bool only_generate_when_vectorized = requestor->getOptimizationProperties().ct_only_vectorized;
if (generates_fusing && only_generate_when_vectorized) {
// We don't need to generate any code for this suboperator.
// Directly mark the output IUs as ready (those are all pseudo IUs).
notifyIUsReady(*requestor);
} else {
// Consume in the original requestor notifying it that all children were produced successfuly.
// Actually let the consumer generate the required code.
requestor->consumeAllChildren(*this);
}
}
}

Expand Down Expand Up @@ -139,6 +148,10 @@ IR::FunctionBuilder& CompilationContext::getFctBuilder() {
return builder->fct_builder;
}

const OptimizationHints& CompilationContext::getOptimizationHints() const {
return optimization_hints;
}

CompilationContext::Builder::Builder(IR::Program& program, std::string fct_name)
: ir_builder(program.getIRBuilder()), fct_builder(createFctBuilder(ir_builder, std::move(fct_name))) {
}
Expand Down
30 changes: 25 additions & 5 deletions src/algebra/CompilationContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,37 @@
#include "exec/FuseChunk.h"

#include <cstdint>
#include <optional>
#include <memory>
#include <unordered_map>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <unordered_map>

namespace inkfuse {

struct Suboperator;

/// Hints that can be used during the code generation to generate more optimized
/// code. Examples:
///
/// When we are generating `OperatorFusing` code, we do not issue prefetch instructions.
/// These are exclusively used in the vectorized backends to issue independent loads
/// and hide cache miss latency for followup operators.
struct OptimizationHints {
enum class CodegenMode {
OperatorFusing,
Vectorized,
};

CodegenMode mode = CodegenMode::OperatorFusing;
};

/// Context for compiling a single pipeline.
struct CompilationContext {
/// Set up a compilation context for generating code for a full given pipeline.
CompilationContext(std::string program_name, const Pipeline& pipeline_);
CompilationContext(std::string program_name, const Pipeline& pipeline_, OptimizationHints hints_ = OptimizationHints{});
/// Set up a compilation context which will generate the code within a specific IR program for the full pipeline.
CompilationContext(IR::ProgramArc program_, std::string fct_name_, const Pipeline& pipeline_);
CompilationContext(IR::ProgramArc program_, std::string fct_name_, const Pipeline& pipeline_, OptimizationHints hints_ = OptimizationHints{});

/// Compile the code for this context.
void compile();
Expand Down Expand Up @@ -50,6 +65,9 @@ struct CompilationContext {
const IR::Program& getProgram();
IR::FunctionBuilder& getFctBuilder();

/// Get the optimization hints for the generated program.
const OptimizationHints& getOptimizationHints() const;

private:
static IR::FunctionBuilder createFctBuilder(IR::IRBuilder& program, std::string fct_name);

Expand All @@ -73,6 +91,8 @@ struct CompilationContext {
const std::string fct_name;
/// The backing IR program.
IR::ProgramArc program;
/// Optimization hints that can be used during code generation.
OptimizationHints optimization_hints;
/// The function builder for the generated code.
std::optional<Builder> builder;
/// Which sub-operators were computed already? Needed to prevent double-computation in DAGs.
Expand Down
36 changes: 31 additions & 5 deletions src/algebra/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,30 @@ void materializedTupleToHashTable(
assert(ht_state.hash_table);
assert(!mat.handles.empty());
assert(mat.handles.size() == mat.materializers.size());
const size_t batch_size = 256;
std::vector<uint64_t> hashes(batch_size);
for (auto& read_handle : mat.handles) {
// Pick morsels from the read handle.
while (const TupleMaterializer::MatChunk* chunk = read_handle->pullChunk()) {
// Materialize all tuples from the chunk.
// We traverse the materialized tuple in batches of 256 similar as a vectorized
// engine would. For large hash tables this increases throughput significantly.
const char* curr_tuple = reinterpret_cast<const char*>(chunk->data.get());
while (curr_tuple < chunk->end_ptr) {
// Copy over the whole tuple into the hash table.
ht_state.hash_table->insert<false>(curr_tuple);
size_t curr_batch_size = std::min(batch_size, (chunk->end_ptr - curr_tuple) / slot_size);
const char* curr_tuple_hash_it = curr_tuple;
for (size_t batch_idx = 0; batch_idx < curr_batch_size; ++batch_idx) {
hashes[batch_idx] = ht_state.hash_table->compute_hash(curr_tuple_hash_it);
curr_tuple_hash_it += slot_size;
}
for (size_t batch_idx = 0; batch_idx < curr_batch_size; ++batch_idx) {
ht_state.hash_table->slot_prefetch(hashes[batch_idx]);
}
for (size_t batch_idx = 0; batch_idx < curr_batch_size; ++batch_idx) {
ht_state.hash_table->insert<false>(curr_tuple, hashes[batch_idx]);
curr_tuple += slot_size;
}
// Move to the next tuple.
curr_tuple += slot_size;
}
}
}
Expand Down Expand Up @@ -131,6 +145,11 @@ void Join::plan() {
lookup_left.emplace(IR::Pointer::build(IR::Char::build()));
lookup_right.emplace(IR::Pointer::build(IR::Char::build()));
filter_pseudo_iu.emplace(IR::Void::build());

// The probe hash is always a unit64_t.
hash_right.emplace(IR::UnsignedInt::build(8));
// Pseudo IU for making sure we prefetch before we probe.
prefetch_pseudo.emplace(IR::Void::build());
}

void Join::decay(inkfuse::PipelineDAG& dag) const {
Expand Down Expand Up @@ -245,12 +264,19 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const {
pseudo.push_back(&pseudo_iu);
}

// 2.2.1 Compute the hash.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htHash<AtomicHashTable<SimpleKeyComparator>>(this, *hash_right, *scratch_pad_right, std::move(pseudo), &ht_state));

// 2.2.2 Prefetch the slot.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htPrefetch<AtomicHashTable<SimpleKeyComparator>>(this, &*prefetch_pseudo, *hash_right, &ht_state));

// 2.2.3 Perfom the lookup.
if (type == JoinType::LeftSemi) {
// Lookup on a slot disables the slot, giving semi-join behaviour.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupDisable(this, *lookup_right, *scratch_pad_right, std::move(pseudo), &ht_state));
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash<AtomicHashTable<SimpleKeyComparator>, true>(this, *lookup_right, *scratch_pad_right, *hash_right, &*prefetch_pseudo, &ht_state));
} else {
// Regular lookup that does not disable slots.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookup<AtomicHashTable<SimpleKeyComparator>>(this, *lookup_right, *scratch_pad_right, std::move(pseudo), &ht_state));
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash<AtomicHashTable<SimpleKeyComparator>, false>(this, *lookup_right, *scratch_pad_right, *hash_right, &*prefetch_pseudo, &ht_state));
}

// 2.3 Filter on probe matches.
Expand Down
5 changes: 5 additions & 0 deletions src/algebra/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ struct Join : public RelAlgOp {
/// Packed scratch pad IU right.
std::optional<IU> scratch_pad_right;

/// Computed hash on the probe side.
std::optional<IU> hash_right;
/// Prefetch pseudo IU - ensures that we prefetch before probing.
std::optional<IU> prefetch_pseudo;

/// Lookup result left.
std::optional<IU> lookup_left;
/// Lookup result right.
Expand Down
23 changes: 0 additions & 23 deletions src/algebra/suboperators/RuntimeFunctionSubop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,6 @@ std::unique_ptr<RuntimeFunctionSubop> RuntimeFunctionSubop::htInsert(const inkfu
pointers_));
}

std::unique_ptr<RuntimeFunctionSubop> RuntimeFunctionSubop::htLookupDisable(const RelAlgOp* source, const IU& pointers_, const IU& keys_, std::vector<const IU*> pseudo_ius_, DefferredStateInitializer* state_init_) {
std::string fct_name = "ht_at_sk_lookup_disable";
std::vector<const IU*> in_ius{&keys_};
for (auto pseudo : pseudo_ius_) {
// Pseudo IUs are used as input IUs in the backing graph, but do not influence arguments.
in_ius.push_back(pseudo);
}
std::vector<bool> ref{keys_.type->id() != "ByteArray" && keys_.type->id() != "Ptr_Char"};
std::vector<const IU*> out_ius_{&pointers_};
std::vector<const IU*> args{&keys_};
const IU* out = &pointers_;
return std::unique_ptr<RuntimeFunctionSubop>(
new RuntimeFunctionSubop(
source,
state_init_,
std::move(fct_name),
std::move(in_ius),
std::move(out_ius_),
std::move(args),
std::move(ref),
out));
}

std::unique_ptr<RuntimeFunctionSubop> RuntimeFunctionSubop::htNoKeyLookup(const RelAlgOp* source, const IU& pointers_, const IU& input_dependency, DefferredStateInitializer* state_init_) {
std::string fct_name = "ht_nk_lookup";
std::vector<const IU*> in_ius{&input_dependency};
Expand Down
Loading
Loading