Skip to content

Commit 0e0544e

Browse files
committed
Add overall WorkerTuner trait to bring together suppliers
1 parent 5d567fb commit 0e0544e

File tree

18 files changed

+340
-150
lines changed

18 files changed

+340
-150
lines changed

core-api/src/worker.rs

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,10 @@ pub struct WorkerConfig {
3333
/// or failures.
3434
#[builder(default = "0")]
3535
pub max_cached_workflows: usize,
36-
/// Set a [SlotSupplier] for workflow tasks.
37-
#[builder(setter(into = false))]
38-
pub workflow_task_slot_supplier:
39-
Arc<dyn SlotSupplier<SlotKind = WorkflowSlotKind> + Send + Sync>,
40-
/// Set a [SlotSupplier] for activity tasks.
41-
#[builder(setter(into = false))]
42-
pub activity_task_slot_supplier:
43-
Arc<dyn SlotSupplier<SlotKind = ActivitySlotKind> + Send + Sync>,
44-
/// Set a [SlotSupplier] for local activity tasks.
45-
#[builder(setter(into = false))]
46-
pub local_activity_task_slot_supplier:
47-
Arc<dyn SlotSupplier<SlotKind = LocalActivitySlotKind> + Send + Sync>,
36+
/// Set a [WorkerTuner] for this worker. Either this or at least one of the `max_outstanding_*`
37+
/// fields must be set.
38+
#[builder(setter(into = false, strip_option), default)]
39+
pub tuner: Option<Arc<dyn WorkerTuner + Send + Sync>>,
4840
/// Maximum number of concurrent poll workflow task requests we will perform at a time on this
4941
/// worker's task queue. See also [WorkerConfig::nonsticky_to_sticky_poll_ratio]. Must be at
5042
/// least 1.
@@ -137,6 +129,25 @@ pub struct WorkerConfig {
137129
/// map key).
138130
#[builder(default)]
139131
pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
132+
133+
/// The maximum allowed number of workflow tasks that will ever be given to this worker at one
134+
/// time. Note that one workflow task may require multiple activations - so the WFT counts as
135+
/// "outstanding" until all activations it requires have been completed.
136+
///
137+
/// Mutually exclusive with `tuner`
138+
#[builder(setter(into, strip_option), default)]
139+
pub max_outstanding_workflow_tasks: Option<usize>,
140+
/// The maximum number of activity tasks that will ever be given to this worker concurrently
141+
///
142+
/// Mutually exclusive with `tuner`
143+
#[builder(setter(into, strip_option), default)]
144+
pub max_outstanding_activities: Option<usize>,
145+
/// The maximum number of local activity tasks that will ever be given to this worker
146+
/// concurrently
147+
///
148+
/// Mutually exclusive with `tuner`
149+
#[builder(setter(into, strip_option), default)]
150+
pub max_outstanding_local_activities: Option<usize>,
140151
}
141152

142153
impl WorkerConfig {
@@ -182,6 +193,14 @@ impl WorkerConfigBuilder {
182193
}
183194
}
184195

196+
if self.tuner.is_some()
197+
&& (self.max_outstanding_workflow_tasks.is_some()
198+
|| self.max_outstanding_activities.is_some()
199+
|| self.max_outstanding_local_activities.is_some())
200+
{
201+
return Err("max_outstanding_* fields are mutually exclusive with `tuner`".to_owned());
202+
}
203+
185204
let max_wft_polls = self
186205
.max_concurrent_wft_polls
187206
.unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT);
@@ -213,6 +232,29 @@ impl WorkerConfigBuilder {
213232
}
214233
}
215234

235+
/// This trait allows users to customize the performance characteristics of workers dynamically.
236+
/// For more, see the docstrings of the traits in the return types of its functions.
237+
pub trait WorkerTuner {
238+
/// Return a [SlotSupplier] for workflow tasks
239+
fn workflow_task_slot_supplier(
240+
&self,
241+
) -> Arc<dyn SlotSupplier<SlotKind = WorkflowSlotKind> + Send + Sync>;
242+
243+
/// Return a [SlotSupplier] for activity tasks
244+
fn activity_task_slot_supplier(
245+
&self,
246+
) -> Arc<dyn SlotSupplier<SlotKind = ActivitySlotKind> + Send + Sync>;
247+
248+
/// Return a [SlotSupplier] for local activities
249+
fn local_activity_slot_supplier(
250+
&self,
251+
) -> Arc<dyn SlotSupplier<SlotKind = LocalActivitySlotKind> + Send + Sync>;
252+
253+
/// Core will call this at worker initialization time, allowing the implementation to hook up to
254+
/// metrics if any are configured. If not, it will not be called.
255+
fn attach_metrics(&self, metrics: TemporalMeter);
256+
}
257+
216258
/// Implementing this trait allows users to customize how many tasks of certain kinds the worker
217259
/// will perform concurrently.
218260
///
@@ -246,10 +288,6 @@ pub trait SlotSupplier {
246288
fn available_slots(&self) -> Option<usize> {
247289
None
248290
}
249-
250-
/// Core will call this at worker initialization time, allowing the implementation to hook up to
251-
/// metrics if any are configured. If not, it will not be called.
252-
fn attach_metrics(&self, metrics: TemporalMeter);
253291
}
254292

