Skip to content

Commit 7d6e505

Browse files
committed
Fix a handful of integ test problems or sensititivy to new server
1 parent 93dfcea commit 7d6e505

File tree

6 files changed

+25
-12
lines changed

6 files changed

+25
-12
lines changed

core-api/src/worker.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,19 @@ impl WorkerConfigBuilder {
190190
);
191191
}
192192
}
193+
194+
let max_wft_polls = self
195+
.max_concurrent_wft_polls
196+
.unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT);
197+
198+
if let Some(max_cache) = self.max_cached_workflows {
199+
if max_cache > 0 && max_wft_polls > max_cache {
200+
return Err(
201+
"`max_concurrent_wft_polls` cannot exceed `max_cached_workflows`".to_owned(),
202+
);
203+
}
204+
}
205+
193206
// if matches!(self.max_concurrent_wft_polls, Some(1))
194207
// && self.max_cached_workflows > Some(0)
195208
// && self

core/src/worker/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,9 @@ impl Worker {
267267
config.workflow_task_slot_supplier.clone(),
268268
metrics.with_new_attrs([workflow_worker_type()]),
269269
if config.max_cached_workflows > 0 {
270-
Some(config.max_cached_workflows)
270+
// Since we always need to be able to poll the normal task queue as well as the
271+
// sticky queue, we need a value of at least 2 here.
272+
Some(std::cmp::max(2, config.max_cached_workflows))
271273
} else {
272274
None
273275
},

tests/integ_tests/polling_tests.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use assert_matches::assert_matches;
2-
use std::sync::Arc;
3-
use std::time::Duration;
2+
use std::{sync::Arc, time::Duration};
43
use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions};
5-
use temporal_sdk_core::ephemeral_server::TemporalDevServerConfigBuilder;
6-
use temporal_sdk_core::{init_worker, ClientOptionsBuilder, WorkerConfigBuilder};
4+
use temporal_sdk_core::{
5+
ephemeral_server::TemporalDevServerConfigBuilder, init_worker, ClientOptionsBuilder,
6+
};
77
use temporal_sdk_core_api::Worker;
88
use temporal_sdk_core_protos::coresdk::{
99
activity_task::activity_task as act_task,
@@ -14,7 +14,7 @@ use temporal_sdk_core_protos::coresdk::{
1414
};
1515
use temporal_sdk_core_test_utils::{
1616
default_cached_download, drain_pollers_and_shutdown, init_core_and_create_wf, init_integ_telem,
17-
schedule_activity_cmd, WorkerTestHelpers,
17+
integ_worker_config, schedule_activity_cmd, WorkerTestHelpers,
1818
};
1919
use tokio::time::timeout;
2020
use tracing::info;
@@ -165,10 +165,7 @@ async fn switching_worker_client_changes_poll() {
165165
// Create a worker only on the first server
166166
let worker = init_worker(
167167
init_integ_telem(),
168-
WorkerConfigBuilder::default()
169-
.namespace("default")
170-
.task_queue("my-task-queue")
171-
.worker_build_id("test_build_id")
168+
integ_worker_config("my-task-queue")
172169
// We want a cache so we don't get extra remove-job activations
173170
.max_cached_workflows(100_usize)
174171
.build()

tests/integ_tests/update_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ async fn update_timer_sequence() {
664664
Ok("done")
665665
},
666666
);
667-
ctx.timer(Duration::from_secs(1)).await;
667+
ctx.timer(Duration::from_secs(2)).await;
668668
Ok(().into())
669669
});
670670

tests/integ_tests/visibility_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ async fn client_create_namespace() {
9797
);
9898

9999
let register_options = RegisterNamespaceOptions::builder()
100-
.namespace("test-create-namespace")
100+
.namespace(uuid::Uuid::new_v4().to_string())
101101
.description("it's alive")
102102
.build()
103103
.unwrap();

tests/integ_tests/workflow_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ pub(crate) async fn cache_evictions_wf(command_sink: WfContext) -> WorkflowResul
126126
async fn workflow_lru_cache_evictions() {
127127
let wf_type = "workflow_lru_cache_evictions";
128128
let mut starter = CoreWfStarter::new(wf_type);
129+
starter.worker_config.max_concurrent_wft_polls(1_usize);
129130
starter.no_remote_activities().max_cached_workflows(1);
130131
let mut worker = starter.worker().await;
131132
worker.register_wf(wf_type.to_string(), cache_evictions_wf);

0 commit comments

Comments
 (0)