Skip to content

Commit

Permalink
Add overall WorkerTuner trait to bring together suppliers
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed May 4, 2024
1 parent 5d567fb commit e934332
Show file tree
Hide file tree
Showing 18 changed files with 339 additions and 150 deletions.
69 changes: 53 additions & 16 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,10 @@ pub struct WorkerConfig {
/// or failures.
#[builder(default = "0")]
pub max_cached_workflows: usize,
/// Set a [SlotSupplier] for workflow tasks.
#[builder(setter(into = false))]
pub workflow_task_slot_supplier:
Arc<dyn SlotSupplier<SlotKind = WorkflowSlotKind> + Send + Sync>,
/// Set a [SlotSupplier] for activity tasks.
#[builder(setter(into = false))]
pub activity_task_slot_supplier:
Arc<dyn SlotSupplier<SlotKind = ActivitySlotKind> + Send + Sync>,
/// Set a [SlotSupplier] for local activity tasks.
#[builder(setter(into = false))]
pub local_activity_task_slot_supplier:
Arc<dyn SlotSupplier<SlotKind = LocalActivitySlotKind> + Send + Sync>,
/// Set a [WorkerTuner] for this worker. Either this or at least one of the `max_outstanding_*`
/// fields must be set.
#[builder(setter(into = false, strip_option), default)]
pub tuner: Option<Arc<dyn WorkerTuner + Send + Sync>>,
/// Maximum number of concurrent poll workflow task requests we will perform at a time on this
/// worker's task queue. See also [WorkerConfig::nonsticky_to_sticky_poll_ratio]. Must be at
/// least 1.
Expand Down Expand Up @@ -137,6 +129,25 @@ pub struct WorkerConfig {
/// map key).
#[builder(default)]
pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,

/// The maximum allowed number of workflow tasks that will ever be given to this worker at one
/// time. Note that one workflow task may require multiple activations - so the WFT counts as
/// "outstanding" until all activations it requires have been completed.
///
/// Mutually exclusive with `tuner`
#[builder(setter(into, strip_option), default)]
pub max_outstanding_workflow_tasks: Option<usize>,
/// The maximum number of activity tasks that will ever be given to this worker concurrently
///
/// Mutually exclusive with `tuner`
#[builder(setter(into, strip_option), default)]
pub max_outstanding_activities: Option<usize>,
/// The maximum number of local activity tasks that will ever be given to this worker
/// concurrently
///
/// Mutually exclusive with `tuner`
#[builder(setter(into, strip_option), default)]
pub max_outstanding_local_activities: Option<usize>,
}

impl WorkerConfig {
Expand Down Expand Up @@ -182,6 +193,13 @@ impl WorkerConfigBuilder {
}
}

if self.tuner.is_some() && self.max_outstanding_workflow_tasks.is_some()
|| self.max_outstanding_activities.is_some()
|| self.max_outstanding_local_activities.is_some()
{
return Err("max_outstanding_* fields are mutually exclusive with `tuner`".to_owned());
}

let max_wft_polls = self
.max_concurrent_wft_polls
.unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT);
Expand Down Expand Up @@ -213,6 +231,29 @@ impl WorkerConfigBuilder {
}
}

/// This trait allows users to customize the performance characteristics of workers dynamically.
/// For more, see the docstrings of the traits in the return types of its functions.
pub trait WorkerTuner {
/// Return a [SlotSupplier] for workflow tasks
fn workflow_task_slot_supplier(
&self,
) -> Arc<dyn SlotSupplier<SlotKind = WorkflowSlotKind> + Send + Sync>;

/// Return a [SlotSupplier] for activity tasks
fn activity_task_slot_supplier(
&self,
) -> Arc<dyn SlotSupplier<SlotKind = ActivitySlotKind> + Send + Sync>;

/// Return a [SlotSupplier] for local activities
fn local_activity_slot_supplier(
&self,
) -> Arc<dyn SlotSupplier<SlotKind = LocalActivitySlotKind> + Send + Sync>;

/// Core will call this at worker initialization time, allowing the implementation to hook up to
/// metrics if any are configured. If not, it will not be called.
fn attach_metrics(&self, metrics: TemporalMeter);
}

