Skip to content

Commit

Permalink
Add Multithreaded Hash Table Benchmarks
Browse files Browse the repository at this point in the history
With multithreading the vectorized hash tables still perform better.
  • Loading branch information
wagjamin committed Nov 6, 2023
1 parent acd0162 commit 0070b4a
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -stdlib=libc++")
# For benchmarks: easier profiling & links against system installed googlebench
# if that was build with non libcxx.
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -flto -fno-omit-frame-pointer")
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -flto")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -O0 -fsanitize=address -flto=thin")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3 -flto")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -flto")
Expand Down
86 changes: 85 additions & 1 deletion bench/vectorized_ht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <algorithm>
#include <chrono>
#include <cstring>
#include <iostream>
#include <thread>
#include <vector>

using namespace inkfuse;
Expand Down Expand Up @@ -285,6 +285,86 @@ void BM_ht_perf_vectorized_inkfuse(benchmark::State& state) {
state.SetItemsProcessed(state.iterations() * num_elems);
}

void BM_ht_perf_tat_inkfuse_mt(benchmark::State& state) {
const uint64_t num_elems = state.range(0);
const uint64_t num_threads = 16;
inkfuse::SimpleKeyComparator comp{8};
AtomicHashTable<inkfuse::SimpleKeyComparator> ht{comp, 16, 2 * num_elems};
for (uint64_t k = 1; k <= num_elems; ++k) {
const uint64_t key = 7 * k;
char* value = ht.insert<true>(reinterpret_cast<const char*>(&key));
reinterpret_cast<uint64_t*>(value)[1] = k;
}
for (auto _ : state) {
auto start = std::chrono::high_resolution_clock::now();
std::vector<std::thread> threads;
for (size_t th = 0; th < num_threads; ++th) {
threads.emplace_back([&, th]() {
size_t offset = th * (num_elems / num_threads);
for (uint64_t k = 1; k <= num_elems; ++k) {
const uint64_t key = 7 * (((k + offset) % num_elems) + 1);
const uint64_t hash = ht.compute_hash_and_prefetch_fixed<8>(reinterpret_cast<const char*>(&key));
const auto res = ht.lookup(reinterpret_cast<const char*>(&key), hash);
if (reinterpret_cast<const uint64_t*>(res)[1] > num_elems) {
throw std::runtime_error("bad ht lookup for " + std::to_string(k));
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
auto stop = std::chrono::high_resolution_clock::now();
auto elapsed_s = std::chrono::duration_cast<std::chrono::duration<double>>(stop - start);
state.SetIterationTime(elapsed_s.count());
}
state.SetItemsProcessed(state.iterations() * num_elems * num_threads);
}

void BM_ht_perf_vectorized_inkfuse_mt(benchmark::State& state) {
const uint64_t num_elems = state.range(0);
const uint64_t batch_size = state.range(1);
const uint64_t num_threads = 16;
inkfuse::SimpleKeyComparator comp{8};
AtomicHashTable<inkfuse::SimpleKeyComparator> ht{comp, 16, 2 * num_elems};
for (uint64_t k = 1; k <= num_elems; ++k) {
const uint64_t key = 7 * k;
char* value = ht.insert<true>(reinterpret_cast<const char*>(&key));
reinterpret_cast<uint64_t*>(value)[1] = k;
}
for (auto _ : state) {
auto start = std::chrono::high_resolution_clock::now();
std::vector<std::thread> threads;
for (size_t th = 0; th < num_threads; ++th) {
threads.emplace_back([&, th]() {
std::vector<uint64_t> keys(batch_size);
std::vector<uint64_t> hashes(batch_size);
std::vector<char*> results(batch_size);
size_t offset = th * (num_elems / num_threads);
for (uint64_t k = 1; k <= num_elems; k += batch_size) {
const auto curr_batch = std::min(batch_size, num_elems - k + 1);
for (uint64_t tid = 0; tid < curr_batch; ++tid) {
keys[tid] = 7 * (((k + tid + offset) % num_elems) + 1);
}
for (uint64_t tid = 0; tid < curr_batch; ++tid) {
hashes[tid] = ht.compute_hash_and_prefetch_fixed<8>(reinterpret_cast<const char*>(&keys[tid]));
}
for (uint64_t tid = 0; tid < curr_batch; ++tid) {
results[tid] = ht.lookup(reinterpret_cast<const char*>(&keys[tid]), hashes[tid]);
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
auto stop = std::chrono::high_resolution_clock::now();
auto elapsed_s = std::chrono::duration_cast<std::chrono::duration<double>>(stop - start);
state.SetIterationTime(elapsed_s.count());
}
state.SetItemsProcessed(state.iterations() * num_elems * num_threads);
}

BENCHMARK(BM_ht_perf_tat)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 21)->Arg(1 << 25)->Arg(1 << 30);
BENCHMARK(BM_ht_perf_tat_inkfuse)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 21)->Arg(1 << 25)->Arg(1 << 30);

Expand All @@ -300,4 +380,8 @@ BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 9, 256)->ArgPair(1 << 13,
// Different internal batch sizes. 256 is a good value.
BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192);

// Multithreaded hash table benchmarks.
BENCHMARK(BM_ht_perf_tat_inkfuse_mt)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 21, 256)->ArgPair(1 << 25, 256)->UseManualTime();
BENCHMARK(BM_ht_perf_vectorized_inkfuse_mt)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 21, 256)->ArgPair(1 << 25, 256)->UseManualTime();

} // namespace
2 changes: 1 addition & 1 deletion src/exec/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "runtime/MemoryRuntime.h"

#include <chrono>
#include <cstring>
#include <iostream>

namespace inkfuse {
Expand Down Expand Up @@ -58,7 +59,6 @@ void resetCpuAffinity() {
std::cerr << "Could not reset worker thread affinity: " << rc << "\n";
}
}

};

using ROFStrategy = Suboperator::OptimizationProperties::ROFStrategy;
Expand Down

0 comments on commit 0070b4a

Please sign in to comment.