Skip to content

Commit 5886007

Browse files
committed
PID controller w/ mem & cpu
1 parent b6b91c9 commit 5886007

File tree

3 files changed

+124
-60
lines changed

3 files changed

+124
-60
lines changed

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ opentelemetry_sdk = { version = "0.22", features = ["rt-tokio", "metrics"], opti
4848
opentelemetry-otlp = { version = "0.15", features = ["tokio", "metrics"], optional = true }
4949
opentelemetry-prometheus = { version = "0.15", optional = true }
5050
parking_lot = { version = "0.12", features = ["send_guard"] }
51+
pid = "4.0"
5152
pin-project = "1.0"
5253
prometheus = "0.13"
5354
prost = { workspace = true }

core/src/worker/slot_supplier/resource_based.rs

Lines changed: 107 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ use crossbeam_utils::atomic::AtomicCell;
22
use parking_lot::Mutex;
33
use std::{
44
marker::PhantomData,
5-
ops::SubAssign,
6-
sync::{atomic::AtomicU64, Arc},
5+
sync::{
6+
atomic::{AtomicU32, AtomicU64, Ordering},
7+
Arc,
8+
},
79
time::{Duration, Instant},
810
};
911
use temporal_sdk_core_api::worker::{
@@ -14,40 +16,57 @@ use tokio::sync::watch;
1416

1517
pub struct ResourceBasedSlots<MI> {
1618
target_mem_usage: f64,
17-
assumed_maximum_marginal_contribution: f64,
18-
mem_info_supplier: MI,
19-
max: usize,
19+
target_cpu_usage: f32,
20+
sys_info_supplier: MI,
2021
}
2122
pub struct ResourceBasedSlotsForType<MI, SK> {
2223
inner: Arc<ResourceBasedSlots<MI>>,
2324
minimum: usize,
25+
/// Maximum amount of slots of this type permitted
26+
max: usize,
2427
/// Minimum time we will wait (after passing the minimum slots number) between handing out new
2528
/// slots
2629
ramp_throttle: Duration,
2730

31+
pids: Arc<Mutex<PidControllers>>,
2832
last_slot_issued_tx: watch::Sender<Instant>,
2933
last_slot_issued_rx: watch::Receiver<Instant>,
3034
_slot_kind: PhantomData<SK>,
3135
}
36+
struct PidControllers {
37+
mem: pid::Pid<f64>,
38+
cpu: pid::Pid<f32>,
39+
}
3240

33-
impl ResourceBasedSlots<SysinfoMem> {
34-
pub fn new(target_mem_usage: f64, marginal_contribution: f64, max: usize) -> Self {
41+
impl ResourceBasedSlots<RealSysInfo> {
42+
pub fn new(target_mem_usage: f64, target_cpu_usage: f32) -> Self {
3543
Self {
3644
target_mem_usage,
37-
assumed_maximum_marginal_contribution: marginal_contribution,
38-
mem_info_supplier: SysinfoMem::new(),
39-
max,
45+
target_cpu_usage,
46+
sys_info_supplier: RealSysInfo::new(),
4047
}
4148
}
4249
}
4350

51+
impl PidControllers {
52+
fn new(mem_target: f64, cpu_target: f32) -> Self {
53+
let mut mem = pid::Pid::new(mem_target, 100.0);
54+
mem.p(5.0, 100).i(0.0, 100).d(1.0, 100);
55+
let mut cpu = pid::Pid::new(cpu_target, 100.0);
56+
cpu.p(5.0, 100.).i(0.0, 100.).d(1.0, 100.);
57+
Self { mem, cpu }
58+
}
59+
}
60+
4461
trait MemoryInfo {
4562
/// Return total available system memory in bytes
4663
fn total_mem(&self) -> u64;
4764
/// Return memory used by this process in bytes
4865
// TODO: probably needs to be just overall used... won't work w/ subprocesses for example
4966
fn process_used_mem(&self) -> u64;
5067

68+
fn used_cpu_percent(&self) -> f32;
69+
5170
fn process_used_percent(&self) -> f64 {
5271
self.process_used_mem() as f64 / self.total_mem() as f64
5372
}
@@ -79,7 +98,9 @@ where
7998

8099
fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option<SlotSupplierPermit> {
81100
if self.time_since_last_issued() > self.ramp_throttle
82-
&& self.inner.can_reserve(ctx.num_issued_slots())
101+
&& ctx.num_issued_slots() < self.max
102+
&& self.pid_decision()
103+
&& self.inner.can_reserve()
83104
{
84105
let _ = self.last_slot_issued_tx.send(Instant::now());
85106
Some(SlotSupplierPermit::NoData)
@@ -102,84 +123,105 @@ where
102123
MI: MemoryInfo + Send + Sync,
103124
SK: SlotKind + Send + Sync,
104125
{
126+
fn new(
127+
inner: Arc<ResourceBasedSlots<MI>>,
128+
minimum: usize,
129+
max: usize,
130+
ramp_throttle: Duration,
131+
) -> Self {
132+
let (tx, rx) = watch::channel(Instant::now());
133+
Self {
134+
minimum,
135+
max,
136+
ramp_throttle,
137+
pids: Arc::new(Mutex::new(PidControllers::new(
138+
inner.target_mem_usage,
139+
inner.target_cpu_usage,
140+
))),
141+
inner,
142+
last_slot_issued_tx: tx,
143+
last_slot_issued_rx: rx,
144+
_slot_kind: PhantomData,
145+
}
146+
}
147+
105148
fn time_since_last_issued(&self) -> Duration {
106149
Instant::now()
107150
.checked_duration_since(*self.last_slot_issued_rx.borrow())
108151
.unwrap_or_default()
109152
}
153+
154+
/// Returns true if the pid controllers think a new slot should be given out
155+
fn pid_decision(&self) -> bool {
156+
let mut pids = self.pids.lock();
157+
let mem_output = pids
158+
.mem
159+
.next_control_output(self.inner.sys_info_supplier.process_used_percent())
160+
.output;
161+
let cpu_output = pids
162+
.cpu
163+
.next_control_output(self.inner.sys_info_supplier.used_cpu_percent())
164+
.output;
165+
mem_output > 0.25 && cpu_output > 0.25
166+
}
110167
}
111168

112169
impl<MI> WorkflowCacheSizer for ResourceBasedSlots<MI>
113170
where
114-
MI: MemoryInfo + Sync,
171+
MI: MemoryInfo + Sync + Send,
115172
{
116-
fn can_allow_workflow(
117-
&self,
118-
slots_info: &WorkflowSlotsInfo,
119-
_new_task: &WorkflowSlotInfo,
120-
) -> bool {
121-
self.can_reserve(slots_info.used_slots.len())
173+
fn can_allow_workflow(&self, _: &WorkflowSlotsInfo, _: &WorkflowSlotInfo) -> bool {
174+
self.can_reserve()
122175
}
123176
}
124177

125-
impl<MI: MemoryInfo + Sync> ResourceBasedSlots<MI> {
178+
impl<MI: MemoryInfo + Sync + Send> ResourceBasedSlots<MI> {
126179
// TODO: Can just be an into impl probably?
127180
pub fn as_kind<SK: SlotKind + Send + Sync>(
128181
self: &Arc<Self>,
129182
minimum: usize,
183+
max: usize,
130184
ramp_throttle: Duration,
131185
) -> Arc<ResourceBasedSlotsForType<MI, SK>> {
132-
let (tx, rx) = watch::channel(Instant::now());
133-
Arc::new(ResourceBasedSlotsForType {
134-
inner: self.clone(),
186+
Arc::new(ResourceBasedSlotsForType::new(
187+
self.clone(),
135188
minimum,
189+
max,
136190
ramp_throttle,
137-
last_slot_issued_tx: tx,
138-
last_slot_issued_rx: rx,
139-
_slot_kind: PhantomData,
140-
})
191+
))
141192
}
142193

143194
pub fn into_kind<SK: SlotKind + Send + Sync>(self) -> ResourceBasedSlotsForType<MI, SK> {
144-
let (tx, rx) = watch::channel(Instant::now());
145-
ResourceBasedSlotsForType {
146-
inner: Arc::new(self),
147-
// TODO: Configure
148-
minimum: 1,
149-
ramp_throttle: Duration::from_millis(0),
150-
last_slot_issued_tx: tx,
151-
last_slot_issued_rx: rx,
152-
_slot_kind: PhantomData,
153-
}
195+
// TODO: remove or parameterize
196+
ResourceBasedSlotsForType::new(Arc::new(self), 1, 1000, Duration::from_millis(0))
154197
}
155198

156-
fn can_reserve(&self, num_used: usize) -> bool {
157-
if num_used > self.max {
158-
return false;
159-
}
160-
self.mem_info_supplier.process_used_percent() + self.assumed_maximum_marginal_contribution
161-
<= self.target_mem_usage
199+
fn can_reserve(&self) -> bool {
200+
self.sys_info_supplier.process_used_percent() <= self.target_mem_usage
162201
}
163202
}
164203

165204
#[derive(Debug)]
166-
pub struct SysinfoMem {
205+
pub struct RealSysInfo {
167206
sys: Mutex<sysinfo::System>,
168207
pid: sysinfo::Pid,
169208
cur_mem_usage: AtomicU64,
209+
cur_cpu_usage: AtomicU32,
170210
last_refresh: AtomicCell<Instant>,
171211
}
172-
impl SysinfoMem {
212+
impl RealSysInfo {
173213
fn new() -> Self {
174214
let mut sys = sysinfo::System::new();
175215
let pid = sysinfo::get_current_pid().expect("get pid works");
176216
sys.refresh_processes();
177217
sys.refresh_memory();
218+
sys.refresh_cpu();
178219
Self {
179220
sys: Default::default(),
180221
last_refresh: AtomicCell::new(Instant::now()),
181222
pid,
182223
cur_mem_usage: AtomicU64::new(0),
224+
cur_cpu_usage: AtomicU32::new(0),
183225
}
184226
}
185227
fn refresh_if_needed(&self) {
@@ -189,23 +231,32 @@ impl SysinfoMem {
189231
let mut lock = self.sys.lock();
190232
lock.refresh_memory();
191233
lock.refresh_processes();
234+
lock.refresh_cpu_usage();
192235
let proc = lock.process(self.pid).expect("exists");
193236
self.cur_mem_usage
194-
.store(dbg!(proc.memory()), std::sync::atomic::Ordering::Release);
237+
.store(dbg!(proc.memory()), Ordering::Release);
238+
self.cur_cpu_usage.store(
239+
dbg!(lock.global_cpu_info().cpu_usage()).to_bits(),
240+
Ordering::Release,
241+
);
195242
self.last_refresh.store(Instant::now())
196243
}
197244
}
198245
}
199-
impl MemoryInfo for SysinfoMem {
246+
impl MemoryInfo for RealSysInfo {
200247
fn total_mem(&self) -> u64 {
201248
self.refresh_if_needed();
202249
self.sys.lock().total_memory()
203250
}
204251

205252
fn process_used_mem(&self) -> u64 {
206253
self.refresh_if_needed();
207-
self.cur_mem_usage
208-
.load(std::sync::atomic::Ordering::Acquire)
254+
self.cur_mem_usage.load(Ordering::Acquire)
255+
}
256+
257+
fn used_cpu_percent(&self) -> f32 {
258+
self.refresh_if_needed();
259+
f32::from_bits(self.cur_cpu_usage.load(Ordering::Acquire))
209260
}
210261
}
211262

@@ -235,6 +286,10 @@ mod tests {
235286
fn process_used_mem(&self) -> u64 {
236287
self.used.load(Ordering::Acquire)
237288
}
289+
290+
fn used_cpu_percent(&self) -> f32 {
291+
todo!()
292+
}
238293
}
239294
struct FakeResCtx {}
240295
impl SlotReservationContext for FakeResCtx {
@@ -248,9 +303,8 @@ mod tests {
248303
let (fmis, used) = FakeMIS::new();
249304
let rbs = ResourceBasedSlots {
250305
target_mem_usage: 0.8,
251-
assumed_maximum_marginal_contribution: 0.1,
252-
mem_info_supplier: fmis,
253-
max: 1000,
306+
target_cpu_usage: 1.0,
307+
sys_info_supplier: fmis,
254308
}
255309
.into_kind::<WorkflowSlotKind>();
256310
assert!(rbs.try_reserve_slot(&FakeResCtx {}).is_some());
@@ -264,9 +318,8 @@ mod tests {
264318
used.store(90_000, Ordering::Release);
265319
let rbs = ResourceBasedSlots {
266320
target_mem_usage: 0.8,
267-
assumed_maximum_marginal_contribution: 0.1,
268-
mem_info_supplier: fmis,
269-
max: 1000,
321+
target_cpu_usage: 1.0,
322+
sys_info_supplier: fmis,
270323
}
271324
.into_kind::<WorkflowSlotKind>();
272325
let order = crossbeam_queue::ArrayQueue::new(2);

tests/heavy_tests.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ use temporal_sdk_core_protos::{
1010
coresdk::{workflow_commands::ActivityCancellationType, AsJsonPayloadExt},
1111
temporal::api::enums::v1::WorkflowIdReusePolicy,
1212
};
13-
use temporal_sdk_core_test_utils::{workflows::la_problem_workflow, CoreWfStarter};
13+
use temporal_sdk_core_test_utils::{
14+
canned_histories::cancel_scheduled_activity, workflows::la_problem_workflow, CoreWfStarter,
15+
};
1416

1517
mod fuzzy_workflow;
1618

@@ -96,11 +98,16 @@ async fn chunky_activities() {
9698
// .worker_config
9799
// .max_outstanding_activities(25)
98100
// .max_outstanding_workflow_tasks(25);
99-
let resource_slots = Arc::new(ResourceBasedSlots::new(0.5, 0.01, 1024));
101+
// TODO: Fix /1 or /100 thing
102+
let resource_slots = Arc::new(ResourceBasedSlots::new(0.5, 90.0));
100103
starter
101104
.worker_config
102-
.workflow_task_slot_supplier(resource_slots.as_kind(25, Duration::from_millis(0)))
103-
.activity_task_slot_supplier(resource_slots.as_kind(5, Duration::from_millis(100)));
105+
.workflow_task_slot_supplier(resource_slots.as_kind(
106+
25,
107+
WORKFLOWS,
108+
Duration::from_millis(0),
109+
))
110+
.activity_task_slot_supplier(resource_slots.as_kind(5, 1000, Duration::from_millis(50)));
104111
let mut worker = starter.worker().await;
105112

106113
let activity_id = "act-1";
@@ -177,8 +184,11 @@ async fn workflow_load() {
177184
let mut starter = CoreWfStarter::new("workflow_load");
178185
starter.worker_config.max_concurrent_wft_polls(10_usize);
179186
starter.worker_config.workflow_task_slot_supplier(
180-
Arc::new(ResourceBasedSlots::new(0.7, 0.01, cache_size))
181-
.as_kind(10, Duration::from_millis(0)),
187+
Arc::new(ResourceBasedSlots::new(0.7, 0.9)).as_kind(
188+
10,
189+
cache_size,
190+
Duration::from_millis(0),
191+
),
182192
);
183193
starter
184194
// .max_wft(100)

0 commit comments

Comments
 (0)