/// Implementing this trait allows users to customize how many tasks of certain kinds the worker
/// will perform concurrently.
///
Expand Down Expand Up @@ -246,10 +287,6 @@ pub trait SlotSupplier {
fn available_slots(&self) -> Option<usize> {
None
}

/// Core will call this at worker initialization time, allowing the implementation to hook up to
/// metrics if any are configured. If not, it will not be called.
fn attach_metrics(&self, metrics: TemporalMeter);
}

pub trait SlotReservationContext: Send + Sync {
Expand Down
9 changes: 3 additions & 6 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ use crate::{
single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs, MocksHolder,
QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
},
worker::{
client::mocks::{mock_manual_workflow_client, mock_workflow_client},
slot_supplier::FixedSizeSlotSupplier,
},
ActivityHeartbeat, Worker, WorkerConfigSlotSupplierExt,
worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client},
ActivityHeartbeat, Worker,
};
use futures::FutureExt;
use itertools::Itertools;
Expand Down Expand Up @@ -921,7 +918,7 @@ async fn activity_tasks_from_completion_reserve_slots() {
let mut mock = build_mock_pollers(mh);
mock.worker_cfg(|cfg| {
cfg.max_cached_workflows = 2;
cfg.activity_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(2));
cfg.max_outstanding_activities = Some(2);
});
mock.set_act_poller(mock_poller_from_resps(act_tasks));
let core = Arc::new(mock_worker(mock));
Expand Down
8 changes: 3 additions & 5 deletions core/src/core_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::{
build_mock_pollers, hist_to_poll_resp, mock_sdk, mock_sdk_cfg, mock_worker,
single_hist_mock_sg, MockPollCfg, ResponseType, WorkerExt,
},
worker::{
client::mocks::mock_workflow_client, slot_supplier::FixedSizeSlotSupplier, LEGACY_QUERY_ID,
},
worker::{client::mocks::mock_workflow_client, LEGACY_QUERY_ID},
};
use anyhow::anyhow;
use crossbeam_queue::SegQueue;
Expand Down Expand Up @@ -189,7 +187,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
mh.enforce_correct_number_of_polls = false;
let mut worker = mock_sdk_cfg(mh, |wc| {
wc.max_cached_workflows = 1;
wc.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1));
wc.max_outstanding_workflow_tasks = Some(1);
});
let core = worker.core_worker.clone();

Expand Down Expand Up @@ -1086,7 +1084,7 @@ async fn local_act_records_nonfirst_attempts_ok() {
}));
let mut worker = mock_sdk_cfg(mh, |wc| {
wc.max_cached_workflows = 1;
wc.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1));
wc.max_outstanding_workflow_tasks = Some(1);
});

