Skip to content

Commit

Permalink
Merge pull request #213 from CESNET/workers-cpu-affinity
Browse files Browse the repository at this point in the history
Workers cpu affinity
  • Loading branch information
SiskaPavel authored Aug 20, 2024
2 parents 4be259b + 9c6c5df commit 5ba318c
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 21 deletions.
19 changes: 19 additions & 0 deletions include/ipfixprobe/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include <stdexcept>
#include <cstdint>
#include <sys/time.h>
#include <vector>
#include <sstream>

namespace ipxp {

Expand Down Expand Up @@ -152,6 +154,23 @@ T str2num(std::string str, typename std::enable_if<is_uint<T>()>::type * = nullp
*/
uint64_t timeval2usec(const struct timeval& tv);

/**
* @brief Convert vector to string, e.g. for error messages
*/
template <typename T>
std::string vec2str(const std::vector<T> &vec) {
std::stringstream ss;
bool first = true;
for (auto &item : vec)
{
if (!first)
ss << ", ";
ss << item;
first = false;
}
return ss.str();
}

}

#endif /* IPXP_UTILS_HPP */
53 changes: 37 additions & 16 deletions init/ipfixprobed
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,31 @@ if [ -e "$CONFFILE" ]; then
exit 1
fi

if [ ! -z "$DPDK_LCORES" ]; then
DPDK_LCORES="--lcores $DPDK_LCORES"
fi

if [ -n "$DPDK_OPTS" -a "${DPDK_OPTS:0:1}" != ";" ]; then
DPDK_OPTS=";$DPDK_OPTS"
fi

# create array with input workers affinities
if [ ! -z "$DPDK_INPUT_WORKER_CPUS" ]; then
if `declare -p DPDK_INPUT_WORKER_CPUS > /dev/null 2>/dev/null`; then
if [ "${#DPDK_INPUT_WORKER_CPUS[@]}" -ne "$DPDK_QUEUES_COUNT" ]; then
echo "DPDK_INPUT_WORKER_CPUS length must be the same as queues count."
exit 1
fi
fi
fi
for ((i = 0; i < DPDK_QUEUES_COUNT; i++)); do
if [ ! -z "$DPDK_INPUT_WORKER_CPUS" ]; then
affinities[i]="@${DPDK_INPUT_WORKER_CPUS[$i]}"
else
affinities[i]=""
fi
done

# set up DPDK interface(s)
if [ "$DPDK_RING" = "1" ]; then
# checks
Expand All @@ -24,19 +49,19 @@ if [ -e "$CONFFILE" ]; then
echo "Missing DPDK_RING_STARTIDX in configuration of DPDK_RING mode, using 0."
DPDK_RING_STARTIDX=0
fi

# mring interfaces
dpdkinput=("-i" "dpdk-ring;r=$(printf "$DPDK_RING_PATTERN" "$DPDK_RING_STARTIDX");e=--lcores $DPDK_LCORES $DPDK_EXTRA_EAL")
dpdkinput=("-i" "dpdk-ring${affinities[0]};r=$(printf "$DPDK_RING_PATTERN" "$DPDK_RING_STARTIDX")${DPDK_OPTS};e=$DPDK_LCORES $DPDK_EXTRA_EAL")
plugin_idx=1
for ((ifc=($DPDK_RING_STARTIDX+1); ifc<($DPDK_RING_STARTIDX + $DPDK_QUEUES_COUNT);ifc++)); do
dpdkinput+=("-i" "dpdk-ring;r=$(printf "$DPDK_RING_PATTERN" "$ifc")")
dpdkinput+=("-i" "dpdk-ring${affinities[$plugin_idx]};r=$(printf "$DPDK_RING_PATTERN" "$ifc")")
((plugin_idx++))
done
else
# DPDK port interface
if [ -n "$DPDK_PORTOPTS" -a "${DPDK_PORTOPTS:0:1}" != ";" ]; then
DPDK_PORTOPTS=";$DPDK_PORTOPTS"
fi
dpdkinput=("-i" "dpdk;p=${DPDK_PORT}${DPDK_PORTOPTS};q=$DPDK_QUEUES_COUNT;e=--lcores $DPDK_LCORES $DPDK_EXTRA_EAL -a $DPDK_DEVICE")
dpdkinput=("-i" "dpdk${affinities[0]};p=${DPDK_PORT}${DPDK_OPTS};q=$DPDK_QUEUES_COUNT;e=$DPDK_LCORES $DPDK_EXTRA_EAL -a $DPDK_DEVICE")
for ((ifc=1; ifc<$DPDK_QUEUES_COUNT;ifc++)); do
dpdkinput+=("-i" "dpdk")
dpdkinput+=("-i" "dpdk${affinities[$ifc]}")
done
fi
fi
Expand Down Expand Up @@ -88,23 +113,19 @@ if [ -e "$CONFFILE" ]; then
NON_BLOCKING_TCP_PARAM="non-blocking-tcp";
fi

output="-o ipfix;host=${HOST:-127.0.0.1};port=${PORT:-4739};id=${LINK:-0};dir=${DIR:-0};${UDP_PARAM};${NON_BLOCKING_TCP_PARAM};template=${TEMPLATE_REFRESH_RATE:-300}"

telemetry=""
if [ "$USE_FUSE" = "1" ]; then
telemetry="-t ${FUSE_MOUNT_POINT}"
output_affinity=""
if [ ! -z "$OUTPUT_WORKER_CPU" ]; then
output_affinity="@$OUTPUT_WORKER_CPU"
fi

exec /usr/bin/ipfixprobe "${dpdkinput[@]}" $input $storage $process $output $telemetry
output="-o ipfix$output_affinity;host=${HOST:-127.0.0.1};port=${PORT:-4739};id=${LINK:-0};dir=${DIR:-0};${UDP_PARAM};${NON_BLOCKING_TCP_PARAM};template=${TEMPLATE_REFRESH_RATE:-300}"

telemetry=""
if [ "$USE_FUSE" = "1" ]; then
telemetry="-t ${FUSE_MOUNT_POINT}"
fi

exec /usr/bin/ipfixprobe "${dpdkinput[@]}" $input $storage $process $output $telemetry

exec /usr/bin/ipfixprobe "${dpdkinput[@]}" $input $storage $process $output
exec /usr/bin/ipfixprobe "${dpdkinput[@]}" $input $storage $process $output $telemetry $EXTRA_ARGS
else
echo "Configuration file '$CONFFILE' does not exist, exitting." >&2
exit 1
Expand Down
11 changes: 11 additions & 0 deletions init/link0.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
# Set mapping of DPDK lcores to threads:
#DPDK_LCORES="(0-7)@(0,2,4,6,8,10,12,14)"

# Set input workers CPU affinity, each worker is mapped on single core
# array must have the same size as DPDK_QUEUES_COUNT
# when DPDK_INPUT_WORKER_CPUS is specified, DPDK_LCORES does not affect input workers
#DPDK_INPUT_WORKER_CPUS=(0 2 4 6 8 10 12 14)

# Extra options for DPDK EAL, passed to e= option of `dpdk` or `dpdk-ring` plugin.
# * Use --file-prefix to separate DPDK application into new namespace.
# * Use --proc-type=secondary for Option B) to receive packets via mrings created
Expand Down Expand Up @@ -189,7 +194,13 @@ NON_BLOCKING_TCP=no
# Export ipfix template every N seconds (UDP)
TEMPLATE_REFRESH_RATE=300

# Define output worker (thread) affinity, e.g. CPU core isolated from the scheduler
#OUTPUT_WORKER_CPU=12

####### Fuse telemetry

USE_FUSE=0
FUSE_MOUNT_POINT="/var/run/ipfixprobe"

# Specify any extra global arguments, e.g. size of input queue
#EXTRA_ARGS="-q 2048"
55 changes: 50 additions & 5 deletions ipfixprobe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void print_help(ipxp_conf_t &conf, const std::string &arg)
}
}

