From aadb636a894f97319110f3a7dff3e7a15931ec4c Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 2 May 2024 17:40:45 -0700 Subject: [PATCH] Make all the PID options fully configurable --- .../worker/slot_supplier/resource_based.rs | 84 +++++++++++++++---- 1 file changed, 67 insertions(+), 17 deletions(-) diff --git a/core/src/worker/slot_supplier/resource_based.rs b/core/src/worker/slot_supplier/resource_based.rs index 70afff102..9ce36447f 100644 --- a/core/src/worker/slot_supplier/resource_based.rs +++ b/core/src/worker/slot_supplier/resource_based.rs @@ -17,9 +17,7 @@ use tokio::{sync::watch, task::JoinHandle}; /// Implements [SlotSupplier] and attempts to maintain certain levels of resource usage when /// under load. pub struct ResourceBasedSlots { - /// Stored outside the pid controller to enforce a hard limit where we _definitely_ won't allow - /// a new slot if already at the memory limit. - target_mem_usage: f64, + options: ResourceBasedSlotsOptions, sys_info_supplier: MI, metrics: OnceLock>, pids: Mutex, @@ -40,6 +38,35 @@ pub struct ResourceBasedSlotsForType { last_slot_issued_rx: watch::Receiver, _slot_kind: PhantomData, } +#[derive(Clone, Debug, derive_builder::Builder)] +#[non_exhaustive] +pub struct ResourceBasedSlotsOptions { + /// A value in the range [0.0, 1.0] representing the target memory usage. + target_mem_usage: f64, + /// A value in the range [0.0, 1.0] representing the target CPU usage. + target_cpu_usage: f64, + + #[builder(default = "5.0")] + pub mem_p_gain: f64, + #[builder(default = "0.0")] + pub mem_i_gain: f64, + #[builder(default = "1.0")] + pub mem_d_gain: f64, + /// If the mem PID controller outputs a value higher than this, we say the mem half of things + /// will allow a slot + #[builder(default = "0.25")] + pub mem_output_threshold: f64, + #[builder(default = "5.0")] + pub cpu_p_gain: f64, + #[builder(default = "0.0")] + pub cpu_i_gain: f64, + #[builder(default = "1.0")] + pub cpu_d_gain: f64, + /// If the CPU PID controller outputs a value higher than this, we say the CPU half of things + /// will allow a slot + #[builder(default = "0.05")] + pub cpu_output_threshold: f64, +} struct PidControllers { mem: pid::Pid, cpu: pid::Pid, @@ -63,16 +90,30 @@ impl ResourceBasedSlots { /// Create an instance attempting to target the provided memory and cpu thresholds as values /// between 0 and 1. pub fn new(target_mem_usage: f64, target_cpu_usage: f64) -> Self { - Self::new_with_sysinfo(target_mem_usage, target_cpu_usage, RealSysInfo::new()) + let opts = ResourceBasedSlotsOptionsBuilder::default() + .target_mem_usage(target_mem_usage) + .target_cpu_usage(target_cpu_usage) + .build() + .expect("default resource based slot options can't fail to build"); + Self::new_with_sysinfo(opts, RealSysInfo::new()) + } + + /// Create an instance using the fully configurable set of PID controller options + pub fn new_from_options(options: ResourceBasedSlotsOptions) -> Self { + Self::new_with_sysinfo(options, RealSysInfo::new()) } } impl PidControllers { - fn new(mem_target: f64, cpu_target: f64) -> Self { - let mut mem = pid::Pid::new(mem_target, 100.0); - mem.p(5.0, 100).i(0.0, 100).d(1.0, 100); - let mut cpu = pid::Pid::new(cpu_target, 100.0); - cpu.p(5.0, 100.).i(0.0, 100.).d(1.0, 100.); + fn new(options: &ResourceBasedSlotsOptions) -> Self { + let mut mem = pid::Pid::new(options.target_mem_usage, 100.0); + mem.p(options.mem_p_gain, 100) + .i(options.mem_i_gain, 100) + .d(options.mem_d_gain, 100); + let mut cpu = pid::Pid::new(options.target_cpu_usage, 100.0); + cpu.p(options.cpu_p_gain, 100) + .i(options.cpu_i_gain, 100) + .d(options.cpu_d_gain, 100); Self { mem, cpu } } } @@ -230,18 +271,18 @@ impl ResourceBasedSlots { )) } - fn new_with_sysinfo(target_mem_usage: f64, target_cpu_usage: f64, sys_info: MI) -> Self { + fn new_with_sysinfo(options: ResourceBasedSlotsOptions, sys_info: MI) -> Self { Self { + pids: Mutex::new(PidControllers::new(&options)), + options, metrics: OnceLock::new(), - target_mem_usage, sys_info_supplier: sys_info, - pids: Mutex::new(PidControllers::new(target_mem_usage, target_cpu_usage)), last_metric_vals: Arc::new(AtomicCell::new(Default::default())), } } fn can_reserve(&self) -> bool { - self.sys_info_supplier.used_mem_percent() <= self.target_mem_usage + self.sys_info_supplier.used_mem_percent() <= self.options.target_mem_usage } /// Returns true if the pid controllers think a new slot should be given out @@ -257,7 +298,8 @@ impl ResourceBasedSlots { mem_used_percent, cpu_used_percent, }); - mem_output > 0.25 && cpu_output > 0.05 + mem_output > self.options.mem_output_threshold + && cpu_output > self.options.cpu_output_threshold } fn attach_metrics(&self, metrics: TemporalMeter) { @@ -373,10 +415,18 @@ mod tests { } } + fn test_options() -> ResourceBasedSlotsOptions { + ResourceBasedSlotsOptionsBuilder::default() + .target_mem_usage(0.8) + .target_cpu_usage(1.0) + .build() + .expect("default resource based slot options can't fail to build") + } + #[test] fn mem_workflow_sync() { let (fmis, used) = FakeMIS::new(); - let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(0.8, 1.0, fmis)) + let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(test_options(), fmis)) .as_kind::(0, 100, Duration::from_millis(0)); let pd = MeteredPermitDealer::new(rbs.clone(), MetricsContext::no_op(), None); assert!(rbs.try_reserve_slot(&pd).is_some()); @@ -388,7 +438,7 @@ mod tests { async fn mem_workflow_async() { let (fmis, used) = FakeMIS::new(); used.store(90_000, Ordering::Release); - let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(0.8, 1.0, fmis)) + let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(test_options(), fmis)) .as_kind::(0, 100, Duration::from_millis(0)); let pd = MeteredPermitDealer::new(rbs.clone(), MetricsContext::no_op(), None); let order = crossbeam_queue::ArrayQueue::new(2); @@ -408,7 +458,7 @@ mod tests { #[test] fn minimum_respected() { let (fmis, used) = FakeMIS::new(); - let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(0.8, 1.0, fmis)) + let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(test_options(), fmis)) .as_kind::(2, 100, Duration::from_millis(0)); let pd = MeteredPermitDealer::new(rbs.clone(), MetricsContext::no_op(), None); used.store(90_000, Ordering::Release);