Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker Slots interface and Resource Based Autotuner First Cut #719

Merged
merged 30 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6175849
Super basic implementation for workflow task slots
Sushisource Mar 9, 2024
5ba52c4
Mostly there to the interface changes.
Sushisource Mar 11, 2024
dba422c
Add slot suppliers to config
Sushisource Mar 12, 2024
cf871f0
Unit tests passing
Sushisource Mar 15, 2024
b0b2fd3
Integ tests passing
Sushisource Mar 15, 2024
07ad7e6
Implement resource-based basics & preliminary testing
Sushisource Mar 18, 2024
06e1d45
Added generics to permits
Sushisource Mar 19, 2024
8055d98
Added a heavier activity test, and basic multi-slot supplier
Sushisource Mar 27, 2024
b6b91c9
Semi-reasonable but simple algorithm which performs decently
Sushisource Apr 15, 2024
5886007
PID controller w/ mem & cpu
Sushisource Apr 16, 2024
d848b99
Respect minimum / remove dbg
Sushisource Apr 22, 2024
adda8d9
Metrics emission for load testing
Sushisource Apr 25, 2024
611b1f2
Fix cpu being out of 100 instead of 1
Sushisource Apr 29, 2024
e57b354
Add missing docstrings
Sushisource Apr 29, 2024
7bd7c5a
Add tests for slots used metric
Sushisource Apr 29, 2024
935248a
Fix unit tests / lints / other cleanup
Sushisource Apr 30, 2024
b719ecc
Make sure we don't try to reserve slot if it would exceed WF cache
Sushisource Apr 30, 2024
93dfcea
Merge branch 'master' into resource-slots-poc
Sushisource Apr 30, 2024
7d6e505
Fix a handful of integ test problems or sensititivy to new server
Sushisource Apr 30, 2024
eb23321
Address todos
Sushisource May 1, 2024
c9e80c9
Docstring / naming fixes from review comments
Sushisource May 1, 2024
a459380
Fix periodic metric emission
Sushisource May 1, 2024
5d136c9
Merge branch 'master' into resource-slots-poc
Sushisource May 2, 2024
daaf52a
Default available_slots implementation to None
Sushisource May 2, 2024
5eea7df
Remove release reason for now
Sushisource May 3, 2024
aadb636
Make all the PID options fully configurable
Sushisource May 3, 2024
cdcdd33
Add docstring about algorithm
Sushisource May 3, 2024
5d567fb
Fix possible underflow when recording metrics
Sushisource May 3, 2024
42140a6
Add overall WorkerTuner trait to bring together suppliers
Sushisource May 3, 2024
97741c2
Package rename / pub fixed size
Sushisource May 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 144 additions & 49 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::errors::WorkflowErrorType;
use crate::{errors::WorkflowErrorType, telemetry::metrics::TemporalMeter};
use std::{
any::Any,
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

const MAX_OUTSTANDING_WFT_DEFAULT: usize = 100;
const MAX_CONCURRENT_WFT_POLLS_DEFAULT: usize = 5;

/// Defines per-worker configuration options
#[derive(Debug, Clone, derive_builder::Builder)]
#[derive(Clone, derive_builder::Builder)]
#[builder(setter(into), build_fn(validate = "Self::validate"))]
#[non_exhaustive]
pub struct WorkerConfig {
Expand All @@ -32,20 +33,18 @@ pub struct WorkerConfig {
/// or failures.
#[builder(default = "0")]
pub max_cached_workflows: usize,
/// The maximum allowed number of workflow tasks that will ever be given to this worker at one
/// time. Note that one workflow task may require multiple activations - so the WFT counts as
/// "outstanding" until all activations it requires have been completed.
///
/// Cannot be larger than `max_cached_workflows`.
#[builder(default = "MAX_OUTSTANDING_WFT_DEFAULT")]
pub max_outstanding_workflow_tasks: usize,
/// The maximum number of activity tasks that will ever be given to this worker concurrently
#[builder(default = "100")]
pub max_outstanding_activities: usize,
/// The maximum number of local activity tasks that will ever be given to this worker
/// concurrently
#[builder(default = "100")]
pub max_outstanding_local_activities: usize,
/// Set a [SlotSupplier] for workflow tasks.
#[builder(setter(into = false))]
pub workflow_task_slot_supplier:
Arc<dyn SlotSupplier<SlotKind = WorkflowSlotKind> + Send + Sync>,
/// Set a [SlotSupplier] for activity tasks.
#[builder(setter(into = false))]
pub activity_task_slot_supplier:
Arc<dyn SlotSupplier<SlotKind = ActivitySlotKind> + Send + Sync>,
/// Set a [SlotSupplier] for local activity tasks.
#[builder(setter(into = false))]
pub local_activity_task_slot_supplier:
Arc<dyn SlotSupplier<SlotKind = LocalActivitySlotKind> + Send + Sync>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think, as I did on the Java SDK, that these should be one worker tuner object that a user can provide (even if the thing is just these three). It is better for callers. Granted this is more of a comment about lang than core.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tradeoffs with both. I did encounter a couple patterns where having just one would have made things slightly easier - but there are just as many situations where having them separate was really useful. Ultimately I think this is just a matter of taste.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It matters from a caller POV too. I suspect many callers will configure workers with tuners created by someone else (same with interceptors). Setting a single resource-based tuner is easier than three resource-based slot suppliers (same with interceptors). This also makes instance reuse across workers easier since it's a single instance shared instead of three instances shared (same with interceptors).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna try it out before merging it just to see if it's much simpler. If I like the end result we'll go that way but if it ends up uglier I'll keep the three options.

My main concern is what happens when you want to use different kinds of suppliers for different types. EX: you want fixed size for workflow and resource based for activity. Obviously you can just stuff those inside your own implementation, but in the (I would guess fairly common) case that you want to do exactly that, you now have to make your own implementation where before you could just set ours - or we have to provide some combinators.

Copy link
Member

@cretz cretz May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 FWIW your main concern also applies to interceptors and data converters in many langs (interceptors are just combinations of client, activity, and worker interceptors, and data converters are just combinations of payload codecs, payload converters, and failure converters). So there is a precedent for this type of combining. And many languages already have these combinators assuming you provide a simple 3-item struct instead of just the interface (e.g. with keyword in C# or dataclasses.replace in Python or .. in Rust or ... in JS). But I think we should optimize for what I think will be the common use case of users wanting to just provide a "tuner".

/// Maximum number of concurrent poll workflow task requests we will perform at a time on this
/// worker's task queue. See also [WorkerConfig::nonsticky_to_sticky_poll_ratio]. Must be at
/// least 1.
Expand Down Expand Up @@ -174,46 +173,28 @@ impl WorkerConfigBuilder {
if self.max_concurrent_at_polls == Some(0) {
return Err("`max_concurrent_at_polls` must be at least 1".to_owned());
}
if self.max_cached_workflows > Some(0)
&& self.max_outstanding_workflow_tasks > self.max_cached_workflows
{
return Err(
"Maximum concurrent workflow tasks cannot exceed the maximum number of cached \
workflows"
.to_owned(),
);
}

if let Some(Some(ref x)) = self.max_worker_activities_per_second {
if !x.is_normal() || x.is_sign_negative() {
return Err(
"`max_worker_activities_per_second` must be positive and nonzero".to_owned(),
);
}
}
if matches!(self.max_concurrent_wft_polls, Some(1))
&& self.max_cached_workflows > Some(0)
&& self
.max_outstanding_workflow_tasks
.unwrap_or(MAX_OUTSTANDING_WFT_DEFAULT)
<= 1
{
return Err(
"`max_outstanding_workflow_tasks` must be at at least 2 when \
`max_cached_workflows` is nonzero"
.to_owned(),
);
}
if self

let max_wft_polls = self
.max_concurrent_wft_polls
.unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT)
> self
.max_outstanding_workflow_tasks
.unwrap_or(MAX_OUTSTANDING_WFT_DEFAULT)
{
return Err(
"`max_concurrent_wft_polls` cannot exceed `max_outstanding_workflow_tasks`"
.to_owned(),
);
.unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT);

// It wouldn't make any sense to have more outstanding polls than workflows we can possibly
// cache. If we allow this at low values it's possible for sticky pollers to reserve all
// available slots, crowding out the normal queue and gumming things up.
if let Some(max_cache) = self.max_cached_workflows {
if max_cache > 0 && max_wft_polls > max_cache {
return Err(
"`max_concurrent_wft_polls` cannot exceed `max_cached_workflows`".to_owned(),
);
}
}

if self.use_worker_versioning.unwrap_or_default()
Expand All @@ -231,3 +212,117 @@ impl WorkerConfigBuilder {
Ok(())
}
}

/// Implementing this trait allows users to customize how many tasks of certain kinds the worker
/// will perform concurrently.
///
/// Note that, for implementations on workflow tasks ([WorkflowSlotKind]), workers that have the
/// workflow cache enabled should be willing to hand out _at least_ two slots, to avoid the worker
/// becoming stuck only polling on the worker's sticky queue.
#[async_trait::async_trait]
pub trait SlotSupplier {
type SlotKind: SlotKind;
/// Block until a slot is available, then return a permit for the slot.
async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit;

/// Try to immediately reserve a slot, returning None if one is not available
fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option<SlotSupplierPermit>;

/// Marks a slot as actually now being used. This is separate from reserving one because the
/// pollers need to reserve a slot before they have actually obtained work from server. Once
/// that task is obtained (and validated) then the slot can actually be used to work on the
/// task.
///
/// Users' implementation of this can choose to emit metrics, or otherwise leverage the
/// information provided by the `info` parameter to be better able to make future decisions
/// about whether a slot should be handed out.
fn mark_slot_used(&self, info: <Self::SlotKind as SlotKind>::Info<'_>);

/// Frees a slot.
fn release_slot(&self, info: SlotReleaseReason);

/// If this implementation knows how many slots are available at any moment, it should return
/// that here.
fn available_slots(&self) -> Option<usize>;

Sushisource marked this conversation as resolved.
Show resolved Hide resolved
/// Core will call this at worker initialization time, allowing the implementation to hook up to
/// metrics if any are configured. If not, it will not be called.
fn attach_metrics(&self, metrics: TemporalMeter);
}
Copy link
Member

@cretz cretz May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm. I know everywhere else implementers want to use metrics, they access via runtime (e.g. .telemetry().get_metric_meter()). Do we need this here? Can't the user wire up metrics themselves? Is there a general purpose initialization method that may be needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least the way TS is structured it's not really possible, and is likely to be annoying in other langs. The problem is you need to have initialized the runtime before you construct worker options to be able to provide the telemetry instance when constructing your slot supplier - but since the runtime is often constructed implicitly when a user initializes a worker, the worker config is already created before the runtime.

Copy link
Member

@cretz cretz May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but since the runtime is often constructed implicitly when a user initializes a worker

Hrmm, is this true? It seems from the TS bridge code I see that the client already has the runtime before worker is created. I would expect it can provide the metric meter to the slot supplier instances it creates at worker creation time.

For TS-side implementations, when they get around to temporalio/sdk-typescript#1229, should provide an accessor to the metric meter off of the runtime object (and if that object is created implicitly, so be it).

Copy link
Member Author

@Sushisource Sushisource May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah fair point, it's not impossible, I actually had it kinda like this originally but it just made for a bit of a mess in TS's bridge because of type erasure stuff, where you only had access to the erased slot supplier, so the trait needed this way to attach the metrics without changing things such that the slot suppliers are passed separately from the worker config which would've been annoying.

The nice thing about this is that lang doesn't need to do anything, this is all handled inside Core and there's going to have to be some kind of LangWrapper implementation of SlotSupplier anyway, and lang can provide the attached meter to the user's implementation in a context object or however else it feels like

Copy link
Member

@cretz cretz May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lang can provide the attached meter to the user's implementation in a context object or however else it feels like

I think lang may want to ask users to do this themselves and not provide them anything in this abstraction specifically. At least in .NET and Python, any user that wants to emit metrics from their custom tuner implementation can of course use their own metrics impl, or use ours the same way they'd do it anywhere else outside of workflows/activities (using the runtime metric meter).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, they can still do that just fine, as well as we can provide it to them for access whenever they like outside of that. For now, within core, this makes the most sense and doesn't have any impact on lang.


pub trait SlotReservationContext: Send + Sync {
/// Returns the number of currently in-use slots
fn num_issued_slots(&self) -> usize;
}
cretz marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Default)]
pub struct SlotSupplierPermit {
user_data: Option<Box<dyn Any + Send + Sync>>,
}
impl SlotSupplierPermit {
pub fn with_user_data<T: Any + Send + Sync>(user_data: T) -> Self {
Self {
user_data: Some(Box::new(user_data)),
}
}
/// Attempts to downcast the inner data, if any, into the provided type and returns it.
/// Returns none if there is no data or the data is not of the appropriate type.
pub fn user_data<T: Any + Send + Sync>(&self) -> Option<&T> {
self.user_data.as_ref().and_then(|b| b.downcast_ref())
}
/// Attempts to downcast the inner data, if any, into the provided type and returns it mutably.
/// Returns none if there is no data or the data is not of the appropriate type.
pub fn user_data_mut<T: Any + Send + Sync>(&mut self) -> Option<&mut T> {
self.user_data.as_mut().and_then(|b| b.downcast_mut())
}
}

pub enum SlotReleaseReason {
TaskComplete,
NeverUsed,
Error, // TODO: Details
}

pub struct WorkflowSlotInfo<'a> {
pub workflow_type: &'a str,
// etc...
}

pub struct ActivitySlotInfo<'a> {
pub activity_type: &'a str,
// etc...
}
pub struct LocalActivitySlotInfo<'a> {
pub activity_type: &'a str,
// etc...
}