worker.register_wf(
Expand Down
20 changes: 11 additions & 9 deletions core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use crate::{
},
worker::{
client::mocks::{mock_manual_workflow_client, mock_workflow_client},
slot_supplier::FixedSizeSlotSupplier,
TunerBuilder,
},
Worker, WorkerConfigSlotSupplierExt,
Worker,
};
use futures::{stream, FutureExt};
use rstest::{fixture, rstest};
Expand All @@ -31,7 +31,6 @@ use temporal_client::WorkflowOptions;
use temporal_sdk::{ActivityOptions, CancellableFuture, WfContext};
use temporal_sdk_core_api::{
errors::PollWfError,
telemetry::metrics::TemporalMeter,
worker::{
SlotKind, SlotReservationContext, SlotSupplier, SlotSupplierPermit, WorkflowSlotKind,
},
Expand Down Expand Up @@ -921,7 +920,7 @@ async fn max_wft_respected() {
let mh = MockPollCfg::new(hists.into_iter().collect(), true, 0);
let mut worker = mock_sdk_cfg(mh, |cfg| {
cfg.max_cached_workflows = total_wfs as usize;
cfg.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1));
cfg.max_outstanding_workflow_tasks = Some(1);
});
let active_count: &'static _ = Box::leak(Box::new(Semaphore::new(1)));
worker.register_wf(DEFAULT_WORKFLOW_TYPE, move |ctx: WfContext| async move {
Expand Down Expand Up @@ -1516,7 +1515,7 @@ async fn failing_wft_doesnt_eat_permit_forever() {
let mut mock = build_mock_pollers(mock);
mock.worker_cfg(|cfg| {
cfg.max_cached_workflows = 2;
cfg.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(2));
cfg.max_outstanding_workflow_tasks = Some(2);
});
let outstanding_mock_tasks = mock.outstanding_task_map.clone();
let worker = mock_worker(mock);
Expand Down Expand Up @@ -1592,7 +1591,7 @@ async fn cache_miss_will_fetch_history() {
mock.worker_cfg(|cfg| {
cfg.max_cached_workflows = 1;
// Also verifies tying the WFT permit to the fetch request doesn't get us stuck
cfg.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(1));
cfg.max_outstanding_workflow_tasks = Some(1);
});
let worker = mock_worker(mock);

Expand Down Expand Up @@ -1818,7 +1817,7 @@ async fn poll_faster_than_complete_wont_overflow_cache() {
let mut mock = build_mock_pollers(mock_cfg);
mock.worker_cfg(|wc| {
wc.max_cached_workflows = 3;
wc.workflow_task_slot_supplier = Arc::new(FixedSizeSlotSupplier::new(3));
wc.max_outstanding_workflow_tasks = Some(3);
});
let core = mock_worker(mock);
// Poll 4 times, completing once, such that max tasks are never exceeded
Expand Down Expand Up @@ -2902,13 +2901,16 @@ async fn slot_provider_cant_hand_out_more_permits_than_cache_size() {
fn available_slots(&self) -> Option<usize> {
None
}
fn attach_metrics(&self, _: TemporalMeter) {}
}

let worker = Worker::new_test(
test_worker_cfg()
.max_cached_workflows(10_usize)
.workflow_task_slot_supplier(Arc::new(EndlessSupplier {}))
.tuner(
TunerBuilder::default()
.workflow_slot_supplier(Arc::new(EndlessSupplier {}))
.build(),
)
.max_concurrent_wft_polls(10_usize)
.no_remote_activities(true)
.build()
Expand Down
4 changes: 2 additions & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ pub use temporal_sdk_core_protos as protos;
pub use temporal_sdk_core_protos::TaskToken;
pub use url::Url;
pub use worker::{
RealSysInfo, ResourceBasedSlots, Worker, WorkerConfig, WorkerConfigBuilder,
WorkerConfigSlotSupplierExt,
RealSysInfo, ResourceBasedSlots, ResourceBasedTuner, ResourceSlotOptions, TunerBuilder,
TunerHolder, Worker, WorkerConfig, WorkerConfigBuilder,
};

use crate::{
Expand Down
4 changes: 0 additions & 4 deletions core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
client::{
mocks::mock_workflow_client, MockWorkerClient, WorkerClient, WorkflowTaskCompletion,
},
slot_supplier::FixedSizeSlotSupplier,
TaskPollers,
},
TaskToken, Worker, WorkerConfig, WorkerConfigBuilder,
Expand Down Expand Up @@ -62,9 +61,6 @@ pub(crate) fn test_worker_cfg() -> WorkerConfigBuilder {
let mut wcb = WorkerConfigBuilder::default();
wcb.namespace(NAMESPACE)
.task_queue(TEST_Q)
.workflow_task_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(100)))
.activity_task_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(100)))
.local_activity_task_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(100)))
.worker_build_id("test_bin_id")
.ignore_evicts_on_shutdown(true)
// Serial polling since it makes mocking much easier.
Expand Down
26 changes: 13 additions & 13 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ mod activities;
pub(crate) mod client;
mod slot_provider;
pub(crate) mod slot_supplier;
mod tuner;
mod workflow;

pub use slot_supplier::{RealSysInfo, ResourceBasedSlots, WorkerConfigSlotSupplierExt};
pub use slot_supplier::{RealSysInfo, ResourceBasedSlots, ResourceBasedTuner, ResourceSlotOptions};
pub use temporal_sdk_core_api::worker::{WorkerConfig, WorkerConfigBuilder};
pub use tuner::{TunerBuilder, TunerHolder};

