From e934332945536810a07f6e36e37ee4bc79045ca9 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 3 May 2024 15:33:57 -0700 Subject: [PATCH] Add overall WorkerTuner trait to bring together suppliers --- core-api/src/worker.rs | 69 ++++++-- core/src/core_tests/activity_tasks.rs | 9 +- core/src/core_tests/local_activities.rs | 8 +- core/src/core_tests/workflow_tasks.rs | 20 ++- core/src/lib.rs | 4 +- core/src/test_help/mod.rs | 4 - core/src/worker/mod.rs | 26 +-- core/src/worker/slot_supplier.rs | 44 +---- .../worker/slot_supplier/resource_based.rs | 159 ++++++++++++++---- core/src/worker/tuner.rs | 115 +++++++++++++ test-utils/src/lib.rs | 7 +- tests/fuzzy_workflow.rs | 1 - tests/heavy_tests.rs | 12 +- tests/integ_tests/metrics_tests.rs | 5 +- tests/integ_tests/workflow_tests.rs | 2 +- .../workflow_tests/continue_as_new.rs | 1 - .../workflow_tests/local_activities.rs | 2 +- .../integ_tests/workflow_tests/stickyness.rs | 1 - 18 files changed, 339 insertions(+), 150 deletions(-) create mode 100644 core/src/worker/tuner.rs diff --git a/core-api/src/worker.rs b/core-api/src/worker.rs index 36cb83942..4c5044fad 100644 --- a/core-api/src/worker.rs +++ b/core-api/src/worker.rs @@ -33,18 +33,10 @@ pub struct WorkerConfig { /// or failures. #[builder(default = "0")] pub max_cached_workflows: usize, - /// Set a [SlotSupplier] for workflow tasks. - #[builder(setter(into = false))] - pub workflow_task_slot_supplier: - Arc + Send + Sync>, - /// Set a [SlotSupplier] for activity tasks. - #[builder(setter(into = false))] - pub activity_task_slot_supplier: - Arc + Send + Sync>, - /// Set a [SlotSupplier] for local activity tasks. - #[builder(setter(into = false))] - pub local_activity_task_slot_supplier: - Arc + Send + Sync>, + /// Set a [WorkerTuner] for this worker. Either this or at least one of the `max_outstanding_*` + /// fields must be set. + #[builder(setter(into = false, strip_option), default)] + pub tuner: Option>, /// Maximum number of concurrent poll workflow task requests we will perform at a time on this /// worker's task queue. See also [WorkerConfig::nonsticky_to_sticky_poll_ratio]. Must be at /// least 1. @@ -137,6 +129,25 @@ pub struct WorkerConfig { /// map key). #[builder(default)] pub workflow_types_to_failure_errors: HashMap>, + + /// The maximum allowed number of workflow tasks that will ever be given to this worker at one + /// time. Note that one workflow task may require multiple activations - so the WFT counts as + /// "outstanding" until all activations it requires have been completed. + /// + /// Mutually exclusive with `tuner` + #[builder(setter(into, strip_option), default)] + pub max_outstanding_workflow_tasks: Option, + /// The maximum number of activity tasks that will ever be given to this worker concurrently + /// + /// Mutually exclusive with `tuner` + #[builder(setter(into, strip_option), default)] + pub max_outstanding_activities: Option, + /// The maximum number of local activity tasks that will ever be given to this worker + /// concurrently + /// + /// Mutually exclusive with `tuner` + #[builder(setter(into, strip_option), default)] + pub max_outstanding_local_activities: Option, } impl WorkerConfig { @@ -182,6 +193,13 @@ impl WorkerConfigBuilder { } } + if self.tuner.is_some() && self.max_outstanding_workflow_tasks.is_some() + || self.max_outstanding_activities.is_some() + || self.max_outstanding_local_activities.is_some() + { + return Err("max_outstanding_* fields are mutually exclusive with `tuner`".to_owned()); + } + let max_wft_polls = self .max_concurrent_wft_polls .unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT); @@ -213,6 +231,29 @@ impl WorkerConfigBuilder { } } +/// This trait allows users to customize the performance characteristics of workers dynamically. +/// For more, see the docstrings of the traits in the return types of its functions. +pub trait WorkerTuner { + /// Return a [SlotSupplier] for workflow tasks + fn workflow_task_slot_supplier( + &self, + ) -> Arc + Send + Sync>; + + /// Return a [SlotSupplier] for activity tasks + fn activity_task_slot_supplier( + &self, + ) -> Arc + Send + Sync>; + + /// Return a [SlotSupplier] for local activities + fn local_activity_slot_supplier( + &self, + ) -> Arc + Send + Sync>; + + /// Core will call this at worker initialization time, allowing the implementation to hook up to + /// metrics if any are configured. If not, it will not be called. + fn attach_metrics(&self, metrics: TemporalMeter); +} + /// Implementing this trait allows users to customize how many tasks of certain kinds the worker /// will perform concurrently. /// @@ -246,10 +287,6 @@ pub trait SlotSupplier { fn available_slots(&self) -> Option { None } - - /// Core will call this at worker initialization time, allowing the implementation to hook up to - /// metrics if any are configured. If not, it will not be called. - fn attach_metrics(&self, metrics: TemporalMeter); } pub trait SlotReservationContext: Send + Sync { diff --git a/core/src/core_tests/activity_tasks.rs b/core/src/core_tests/activity_tasks.rs index d89ed74ff..aa9af6c7a 100644 --- a/core/src/core_tests/activity_tasks.rs +++ b/core/src/core_tests/activity_tasks.rs @@ -6,11 +6,8 @@ use crate::{ single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs, MocksHolder, QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q, }, - worker::{ - client::mocks::{mock_manual_workflow_client, mock_workflow_client}, - slot_supplier::FixedSizeSlotSupplier, - }, - ActivityHeartbeat, Worker, WorkerConfigSlotSupplierExt, + worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client}, + ActivityHeartbeat, Worker, }; use futures::FutureExt; use itertools::Itertools; @@ -921,7 +918,7 @@ async fn activity_tasks_from_completion_reserve_slots() { let mut mock = build_mock_pollers(mh); mock.worker_cfg(|cfg| { cfg.max_cached_workflows = 2; - cfg.activity_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(2)); + cfg.max_outstanding_activities = Some(2); }); mock.set_act_poller(mock_poller_from_resps(act_tasks)); let core = Arc::new(mock_worker(mock)); diff --git a/core/src/core_tests/local_activities.rs b/core/src/core_tests/local_activities.rs index 847f5843d..6eca84ea8 100644 --- a/core/src/core_tests/local_activities.rs +++ b/core/src/core_tests/local_activities.rs @@ -5,9 +5,7 @@ use crate::{ build_mock_pollers, hist_to_poll_resp, mock_sdk, mock_sdk_cfg, mock_worker, single_hist_mock_sg, MockPollCfg, ResponseType, WorkerExt, }, - worker::{ - client::mocks::mock_workflow_client, slot_supplier::FixedSizeSlotSupplier, LEGACY_QUERY_ID, - }, + worker::{client::mocks::mock_workflow_client, LEGACY_QUERY_ID}, }; use anyhow::anyhow; use crossbeam_queue::SegQueue; @@ -189,7 +187,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) { mh.enforce_correct_number_of_polls = false; let mut worker = mock_sdk_cfg(mh, |wc| { wc.max_cached_workflows = 1; - wc.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1)); + wc.max_outstanding_workflow_tasks = Some(1); }); let core = worker.core_worker.clone(); @@ -1086,7 +1084,7 @@ async fn local_act_records_nonfirst_attempts_ok() { })); let mut worker = mock_sdk_cfg(mh, |wc| { wc.max_cached_workflows = 1; - wc.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1)); + wc.max_outstanding_workflow_tasks = Some(1); }); worker.register_wf( diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index 8ea903cec..1e7679cc4 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -12,9 +12,9 @@ use crate::{ }, worker::{ client::mocks::{mock_manual_workflow_client, mock_workflow_client}, - slot_supplier::FixedSizeSlotSupplier, + TunerBuilder, }, - Worker, WorkerConfigSlotSupplierExt, + Worker, }; use futures::{stream, FutureExt}; use rstest::{fixture, rstest}; @@ -31,7 +31,6 @@ use temporal_client::WorkflowOptions; use temporal_sdk::{ActivityOptions, CancellableFuture, WfContext}; use temporal_sdk_core_api::{ errors::PollWfError, - telemetry::metrics::TemporalMeter, worker::{ SlotKind, SlotReservationContext, SlotSupplier, SlotSupplierPermit, WorkflowSlotKind, }, @@ -921,7 +920,7 @@ async fn max_wft_respected() { let mh = MockPollCfg::new(hists.into_iter().collect(), true, 0); let mut worker = mock_sdk_cfg(mh, |cfg| { cfg.max_cached_workflows = total_wfs as usize; - cfg.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1)); + cfg.max_outstanding_workflow_tasks = Some(1); }); let active_count: &'static _ = Box::leak(Box::new(Semaphore::new(1))); worker.register_wf(DEFAULT_WORKFLOW_TYPE, move |ctx: WfContext| async move { @@ -1516,7 +1515,7 @@ async fn failing_wft_doesnt_eat_permit_forever() { let mut mock = build_mock_pollers(mock); mock.worker_cfg(|cfg| { cfg.max_cached_workflows = 2; - cfg.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(2)); + cfg.max_outstanding_workflow_tasks = Some(2); }); let outstanding_mock_tasks = mock.outstanding_task_map.clone(); let worker = mock_worker(mock); @@ -1592,7 +1591,7 @@ async fn cache_miss_will_fetch_history() { mock.worker_cfg(|cfg| { cfg.max_cached_workflows = 1; // Also verifies tying the WFT permit to the fetch request doesn't get us stuck - cfg.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1)); + cfg.max_outstanding_workflow_tasks = Some(1); }); let worker = mock_worker(mock); @@ -1818,7 +1817,7 @@ async fn poll_faster_than_complete_wont_overflow_cache() { let mut mock = build_mock_pollers(mock_cfg); mock.worker_cfg(|wc| { wc.max_cached_workflows = 3; - wc.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(3)); + wc.max_outstanding_workflow_tasks = Some(3); }); let core = mock_worker(mock); // Poll 4 times, completing once, such that max tasks are never exceeded @@ -2902,13 +2901,16 @@ async fn slot_provider_cant_hand_out_more_permits_than_cache_size() { fn available_slots(&self) -> Option { None } - fn attach_metrics(&self, _: TemporalMeter) {} } let worker = Worker::new_test( test_worker_cfg() .max_cached_workflows(10_usize) - .workflow_task_slot_supplier(Arc::new(EndlessSupplier {})) + .tuner( + TunerBuilder::default() + .workflow_slot_supplier(Arc::new(EndlessSupplier {})) + .build(), + ) .max_concurrent_wft_polls(10_usize) .no_remote_activities(true) .build() diff --git a/core/src/lib.rs b/core/src/lib.rs index bce9e13a8..e1d865410 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -39,8 +39,8 @@ pub use temporal_sdk_core_protos as protos; pub use temporal_sdk_core_protos::TaskToken; pub use url::Url; pub use worker::{ - RealSysInfo, ResourceBasedSlots, Worker, WorkerConfig, WorkerConfigBuilder, - WorkerConfigSlotSupplierExt, + RealSysInfo, ResourceBasedSlots, ResourceBasedTuner, ResourceSlotOptions, TunerBuilder, + TunerHolder, Worker, WorkerConfig, WorkerConfigBuilder, }; use crate::{ diff --git a/core/src/test_help/mod.rs b/core/src/test_help/mod.rs index 29e23c83a..7cc6a2f2d 100644 --- a/core/src/test_help/mod.rs +++ b/core/src/test_help/mod.rs @@ -9,7 +9,6 @@ use crate::{ client::{ mocks::mock_workflow_client, MockWorkerClient, WorkerClient, WorkflowTaskCompletion, }, - slot_supplier::FixedSizeSlotSupplier, TaskPollers, }, TaskToken, Worker, WorkerConfig, WorkerConfigBuilder, @@ -62,9 +61,6 @@ pub(crate) fn test_worker_cfg() -> WorkerConfigBuilder { let mut wcb = WorkerConfigBuilder::default(); wcb.namespace(NAMESPACE) .task_queue(TEST_Q) - .workflow_task_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(100))) - .activity_task_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(100))) - .local_activity_task_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(100))) .worker_build_id("test_bin_id") .ignore_evicts_on_shutdown(true) // Serial polling since it makes mocking much easier. diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index fe96894c8..2cf8c5315 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -2,10 +2,12 @@ mod activities; pub(crate) mod client; mod slot_provider; pub(crate) mod slot_supplier; +mod tuner; mod workflow; -pub use slot_supplier::{RealSysInfo, ResourceBasedSlots, WorkerConfigSlotSupplierExt}; +pub use slot_supplier::{RealSysInfo, ResourceBasedSlots, ResourceBasedTuner, ResourceSlotOptions}; pub use temporal_sdk_core_api::worker::{WorkerConfig, WorkerConfigBuilder}; +pub use tuner::{TunerBuilder, TunerHolder}; pub(crate) use activities::{ ExecutingLAId, LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution, @@ -250,21 +252,19 @@ impl Worker { } else { (MetricsContext::no_op(), None) }; + let tuner = config + .tuner + .as_ref() + .cloned() + .unwrap_or_else(|| TunerBuilder::from_config(&config).build()); + metrics.worker_registered(); if let Some(meter) = meter { - config - .workflow_task_slot_supplier - .attach_metrics(meter.clone()); - config - .activity_task_slot_supplier - .attach_metrics(meter.clone()); - config - .local_activity_task_slot_supplier - .attach_metrics(meter); + tuner.attach_metrics(meter.clone()); } let shutdown_token = CancellationToken::new(); let wft_slots = Arc::new(MeteredPermitDealer::new( - config.workflow_task_slot_supplier.clone(), + tuner.workflow_task_slot_supplier(), metrics.with_new_attrs([workflow_worker_type()]), if config.max_cached_workflows > 0 { // Since we always need to be able to poll the normal task queue as well as the @@ -275,7 +275,7 @@ impl Worker { }, )); let act_slots = Arc::new(MeteredPermitDealer::new( - config.activity_task_slot_supplier.clone(), + tuner.activity_task_slot_supplier(), metrics.with_new_attrs([activity_worker_type()]), None, )); @@ -378,7 +378,7 @@ impl Worker { let (hb_tx, hb_rx) = unbounded_channel(); let local_act_mgr = Arc::new(LocalActivityManager::new( - config.local_activity_task_slot_supplier.clone(), + tuner.local_activity_slot_supplier(), config.namespace.clone(), hb_tx, metrics.with_new_attrs([local_activity_worker_type()]), diff --git a/core/src/worker/slot_supplier.rs b/core/src/worker/slot_supplier.rs index 84f98dc31..2ce0599a6 100644 --- a/core/src/worker/slot_supplier.rs +++ b/core/src/worker/slot_supplier.rs @@ -1,13 +1,14 @@ use std::{marker::PhantomData, sync::Arc}; use temporal_sdk_core_api::worker::{ - SlotKind, SlotReservationContext, SlotSupplier, SlotSupplierPermit, WorkerConfigBuilder, + SlotKind, SlotReservationContext, SlotSupplier, SlotSupplierPermit, }; use tokio::sync::Semaphore; mod resource_based; -pub use resource_based::{RealSysInfo, ResourceBasedSlots}; -use temporal_sdk_core_api::telemetry::metrics::TemporalMeter; +pub use resource_based::{ + RealSysInfo, ResourceBasedSlots, ResourceBasedTuner, ResourceSlotOptions, +}; pub(crate) struct FixedSizeSlotSupplier { sem: Arc, @@ -52,41 +53,4 @@ where fn available_slots(&self) -> Option { Some(self.sem.available_permits()) } - - fn attach_metrics(&self, _: TemporalMeter) { - // Doesn't need to do anything. Metrics tracking is handled by `MeteredPermitDealer`. - } -} - -/// Extension trait providing backwards compatibility with old fixed-size slot options -pub trait WorkerConfigSlotSupplierExt { - /// Creates a [FixedSizeSlotSupplier] using the provided max and assigns it as the workflow - /// task slot supplier - fn max_outstanding_workflow_tasks(&mut self, max: usize) -> &mut Self; - /// Creates a [FixedSizeSlotSupplier] using the provided max and assigns it as the activity task - /// slot supplier - fn max_outstanding_activities(&mut self, max: usize) -> &mut Self; - /// Creates a [FixedSizeSlotSupplier] using the provided max and assigns it as the local - /// activity task slot supplier - fn max_outstanding_local_activities(&mut self, max: usize) -> &mut Self; -} - -impl WorkerConfigSlotSupplierExt for WorkerConfigBuilder { - fn max_outstanding_workflow_tasks(&mut self, max: usize) -> &mut Self { - let fsss = FixedSizeSlotSupplier::new(max); - self.workflow_task_slot_supplier(Arc::new(fsss)); - self - } - - fn max_outstanding_activities(&mut self, max: usize) -> &mut Self { - let fsss = FixedSizeSlotSupplier::new(max); - self.activity_task_slot_supplier(Arc::new(fsss)); - self - } - - fn max_outstanding_local_activities(&mut self, max: usize) -> &mut Self { - let fsss = FixedSizeSlotSupplier::new(max); - self.local_activity_task_slot_supplier(Arc::new(fsss)); - self - } } diff --git a/core/src/worker/slot_supplier/resource_based.rs b/core/src/worker/slot_supplier/resource_based.rs index 2c794743a..629984068 100644 --- a/core/src/worker/slot_supplier/resource_based.rs +++ b/core/src/worker/slot_supplier/resource_based.rs @@ -10,10 +10,80 @@ use std::{ }; use temporal_sdk_core_api::{ telemetry::metrics::{CoreMeter, GaugeF64, MetricAttributes, TemporalMeter}, - worker::{SlotKind, SlotReservationContext, SlotSupplier, SlotSupplierPermit}, + worker::{ + ActivitySlotKind, LocalActivitySlotKind, SlotKind, SlotReservationContext, SlotSupplier, + SlotSupplierPermit, WorkerTuner, WorkflowSlotKind, + }, }; use tokio::{sync::watch, task::JoinHandle}; +/// Implements [WorkerTuner] and attempts to maintain certain levels of resource usage when +/// under load. +/// +/// It does so by using two PID controllers, one for memory and one for CPU, which are fed the +/// current usage levels of their respective resource as measurements. The user specifies a target +/// threshold for each, and slots are handed out if the output of both PID controllers is above some +/// defined threshold. See [ResourceBasedSlotsOptions] for the default PID controller settings. +pub struct ResourceBasedTuner { + slots: Arc>, + wf_opts: Option, + act_opts: Option, + la_opts: Option, +} + +impl ResourceBasedTuner { + /// Build a new tuner from a [ResourceBasedSlots] instance + pub fn new(resourcer: ResourceBasedSlots) -> Self { + Self { + slots: Arc::new(resourcer), + wf_opts: None, + act_opts: None, + la_opts: None, + } + } + + /// Set workflow slot options + pub fn with_workflow_slots_options(&mut self, opts: ResourceSlotOptions) -> &mut Self { + self.wf_opts = Some(opts); + self + } + + /// Set activity slot options + pub fn with_activity_slots_options(&mut self, opts: ResourceSlotOptions) -> &mut Self { + self.act_opts = Some(opts); + self + } + + /// Set local activity slot options + pub fn with_local_activity_slots_options(&mut self, opts: ResourceSlotOptions) -> &mut Self { + self.la_opts = Some(opts); + self + } +} + +const DEFAULT_WF_SLOT_OPTS: ResourceSlotOptions = ResourceSlotOptions { + min_slots: 2, + max_slots: 10_000, + ramp_throttle: Duration::from_millis(0), +}; +const DEFAULT_ACT_SLOT_OPTS: ResourceSlotOptions = ResourceSlotOptions { + min_slots: 1, + max_slots: 10_000, + ramp_throttle: Duration::from_millis(50), +}; + +/// Options for a specific slot type +#[derive(Debug, Clone, Copy, derive_more::Constructor)] +pub struct ResourceSlotOptions { + /// Amount of slots of this type that will be issued regardless of any other checks + min_slots: usize, + /// Maximum amount of slots of this type permitted + max_slots: usize, + /// Minimum time we will wait (after passing the minimum slots number) between handing out new + /// slots + ramp_throttle: Duration, +} + /// Implements [SlotSupplier] and attempts to maintain certain levels of resource usage when /// under load. /// @@ -32,12 +102,7 @@ pub struct ResourceBasedSlots { pub struct ResourceBasedSlotsForType { inner: Arc>, - minimum: usize, - /// Maximum amount of slots of this type permitted - max: usize, - /// Minimum time we will wait (after passing the minimum slots number) between handing out new - /// slots - ramp_throttle: Duration, + opts: ResourceSlotOptions, last_slot_issued_tx: watch::Sender, last_slot_issued_rx: watch::Receiver, @@ -169,10 +234,11 @@ where async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit { loop { - if ctx.num_issued_slots() < self.minimum { + if ctx.num_issued_slots() < self.opts.min_slots { return self.issue_slot(); } else { let must_wait_for = self + .opts .ramp_throttle .saturating_sub(self.time_since_last_issued()); if must_wait_for > Duration::from_millis(0) { @@ -189,9 +255,9 @@ where fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option { let num_issued = ctx.num_issued_slots(); - if num_issued < self.minimum - || (self.time_since_last_issued() > self.ramp_throttle - && num_issued < self.max + if num_issued < self.opts.min_slots + || (self.time_since_last_issued() > self.opts.ramp_throttle + && num_issued < self.opts.max_slots && self.inner.pid_decision() && self.inner.can_reserve()) { @@ -204,10 +270,6 @@ where fn mark_slot_used(&self, _info: SK::Info<'_>) {} fn release_slot(&self) {} - - fn attach_metrics(&self, metrics: TemporalMeter) { - self.inner.attach_metrics(metrics); - } } impl ResourceBasedSlotsForType @@ -222,17 +284,10 @@ where MI: SystemResourceInfo + Send + Sync, SK: SlotKind + Send + Sync, { - fn new( - inner: Arc>, - minimum: usize, - max: usize, - ramp_throttle: Duration, - ) -> Self { + fn new(inner: Arc>, opts: ResourceSlotOptions) -> Self { let (tx, rx) = watch::channel(Instant::now()); Self { - minimum, - max, - ramp_throttle, + opts, last_slot_issued_tx: tx, last_slot_issued_rx: rx, inner, @@ -252,6 +307,33 @@ where } } +impl WorkerTuner for ResourceBasedTuner { + fn workflow_task_slot_supplier( + &self, + ) -> Arc + Send + Sync> { + let o = self.wf_opts.unwrap_or(DEFAULT_WF_SLOT_OPTS); + self.slots.as_kind(o) + } + + fn activity_task_slot_supplier( + &self, + ) -> Arc + Send + Sync> { + let o = self.act_opts.unwrap_or(DEFAULT_ACT_SLOT_OPTS); + self.slots.as_kind(o) + } + + fn local_activity_slot_supplier( + &self, + ) -> Arc + Send + Sync> { + let o = self.la_opts.unwrap_or(DEFAULT_ACT_SLOT_OPTS); + self.slots.as_kind(o) + } + + fn attach_metrics(&self, metrics: TemporalMeter) { + self.slots.attach_metrics(metrics); + } +} + impl ResourceBasedSlots { /// Create a [ResourceBasedSlotsForType] for this instance which is willing to hand out /// `minimum` slots with no checks at all and `max` slots ever. Otherwise the underlying @@ -264,16 +346,9 @@ impl ResourceBasedSlots { /// resulting in OOM (for example). pub fn as_kind( self: &Arc, - minimum: usize, - max: usize, - ramp_throttle: Duration, + opts: ResourceSlotOptions, ) -> Arc> { - Arc::new(ResourceBasedSlotsForType::new( - self.clone(), - minimum, - max, - ramp_throttle, - )) + Arc::new(ResourceBasedSlotsForType::new(self.clone(), opts)) } fn new_with_sysinfo(options: ResourceBasedSlotsOptions, sys_info: MI) -> Self { @@ -432,7 +507,11 @@ mod tests { fn mem_workflow_sync() { let (fmis, used) = FakeMIS::new(); let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(test_options(), fmis)) - .as_kind::(0, 100, Duration::from_millis(0)); + .as_kind::(ResourceSlotOptions { + min_slots: 0, + max_slots: 100, + ramp_throttle: Duration::from_millis(0), + }); let pd = MeteredPermitDealer::new(rbs.clone(), MetricsContext::no_op(), None); assert!(rbs.try_reserve_slot(&pd).is_some()); used.store(90_000, Ordering::Release); @@ -444,7 +523,11 @@ mod tests { let (fmis, used) = FakeMIS::new(); used.store(90_000, Ordering::Release); let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(test_options(), fmis)) - .as_kind::(0, 100, Duration::from_millis(0)); + .as_kind::(ResourceSlotOptions { + min_slots: 0, + max_slots: 100, + ramp_throttle: Duration::from_millis(0), + }); let pd = MeteredPermitDealer::new(rbs.clone(), MetricsContext::no_op(), None); let order = crossbeam_queue::ArrayQueue::new(2); let waits_free = async { @@ -464,7 +547,11 @@ mod tests { fn minimum_respected() { let (fmis, used) = FakeMIS::new(); let rbs = Arc::new(ResourceBasedSlots::new_with_sysinfo(test_options(), fmis)) - .as_kind::(2, 100, Duration::from_millis(0)); + .as_kind::(ResourceSlotOptions { + min_slots: 2, + max_slots: 100, + ramp_throttle: Duration::from_millis(0), + }); let pd = MeteredPermitDealer::new(rbs.clone(), MetricsContext::no_op(), None); used.store(90_000, Ordering::Release); let _p1 = pd.try_acquire_owned().unwrap(); diff --git a/core/src/worker/tuner.rs b/core/src/worker/tuner.rs new file mode 100644 index 000000000..e0ed4dae7 --- /dev/null +++ b/core/src/worker/tuner.rs @@ -0,0 +1,115 @@ +use crate::worker::slot_supplier::FixedSizeSlotSupplier; +use std::sync::{Arc, OnceLock}; +use temporal_sdk_core_api::{ + telemetry::metrics::TemporalMeter, + worker::{ + ActivitySlotKind, LocalActivitySlotKind, SlotSupplier, WorkerConfig, WorkerTuner, + WorkflowSlotKind, + }, +}; + +/// Allows for the composition of different slot suppliers into a [WorkerTuner] +pub struct TunerHolder { + wft_supplier: Arc + Send + Sync>, + act_supplier: Arc + Send + Sync>, + la_supplier: Arc + Send + Sync>, + metrics: OnceLock, +} + +/// Can be used to construct a `TunerHolder` from individual slot suppliers. Any supplier which is +/// not provided will default to a [FixedSizeSlotSupplier] with a capacity of 100. +#[derive(Default, Clone)] +pub struct TunerBuilder { + workflow_slot_supplier: + Option + Send + Sync>>, + activity_slot_supplier: + Option + Send + Sync>>, + local_activity_slot_supplier: + Option + Send + Sync>>, +} + +impl TunerBuilder { + pub(crate) fn from_config(cfg: &WorkerConfig) -> Self { + let mut builder = Self::default(); + if let Some(m) = cfg.max_outstanding_workflow_tasks { + builder.workflow_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(m))); + } + if let Some(m) = cfg.max_outstanding_activities { + builder.activity_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(m))); + } + if let Some(m) = cfg.max_outstanding_local_activities { + builder.local_activity_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(m))); + } + builder + } + + /// Set a workflow slot supplier + pub fn workflow_slot_supplier( + &mut self, + supplier: Arc + Send + Sync>, + ) -> &mut Self { + self.workflow_slot_supplier = Some(supplier); + self + } + + /// Set an activity slot supplier + pub fn activity_slot_supplier( + &mut self, + supplier: Arc + Send + Sync>, + ) -> &mut Self { + self.activity_slot_supplier = Some(supplier); + self + } + + /// Set a local activity slot supplier + pub fn local_activity_slot_supplier( + &mut self, + supplier: Arc + Send + Sync>, + ) -> &mut Self { + self.local_activity_slot_supplier = Some(supplier); + self + } + + /// Build a [WorkerTuner] from the configured slot suppliers + pub fn build(&mut self) -> Arc { + Arc::new(TunerHolder { + wft_supplier: self + .workflow_slot_supplier + .clone() + .unwrap_or_else(|| Arc::new(FixedSizeSlotSupplier::new(100))), + act_supplier: self + .activity_slot_supplier + .clone() + .unwrap_or_else(|| Arc::new(FixedSizeSlotSupplier::new(100))), + la_supplier: self + .local_activity_slot_supplier + .clone() + .unwrap_or_else(|| Arc::new(FixedSizeSlotSupplier::new(100))), + metrics: OnceLock::new(), + }) + } +} + +impl WorkerTuner for TunerHolder { + fn workflow_task_slot_supplier( + &self, + ) -> Arc + Send + Sync> { + self.wft_supplier.clone() + } + + fn activity_task_slot_supplier( + &self, + ) -> Arc + Send + Sync> { + self.act_supplier.clone() + } + + fn local_activity_slot_supplier( + &self, + ) -> Arc + Send + Sync> { + self.la_supplier.clone() + } + + fn attach_metrics(&self, m: TemporalMeter) { + let _ = self.metrics.set(m); + } +} diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index f2a30c086..576f790b4 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -36,7 +36,6 @@ use temporal_sdk_core::{ replay::ReplayWorkerInput, telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter}, ClientOptions, ClientOptionsBuilder, CoreRuntime, WorkerConfigBuilder, - WorkerConfigSlotSupplierExt, }; use temporal_sdk_core_api::{ errors::{PollActivityError, PollWfError}, @@ -101,9 +100,9 @@ pub fn integ_worker_config(tq: &str) -> WorkerConfigBuilder { let mut b = WorkerConfigBuilder::default(); b.namespace(NAMESPACE) .task_queue(tq) - .max_outstanding_activities(100) - .max_outstanding_local_activities(100) - .max_outstanding_workflow_tasks(100) + .max_outstanding_activities(100_usize) + .max_outstanding_local_activities(100_usize) + .max_outstanding_workflow_tasks(100_usize) .worker_build_id("test_build_id"); b } diff --git a/tests/fuzzy_workflow.rs b/tests/fuzzy_workflow.rs index 03d937e01..21eaa43ac 100644 --- a/tests/fuzzy_workflow.rs +++ b/tests/fuzzy_workflow.rs @@ -3,7 +3,6 @@ use rand::{prelude::Distribution, rngs::SmallRng, Rng, SeedableRng}; use std::{future, time::Duration}; use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ActContext, ActivityOptions, LocalActivityOptions, WfContext, WorkflowResult}; -use temporal_sdk_core::WorkerConfigSlotSupplierExt; use temporal_sdk_core_protos::coresdk::{AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt}; use temporal_sdk_core_test_utils::CoreWfStarter; use tokio_util::sync::CancellationToken; diff --git a/tests/heavy_tests.rs b/tests/heavy_tests.rs index 24c1de528..70684c6fd 100644 --- a/tests/heavy_tests.rs +++ b/tests/heavy_tests.rs @@ -5,7 +5,7 @@ use std::{ }; use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ActContext, ActivityOptions, WfContext, WorkflowResult}; -use temporal_sdk_core::{ResourceBasedSlots, WorkerConfigSlotSupplierExt}; +use temporal_sdk_core::{ResourceBasedSlots, ResourceBasedTuner, ResourceSlotOptions}; use temporal_sdk_core_protos::{ coresdk::{workflow_commands::ActivityCancellationType, AsJsonPayloadExt}, temporal::api::enums::v1::WorkflowIdReusePolicy, @@ -93,15 +93,15 @@ async fn chunky_activities_resource_based() { .worker_config .max_concurrent_wft_polls(10_usize) .max_concurrent_at_polls(10_usize); - let resource_slots = Arc::new(ResourceBasedSlots::new(0.7, 0.7)); - starter - .worker_config - .workflow_task_slot_supplier(resource_slots.as_kind( + let mut tuner = ResourceBasedTuner::new(ResourceBasedSlots::new(0.7, 0.7)); + tuner + .with_workflow_slots_options(ResourceSlotOptions::new( 25, WORKFLOWS, Duration::from_millis(0), )) - .activity_task_slot_supplier(resource_slots.as_kind(5, 1000, Duration::from_millis(50))); + .with_activity_slots_options(ResourceSlotOptions::new(5, 1000, Duration::from_millis(50))); + starter.worker_config.tuner(Arc::new(tuner)); let mut worker = starter.worker().await; let activity_id = "act-1"; diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index e7b8d2ce4..a19053eb8 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -1,10 +1,7 @@ use assert_matches::assert_matches; use std::{net::SocketAddr, sync::Arc, time::Duration}; use temporal_client::{WorkflowClientTrait, WorkflowOptions, WorkflowService}; -use temporal_sdk_core::{ - init_worker, telemetry::start_prometheus_metric_exporter, CoreRuntime, - WorkerConfigSlotSupplierExt, -}; +use temporal_sdk_core::{init_worker, telemetry::start_prometheus_metric_exporter, CoreRuntime}; use temporal_sdk_core_api::{ telemetry::{ metrics::{CoreMeter, MetricAttributes, MetricParameters}, diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index 9d639dded..a965a345f 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -29,7 +29,7 @@ use std::{ }; use temporal_client::{WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{interceptors::WorkerInterceptor, ActivityOptions, WfContext, WorkflowResult}; -use temporal_sdk_core::{replay::HistoryForReplay, CoreRuntime, WorkerConfigSlotSupplierExt}; +use temporal_sdk_core::{replay::HistoryForReplay, CoreRuntime}; use temporal_sdk_core_api::{ errors::{PollWfError, WorkflowErrorType}, Worker, diff --git a/tests/integ_tests/workflow_tests/continue_as_new.rs b/tests/integ_tests/workflow_tests/continue_as_new.rs index 041b40a42..262612c50 100644 --- a/tests/integ_tests/workflow_tests/continue_as_new.rs +++ b/tests/integ_tests/workflow_tests/continue_as_new.rs @@ -1,7 +1,6 @@ use std::time::Duration; use temporal_client::WorkflowOptions; use temporal_sdk::{WfContext, WfExitValue, WorkflowResult}; -use temporal_sdk_core::WorkerConfigSlotSupplierExt; use temporal_sdk_core_protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution; use temporal_sdk_core_test_utils::CoreWfStarter; diff --git a/tests/integ_tests/workflow_tests/local_activities.rs b/tests/integ_tests/workflow_tests/local_activities.rs index a6027fe3a..762ca19da 100644 --- a/tests/integ_tests/workflow_tests/local_activities.rs +++ b/tests/integ_tests/workflow_tests/local_activities.rs @@ -10,7 +10,7 @@ use temporal_sdk::{ interceptors::WorkerInterceptor, ActContext, ActivityCancelledError, CancellableFuture, LocalActivityOptions, WfContext, WorkflowResult, }; -use temporal_sdk_core::{replay::HistoryForReplay, WorkerConfigSlotSupplierExt}; +use temporal_sdk_core::replay::HistoryForReplay; use temporal_sdk_core_protos::{ coresdk::{ workflow_commands::{workflow_command::Variant, ActivityCancellationType}, diff --git a/tests/integ_tests/workflow_tests/stickyness.rs b/tests/integ_tests/workflow_tests/stickyness.rs index 0d588bfa1..4969071ca 100644 --- a/tests/integ_tests/workflow_tests/stickyness.rs +++ b/tests/integ_tests/workflow_tests/stickyness.rs @@ -5,7 +5,6 @@ use std::{ }; use temporal_client::WorkflowOptions; use temporal_sdk::{WfContext, WorkflowResult}; -use temporal_sdk_core::WorkerConfigSlotSupplierExt; use temporal_sdk_core_test_utils::CoreWfStarter; use tokio::sync::Barrier;