Skip to content

Commit

Permalink
Make all the PID options fully configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed May 3, 2024
1 parent 5eea7df commit aadb636
Showing 1 changed file with 67 additions and 17 deletions.
84 changes: 67 additions & 17 deletions core/src/worker/slot_supplier/resource_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use tokio::{sync::watch, task::JoinHandle};
/// Implements [SlotSupplier] and attempts to maintain certain levels of resource usage when
/// under load.
pub struct ResourceBasedSlots<MI> {
/// Stored outside the pid controller to enforce a hard limit where we _definitely_ won't allow
/// a new slot if already at the memory limit.
target_mem_usage: f64,
options: ResourceBasedSlotsOptions,
sys_info_supplier: MI,
metrics: OnceLock<JoinHandle<()>>,
pids: Mutex<PidControllers>,
Expand All @@ -40,6 +38,35 @@ pub struct ResourceBasedSlotsForType<MI, SK> {
last_slot_issued_rx: watch::Receiver<Instant>,
_slot_kind: PhantomData<SK>,
}
#[derive(Clone, Debug, derive_builder::Builder)]
#[non_exhaustive]
pub struct ResourceBasedSlotsOptions {
/// A value in the range [0.0, 1.0] representing the target memory usage.
target_mem_usage: f64,
/// A value in the range [0.0, 1.0] representing the target CPU usage.
target_cpu_usage: f64,

#[builder(default = "5.0")]
pub mem_p_gain: f64,
#[builder(default = "0.0")]
pub mem_i_gain: f64,
#[builder(default = "1.0")]
pub mem_d_gain: f64,
/// If the mem PID controller outputs a value higher than this, we say the mem half of things
/// will allow a slot
#[builder(default = "0.25")]
pub mem_output_threshold: f64,
#[builder(default = "5.0")]
pub cpu_p_gain: f64,
#[builder(default = "0.0")]
pub cpu_i_gain: f64,
#[builder(default = "1.0")]
pub cpu_d_gain: f64,
/// If the CPU PID controller outputs a value higher than this, we say the CPU half of things
/// will allow a slot
#[builder(default = "0.05")]
pub cpu_output_threshold: f64,
}
struct PidControllers {
mem: pid::Pid<f64>,
cpu: pid::Pid<f64>,
Expand All @@ -63,16 +90,30 @@ impl ResourceBasedSlots<RealSysInfo> {
/// Create an instance attempting to target the provided memory and cpu thresholds as values
/// between 0 and 1.
pub fn new(target_mem_usage: f64, target_cpu_usage: f64) -> Self {
Self::new_with_sysinfo(target_mem_usage, target_cpu_usage, RealSysInfo::new())
let opts = ResourceBasedSlotsOptionsBuilder::default()
.target_mem_usage(target_mem_usage)
.target_cpu_usage(target_cpu_usage)
.build()
.expect("default resource based slot options can't fail to build");
Self::new_with_sysinfo(opts, RealSysInfo::new())
}

/// Create an instance using the fully configurable set of PID controller options
pub fn new_from_options(options: ResourceBasedSlotsOptions) -> Self {
Self::new_with_sysinfo(options, RealSysInfo::new())
}
}

impl PidControllers {
fn new(mem_target: f64, cpu_target: f64) -> Self {
let mut mem = pid::Pid::new(mem_target, 100.0);
mem.p(5.0, 100).i(0.0, 100).d(1.0, 100);
let mut cpu = pid::Pid::new(cpu_target, 100.0);
cpu.p(5.0, 100.).i(0.0, 100.).d(1.0, 100.);
fn new(options: &ResourceBasedSlotsOptions) -> Self {
let mut mem = pid::Pid::new(options.target_mem_usage, 100.0);
mem.p(options.mem_p_gain, 100)
.i(options.mem_i_gain, 100)
.d(options.mem_d_gain, 100);
let mut cpu = pid::Pid::new(options.target_cpu_usage, 100.0);
cpu.p(options.cpu_p_gain, 100)
.i(options.cpu_i_gain, 100)
.d(options.cpu_d_gain, 100);
Self { mem, cpu }
}
}
Expand Down Expand Up @@ -230,18 +271,18 @@ impl<MI: SystemResourceInfo + Sync + Send> ResourceBasedSlots<MI> {
))
}

fn new_with_sysinfo(target_mem_usage: f64, target_cpu_usage: f64, sys_info: MI) -> Self {
fn new_with_sysinfo(options: ResourceBasedSlotsOptions, sys_info: MI) -> Self {
Self {
pids: Mutex::new(PidControllers::new(&options)),
options,
metrics: OnceLock::new(),
target_mem_usage,
sys_info_supplier: sys_info,
pids: Mutex::new(PidControllers::new(target_mem_usage, target_cpu_usage)),
last_metric_vals: Arc::new(AtomicCell::new(Default::default())),
}
}

fn can_reserve(&self) -> bool {
self.sys_info_supplier.used_mem_percent() <= self.target_mem_usage
self.sys_info_supplier.used_mem_percent() <= self.options.target_mem_usage
}

/// Returns true if the pid controllers think a new slot should be given out
Expand All @@ -257,7 +298,8 @@ impl<MI: SystemResourceInfo + Sync + Send> ResourceBasedSlots<MI> {
mem_used_percent,
cpu_used_percent,
});
mem_output > 0.25 && cpu_output > 0.05
mem_output > self.options.mem_output_threshold
&& cpu_output > self.options.cpu_output_threshold
}

fn attach_metrics(&self, metrics: TemporalMeter) {
Expand Down Expand Up @@ -373,10 +415,18 @@ mod tests {
}
}

fn test_options() -> ResourceBasedSlotsOptions {
ResourceBasedSlotsOptionsBuilder::default()
.target_mem_usage(0.8)
.target_cpu_usage(1.0)
.build()
.expect("default resource based slot options can't fail to build")
}

#[test]
fn mem_workflow_sync() {
let (fmis, used) = FakeMIS::new();
let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(0.8, 1.0, fmis))
let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(test_options(), fmis))
.as_kind::<WorkflowSlotKind>(0, 100, Duration::from_millis(0));
let pd = MeteredPermitDealer::new(rbs.clone(), MetricsContext::no_op(), None);
assert!(rbs.try_reserve_slot(&pd).is_some());
Expand All @@ -388,7 +438,7 @@ mod tests {
async fn mem_workflow_async() {
let (fmis, used) = FakeMIS::new();
used.store(90_000, Ordering::Release);
let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(0.8, 1.0, fmis))
let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(test_options(), fmis))
.as_kind::<WorkflowSlotKind>(0, 100, Duration::from_millis(0));
let pd = MeteredPermitDealer::new(rbs.clone(), MetricsContext::no_op(), None);
let order = crossbeam_queue::ArrayQueue::new(2);
Expand All @@ -408,7 +458,7 @@ mod tests {
#[test]
fn minimum_respected() {
let (fmis, used) = FakeMIS::new();
let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(0.8, 1.0, fmis))
let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(test_options(), fmis))
.as_kind::<WorkflowSlotKind>(2, 100, Duration::from_millis(0));
let pd = MeteredPermitDealer::new(rbs.clone(), MetricsContext::no_op(), None);
used.store(90_000, Ordering::Release);
Expand Down

0 comments on commit aadb636

Please sign in to comment.