255293
pub trait SlotReservationContext: Send + Sync {

core/src/core_tests/activity_tasks.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,8 @@ use crate::{
66
single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs, MocksHolder,
77
QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
88
},
9-
worker::{
10-
client::mocks::{mock_manual_workflow_client, mock_workflow_client},
11-
slot_supplier::FixedSizeSlotSupplier,
12-
},
13-
ActivityHeartbeat, Worker, WorkerConfigSlotSupplierExt,
9+
worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client},
10+
ActivityHeartbeat, Worker,
1411
};
1512
use futures::FutureExt;
1613
use itertools::Itertools;
@@ -921,7 +918,7 @@ async fn activity_tasks_from_completion_reserve_slots() {
921918
let mut mock = build_mock_pollers(mh);
922919
mock.worker_cfg(|cfg| {
923920
cfg.max_cached_workflows = 2;
924-
cfg.activity_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(2));
921+
cfg.max_outstanding_activities = Some(2);
925922
});
926923
mock.set_act_poller(mock_poller_from_resps(act_tasks));
927924
let core = Arc::new(mock_worker(mock));

core/src/core_tests/local_activities.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ use crate::{
55
build_mock_pollers, hist_to_poll_resp, mock_sdk, mock_sdk_cfg, mock_worker,
66
single_hist_mock_sg, MockPollCfg, ResponseType, WorkerExt,
77
},
8-
worker::{
9-
client::mocks::mock_workflow_client, slot_supplier::FixedSizeSlotSupplier, LEGACY_QUERY_ID,
10-
},
8+
worker::{client::mocks::mock_workflow_client, LEGACY_QUERY_ID},
119
};
1210
use anyhow::anyhow;
1311
use crossbeam_queue::SegQueue;
@@ -189,7 +187,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
189187
mh.enforce_correct_number_of_polls = false;
190188
let mut worker = mock_sdk_cfg(mh, |wc| {
191189
wc.max_cached_workflows = 1;
192-
wc.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1));
190+
wc.max_outstanding_workflow_tasks = Some(1);
193191
});
194192
let core = worker.core_worker.clone();
195193

@@ -1086,7 +1084,7 @@ async fn local_act_records_nonfirst_attempts_ok() {
10861084
}));
10871085
let mut worker = mock_sdk_cfg(mh, |wc| {
10881086
wc.max_cached_workflows = 1;
1089-
wc.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1));
1087+
wc.max_outstanding_workflow_tasks = Some(1);
10901088
});
10911089

