Skip to content

Commit

Permalink
Get Microbenchmarks to Build Again
Browse files Browse the repository at this point in the history
  • Loading branch information
wagjamin authored and Benjamin Wagner committed Nov 4, 2023
1 parent beb1a12 commit 8a44c3a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 72 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
# Generate DWARF 4 in debug to work on older GDB versions
# flto required as xxhash is also built with flto to allow efficient inlining
# of the hash functions.
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++ -gdwarf-4 -flto")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -stdlib=libc++ -flto")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -O0 -fsanitize=address")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")
Expand Down Expand Up @@ -292,6 +292,7 @@ if (WITH_BENCH)
bench/compiler_invoke.cpp
bench/ht_benchmark.cpp
bench/vectorized_ht.cpp
$<TARGET_OBJECTS:inkfuse_runtime>
)
target_link_libraries(inkbench PUBLIC benchmark::benchmark inkfuse)
# Move the testdata into the binary tree for easy ingest tests.
Expand Down
16 changes: 7 additions & 9 deletions bench/vectorized_ht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ void BM_ht_perf_vectorized_inkfuse(benchmark::State& state) {
}
std::vector<uint64_t> keys(batch_size);
std::vector<uint64_t> hashes(batch_size);
std::vector<char*> results(batch_size);
for (auto _ : state) {
// Lookup every key again.
for (uint64_t k = 1; k <= num_elems; k += batch_size) {
Expand All @@ -277,28 +278,25 @@ void BM_ht_perf_vectorized_inkfuse(benchmark::State& state) {
hashes[tid] = ht.compute_hash_and_prefetch(reinterpret_cast<const char*>(&keys[tid]));
}
for (uint64_t tid = 0; tid < curr_batch; ++tid) {
const auto* res = ht.lookup(reinterpret_cast<const char*>(&keys[tid]), hashes[tid]);
if (reinterpret_cast<const uint64_t*>(res)[1] > num_elems) {
throw std::runtime_error("bad ht lookup for " + std::to_string(k));
}
results[tid] = ht.lookup(reinterpret_cast<const char*>(&keys[tid]), hashes[tid]);
}
}
}
state.SetItemsProcessed(state.iterations() * num_elems);
}

BENCHMARK(BM_ht_perf_tat)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 25)->Arg(1 << 30);
BENCHMARK(BM_ht_perf_tat_inkfuse)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 25)->Arg(1 << 30);
BENCHMARK(BM_ht_perf_tat)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 21)->Arg(1 << 25)->Arg(1 << 30);
BENCHMARK(BM_ht_perf_tat_inkfuse)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 21)->Arg(1 << 25)->Arg(1 << 30);

BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256);
BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 21, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256);
// Different internal batch sizes. 256 is a good value.
BENCHMARK(BM_ht_perf_vectorized)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192);

BENCHMARK(BM_ht_perf_vectorized_rof)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256);
BENCHMARK(BM_ht_perf_vectorized_rof)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 21, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256);
// Different internal batch sizes. 256 is a good value.
BENCHMARK(BM_ht_perf_vectorized_rof)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192);

BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256);
BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 21, 256)->ArgPair(1 << 25, 256)->ArgPair(1 << 30, 256);
// Different internal batch sizes. 256 is a good value.
BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192);

