From 826223f6056611f7802ee7ee6ffdca0ac09d3832 Mon Sep 17 00:00:00 2001
From: Adriaan Schmidt <>
Date: Wed, 22 May 2024 09:00:00 +0200
Subject: [PATCH] feat: add plugin_kthread

Signed-off-by: Adriaan Schmidt <>
 tuned/plugins/ | 562 ++++++++++++++++++++++++++++++++
 1 file changed, 562 insertions(+)
 create mode 100644 tuned/plugins/

diff --git a/tuned/plugins/ b/tuned/plugins/
new file mode 100644
index 00000000..41621470
--- /dev/null
+++ b/tuned/plugins/
@@ -0,0 +1,562 @@
+from . import hotplug
+from .decorators import *
+import tuned.consts as consts
+import tuned.logs
+# The scheduler interface in os was introduced in Python 3.3
+# To also support earlier versions, we import some logic from plugin_scheduler
+from .plugin_scheduler import SchedulerUtils, SchedulerUtilsSchedutils
+import perf
+import procfs
+import errno
+import os
+import re
+import select
+import threading
+log = tuned.logs.get()
+# threads can disappear at any time. in that case, we raise a custom exception
+class ThreadNoLongerExists(Exception):
+	pass
+# another custom exception to signal non-changeability of affinities
+class AffinityNotChangeable(Exception):
+	pass
+# the plugin manages each kthread as a "device", and keeps a KthreadInfo object
+class KthreadInfo(object):
+	def __init__(self, proc):
+ =
+		self.comm = procfs.process_cmdline(proc)
+		self.device = "%d:%s" % (, self.comm)
+		# it seems we can check for fixed affinities too early, i.e. procfs
+		# indicates "changeable", but we fail later when we try to actually change it
+		# so: delay the check until later
+		self.affinity_changeable = None
+		self.sched_orig = None
+# scheduling options
+class SchedOpts(object):
+	def __init__(self, policy=None, priority=None, affinity=None):
+		self.policy = policy
+		self.priority = priority
+		self.affinity = affinity
+# group.* definitions from the instance options
+class GroupCmd(object):
+	def __init__(self, name, prio, sched, regex):
+ = name
+		self.prio = prio
+		self.sched = sched
+		self.regex = regex
+class KthreadPlugin(hotplug.Plugin):
+	r"""
+	`kthread`::
+	Allows tuning of kernel threads by setting their CPU affinities and
+	scheduling parameters. The plugin re-implements functionality already
+	present in the `scheduler` plugin. However, this plugin offers more
+	flexibility, as it allows tuning of individual kernel threads, which
+	are handled as `devices`. Multiple plugin instances can be defined,
+	each addressing different groups of kernel threads.
+	When using the `kthread` plugin, make sure to disable processing of kernel
+	threads in the `scheduler` plugin by setting its option
+	[option]`kthread_process=false`.
+	===
+	Tuning options are controlled by [option]`group` definitions.
+	+
+	A group definition has the form
+	`group.<name> = <rule_prio>:<schedopts>:<affinity>:<regex>`
+	+
+	with four required fields:
+	+
+	--
+	`rule_prio`::
+	priority of the group within this plugin instance (lower number indicates
+	higher priority)
+	`schedopts`::
+	desired scheduling policy and priority, or either "*" or an empty string
+	to leave the scheduling options unchanged.
+	The first character defines the policy
+	- r: SCHED_RR
+	The remainder is the desired priority in the range 0..99.
+	For SCHED_OTHER, only a priority of 0 is allowed.
+	Examples: `f50` to set SCHED_FIFO with priority 50, `o0` for SCHED_OTHER
+	`affinity`::
+	desired affinity (as cpulist string), or either "*" or an empty string
+	to leave the affinity unchanged
+	`regex`::
+	regular expression to match kernel threads. Note that the thread name needs
+	to match the full regex, i.e. matching happens with re.fullmatch().
+	--
+	The [option]`group` options of the `kthread` plugin differ from those of
+	the `scheduler` plugin:
+	- scheduling policy and priority are combined into one option
+	- affinities are specified as cpulist strings instead of masks
+	- regular expressions need to fully match the thread names
+	- no square brackets are added to the kernel thread names
+	Example:
+	The `scheduler` definition
+	group.ksoftirqd=0:f:2:*:^\[ksoftirqd
+	is translated to the `kthread` definition
+	group.ksoftirqd=0:f2:*:ksoftirqd.*
+	"""
+	def __init__(self, monitor_repository, storage_factory, hardware_inventory, device_matcher, device_matcher_udev, plugin_instance_factory, global_cfg, variables):
+		super(KthreadPlugin, self).__init__(monitor_repository, storage_factory, hardware_inventory, device_matcher, device_matcher_udev, plugin_instance_factory, global_cfg, variables)
+		self._has_dynamic_options = True
+		self._kthreads = {}
+		self._lock = threading.RLock()
+		self._instance_count = 0
+		try:
+			self._scheduler_utils = SchedulerUtils()
+		except AttributeError:
+			self._scheduler_utils = SchedulerUtilsSchedutils()
+		self._perf_setup()
+	def cleanup(self):
+		super(KthreadPlugin, self).cleanup()
+		self._perf_shutdown()
+	#
+	# plugin-level methods: devices and plugin options
+	#
+	def _init_devices(self):
+		self._devices_supported = True
+		self._free_devices = set()
+		self._assigned_devices = set()
+		self._kthread_scan(initial=True)
+	@classmethod
+	def _get_config_options(cls):
+		return {
+			# nothing here, the group.* options are covered by self._has_dynamic_options
+		}
+	#
+	# helper functions
+	#
+	def _convert_device_to_pid(self, device):
+		"""Extract the PID (as int) from the device name string"""
+		pid, _ = device.split(":", 1)
+		return int(pid)
+	#
+	# instance-level methods: implement the Instance interface
+	#
+	def _instance_init(self, instance):
+		instance._has_static_tuning = True
+		instance._has_dynamic_tuning = False
+		# set our instance name, so the _instance_kthread command can find us
+		instance._options["_instance"] =
+		# process group.* options if not already done via _get_matching_devices
+		if not hasattr(instance, "_groups"):
+			self._instance_prepare_device_matching(instance)
+		# warn in case we have device or udev expressions... those don't work in this plugin
+		if instance._devices_expression not in [None, "*"]:
+			log.warn("Ignoring devices expression '%s' of instance '%s'" % (instance._devices_expression,
+		if instance._devices_udev_regex is not None:
+			log.warn("Ignoring devices udev regex '%s' of instance '%s'" % (instance._devices_udev_regex,
+	def _instance_cleanup(self, instance):
+		pass
+	def _instance_prepare_device_matching(self, instance):
+		"""Process all group.* options and populate instance._groups"""
+		groups = []
+		for k, v in instance._options.items():
+			# group definitions have the format:
+			# group.<name> = <rule_prio>:<schedopts>:<affinity>:<regex>
+			if not k.startswith("group."):
+				continue
+			name = k[len("group."):]
+			opt = v.split(":", 3)
+			if not len(opt) == 4:
+				log.error("Invalid definition for '%s': need exactly 4 arguments" % k)
+				continue
+			opt_rule_prio, opt_schedopts, opt_affinity, opt_regex = opt
+			# parse rule prio
+			try:
+				rule_prio = int(opt_rule_prio)
+			except ValueError:
+				log.error("Could not parse rule prio for '%s': '%s' is not a number" % (k, opt_rule_prio))
+				continue
+			# parse scheduling options
+			policy, priority, affinity = None, None, None
+			if opt_schedopts in ["", "*"]:
+				pass
+			elif len(opt_schedopts) > 1 and opt_schedopts[0] in self._scheduler_utils._dict_schedcfg2num.keys():
+				policy = self._scheduler_utils.sched_cfg_to_num(opt_schedopts[0])
+				try:
+					priority = int(opt_schedopts[1:])
+				except ValueError:
+					log.error("Could not parse scheduling priority for '%s': '%s' is not a number" % (k, opt_schedopts[1:]))
+					continue
+				if policy == os.SCHED_OTHER and priority != 0:
+					log.error("Could not parse scheduling priority for '%s': SCHED_OTHER requires priority 0" % k)
+					continue
+				if priority < 0 or priority > 99:
+					log.error("Could not parse scheduling priority for '%s': value '%d' out of range" % (k, priority))
+					continue
+			else:
+				log.error("Could not parse scheduling priority for '%s': '%s' has wrong format" % (k, opt_schedopts))
+				continue
+			if not opt_affinity in ["", "*"]:
+				affinity = set(self._cmd.cpulist_unpack(opt_affinity))
+				if len(affinity) == 0:
+					log.error("Could not parse affinity for '%s': '%s' has wrong format" % (k, opt_affinity))
+					continue
+			sched = SchedOpts(policy=policy, priority=priority, affinity=affinity)
+			# parse the regex
+			try:
+				regex = re.compile(opt_regex)
+			except re.error as e:
+				log.error("Could not compile regex for '%s': '%s'" % (k, e.msg))
+				continue
+			groups.append(GroupCmd(name, rule_prio, sched, regex))
+		instance._groups = sorted(groups, key=lambda x: x.prio)
+	def _get_instance_sched_options(self, instance, kthread):
+		"""
+		determine options an instance would set for a kthread, None if the
+		instance would not set any (because none of the group.* regexes matches)
+		"""
+		for group in instance._groups:
+			if group.regex.fullmatch(kthread.comm):
+				return group.sched
+		return None
+	def _get_matching_devices(self, instance, devices):
+		"""
+		overrides method in base.Plugin
+		instead of matching with devices/udev regexes, we use the group.*
+		definitions to determine threads that fit the instance
+		"""
+		# this can be called before the instance is initialized via
+		# _instance_init(), so we need to make sure that we process our
+		# group.* options
+		if not hasattr(instance, "_groups"):
+			self._instance_prepare_device_matching(instance)
+		matching_devices = set()
+		for device in devices:
+			pid = self._convert_device_to_pid(device)
+			try:
+				kthread = self._kthread_get(pid)
+			except ThreadNoLongerExists:
+				self._kthread_remove(pid)
+				continue
+			if self._get_instance_sched_options(instance, kthread) is not None:
+				matching_devices.add(device)
+		return matching_devices
+	def _instance_apply_static(self, instance):
+		if self._instance_count == 0:
+			# scan for kthreads that have appeared since plugin initialization
+			self._kthread_scan(initial=False)
+			self._perf_monitor_start()
+		self._instance_count += 1
+		super(KthreadPlugin, self)._instance_apply_static(instance)
+	def _instance_unapply_static(self, instance, rollback):
+		super(KthreadPlugin, self)._instance_unapply_static(instance, rollback)
+		self._instance_count -= 1
+		if self._instance_count == 0:
+			self._perf_monitor_shutdown()
+	#
+	# internal bookkeeping (self._kthreads)
+	# as these methods are called from the main thred and the perf monitor
+	# thread, we need to lock all accesses to self._kthreads and the 
+	# hotplug.Plugin methods _add_device()/_remove_device()
+	#
+	def _kthread_scan(self, initial=False):
+		"""Scan procfs for kernel threads and add them to out bookkeeping
+		Args:
+			initial (bool): is this the initial scan? passed on to _kthread_add()
+		"""
+		ps = procfs.pidstats()
+		for pid in ps.keys():
+			self._kthread_add(pid, initial)
+	def _kthread_add(self, pid, initial=False):
+		"""Add kernel thread to internal bookkeeping
+		Args:
+			pid (int): kernel thread pid
+			initial (bool): is this the initial scan? if yes, then add the new
+				kthread to _free_devices, else initiate hotplug mechanism via
+				_add_device()
+		"""
+		try:
+			proc = procfs.process(pid)
+			if not self._is_kthread(proc):
+				return
+			kthread = KthreadInfo(proc)
+		except (FileNotFoundError, ProcessLookupError):
+			return
+		with self._lock:
+			if in self._kthreads:
+				return
+			self._kthreads[] = kthread
+			if initial:
+				self._free_devices.add(kthread.device)
+			else:
+				self._add_device(kthread.device)
+		log.debug("Added kthread %s" % kthread.device)
+	def _kthread_remove(self, pid):
+		"""Remove kernel thread from internal bookkeeping
+		Args:
+			pid (int): kernel thread pid
+		"""
+		try:
+			with self._lock:
+				device = self._kthreads[pid].device
+				del self._kthreads[pid]
+				self._remove_device(device)
+		except KeyError:
+			return
+		log.debug("Removed kthread %d" % pid)
+	def _kthread_get(self, pid):
+		"""Get KthreadInfo object for a given PID
+		Args:
+			pid (int): kernel thread pid
+		"""
+		try:
+			with self._lock:
+				return self._kthreads[pid]
+		except KeyError:
+			raise ThreadNoLongerExists()
+	def _is_kthread(self, proc):
+		"""helper to determine if a procfs process is a kernel thread"""
+		return proc["stat"]["flags"] & procfs.pidstat.PF_KTHREAD != 0
+	#
+	# methods to interact with perf
+	#
+	def _perf_setup(self):
+		self._cpus = perf.cpu_map()
+		self._threads = perf.thread_map()
+		self._evlist = perf.evlist(self._cpus, self._threads)
+		evsel = perf.evsel(
+			type=perf.TYPE_SOFTWARE,
+			config=perf.COUNT_SW_DUMMY,
+			task=1,
+			comm=1,
+			mmap=0,
+			freq=0,
+			wakeup_events=1,
+			watermark=1,
+			sample_type=perf.SAMPLE_TID|perf.SAMPLE_CPU,
+		)
+, threads=self._threads)
+		self._evlist.add(evsel)
+		self._evlist.mmap()
+	def _perf_shutdown(self):
+		if self._evlist:
+			for fd in self._evlist.get_pollfd():
+				os.close(
+	def _perf_monitor_start(self):
+		self._terminate = threading.Event()
+		self._thread = threading.Thread(target=self._perf_monitor_thread)
+		self._thread.start()
+	def _perf_monitor_shutdown(self):
+		self._terminate.set()
+		self._thread.join()
+	def _perf_monitor_thread(self):
+		log.debug("perf monitor thread starting")
+		poll = select.poll()
+		fds = self._evlist.get_pollfd()
+		for fd in fds:
+			poll.register(fd)
+		while not self._terminate.is_set():
+			if len(poll.poll(1000)) == 0:
+				continue
+			have_events = True
+			while have_events:
+				have_events = False
+				for cpu in self._cpus:
+					event = self._evlist.read_on_cpu(cpu)
+					if event:
+						have_events = True
+						if event.type == perf.RECORD_COMM:
+							self._kthread_add(event.tid)
+						elif event.type == perf.RECORD_EXIT:
+							self._kthread_remove(event.tid)
+		log.debug("perf monitor thread shutting down")
+	#
+	# methods for low-level manipulation of scheduling options
+	# via SchedulerUtils from .plugin_scheduler
+	#
+	def _set_affinity(self, pid, affinity):
+		try:
+			self._scheduler_utils.set_affinity(pid, affinity)
+		except OSError as e:
+			if hasattr(e, "errno") and e.errno == errno.ESRCH:
+				log.debug("Failed to set affinity of PID %d, the task vanished." % pid)
+				raise ThreadNoLongerExists()
+			else:
+				try:
+					proc = procfs.process(pid)
+					changeable = not proc["stat"].is_bound_to_cpu()
+				except (OSError, IOError):
+					raise ThreadNoLongerExists()
+				if not changeable:
+					raise AffinityNotChangeable()
+				log.error("Failed to set affinity of PID %d to '%s': %s" % (pid, affinity, e))
+				raise e
+	def _get_affinity(self, pid):
+		try:
+			return self._scheduler_utils.get_affinity(pid)
+		except OSError as e:
+			if hasattr(e, "errno") and e.errno == errno.ESRCH:
+				log.debug("Failed to get affinity of PID %d, the task vanished." % pid)
+				raise ThreadNoLongerExists()
+			else:
+				log.error("Failed to get affinity of PID %d: %s" % (pid, e))
+				raise e
+	def _set_schedopts(self, pid, policy, priority):
+		try:
+			self._scheduler_utils.set_scheduler(pid, policy, priority)
+		except OSError as e:
+			if hasattr(e, "errno") and e.errno == errno.ESRCH:
+				log.debug("Failed to set scheduling of kthread %d, the task vanished." % pid)
+				raise ThreadNoLongerExists()
+			else:
+				log.error("Failed to set scheduling of kthread %d: %s" % (pid, e))
+				raise e
+	def _get_schedopts(self, pid):
+		try:
+			return self._scheduler_utils.get_scheduler(pid), self._scheduler_utils.get_priority(pid)
+		except OSError as e:
+			if hasattr(e, "errno") and e.errno == errno.ESRCH:
+				log.debug("Failed to get scheduling of kthread %d, the task vanished." % pid)
+				raise ThreadNoLongerExists()
+			else:
+				log.error("Failed to get scheduling of kthread %d: %s" % (pid, e))
+				raise e
+	def _format_schedopts(self, policy, priority):
+		return "%s:%d" % (self._scheduler_utils.sched_num_to_const(policy), priority)
+	#
+	# "high-level" methods that work on KthreadInfo objects:
+	# apply tuning while saving original settings
+	#
+	def _apply_kthread_tuning(self, kthread, opts):
+		current_affinity = self._get_affinity(
+		current_policy, current_priority = self._get_schedopts(
+		if kthread.sched_orig is None:
+			orig_opts = SchedOpts(policy=current_policy, priority=current_priority, affinity=current_affinity)
+			kthread.sched_orig = orig_opts
+		if opts.affinity is not None and opts.affinity != current_affinity:
+			try:
+				self._set_affinity(, opts.affinity)
+				kthread.affinity_changeable = True
+				log.debug("Set CPU affinity of kthread %s to '%s'" % (kthread.device, opts.affinity))
+			except AffinityNotChangeable:
+				kthread.affinity_changeable = False
+				log.debug("The CPU affinity of kthread %s is not changeable"% kthread.device)
+		if opts.policy is not None or opts.priority is not None:
+			if opts.policy != current_policy or opts.priority != current_priority:
+				self._set_schedopts(, opts.policy, opts.priority)
+				log.debug("Set scheduling of kthread %s to '%s'"
+						% (kthread.device, self._format_schedopts(opts.policy, opts.priority)))
+	def _restore_kthread_tuning(self, kthread):
+		opts = kthread.sched_orig
+		current_affinity = self._get_affinity(
+		current_policy, current_priority = self._get_schedopts(
+		if kthread.affinity_changeable and opts.affinity != current_affinity:
+			try:
+				self._set_affinity(, opts.affinity)
+				log.debug("Restored CPU affinity of kthread %s to '%s'"
+						% (kthread.device, opts.affinity))
+			except AffinityNotChangeable:
+				log.debug("Failed to restore CPU affinity of kthread %s to '%s'"
+						% (kthread.device, opts.affinity))
+		if opts.policy != current_policy or opts.priority != current_priority:
+			self._set_schedopts(, opts.policy, opts.priority)
+			log.debug("Restored scheduling of kthread %s to '%s'"
+					% (kthread.device, self._format_schedopts(opts.policy, opts.priority)))
+	def _verify_kthread_tuning(self, kthread, opts):
+		affinity_ok, priority_ok = True, True
+		current_affinity = self._get_affinity(
+		current_policy, current_priority = self._get_schedopts(
+		if opts.affinity is not None and kthread.affinity_changeable:
+			desc = "CPU affinity of kthread %s" % kthread.device
+			current = self._cmd.cpulist2string(self._cmd.cpulist_pack(current_affinity))
+			if opts.affinity == current_affinity:
+ % (desc, current))
+			else:
+				desired = self._cmd.cpulist2string(self._cmd.cpulist_pack(opts.affinity))
+				log.error(consts.STR_VERIFY_PROFILE_VALUE_FAIL % (desc, current, desired))
+				affinity_ok = False
+		if opts.policy is not None or opts.priority is not None:
+			desc = "scheduling of kthread %s" % kthread.device
+			current = self._format_schedopts(current_policy, current_priority)
+			if opts.policy == current_policy and opts.priority == current_priority:
+ % (desc, current))
+			else:
+				desired = self._format_schedopts(opts.policy, opts.priority)
+				log.error(consts.STR_VERIFY_PROFILE_VALUE_FAIL % (desc, current, desired))
+				priority_ok = False
+		return affinity_ok and priority_ok
+	#
+	# command definitions: entry point for device tuning
+	#
+	@command_custom("_instance", per_device=True)
+	def _instance_kthread(self, start, value, device, verify, ignore_missing):
+		"""
+		This is the actual entry point for tuning.
+		value (of the option "_instance") is the name of the instance, set in _instance_init
+		"""
+		pid = self._convert_device_to_pid(device)
+		try:
+			kthread = self._kthread_get(pid)
+			if verify:
+				instance = self._instances[value]
+				opts = self._get_instance_sched_options(instance, kthread)
+				return self._verify_kthread_tuning(kthread, opts)
+			if start:
+				instance = self._instances[value]
+				opts = self._get_instance_sched_options(instance, kthread)
+				self._apply_kthread_tuning(kthread, opts)
+			else:
+				self._restore_kthread_tuning(kthread)
+		except ThreadNoLongerExists:
+			self._kthread_remove(pid)
+		return None