10921090
worker.register_wf(

core/src/core_tests/workflow_tasks.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ use crate::{
1212
},
1313
worker::{
1414
client::mocks::{mock_manual_workflow_client, mock_workflow_client},
15-
slot_supplier::FixedSizeSlotSupplier,
15+
TunerBuilder,
1616
},
17-
Worker, WorkerConfigSlotSupplierExt,
17+
Worker,
1818
};
1919
use futures::{stream, FutureExt};
2020
use rstest::{fixture, rstest};
@@ -31,7 +31,6 @@ use temporal_client::WorkflowOptions;
3131
use temporal_sdk::{ActivityOptions, CancellableFuture, WfContext};
3232
use temporal_sdk_core_api::{
3333
errors::PollWfError,
34-
telemetry::metrics::TemporalMeter,
3534
worker::{
3635
SlotKind, SlotReservationContext, SlotSupplier, SlotSupplierPermit, WorkflowSlotKind,
3736
},
@@ -921,7 +920,7 @@ async fn max_wft_respected() {
921920
let mh = MockPollCfg::new(hists.into_iter().collect(), true, 0);
922921
let mut worker = mock_sdk_cfg(mh, |cfg| {
923922
cfg.max_cached_workflows = total_wfs as usize;
924-
cfg.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1));
923+
cfg.max_outstanding_workflow_tasks = Some(1);
925924
});
926925
let active_count: &'static _ = Box::leak(Box::new(Semaphore::new(1)));
927926
worker.register_wf(DEFAULT_WORKFLOW_TYPE, move |ctx: WfContext| async move {
@@ -1516,7 +1515,7 @@ async fn failing_wft_doesnt_eat_permit_forever() {
15161515
let mut mock = build_mock_pollers(mock);
15171516
mock.worker_cfg(|cfg| {
15181517
cfg.max_cached_workflows = 2;
1519-
cfg.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(2));
1518+
cfg.max_outstanding_workflow_tasks = Some(2);
15201519
});
15211520
let outstanding_mock_tasks = mock.outstanding_task_map.clone();
15221521
let worker = mock_worker(mock);
@@ -1592,7 +1591,7 @@ async fn cache_miss_will_fetch_history() {
15921591
mock.worker_cfg(|cfg| {
15931592
cfg.max_cached_workflows = 1;
15941593
// Also verifies tying the WFT permit to the fetch request doesn't get us stuck
1595-
cfg.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1));
1594+
cfg.max_outstanding_workflow_tasks = Some(1);
15961595
});
15971596
let worker = mock_worker(mock);
15981597

@@ -1818,7 +1817,7 @@ async fn poll_faster_than_complete_wont_overflow_cache() {
18181817
let mut mock = build_mock_pollers(mock_cfg);
18191818
mock.worker_cfg(|wc| {
18201819
wc.max_cached_workflows = 3;
1821-
wc.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(3));
1820+
wc.max_outstanding_workflow_tasks = Some(3);
18221821
});
18231822
let core = mock_worker(mock);
18241823
// Poll 4 times, completing once, such that max tasks are never exceeded
@@ -2902,13 +2901,16 @@ async fn slot_provider_cant_hand_out_more_permits_than_cache_size() {
29022901
fn available_slots(&self) -> Option<usize> {
29032902
None
29042903
}
2905-
fn attach_metrics(&self, _: TemporalMeter) {}
29062904
}
29072905

29082906
let worker = Worker::new_test(
29092907
test_worker_cfg()
29102908
.max_cached_workflows(10_usize)
2911-
.workflow_task_slot_supplier(Arc::new(EndlessSupplier {}))
2909+
.tuner(
2910+
TunerBuilder::default()
2911+
.workflow_slot_supplier(Arc::new(EndlessSupplier {}))
2912+
.build(),
2913+
)
29122914
.max_concurrent_wft_polls(10_usize)
29132915
.no_remote_activities(true)
29142916
.build()

core/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ pub use temporal_sdk_core_protos as protos;
3939
pub use temporal_sdk_core_protos::TaskToken;
4040
pub use url::Url;
4141
pub use worker::{
42-
RealSysInfo, ResourceBasedSlots, Worker, WorkerConfig, WorkerConfigBuilder,
43-
WorkerConfigSlotSupplierExt,
42+
RealSysInfo, ResourceBasedSlots, ResourceBasedTuner, ResourceSlotOptions, TunerBuilder,
43+
TunerHolder, Worker, WorkerConfig, WorkerConfigBuilder,
4444
};
4545

4646
use crate::{

core/src/test_help/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use crate::{
99
client::{
1010
mocks::mock_workflow_client, MockWorkerClient, WorkerClient, WorkflowTaskCompletion,
1111
},
12-
slot_supplier::FixedSizeSlotSupplier,
1312
TaskPollers,
1413
},
1514
TaskToken, Worker, WorkerConfig, WorkerConfigBuilder,
@@ -62,9 +61,6 @@ pub(crate) fn test_worker_cfg() -> WorkerConfigBuilder {
6261
let mut wcb = WorkerConfigBuilder::default();
6362
wcb.namespace(NAMESPACE)
6463
.task_queue(TEST_Q)
65-
.workflow_task_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(100)))
66-
.activity_task_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(100)))
67-
.local_activity_task_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(100)))
6864
.worker_build_id("test_bin_id")
6965
.ignore_evicts_on_shutdown(true)
7066
// Serial polling since it makes mocking much easier.

core/src/worker/mod.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ mod activities;
22
pub(crate) mod client;
33
mod slot_provider;
44
pub(crate) mod slot_supplier;
5+
mod tuner;
56
mod workflow;
67

7-
pub use slot_supplier::{RealSysInfo, ResourceBasedSlots, WorkerConfigSlotSupplierExt};
8+
pub use slot_supplier::{RealSysInfo, ResourceBasedSlots, ResourceBasedTuner, ResourceSlotOptions};
89
pub use temporal_sdk_core_api::worker::{WorkerConfig, WorkerConfigBuilder};
10+
pub use tuner::{TunerBuilder, TunerHolder};
911

