Skip to content

Commit 341d3c3

Browse files
committed
Address todos
1 parent 7d6e505 commit 341d3c3

24 files changed

+113
-237
lines changed

core-api/src/worker.rs

Lines changed: 30 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -173,16 +173,7 @@ impl WorkerConfigBuilder {
173173
if self.max_concurrent_at_polls == Some(0) {
174174
return Err("`max_concurrent_at_polls` must be at least 1".to_owned());
175175
}
176-
// TODO: Move these checks into config-for-default implementation
177-
// if self.max_cached_workflows > Some(0)
178-
// && self.max_outstanding_workflow_tasks > self.max_cached_workflows
179-
// {
180-
// return Err(
181-
// "Maximum concurrent workflow tasks cannot exceed the maximum number of cached \
182-
// workflows"
183-
// .to_owned(),
184-
// );
185-
// }
176+
186177
if let Some(Some(ref x)) = self.max_worker_activities_per_second {
187178
if !x.is_normal() || x.is_sign_negative() {
188179
return Err(
@@ -195,6 +186,9 @@ impl WorkerConfigBuilder {
195186
.max_concurrent_wft_polls
196187
.unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT);
197188

189+
// It wouldn't make any sense to have more outstanding polls than workflows we can possibly
190+
// cache. If we allow this at low values it's possible for sticky pollers to reserve all
191+
// available slots, crowding out the normal queue and gumming things up.
198192
if let Some(max_cache) = self.max_cached_workflows {
199193
if max_cache > 0 && max_wft_polls > max_cache {
200194
return Err(
@@ -203,32 +197,6 @@ impl WorkerConfigBuilder {
203197
}
204198
}
205199

206-
// if matches!(self.max_concurrent_wft_polls, Some(1))
207-
// && self.max_cached_workflows > Some(0)
208-
// && self
209-
// .max_outstanding_workflow_tasks
210-
// .unwrap_or(MAX_OUTSTANDING_WFT_DEFAULT)
211-
// <= 1
212-
// {
213-
// return Err(
214-
// "`max_outstanding_workflow_tasks` must be at at least 2 when \
215-
// `max_cached_workflows` is nonzero"
216-
// .to_owned(),
217-
// );
218-
// }
219-
// if self
220-
// .max_concurrent_wft_polls
221-
// .unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT)
222-
// > self
223-
// .max_outstanding_workflow_tasks
224-
// .unwrap_or(MAX_OUTSTANDING_WFT_DEFAULT)
225-
// {
226-
// return Err(
227-
// "`max_concurrent_wft_polls` cannot exceed `max_outstanding_workflow_tasks`"
228-
// .to_owned(),
229-
// );
230-
// }
231-
232200
if self.use_worker_versioning.unwrap_or_default()
233201
&& self
234202
.worker_build_id
@@ -245,6 +213,12 @@ impl WorkerConfigBuilder {
245213
}
246214
}
247215

216+
/// Implementing this trait allows users to customize how many tasks of certain kinds the worker
217+
/// will perform concurrently.
218+
///
219+
/// Note that, for implementations on workflow tasks ([WorkflowSlotKind]), workers that have the
220+
/// workflow cache enabled should be willing to hand out _at least_ two slots, to avoid the worker
221+
/// becoming stuck only polling on the worker's sticky queue.
248222
#[async_trait::async_trait]
249223
pub trait SlotSupplier {
250224
type SlotKind: SlotKind;
@@ -281,10 +255,26 @@ pub trait SlotReservationContext: Send + Sync {
281255
fn num_issued_slots(&self) -> usize;
282256
}
283257

284-
// TODO: Make this a struct with an optional field
285-
pub enum SlotSupplierPermit {
286-
Data(Box<dyn Any + Send + Sync>),
287-
NoData,
258+
#[derive(Default)]
259+
pub struct SlotSupplierPermit {
260+
user_data: Option<Box<dyn Any + Send + Sync>>,
261+
}
262+
impl SlotSupplierPermit {
263+
pub fn with_user_data<T: Any + Send + Sync>(user_data: T) -> Self {
264+
Self {
265+
user_data: Some(Box::new(user_data)),
266+
}
267+
}
268+
/// Attempts to downcast the inner data, if any, into the provided type and returns it.
269+
/// Returns none if there is no data or the data is not of the appropriate type.
270+
pub fn user_data<T: Any + Send + Sync>(&self) -> Option<&T> {
271+
self.user_data.as_ref().and_then(|b| b.downcast_ref())
272+
}
273+
/// Attempts to downcast the inner data, if any, into the provided type and returns it mutably.
274+
/// Returns none if there is no data or the data is not of the appropriate type.
275+
pub fn user_data_mut<T: Any + Send + Sync>(&mut self) -> Option<&mut T> {
276+
self.user_data.as_mut().and_then(|b| b.downcast_mut())
277+
}
288278
}
289279

290280
pub enum SlotReleaseReason {
@@ -336,30 +326,3 @@ impl SlotKind for LocalActivitySlotKind {
336326
"local_activity"
337327
}
338328
}
339-
340-
pub struct WorkflowSlotsInfo {
341-
// TODO: Use wf slot info
342-
pub used_slots: Vec<()>,
343-
/// Current size of the workflow cache.
344-
pub num_cached_workflows: usize,
345-
/// The limit on the size of the cache, if any. This is important for users to know as discussed below in the section
346-
/// on workflow cache management.
347-
pub max_cache_size: Option<usize>,
348-
// ... Possibly also metric information
349-
}
350-
351-
pub trait WorkflowCacheSizer {
352-
/// Return true if it is acceptable to cache a new workflow. Information about already-in-use
353-
/// slots, and just-received task is provided. Will not be called for an already-cached workflow
354-
/// who is receiving a new task.
355-
///
356-
/// Because the number of available slots must be <= the number of workflows cached, if this
357-
/// returns false when there are no idle workflows in the cache (IE: All other outstanding slots
358-
/// are in use), we will buffer the task and wait for another to complete so we can evict it and
359-
/// make room for the new one.
360-
fn can_allow_workflow(
361-
&self,
362-
slots_info: &WorkflowSlotsInfo,
363-
new_task: &WorkflowSlotInfo,
364-
) -> bool;
365-
}

core/src/abstractions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ where
104104
let supp_c_c = self.supplier.clone();
105105
let mets = self.metrics_ctx.clone();
106106
let metric_rec =
107-
// When being called from the drop impl, the semaphore permit isn't actually dropped yet,
108-
// so account for that. TODO: that should move somehow into fixed impl only
107+
// When being called from the drop impl, the permit isn't actually dropped yet, so
108+
// account for that with the `add_one` parameter.
109109
move |add_one: bool| {
110110
let extra = usize::from(add_one);
111111
let unused = uc_c.load(Ordering::Acquire);

core/src/core_tests/workflow_tasks.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2893,10 +2893,10 @@ async fn slot_provider_cant_hand_out_more_permits_than_cache_size() {
28932893
impl SlotSupplier for EndlessSupplier {
28942894
type SlotKind = WorkflowSlotKind;
28952895
async fn reserve_slot(&self, _: &dyn SlotReservationContext) -> SlotSupplierPermit {
2896-
SlotSupplierPermit::NoData
2896+
SlotSupplierPermit::default()
28972897
}
28982898
fn try_reserve_slot(&self, _: &dyn SlotReservationContext) -> Option<SlotSupplierPermit> {
2899-
Some(SlotSupplierPermit::NoData)
2899+
Some(SlotSupplierPermit::default())
29002900
}
29012901
fn mark_slot_used(&self, _: <Self::SlotKind as SlotKind>::Info<'_>) {}
29022902
fn release_slot(&self, _: SlotReleaseReason) {}

core/src/worker/slot_supplier.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use temporal_sdk_core_api::worker::{
66
use tokio::sync::Semaphore;
77

88
mod resource_based;
9-
mod tracking;
109

1110
pub use resource_based::{RealSysInfo, ResourceBasedSlots};
1211
use temporal_sdk_core_api::telemetry::metrics::TemporalMeter;
@@ -33,13 +32,18 @@ where
3332
type SlotKind = SK;
3433

3534
async fn reserve_slot(&self, _: &dyn SlotReservationContext) -> SlotSupplierPermit {
36-
let perm = self.sem.clone().acquire_owned().await.expect("todo");
37-
SlotSupplierPermit::Data(Box::new(perm))
35+
let perm = self
36+
.sem
37+
.clone()
38+
.acquire_owned()
39+
.await
40+
.expect("inner semaphore is never closed");
41+
SlotSupplierPermit::with_user_data(perm)
3842
}
3943

4044
fn try_reserve_slot(&self, _: &dyn SlotReservationContext) -> Option<SlotSupplierPermit> {
4145
let perm = self.sem.clone().try_acquire_owned();
42-
perm.ok().map(|p| SlotSupplierPermit::Data(Box::new(p)))
46+
perm.ok().map(SlotSupplierPermit::with_user_data)
4347
}
4448

4549
fn mark_slot_used(&self, _info: SK::Info<'_>) {}

core/src/worker/slot_supplier/resource_based.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use temporal_sdk_core_api::{
1313
telemetry::metrics::{CoreMeter, GaugeF64, MetricAttributes, TemporalMeter},
1414
worker::{
1515
SlotKind, SlotReleaseReason, SlotReservationContext, SlotSupplier, SlotSupplierPermit,
16-
WorkflowCacheSizer, WorkflowSlotInfo, WorkflowSlotsInfo,
1716
},
1817
};
1918
use tokio::sync::watch;
@@ -212,7 +211,7 @@ where
212211

213212
fn issue_slot(&self) -> SlotSupplierPermit {
214213
let _ = self.last_slot_issued_tx.send(Instant::now());
215-
SlotSupplierPermit::NoData
214+
SlotSupplierPermit::default()
216215
}
217216

218217
fn time_since_last_issued(&self) -> Duration {
@@ -241,15 +240,6 @@ where
241240
}
242241
}
243242

244-
impl<MI> WorkflowCacheSizer for ResourceBasedSlots<MI>
245-
where
246-
MI: SystemResourceInfo + Sync + Send,
247-
{
248-
fn can_allow_workflow(&self, _: &WorkflowSlotsInfo, _: &WorkflowSlotInfo) -> bool {
249-
self.can_reserve()
250-
}
251-
}
252-
253243
impl<MI: SystemResourceInfo + Sync + Send> ResourceBasedSlots<MI> {
254244
/// Create a [ResourceBasedSlotsForType] for this instance which is willing to hand out
255245
/// `minimum` slots with no checks at all and `max` slots ever. Otherwise the underlying

core/src/worker/slot_supplier/tracking.rs

Lines changed: 0 additions & 62 deletions
This file was deleted.

test-utils/src/lib.rs

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -333,38 +333,6 @@ impl CoreWfStarter {
333333
&self.task_queue_name
334334
}
335335

336-
pub fn max_cached_workflows(&mut self, num: usize) -> &mut Self {
337-
self.worker_config.max_cached_workflows(num);
338-
self
339-
}
340-
341-
// TODO: Eliminate these dumb pass-through ones
342-
pub fn max_wft(&mut self, max: usize) -> &mut Self {
343-
self.worker_config.max_outstanding_workflow_tasks(max);
344-
self
345-
}
346-
347-
pub fn max_at(&mut self, max: usize) -> &mut Self {
348-
self.worker_config.max_outstanding_activities(max);
349-
self
350-
}
351-
352-
pub fn max_local_at(&mut self, max: usize) -> &mut Self {
353-
self.worker_config.max_outstanding_local_activities(max);
354-
self
355-
}
356-
357-
pub fn max_at_polls(&mut self, max: usize) -> &mut Self {
358-
self.worker_config.max_concurrent_at_polls(max);
359-
360-
self
361-
}
362-
363-
pub fn no_remote_activities(&mut self) -> &mut Self {
364-
self.worker_config.no_remote_activities(true);
365-
self
366-
}
367-
368336
async fn get_or_init(&mut self) -> &InitializedWorker {
369337
self.initted_worker
370338
.get_or_init(|| async {

tests/heavy_tests.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,20 +84,15 @@ async fn activity_load() {
8484
}
8585

8686
#[tokio::test]
87-
async fn chunky_activities() {
87+
async fn chunky_activities_resource_based() {
8888
const WORKFLOWS: usize = 100;
8989

90-
let mut starter = CoreWfStarter::new("chunky_activities");
90+
let mut starter = CoreWfStarter::new("chunky_activities_resource_based");
9191
starter
9292
.worker_config
9393
.max_concurrent_wft_polls(10_usize)
9494
.max_concurrent_at_polls(10_usize);
95-
// starter
96-
// .worker_config
97-
// .max_outstanding_activities(25)
98-
// .max_outstanding_workflow_tasks(25);
99-
// TODO: Fix /1 or /100 thing
100-
let resource_slots = Arc::new(ResourceBasedSlots::new(0.7, 0.9));
95+
let resource_slots = Arc::new(ResourceBasedSlots::new(0.7, 0.7));
10196
starter
10297
.worker_config
10398
.workflow_task_slot_supplier(resource_slots.as_kind(
@@ -332,7 +327,7 @@ pub async fn many_parallel_timers_longhist(ctx: WfContext) -> WorkflowResult<()>
332327
async fn can_paginate_long_history() {
333328
let wf_name = "can_paginate_long_history";
334329
let mut starter = CoreWfStarter::new(wf_name);
335-
starter.no_remote_activities();
330+
starter.worker_config.no_remote_activities(true);
336331
// Do not use sticky queues so we are forced to paginate once history gets long
337332
starter.max_cached_workflows(0);
338333

tests/integ_tests/metrics_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ async fn query_of_closed_workflow_doesnt_tick_terminal_metric(
368368
let mut starter =
369369
CoreWfStarter::new_with_runtime("query_of_closed_workflow_doesnt_tick_terminal_metric", rt);
370370
// Disable cache to ensure replay happens completely
371-
starter.max_cached_workflows(0);
371+
starter.worker_config.max_cached_workflows(0_usize);
372372
let worker = starter.get_worker().await;
373373
let run_id = starter.start_wf().await;
374374
let task = worker.poll_workflow_activation().await.unwrap();

0 commit comments

Comments
 (0)