Skip to content

Commit

Permalink
Set CPU Affinities for Thread Workers
Browse files Browse the repository at this point in the history
This doesn't have a significant impact on overall system performance,
but it's still the right thing to do.

We now ensure that worker threads are pinned to a CPU core during query
processing. Worker 0 is pinned to CPU 0, worker 1 is pinned to CPU 1,
and so on.

This makes sure that we don't bounce our caches so much when a thread
gets rescheduled during morsel execution. I had hoped this would
positively impact ROF performance, but it really doesn't.
  • Loading branch information
wagjamin committed Nov 5, 2023
1 parent 47a9700 commit acd0162
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions src/exec/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "runtime/MemoryRuntime.h"

#include <chrono>
#include <iostream>

namespace inkfuse {

Expand All @@ -31,6 +32,33 @@ std::pair<size_t, size_t> computeTrials(double interpreted_throughput, double co
// Otherwise try out both 10% of the time.
return {4, 4};
}

/// Try to pin the current thread to a specific core.
void setCpuAffinity(size_t target_cpu) {
if (target_cpu >= std::thread::hardware_concurrency()) {
std::cerr << "Too many threads to set CPU affinity.\n";
return;
}
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(target_cpu, &cpuset);
int rc = pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
if (rc != 0) {
std::cerr << "Could not set worker thread affinity: " << rc << "\n";
}
}

void resetCpuAffinity() {
cpu_set_t cpuset;
std::memset(&cpuset, ~0, sizeof(cpu_set_t));
int rc = pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
if (rc != 0) {
std::cerr << "Could not reset worker thread affinity: " << rc << "\n";
}
}

};

using ROFStrategy = Suboperator::OptimizationProperties::ROFStrategy;
Expand Down Expand Up @@ -78,6 +106,8 @@ void PipelineExecutor::preparePipeline(ExecutionMode prep_mode) {
}

void PipelineExecutor::threadSwimlane(size_t thread_id, OnceBarrier& compile_prep_barrier) {
// Set the CPU affinity to make sure the thread doesn't jump across cores.
setCpuAffinity(thread_id);
// Scope guard for memory compile_state->context and flags.
ExecutionContext::RuntimeGuard guard{*context, thread_id};
if (mode == ExecutionMode::Fused) {
Expand Down Expand Up @@ -170,6 +200,7 @@ void PipelineExecutor::threadSwimlane(size_t thread_id, OnceBarrier& compile_pre
it_counter++;
}
}
resetCpuAffinity();
}

void PipelineExecutor::runSwimlanes() {
Expand Down

0 comments on commit acd0162

Please sign in to comment.