void process_plugin_argline(const std::string &args, std::string &plugin, std::string &params)
void process_plugin_argline(const std::string &args, std::string &plugin, std::string &params, std::vector<int> &affinity)
{
size_t delim;

Expand All @@ -148,6 +148,16 @@ void process_plugin_argline(const std::string &args, std::string &plugin, std::s
plugin = params.substr(0, delim);
params.erase(0, delim == std::string::npos ? delim : delim + 1);

delim = plugin.find('@');
if (delim != std::string::npos) {
try {
affinity.emplace_back(std::stoi(plugin.substr(delim + 1)));
} catch (const std::invalid_argument &ex) {
throw IPXPError("CPU affinity must be single number: " + std::string(ex.what()));
}
}
plugin = plugin.substr(0, delim);

trim_str(plugin);
trim_str(params);
}
Expand All @@ -168,6 +178,28 @@ telemetry::Content get_ipx_ring_telemetry(ipx_ring_t* ring)
return dict;
}

void set_thread_details(pthread_t thread, const std::string &name, const std::vector<int> &affinity)
{
// Set thread name and affinity
if (name.length() > 0) {
pthread_setname_np(thread, name.substr(0, 15).c_str());
}
if (affinity.size() > 0) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (auto cpu : affinity) {
CPU_SET(cpu, &cpuset);
}
int ret = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
if (ret != 0) {
throw IPXPError(
"pthread_setaffinity_np failed, CPU(s) "
+ vec2str(affinity) + " probably cannot be set"
);
}
}
}

bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
{
auto deleter = [&](OutputPlugin::Plugins *p) {
Expand All @@ -183,18 +215,27 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
std::string output_params = "";

if (parser.m_storage.size()) {
process_plugin_argline(parser.m_storage[0], storage_name, storage_params);
std::vector<int> affinity;
process_plugin_argline(parser.m_storage[0], storage_name, storage_params, affinity);
if (affinity.size() != 0) {
throw IPXPError("cannot set CPU affinity for storage plugin (storage plugin is invoked inside input threads)");
}
}
std::vector<int> output_worker_affinity;
if (parser.m_output.size()) {
process_plugin_argline(parser.m_output[0], output_name, output_params);
process_plugin_argline(parser.m_output[0], output_name, output_params, output_worker_affinity);
}

// Process
for (auto &it : parser.m_process) {
ProcessPlugin *process_plugin = nullptr;
std::string process_params;
std::string process_name;
process_plugin_argline(it, process_name, process_params);
std::vector<int> affinity;
process_plugin_argline(it, process_name, process_params, affinity);
if (affinity.size() != 0) {
throw IPXPError("cannot set CPU affinity for process plugin (process plugins are invoked inside input threads)");
}
for (auto &it : *process_plugins) {
std::string plugin_name = it.first;
if (plugin_name == process_name) {
Expand Down Expand Up @@ -272,6 +313,7 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
output_stats,
output_queue
};
set_thread_details(tmp.thread->native_handle(), "out_" + output_name, output_worker_affinity);
conf.outputs.push_back(tmp);
conf.output_fut.push_back(output_res->get_future());
}
Expand All @@ -286,7 +328,8 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
StoragePlugin *storage_plugin = nullptr;
std::string input_params;
std::string input_name;
process_plugin_argline(it, input_name, input_params);
std::vector<int> affinity;
process_plugin_argline(it, input_name, input_params, affinity);

auto input_plugin_dir = input_dir->addDir(input_name);
auto pipeline_queue_dir = pipeline_dir->addDir("queues")->addDir(std::to_string(pipeline_idx));
Expand Down Expand Up @@ -358,6 +401,7 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
storage_process_plugins
}
};
set_thread_details(tmp.input.thread->native_handle(), "in_"+ std::to_string(pipeline_idx) + "_" + input_name, affinity);
conf.pipelines.push_back(tmp);
pipeline_idx++;
}
Expand Down Expand Up @@ -654,6 +698,7 @@ int run(int argc, char *argv[])
conf.pkt_bufsize = parser.m_pkt_bufsize;
conf.max_pkts = parser.m_max_pkts;

set_thread_details(pthread_self(), "", parser.m_cpu_mask);

try {
if (process_plugin_args(conf, parser)) {
Expand Down
13 changes: 13 additions & 0 deletions ipfixprobe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class IpfixprobeOptParser : public OptionsParser {
bool m_help;
std::string m_help_str;
bool m_version;
std::vector<int> m_cpu_mask;

IpfixprobeOptParser() : OptionsParser("ipfixprobe", "flow exporter supporting various custom IPFIX elements"),
m_pid(""), m_appfs_mount_point(""), m_daemon(false),
Expand Down Expand Up @@ -170,6 +171,18 @@ class IpfixprobeOptParser : public OptionsParser {
m_version = true;
return true;
}, OptionFlags::NoArgument);
register_option("-C", "--cpus", "CPU_LIST", "Set global CPU mask for main thread and subthreads", [this](const char *arg) {
try {
std::stringstream ss(arg);
std::string tmp;
while (std::getline(ss, tmp, ',')) {
m_cpu_mask.emplace_back(str2num<uint16_t>(tmp));
}
return true;
} catch (std::invalid_argument &e) {
return false;
}
});
}
};

Expand Down

0 comments on commit 5ba318c

Please sign in to comment.