pub(crate) use activities::{
ExecutingLAId, LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution,
Expand Down Expand Up @@ -250,21 +252,19 @@ impl Worker {
} else {
(MetricsContext::no_op(), None)
};
let tuner = config
.tuner
.as_ref()
.cloned()
.unwrap_or_else(|| TunerBuilder::from_config(&config).build());

metrics.worker_registered();
if let Some(meter) = meter {
config
.workflow_task_slot_supplier
.attach_metrics(meter.clone());
config
.activity_task_slot_supplier
.attach_metrics(meter.clone());
config
.local_activity_task_slot_supplier
.attach_metrics(meter);
tuner.attach_metrics(meter.clone());
}
let shutdown_token = CancellationToken::new();
let wft_slots = Arc::new(MeteredPermitDealer::new(
config.workflow_task_slot_supplier.clone(),
tuner.workflow_task_slot_supplier(),
metrics.with_new_attrs([workflow_worker_type()]),
if config.max_cached_workflows > 0 {
// Since we always need to be able to poll the normal task queue as well as the
Expand All @@ -275,7 +275,7 @@ impl Worker {
},
));
let act_slots = Arc::new(MeteredPermitDealer::new(
config.activity_task_slot_supplier.clone(),
tuner.activity_task_slot_supplier(),
metrics.with_new_attrs([activity_worker_type()]),
None,
));
Expand Down Expand Up @@ -378,7 +378,7 @@ impl Worker {

let (hb_tx, hb_rx) = unbounded_channel();
let local_act_mgr = Arc::new(LocalActivityManager::new(
config.local_activity_task_slot_supplier.clone(),
tuner.local_activity_slot_supplier(),
config.namespace.clone(),
hb_tx,
metrics.with_new_attrs([local_activity_worker_type()]),
Expand Down
44 changes: 4 additions & 40 deletions core/src/worker/slot_supplier.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{marker::PhantomData, sync::Arc};
use temporal_sdk_core_api::worker::{
SlotKind, SlotReservationContext, SlotSupplier, SlotSupplierPermit, WorkerConfigBuilder,
SlotKind, SlotReservationContext, SlotSupplier, SlotSupplierPermit,
};
use tokio::sync::Semaphore;

mod resource_based;

pub use resource_based::{RealSysInfo, ResourceBasedSlots};
use temporal_sdk_core_api::telemetry::metrics::TemporalMeter;
pub use resource_based::{
RealSysInfo, ResourceBasedSlots, ResourceBasedTuner, ResourceSlotOptions,
};

pub(crate) struct FixedSizeSlotSupplier<SK> {
sem: Arc<Semaphore>,
Expand Down Expand Up @@ -52,41 +53,4 @@ where
fn available_slots(&self) -> Option<usize> {
Some(self.sem.available_permits())
}

fn attach_metrics(&self, _: TemporalMeter) {
// Doesn't need to do anything. Metrics tracking is handled by `MeteredPermitDealer`.
}
}

/// Extension trait providing backwards compatibility with old fixed-size slot options
pub trait WorkerConfigSlotSupplierExt {
/// Creates a [FixedSizeSlotSupplier] using the provided max and assigns it as the workflow
/// task slot supplier
fn max_outstanding_workflow_tasks(&mut self, max: usize) -> &mut Self;
/// Creates a [FixedSizeSlotSupplier] using the provided max and assigns it as the activity task
/// slot supplier
fn max_outstanding_activities(&mut self, max: usize) -> &mut Self;
/// Creates a [FixedSizeSlotSupplier] using the provided max and assigns it as the local
/// activity task slot supplier
fn max_outstanding_local_activities(&mut self, max: usize) -> &mut Self;
}

impl WorkerConfigSlotSupplierExt for WorkerConfigBuilder {
fn max_outstanding_workflow_tasks(&mut self, max: usize) -> &mut Self {
let fsss = FixedSizeSlotSupplier::new(max);
self.workflow_task_slot_supplier(Arc::new(fsss));
self
}

fn max_outstanding_activities(&mut self, max: usize) -> &mut Self {
let fsss = FixedSizeSlotSupplier::new(max);
self.activity_task_slot_supplier(Arc::new(fsss));
self
}

fn max_outstanding_local_activities(&mut self, max: usize) -> &mut Self {
let fsss = FixedSizeSlotSupplier::new(max);
self.local_activity_task_slot_supplier(Arc::new(fsss));
self
}
}
Loading

0 comments on commit e934332

Please sign in to comment.