1012
pub(crate) use activities::{
1113
ExecutingLAId, LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution,
@@ -250,21 +252,19 @@ impl Worker {
250252
} else {
251253
(MetricsContext::no_op(), None)
252254
};
255+
let tuner = config
256+
.tuner
257+
.as_ref()
258+
.cloned()
259+
.unwrap_or_else(|| TunerBuilder::from_config(&config).build());
260+
253261
metrics.worker_registered();
254262
if let Some(meter) = meter {
255-
config
256-
.workflow_task_slot_supplier
257-
.attach_metrics(meter.clone());
258-
config
259-
.activity_task_slot_supplier
260-
.attach_metrics(meter.clone());
261-
config
262-
.local_activity_task_slot_supplier
263-
.attach_metrics(meter);
263+
tuner.attach_metrics(meter.clone());
264264
}
265265
let shutdown_token = CancellationToken::new();
266266
let wft_slots = Arc::new(MeteredPermitDealer::new(
267-
config.workflow_task_slot_supplier.clone(),
267+
tuner.workflow_task_slot_supplier(),
268268
metrics.with_new_attrs([workflow_worker_type()]),
269269
if config.max_cached_workflows > 0 {
270270
// Since we always need to be able to poll the normal task queue as well as the
@@ -275,7 +275,7 @@ impl Worker {
275275
},
276276
));
277277
let act_slots = Arc::new(MeteredPermitDealer::new(
278-
config.activity_task_slot_supplier.clone(),
278+
tuner.activity_task_slot_supplier(),
279279
metrics.with_new_attrs([activity_worker_type()]),
280280
None,
281281
));
@@ -378,7 +378,7 @@ impl Worker {
378378

379379
let (hb_tx, hb_rx) = unbounded_channel();
380380
let local_act_mgr = Arc::new(LocalActivityManager::new(
381-
config.local_activity_task_slot_supplier.clone(),
381+
tuner.local_activity_slot_supplier(),
382382
config.namespace.clone(),
383383
hb_tx,
384384
metrics.with_new_attrs([local_activity_worker_type()]),

core/src/worker/slot_supplier.rs

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use std::{marker::PhantomData, sync::Arc};
22
use temporal_sdk_core_api::worker::{
3-
SlotKind, SlotReservationContext, SlotSupplier, SlotSupplierPermit, WorkerConfigBuilder,
3+
SlotKind, SlotReservationContext, SlotSupplier, SlotSupplierPermit,
44
};
55
use tokio::sync::Semaphore;
66

77
mod resource_based;
88

9-
pub use resource_based::{RealSysInfo, ResourceBasedSlots};
10-
use temporal_sdk_core_api::telemetry::metrics::TemporalMeter;
9+
pub use resource_based::{
10+
RealSysInfo, ResourceBasedSlots, ResourceBasedTuner, ResourceSlotOptions,
11+
};
1112

1213
pub(crate) struct FixedSizeSlotSupplier<SK> {
1314
sem: Arc<Semaphore>,
@@ -52,41 +53,4 @@ where
5253
fn available_slots(&self) -> Option<usize> {
5354
Some(self.sem.available_permits())
5455
}
55-
56-
fn attach_metrics(&self, _: TemporalMeter) {
57-
// Doesn't need to do anything. Metrics tracking is handled by `MeteredPermitDealer`.
58-
}
59-
}
60-
61-
/// Extension trait providing backwards compatibility with old fixed-size slot options
62-
pub trait WorkerConfigSlotSupplierExt {
63-
/// Creates a [FixedSizeSlotSupplier] using the provided max and assigns it as the workflow
64-
/// task slot supplier
65-
fn max_outstanding_workflow_tasks(&mut self, max: usize) -> &mut Self;
66-
/// Creates a [FixedSizeSlotSupplier] using the provided max and assigns it as the activity task
67-
/// slot supplier
68-
fn max_outstanding_activities(&mut self, max: usize) -> &mut Self;
69-
/// Creates a [FixedSizeSlotSupplier] using the provided max and assigns it as the local
70-
/// activity task slot supplier
71-
fn max_outstanding_local_activities(&mut self, max: usize) -> &mut Self;
72-
}
73-
74-
impl WorkerConfigSlotSupplierExt for WorkerConfigBuilder {
75-
fn max_outstanding_workflow_tasks(&mut self, max: usize) -> &mut Self {
76-
let fsss = FixedSizeSlotSupplier::new(max);
77-
self.workflow_task_slot_supplier(Arc::new(fsss));
78-
self
79-
}
80-
81-
fn max_outstanding_activities(&mut self, max: usize) -> &mut Self {
82-
let fsss = FixedSizeSlotSupplier::new(max);
83-
self.activity_task_slot_supplier(Arc::new(fsss));
84-
self
85-
}
86-
87-
fn max_outstanding_local_activities(&mut self, max: usize) -> &mut Self {
88-
let fsss = FixedSizeSlotSupplier::new(max);
89-
self.local_activity_task_slot_supplier(Arc::new(fsss));
90-
self
91-
}
9256
}

0 commit comments

Comments
 (0)