Expand Down
119 changes: 59 additions & 60 deletions src/algebra/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,79 +257,78 @@ void Join::decayPkJoin(inkfuse::PipelineDAG& dag) const {

// 2.2 Probe.
{
{
// Perform the actual lookup in a fully vectorized fashion.
Pipeline::ROFScopeGuard rof_guard{probe_pipe};
// Perform the actual lookup in a fully vectorized fashion.
Pipeline::ROFScopeGuard rof_guard{probe_pipe};

std::vector<const IU*> pseudo;
for (const auto& pseudo_iu : right_pseudo_ius) {
pseudo.push_back(&pseudo_iu);
}
std::vector<const IU*> pseudo;
for (const auto& pseudo_iu : right_pseudo_ius) {
pseudo.push_back(&pseudo_iu);
}

// 2.2.1 Compute the hash and prefetch the slot.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htHashAndPrefetch<AtomicHashTable<SimpleKeyComparator>>(this, *hash_right, *scratch_pad_right, std::move(pseudo), key_size_left, &ht_state));
// 2.2.1 Compute the hash and prefetch the slot.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htHashAndPrefetch<AtomicHashTable<SimpleKeyComparator>>(this, *hash_right, *scratch_pad_right, std::move(pseudo), key_size_left, &ht_state));

// 2.2.2 Perfom the lookup.
if (type == JoinType::LeftSemi) {
// Lookup on a slot disables the slot, giving semi-join behaviour.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash<AtomicHashTable<SimpleKeyComparator>, true>(this, *lookup_right, *scratch_pad_right, *hash_right, /* prefetch_pseudo = */ nullptr, &ht_state));
} else {
// Regular lookup that does not disable slots.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash<AtomicHashTable<SimpleKeyComparator>, false>(this, *lookup_right, *scratch_pad_right, *hash_right, /* prefetch_pseudo = */ nullptr, &ht_state));
}
// 2.2.2 Perfom the lookup.
if (type == JoinType::LeftSemi) {
// Lookup on a slot disables the slot, giving semi-join behaviour.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash<AtomicHashTable<SimpleKeyComparator>, true>(this, *lookup_right, *scratch_pad_right, *hash_right, /* prefetch_pseudo = */ nullptr, &ht_state));
} else {
// Regular lookup that does not disable slots.
probe_pipe.attachSuboperator(RuntimeFunctionSubop::htLookupWithHash<AtomicHashTable<SimpleKeyComparator>, false>(this, *lookup_right, *scratch_pad_right, *hash_right, /* prefetch_pseudo = */ nullptr, &ht_state));
}
}

// 2.3 Filter on probe matches.
auto& filter_scope_subop = probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu));
auto& filter_scope = reinterpret_cast<ColumnFilterScope&>(filter_scope_subop);
// The filter on the build site filters "itself". This has some repercussions on the repiping
// behaviour of the suboperator and needs to be passed explicitly.
auto& filter_1 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true));
filter_scope.attachFilterLogicDependency(filter_1, *lookup_right);
if (type != JoinType::LeftSemi) {
// If we need to produce columns on the probe side, we also have to filter the probe result.
// Note: the filtered ByteArray from the probe side becomes a Char* after filtering.
auto& filter_2 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type));
filter_scope.attachFilterLogicDependency(filter_2, *scratch_pad_right);
}
// 2.3 Filter on probe matches.
auto& filter_scope_subop = probe_pipe.attachSuboperator(ColumnFilterScope::build(this, *lookup_right, *filter_pseudo_iu));
auto& filter_scope = reinterpret_cast<ColumnFilterScope&>(filter_scope_subop);
// The filter on the build site filters "itself". This has some repercussions on the repiping
// behaviour of the suboperator and needs to be passed explicitly.
auto& filter_1 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *lookup_right, *filtered_build, /* filter_type= */ lookup_right->type, /* filters_itself= */ true));
filter_scope.attachFilterLogicDependency(filter_1, *lookup_right);
if (type != JoinType::LeftSemi) {
// If we need to produce columns on the probe side, we also have to filter the probe result.
// Note: the filtered ByteArray from the probe side becomes a Char* after filtering.
auto& filter_2 = probe_pipe.attachSuboperator(ColumnFilterLogic::build(this, *filter_pseudo_iu, *scratch_pad_right, *filtered_probe, /* filter_type_= */ lookup_right->type));
filter_scope.attachFilterLogicDependency(filter_2, *scratch_pad_right);
}

// 2.4 Unpack everything.
// 2.4.1 Unpack Build Side IUs
size_t build_unpack_offset = 0;
for (const auto& iu : keys_left_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu));
// 2.4 Unpack everything.
// 2.4.1 Unpack Build Side IUs
size_t build_unpack_offset = 0;
for (const auto& iu : keys_left_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(build_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
build_unpack_offset += iu.type->numBytes();
}
for (const auto& iu : payload_left_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(build_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
build_unpack_offset += iu.type->numBytes();
}
// 2.4.1 Unpack Probe Side IUs. Not needed for semi joins.
if (type != JoinType::LeftSemi) {
size_t probe_unpack_offset = 0;
for (const auto& iu : keys_right_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(build_unpack_offset));
param.offsetSet(IR::UI<2>::build(probe_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
build_unpack_offset += iu.type->numBytes();
probe_unpack_offset += iu.type->numBytes();
}
for (const auto& iu : payload_left_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_build, iu));
for (const auto& iu : payload_right_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(build_unpack_offset));
param.offsetSet(IR::UI<2>::build(probe_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
build_unpack_offset += iu.type->numBytes();
probe_unpack_offset += iu.type->numBytes();
}
// 2.4.1 Unpack Probe Side IUs. Not needed for semi joins.
if (type != JoinType::LeftSemi) {
size_t probe_unpack_offset = 0;
for (const auto& iu : keys_right_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(probe_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
probe_unpack_offset += iu.type->numBytes();
}
for (const auto& iu : payload_right_out) {
auto& unpacker = probe_pipe.attachSuboperator(KeyUnpackerSubop::build(this, *filtered_probe, iu));
KeyPackingRuntimeParams param;
param.offsetSet(IR::UI<2>::build(probe_unpack_offset));
reinterpret_cast<KeyUnpackerSubop&>(unpacker).attachRuntimeParams(std::move(param));
probe_unpack_offset += iu.type->numBytes();
}
}
// End vectorized Block.
}
// End vectorized Block.
}
}

}
4 changes: 2 additions & 2 deletions src/exec/FuseChunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

namespace inkfuse {

/// Default chunk size (8192)
const uint64_t DEFAULT_CHUNK_SIZE = 1024;
/// Default chunk size (4096)
const uint64_t DEFAULT_CHUNK_SIZE = 4096;

/// A column within a FuseChunk.
struct Column {
Expand Down

0 comments on commit 8a44c3a

Please sign in to comment.