Skip to content

Commit 84e10bf

Browse files
authored
Worker Slots interface and Resource Based Autotuner First Cut (#719)
Implements the first portions of the slot management proposal and an initial take on resource-based auto-tuning.
1 parent f859376 commit 84e10bf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1676
-458
lines changed

core-api/src/worker.rs

Lines changed: 184 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
use crate::errors::WorkflowErrorType;
1+
use crate::{errors::WorkflowErrorType, telemetry::metrics::TemporalMeter};
22
use std::{
3+
any::Any,
34
collections::{HashMap, HashSet},
5+
sync::Arc,
46
time::Duration,
57
};
68

7-
const MAX_OUTSTANDING_WFT_DEFAULT: usize = 100;
89
const MAX_CONCURRENT_WFT_POLLS_DEFAULT: usize = 5;
910

1011
/// Defines per-worker configuration options
11-
#[derive(Debug, Clone, derive_builder::Builder)]
12+
#[derive(Clone, derive_builder::Builder)]
1213
#[builder(setter(into), build_fn(validate = "Self::validate"))]
1314
#[non_exhaustive]
1415
pub struct WorkerConfig {
@@ -32,20 +33,10 @@ pub struct WorkerConfig {
3233
/// or failures.
3334
#[builder(default = "0")]
3435
pub max_cached_workflows: usize,
35-
/// The maximum allowed number of workflow tasks that will ever be given to this worker at one
36-
/// time. Note that one workflow task may require multiple activations - so the WFT counts as
37-
/// "outstanding" until all activations it requires have been completed.
38-
///
39-
/// Cannot be larger than `max_cached_workflows`.
40-
#[builder(default = "MAX_OUTSTANDING_WFT_DEFAULT")]
41-
pub max_outstanding_workflow_tasks: usize,
42-
/// The maximum number of activity tasks that will ever be given to this worker concurrently
43-
#[builder(default = "100")]
44-
pub max_outstanding_activities: usize,
45-
/// The maximum number of local activity tasks that will ever be given to this worker
46-
/// concurrently
47-
#[builder(default = "100")]
48-
pub max_outstanding_local_activities: usize,
36+
/// Set a [WorkerTuner] for this worker. Either this or at least one of the `max_outstanding_*`
37+
/// fields must be set.
38+
#[builder(setter(into = false, strip_option), default)]
39+
pub tuner: Option<Arc<dyn WorkerTuner + Send + Sync>>,
4940
/// Maximum number of concurrent poll workflow task requests we will perform at a time on this
5041
/// worker's task queue. See also [WorkerConfig::nonsticky_to_sticky_poll_ratio]. Must be at
5142
/// least 1.
@@ -138,6 +129,25 @@ pub struct WorkerConfig {
138129
/// map key).
139130
#[builder(default)]
140131
pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
132+
133+
/// The maximum allowed number of workflow tasks that will ever be given to this worker at one
134+
/// time. Note that one workflow task may require multiple activations - so the WFT counts as
135+
/// "outstanding" until all activations it requires have been completed.
136+
///
137+
/// Mutually exclusive with `tuner`
138+
#[builder(setter(into, strip_option), default)]
139+
pub max_outstanding_workflow_tasks: Option<usize>,
140+
/// The maximum number of activity tasks that will ever be given to this worker concurrently
141+
///
142+
/// Mutually exclusive with `tuner`
143+
#[builder(setter(into, strip_option), default)]
144+
pub max_outstanding_activities: Option<usize>,
145+
/// The maximum number of local activity tasks that will ever be given to this worker
146+
/// concurrently
147+
///
148+
/// Mutually exclusive with `tuner`
149+
#[builder(setter(into, strip_option), default)]
150+
pub max_outstanding_local_activities: Option<usize>,
141151
}
142152

143153
impl WorkerConfig {
@@ -167,53 +177,51 @@ impl WorkerConfig {
167177
}
168178

169179
impl WorkerConfigBuilder {
180+
/// Unset all `max_outstanding_*` fields
181+
pub fn clear_max_outstanding_opts(&mut self) -> &mut Self {
182+
self.max_outstanding_workflow_tasks = None;
183+
self.max_outstanding_activities = None;
184+
self.max_outstanding_local_activities = None;
185+
self
186+
}
187+
170188
fn validate(&self) -> Result<(), String> {
171189
if self.max_concurrent_wft_polls == Some(0) {
172190
return Err("`max_concurrent_wft_polls` must be at least 1".to_owned());
173191
}
174192
if self.max_concurrent_at_polls == Some(0) {
175193
return Err("`max_concurrent_at_polls` must be at least 1".to_owned());
176194
}
177-
if self.max_cached_workflows > Some(0)
178-
&& self.max_outstanding_workflow_tasks > self.max_cached_workflows
179-
{
180-
return Err(
181-
"Maximum concurrent workflow tasks cannot exceed the maximum number of cached \
182-
workflows"
183-
.to_owned(),
184-
);
185-
}
195+
186196
if let Some(Some(ref x)) = self.max_worker_activities_per_second {
187197
if !x.is_normal() || x.is_sign_negative() {
188198
return Err(
189199
"`max_worker_activities_per_second` must be positive and nonzero".to_owned(),
190200
);
191201
}
192202
}
193-
if matches!(self.max_concurrent_wft_polls, Some(1))
194-
&& self.max_cached_workflows > Some(0)
195-
&& self
196-
.max_outstanding_workflow_tasks
197-
.unwrap_or(MAX_OUTSTANDING_WFT_DEFAULT)
198-
<= 1
203+
204+
if self.tuner.is_some()
205+
&& (self.max_outstanding_workflow_tasks.is_some()
206+
|| self.max_outstanding_activities.is_some()
207+
|| self.max_outstanding_local_activities.is_some())
199208
{
200-
return Err(
201-
"`max_outstanding_workflow_tasks` must be at at least 2 when \
202-
`max_cached_workflows` is nonzero"
203-
.to_owned(),
204-
);
209+
return Err("max_outstanding_* fields are mutually exclusive with `tuner`".to_owned());
205210
}
206-
if self
211+
212+
let max_wft_polls = self
207213
.max_concurrent_wft_polls
208-
.unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT)
209-
> self
210-
.max_outstanding_workflow_tasks
211-
.unwrap_or(MAX_OUTSTANDING_WFT_DEFAULT)
212-
{
213-
return Err(
214-
"`max_concurrent_wft_polls` cannot exceed `max_outstanding_workflow_tasks`"
215-
.to_owned(),
216-
);
214+
.unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT);
215+
216+
// It wouldn't make any sense to have more outstanding polls than workflows we can possibly
217+
// cache. If we allow this at low values it's possible for sticky pollers to reserve all
218+
// available slots, crowding out the normal queue and gumming things up.
219+
if let Some(max_cache) = self.max_cached_workflows {
220+
if max_cache > 0 && max_wft_polls > max_cache {
221+
return Err(
222+
"`max_concurrent_wft_polls` cannot exceed `max_cached_workflows`".to_owned(),
223+
);
224+
}
217225
}
218226

219227
if self.use_worker_versioning.unwrap_or_default()
@@ -231,3 +239,132 @@ impl WorkerConfigBuilder {
231239
Ok(())
232240
}
233241
}
242+
243+
/// This trait allows users to customize the performance characteristics of workers dynamically.
244+
/// For more, see the docstrings of the traits in the return types of its functions.
245+
pub trait WorkerTuner {
246+
/// Return a [SlotSupplier] for workflow tasks
247+
fn workflow_task_slot_supplier(
248+
&self,
249+
) -> Arc<dyn SlotSupplier<SlotKind = WorkflowSlotKind> + Send + Sync>;
250+
251+
/// Return a [SlotSupplier] for activity tasks
252+
fn activity_task_slot_supplier(
253+
&self,
254+
) -> Arc<dyn SlotSupplier<SlotKind = ActivitySlotKind> + Send + Sync>;
255+
256+
/// Return a [SlotSupplier] for local activities
257+
fn local_activity_slot_supplier(
258+
&self,
259+
) -> Arc<dyn SlotSupplier<SlotKind = LocalActivitySlotKind> + Send + Sync>;
260+
261+
/// Core will call this at worker initialization time, allowing the implementation to hook up to
262+
/// metrics if any are configured. If not, it will not be called.
263+
fn attach_metrics(&self, metrics: TemporalMeter);
264+
}
265+
266+
/// Implementing this trait allows users to customize how many tasks of certain kinds the worker
267+
/// will perform concurrently.
268+
///
269+
/// Note that, for implementations on workflow tasks ([WorkflowSlotKind]), workers that have the
270+
/// workflow cache enabled should be willing to hand out _at least_ two slots, to avoid the worker
271+
/// becoming stuck only polling on the worker's sticky queue.
272+
#[async_trait::async_trait]
273+
pub trait SlotSupplier {
274+
type SlotKind: SlotKind;
275+
/// Block until a slot is available, then return a permit for the slot.
276+
async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit;
277+
278+
/// Try to immediately reserve a slot, returning None if one is not available
279+
fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option<SlotSupplierPermit>;
280+
281+
/// Marks a slot as actually now being used. This is separate from reserving one because the
282+
/// pollers need to reserve a slot before they have actually obtained work from server. Once
283+
/// that task is obtained (and validated) then the slot can actually be used to work on the
284+
/// task.
285+
///
286+
/// Users' implementation of this can choose to emit metrics, or otherwise leverage the
287+
/// information provided by the `info` parameter to be better able to make future decisions
288+
/// about whether a slot should be handed out.
289+
fn mark_slot_used(&self, info: <Self::SlotKind as SlotKind>::Info<'_>);
290+
291+
/// Frees a slot.
292+
fn release_slot(&self);
293+
294+
/// If this implementation knows how many slots are available at any moment, it should return
295+
/// that here.
296+
fn available_slots(&self) -> Option<usize> {
297+
None
298+
}
299+
}
300+
301+
pub trait SlotReservationContext: Send + Sync {
302+
/// Returns the number of currently outstanding slot permits, whether used or un-used.
303+
fn num_issued_slots(&self) -> usize;
304+
}
305+
306+
#[derive(Default)]
307+
pub struct SlotSupplierPermit {
308+
user_data: Option<Box<dyn Any + Send + Sync>>,
309+
}
310+
impl SlotSupplierPermit {
311+
pub fn with_user_data<T: Any + Send + Sync>(user_data: T) -> Self {
312+
Self {
313+
user_data: Some(Box::new(user_data)),
314+
}
315+
}
316+
/// Attempts to downcast the inner data, if any, into the provided type and returns it.
317+
/// Returns none if there is no data or the data is not of the appropriate type.
318+
pub fn user_data<T: Any + Send + Sync>(&self) -> Option<&T> {
319+
self.user_data.as_ref().and_then(|b| b.downcast_ref())
320+
}
321+
/// Attempts to downcast the inner data, if any, into the provided type and returns it mutably.
322+
/// Returns none if there is no data or the data is not of the appropriate type.
323+
pub fn user_data_mut<T: Any + Send + Sync>(&mut self) -> Option<&mut T> {
324+
self.user_data.as_mut().and_then(|b| b.downcast_mut())
325+
}
326+
}
327+
328+
pub struct WorkflowSlotInfo<'a> {
329+
pub workflow_type: &'a str,
330+
// etc...
331+
}
332+
333+
pub struct ActivitySlotInfo<'a> {
334+
pub activity_type: &'a str,
335+
// etc...
336+
}
337+
pub struct LocalActivitySlotInfo<'a> {
338+
pub activity_type: &'a str,
339+
// etc...
340+
}
341+
342+
#[derive(Debug)]
343+
pub struct WorkflowSlotKind {}
344+
#[derive(Debug)]
345+
pub struct ActivitySlotKind {}
346+
#[derive(Debug)]
347+
pub struct LocalActivitySlotKind {}
348+
pub trait SlotKind {
349+
type Info<'a>;
350+
fn kind_name() -> &'static str;
351+
}
352+
impl SlotKind for WorkflowSlotKind {
353+
type Info<'a> = WorkflowSlotInfo<'a>;
354+
355+
fn kind_name() -> &'static str {
356+
"workflow"
357+
}
358+
}
359+
impl SlotKind for ActivitySlotKind {
360+
type Info<'a> = ActivitySlotInfo<'a>;
361+
fn kind_name() -> &'static str {
362+
"activity"
363+
}
364+
}
365+
impl SlotKind for LocalActivitySlotKind {
366+
type Info<'a> = LocalActivitySlotInfo<'a>;
367+
fn kind_name() -> &'static str {
368+
"local_activity"
369+
}
370+
}