#[derive(Debug)]
pub struct WorkflowSlotKind {}
#[derive(Debug)]
pub struct ActivitySlotKind {}
#[derive(Debug)]
pub struct LocalActivitySlotKind {}
pub trait SlotKind {
type Info<'a>;
fn kind_name() -> &'static str;
}
impl SlotKind for WorkflowSlotKind {
type Info<'a> = WorkflowSlotInfo<'a>;

fn kind_name() -> &'static str {
"workflow"
}
}
impl SlotKind for ActivitySlotKind {
type Info<'a> = ActivitySlotInfo<'a>;
fn kind_name() -> &'static str {
"activity"
}
}
impl SlotKind for LocalActivitySlotKind {
type Info<'a> = LocalActivitySlotInfo<'a>;
fn kind_name() -> &'static str {
"local_activity"
}
}
7 changes: 4 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ ephemeral-server = ["dep:flate2", "dep:nix", "dep:reqwest", "dep:tar", "dep:zip"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
base64 = "0.21"
console-subscriber = { version = "0.2", optional = true }
crossbeam-channel = "0.5"
crossbeam-queue = "0.3"
crossbeam-utils = "0.8"
dashmap = "5.5"
derive_builder = { workspace = true }
derive_more = { workspace = true }
Expand All @@ -48,17 +48,19 @@ opentelemetry_sdk = { version = "0.22", features = ["rt-tokio", "metrics"], opti
opentelemetry-otlp = { version = "0.15", features = ["tokio", "metrics"], optional = true }
opentelemetry-prometheus = { version = "0.15", optional = true }
parking_lot = { version = "0.12", features = ["send_guard"] }
pid = "4.0"
pin-project = "1.0"
prometheus = "0.13"
prost = { workspace = true }
prost-types = { version = "0.5", package = "prost-wkt-types" }
rand = "0.8.3"
reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls", "tokio-rustls"], default-features = false, optional = true }
reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls"], default-features = false, optional = true }
ringbuf = "0.3"
serde = "1.0"
serde_json = "1.0"
siphasher = "1.0"
slotmap = "1.0"
sysinfo = "0.30"
tar = { version = "0.4", optional = true }
thiserror = "1.0"
tokio = { version = "1.26", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs", "process"] }
Expand All @@ -70,7 +72,6 @@ tracing-subscriber = { version = "0.3", features = ["parking_lot", "env-filter",
url = "2.2"
uuid = { version = "1.1", features = ["v4"] }
zip = { version = "0.6.3", optional = true }
log = "0.4.20"

# 1st party local deps
[dependencies.temporal-sdk-core-api]
Expand Down
Loading
Loading