diff --git a/CMakeLists.txt b/CMakeLists.txt index 1589d11..fe4cba1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -stdlib=libc++") # For benchmarks: easier profiling & links against system installed googlebench # if that was build with non libcxx. -# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -flto -fno-omit-frame-pointer") +# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -gdwarf-4 -flto") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -O0 -fsanitize=address -flto=thin") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3 -flto") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -flto") diff --git a/bench/vectorized_ht.cpp b/bench/vectorized_ht.cpp index 9cd8510..36d019b 100644 --- a/bench/vectorized_ht.cpp +++ b/bench/vectorized_ht.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include using namespace inkfuse; @@ -285,6 +285,86 @@ void BM_ht_perf_vectorized_inkfuse(benchmark::State& state) { state.SetItemsProcessed(state.iterations() * num_elems); } +void BM_ht_perf_tat_inkfuse_mt(benchmark::State& state) { + const uint64_t num_elems = state.range(0); + const uint64_t num_threads = 16; + inkfuse::SimpleKeyComparator comp{8}; + AtomicHashTable ht{comp, 16, 2 * num_elems}; + for (uint64_t k = 1; k <= num_elems; ++k) { + const uint64_t key = 7 * k; + char* value = ht.insert(reinterpret_cast(&key)); + reinterpret_cast(value)[1] = k; + } + for (auto _ : state) { + auto start = std::chrono::high_resolution_clock::now(); + std::vector threads; + for (size_t th = 0; th < num_threads; ++th) { + threads.emplace_back([&, th]() { + size_t offset = th * (num_elems / num_threads); + for (uint64_t k = 1; k <= num_elems; ++k) { + const uint64_t key = 7 * (((k + offset) % num_elems) + 1); + const uint64_t hash = ht.compute_hash_and_prefetch_fixed<8>(reinterpret_cast(&key)); + const auto res = ht.lookup(reinterpret_cast(&key), hash); + if (reinterpret_cast(res)[1] > num_elems) { + throw std::runtime_error("bad ht lookup for " + std::to_string(k)); + } + } + }); + } + for (auto& thread : threads) { + thread.join(); + } + auto stop = std::chrono::high_resolution_clock::now(); + auto elapsed_s = std::chrono::duration_cast>(stop - start); + state.SetIterationTime(elapsed_s.count()); + } + state.SetItemsProcessed(state.iterations() * num_elems * num_threads); +} + +void BM_ht_perf_vectorized_inkfuse_mt(benchmark::State& state) { + const uint64_t num_elems = state.range(0); + const uint64_t batch_size = state.range(1); + const uint64_t num_threads = 16; + inkfuse::SimpleKeyComparator comp{8}; + AtomicHashTable ht{comp, 16, 2 * num_elems}; + for (uint64_t k = 1; k <= num_elems; ++k) { + const uint64_t key = 7 * k; + char* value = ht.insert(reinterpret_cast(&key)); + reinterpret_cast(value)[1] = k; + } + for (auto _ : state) { + auto start = std::chrono::high_resolution_clock::now(); + std::vector threads; + for (size_t th = 0; th < num_threads; ++th) { + threads.emplace_back([&, th]() { + std::vector keys(batch_size); + std::vector hashes(batch_size); + std::vector results(batch_size); + size_t offset = th * (num_elems / num_threads); + for (uint64_t k = 1; k <= num_elems; k += batch_size) { + const auto curr_batch = std::min(batch_size, num_elems - k + 1); + for (uint64_t tid = 0; tid < curr_batch; ++tid) { + keys[tid] = 7 * (((k + tid + offset) % num_elems) + 1); + } + for (uint64_t tid = 0; tid < curr_batch; ++tid) { + hashes[tid] = ht.compute_hash_and_prefetch_fixed<8>(reinterpret_cast(&keys[tid])); + } + for (uint64_t tid = 0; tid < curr_batch; ++tid) { + results[tid] = ht.lookup(reinterpret_cast(&keys[tid]), hashes[tid]); + } + } + }); + } + for (auto& thread : threads) { + thread.join(); + } + auto stop = std::chrono::high_resolution_clock::now(); + auto elapsed_s = std::chrono::duration_cast>(stop - start); + state.SetIterationTime(elapsed_s.count()); + } + state.SetItemsProcessed(state.iterations() * num_elems * num_threads); +} + BENCHMARK(BM_ht_perf_tat)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 21)->Arg(1 << 25)->Arg(1 << 30); BENCHMARK(BM_ht_perf_tat_inkfuse)->Arg(1 << 9)->Arg(1 << 13)->Arg(1 << 15)->Arg(1 << 19)->Arg(1 << 21)->Arg(1 << 25)->Arg(1 << 30); @@ -300,4 +380,8 @@ BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, // Different internal batch sizes. 256 is a good value. BENCHMARK(BM_ht_perf_vectorized_inkfuse)->ArgPair(1 << 25, 64)->ArgPair(1 << 25, 128)->ArgPair(1 << 25, 256)->ArgPair(1 << 25, 512)->ArgPair(1 << 25, 1024)->ArgPair(1 << 25, 2024)->ArgPair(1 << 25, 4048)->ArgPair(1 << 25, 8096)->ArgPair(1 << 25, 16192); +// Multithreaded hash table benchmarks. +BENCHMARK(BM_ht_perf_tat_inkfuse_mt)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 21, 256)->ArgPair(1 << 25, 256)->UseManualTime(); +BENCHMARK(BM_ht_perf_vectorized_inkfuse_mt)->ArgPair(1 << 9, 256)->ArgPair(1 << 13, 256)->ArgPair(1 << 15, 256)->ArgPair(1 << 19, 256)->ArgPair(1 << 21, 256)->ArgPair(1 << 25, 256)->UseManualTime(); + } // namespace diff --git a/src/exec/PipelineExecutor.cpp b/src/exec/PipelineExecutor.cpp index 9c072cb..6ca4b70 100644 --- a/src/exec/PipelineExecutor.cpp +++ b/src/exec/PipelineExecutor.cpp @@ -7,6 +7,7 @@ #include "runtime/MemoryRuntime.h" #include +#include #include namespace inkfuse { @@ -58,7 +59,6 @@ void resetCpuAffinity() { std::cerr << "Could not reset worker thread affinity: " << rc << "\n"; } } - }; using ROFStrategy = Suboperator::OptimizationProperties::ROFStrategy;