core/Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ ephemeral-server = ["dep:flate2", "dep:nix", "dep:reqwest", "dep:tar", "dep:zip"
2222
[dependencies]
2323
anyhow = "1.0"
2424
async-trait = "0.1"
25-
base64 = "0.21"
2625
console-subscriber = { version = "0.2", optional = true }
2726
crossbeam-channel = "0.5"
2827
crossbeam-queue = "0.3"
28+
crossbeam-utils = "0.8"
2929
dashmap = "5.5"
3030
derive_builder = { workspace = true }
3131
derive_more = { workspace = true }
@@ -48,20 +48,22 @@ opentelemetry_sdk = { version = "0.22", features = ["rt-tokio", "metrics"], opti
4848
opentelemetry-otlp = { version = "0.15", features = ["tokio", "metrics"], optional = true }
4949
opentelemetry-prometheus = { version = "0.15", optional = true }
5050
parking_lot = { version = "0.12", features = ["send_guard"] }
51+
pid = "4.0"
5152
pin-project = "1.0"
5253
prometheus = "0.13"
5354
prost = { workspace = true }
5455
prost-types = { version = "0.5", package = "prost-wkt-types" }
5556
rand = "0.8.3"
56-
reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls", "tokio-rustls"], default-features = false, optional = true }
57+
reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls"], default-features = false, optional = true }
5758
ringbuf = "0.3"
5859
serde = "1.0"
5960
serde_json = "1.0"
6061
siphasher = "1.0"
6162
slotmap = "1.0"
63+
sysinfo = "0.30"
6264
tar = { version = "0.4", optional = true }
6365
thiserror = "1.0"
64-
tokio = { version = "1.26", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs", "process"] }
66+
tokio = { version = "1.37", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs", "process"] }
6567
tokio-util = { version = "0.7", features = ["io", "io-util"] }
6668
tokio-stream = "0.1"
6769
tonic = { workspace = true, features = ["tls", "tls-roots"] }
@@ -70,7 +72,6 @@ tracing-subscriber = { version = "0.3", features = ["parking_lot", "env-filter",
7072
url = "2.2"
7173
uuid = { version = "1.1", features = ["v4"] }
7274
zip = { version = "0.6.3", optional = true }
73-
log = "0.4.20"
7475

7576
# 1st party local deps
7677
[dependencies.temporal-sdk-core-api]

0 commit comments

Comments
 (0)