Skip to content

Commit 8e2c9df

Browse files
committed
add request_timeout config
On very large datasets the fixed timeouts are too low for some queries. This PR adds a setting to configure the timeout. Two settings are introduced: - `request_timeout` on the node config - `QW_REQUEST_TIMEOUT` env parameter Currently there are two timeouts when doing a distributed search request, one from chitchat when opening a channel and one from the search client. The timeout is applied to both (That means all chitchat connections have the same request_timeout applied, not only search nodes) Related: #5241
1 parent 79acfe4 commit 8e2c9df

File tree

13 files changed

+74
-12
lines changed

13 files changed

+74
-12
lines changed

quickwit/quickwit-cli/src/tool.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -945,6 +945,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
945945
config.gossip_interval,
946946
FailureDetectorConfig::default(),
947947
&ChannelTransport::default(),
948+
config.request_timeout,
948949
)
949950
.await?;
950951

quickwit/quickwit-cluster/src/change.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::collections::btree_map::Entry;
2121
use std::collections::BTreeMap;
2222
use std::pin::Pin;
2323
use std::task::{Context, Poll};
24+
use std::time::Duration;
2425

2526
use chitchat::{ChitchatId, NodeState};
2627
use futures::Stream;
@@ -79,6 +80,7 @@ pub(crate) async fn compute_cluster_change_events(
7980
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
8081
previous_node_states: &BTreeMap<ChitchatId, NodeState>,
8182
new_node_states: &BTreeMap<ChitchatId, NodeState>,
83+
request_timeout: Duration,
8284
) -> Vec<ClusterChange> {
8385
let mut cluster_events = Vec::new();
8486

@@ -95,6 +97,7 @@ pub(crate) async fn compute_cluster_change_events(
9597
chitchat_id,
9698
node_state,
9799
previous_nodes,
100+
request_timeout,
98101
)
99102
.await;
100103

@@ -139,6 +142,7 @@ async fn compute_cluster_change_events_on_added(
139142
new_chitchat_id: &ChitchatId,
140143
new_node_state: &NodeState,
141144
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
145+
request_timeout: Duration,
142146
) -> Vec<ClusterChange> {
143147
let is_self_node = self_chitchat_id == new_chitchat_id;
144148
let new_node_id: NodeId = new_chitchat_id.node_id.clone().into();
@@ -166,8 +170,14 @@ async fn compute_cluster_change_events_on_added(
166170
events.push(ClusterChange::Remove(previous_node));
167171
}
168172
}
169-
let Some(new_node) =
170-
try_new_node(cluster_id, new_chitchat_id, new_node_state, is_self_node).await
173+
let Some(new_node) = try_new_node(
174+
cluster_id,
175+
new_chitchat_id,
176+
new_node_state,
177+
is_self_node,
178+
request_timeout,
179+
)
180+
.await
171181
else {
172182
return events;
173183
};
@@ -300,10 +310,11 @@ async fn try_new_node(
300310
chitchat_id: &ChitchatId,
301311
node_state: &NodeState,
302312
is_self_node: bool,
313+
request_timeout: Duration,
303314
) -> Option<ClusterNode> {
304315
match node_state.grpc_advertise_addr() {
305316
Ok(socket_addr) => {
306-
let channel = make_channel(socket_addr).await;
317+
let channel = make_channel(socket_addr, request_timeout).await;
307318
try_new_node_with_channel(cluster_id, chitchat_id, node_state, channel, is_self_node)
308319
}
309320
Err(error) => {
@@ -443,6 +454,7 @@ pub(crate) mod tests {
443454
&new_chitchat_id,
444455
&new_node_state,
445456
&mut previous_nodes,
457+
Duration::from_secs(30),
446458
)
447459
.await;
448460
assert!(events.is_empty());
@@ -465,6 +477,7 @@ pub(crate) mod tests {
465477
&new_chitchat_id,
466478
&new_node_state,
467479
&mut previous_nodes,
480+
Duration::from_secs(30),
468481
)
469482
.await;
470483
assert!(events.is_empty());
@@ -493,6 +506,7 @@ pub(crate) mod tests {
493506
&new_chitchat_id,
494507
&new_node_state,
495508
&mut previous_nodes,
509+
Duration::from_secs(30),
496510
)
497511
.await;
498512

@@ -515,6 +529,7 @@ pub(crate) mod tests {
515529
&rejoined_chitchat_id,
516530
&new_node_state,
517531
&mut previous_nodes,
532+
Duration::from_secs(30),
518533
)
519534
.await;
520535
assert_eq!(events.len(), 2);
@@ -543,6 +558,7 @@ pub(crate) mod tests {
543558
&new_chitchat_id,
544559
&new_node_state,
545560
&mut previous_nodes,
561+
Duration::from_secs(30),
546562
)
547563
.await;
548564
assert!(events.is_empty());
@@ -567,6 +583,7 @@ pub(crate) mod tests {
567583
&new_chitchat_id,
568584
&new_node_state,
569585
&mut previous_nodes,
586+
Duration::from_secs(30),
570587
)
571588
.await;
572589
assert_eq!(events.len(), 1);
@@ -897,6 +914,7 @@ pub(crate) mod tests {
897914
&mut previous_nodes,
898915
&previous_node_states,
899916
&new_node_states,
917+
Duration::from_secs(30),
900918
)
901919
.await;
902920
assert!(events.is_empty());
@@ -926,6 +944,7 @@ pub(crate) mod tests {
926944
&mut previous_nodes,
927945
&previous_node_states,
928946
&new_node_states,
947+
Duration::from_secs(30),
929948
)
930949
.await;
931950
assert!(events.is_empty());
@@ -943,6 +962,7 @@ pub(crate) mod tests {
943962
&mut previous_nodes,
944963
&previous_node_states,
945964
&new_node_states,
965+
Duration::from_secs(30),
946966
)
947967
.await;
948968
assert_eq!(events.len(), 1);
@@ -957,6 +977,7 @@ pub(crate) mod tests {
957977
&mut previous_nodes,
958978
&new_node_states,
959979
&new_node_states,
980+
Duration::from_secs(30),
960981
)
961982
.await;
962983
assert_eq!(events.len(), 0);
@@ -989,6 +1010,7 @@ pub(crate) mod tests {
9891010
&mut previous_nodes,
9901011
&previous_node_states,
9911012
&new_node_states,
1013+
Duration::from_secs(30),
9921014
)
9931015
.await;
9941016
assert_eq!(events.len(), 1);
@@ -1008,6 +1030,7 @@ pub(crate) mod tests {
10081030
&mut previous_nodes,
10091031
&previous_node_states,
10101032
&new_node_states,
1033+
Duration::from_secs(30),
10111034
)
10121035
.await;
10131036
assert_eq!(events.len(), 1);

quickwit/quickwit-cluster/src/cluster.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ pub struct Cluster {
6767
/// Socket address (UDP) the node listens on for receiving gossip messages.
6868
pub gossip_listen_addr: SocketAddr,
6969
gossip_interval: Duration,
70+
timeout: Duration,
7071
inner: Arc<RwLock<InnerCluster>>,
7172
}
7273

@@ -90,6 +91,9 @@ impl Cluster {
9091
pub fn cluster_id(&self) -> &str {
9192
&self.cluster_id
9293
}
94+
pub fn timeout(&self) -> Duration {
95+
self.timeout
96+
}
9397

9498
pub fn self_chitchat_id(&self) -> &ChitchatId {
9599
&self.self_chitchat_id
@@ -107,6 +111,7 @@ impl Cluster {
107111
self.self_chitchat_id.gossip_advertise_addr
108112
}
109113

114+
#[allow(clippy::too_many_arguments)]
110115
pub async fn join(
111116
cluster_id: String,
112117
self_node: ClusterMember,
@@ -115,6 +120,7 @@ impl Cluster {
115120
gossip_interval: Duration,
116121
failure_detector_config: FailureDetectorConfig,
117122
transport: &dyn Transport,
123+
request_timeout: Duration,
118124
) -> anyhow::Result<Self> {
119125
info!(
120126
cluster_id=%cluster_id,
@@ -185,6 +191,7 @@ impl Cluster {
185191
weak_chitchat,
186192
live_nodes_rx,
187193
catchup_callback_rx.clone(),
194+
request_timeout,
188195
)
189196
.await;
190197

@@ -201,6 +208,7 @@ impl Cluster {
201208
self_chitchat_id: self_node.chitchat_id(),
202209
gossip_listen_addr,
203210
gossip_interval,
211+
timeout: request_timeout,
204212
inner: Arc::new(RwLock::new(inner)),
205213
};
206214
spawn_change_stream_task(cluster.clone()).await;
@@ -549,6 +557,7 @@ fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option<IndexingTask>
549557
}
550558

551559
async fn spawn_change_stream_task(cluster: Cluster) {
560+
let request_timeout = cluster.timeout();
552561
let cluster_guard = cluster.inner.read().await;
553562
let cluster_id = cluster_guard.cluster_id.clone();
554563
let self_chitchat_id = cluster_guard.self_chitchat_id.clone();
@@ -574,6 +583,7 @@ async fn spawn_change_stream_task(cluster: Cluster) {
574583
previous_live_nodes,
575584
&previous_live_node_states,
576585
&new_live_node_states,
586+
request_timeout,
577587
)
578588
.await;
579589
if !events.is_empty() {
@@ -690,6 +700,7 @@ pub async fn create_cluster_for_test_with_id(
690700
Duration::from_millis(25),
691701
failure_detector_config,
692702
transport,
703+
Duration::from_secs(30),
693704
)
694705
.await?;
695706
cluster.set_self_node_readiness(self_node_readiness).await;

quickwit/quickwit-cluster/src/grpc_gossip.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub(crate) async fn spawn_catchup_callback_task(
4646
weak_chitchat: Weak<Mutex<Chitchat>>,
4747
live_nodes_rx: watch::Receiver<BTreeMap<ChitchatId, NodeState>>,
4848
mut catchup_callback_rx: watch::Receiver<()>,
49+
request_timeout: Duration,
4950
) {
5051
let catchup_callback_future = async move {
5152
let mut interval = tokio::time::interval(Duration::from_secs(60));
@@ -61,6 +62,7 @@ pub(crate) async fn spawn_catchup_callback_task(
6162
chitchat,
6263
live_nodes_rx.clone(),
6364
cluster_grpc_client,
65+
request_timeout,
6466
)
6567
.await;
6668

@@ -80,8 +82,9 @@ async fn perform_grpc_gossip_rounds<Factory, Fut>(
8082
chitchat: Arc<Mutex<Chitchat>>,
8183
live_nodes_rx: watch::Receiver<BTreeMap<ChitchatId, NodeState>>,
8284
grpc_client_factory: Factory,
85+
request_timeout: Duration,
8386
) where
84-
Factory: Fn(SocketAddr) -> Fut,
87+
Factory: Fn(SocketAddr, Duration) -> Fut,
8588
Fut: Future<Output = ClusterServiceClient>,
8689
{
8790
wait_for_gossip_candidates(
@@ -102,7 +105,7 @@ async fn perform_grpc_gossip_rounds<Factory, Fut>(
102105
info!("pulling cluster state from node(s): {node_ids:?}");
103106

104107
for (node_id, grpc_advertise_addr) in zip(node_ids, grpc_advertise_addrs) {
105-
let cluster_client = grpc_client_factory(grpc_advertise_addr).await;
108+
let cluster_client = grpc_client_factory(grpc_advertise_addr, request_timeout).await;
106109

107110
let request = FetchClusterStateRequest {
108111
cluster_id: cluster_id.clone(),
@@ -272,7 +275,7 @@ mod tests {
272275
let self_chitchat_id = cluster.self_chitchat_id();
273276
let chitchat = cluster.chitchat().await;
274277

275-
let grpc_client_factory = |_: SocketAddr| {
278+
let grpc_client_factory = |_: SocketAddr, _: Duration| {
276279
Box::pin(async {
277280
let mut mock_cluster_service = MockClusterService::new();
278281
mock_cluster_service
@@ -336,6 +339,7 @@ mod tests {
336339
chitchat.clone(),
337340
live_nodes_rx,
338341
grpc_client_factory,
342+
Duration::from_secs(30),
339343
)
340344
.await;
341345

quickwit/quickwit-cluster/src/grpc_service.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

2020
use std::net::SocketAddr;
21+
use std::time::Duration;
2122

2223
use bytesize::ByteSize;
2324
use itertools::Itertools;
@@ -40,8 +41,11 @@ static CLUSTER_GRPC_CLIENT_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
4041
static CLUSTER_GRPC_SERVER_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
4142
Lazy::new(|| GrpcMetricsLayer::new("cluster", "server"));
4243

43-
pub(crate) async fn cluster_grpc_client(socket_addr: SocketAddr) -> ClusterServiceClient {
44-
let channel = make_channel(socket_addr).await;
44+
pub(crate) async fn cluster_grpc_client(
45+
socket_addr: SocketAddr,
46+
request_timeout: Duration,
47+
) -> ClusterServiceClient {
48+
let channel = make_channel(socket_addr, request_timeout).await;
4549

4650
ClusterServiceClient::tower()
4751
.stack_layer(CLUSTER_GRPC_CLIENT_METRICS_LAYER.clone())

quickwit/quickwit-cluster/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
159159
node_config.gossip_interval,
160160
failure_detector_config,
161161
&CountingUdpTransport,
162+
node_config.request_timeout,
162163
)
163164
.await?;
164165
if node_config

quickwit/quickwit-cluster/src/node.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ impl ClusterNode {
6868
enabled_services: &[&str],
6969
indexing_tasks: &[IndexingTask],
7070
) -> Self {
71+
use std::time::Duration;
72+
7173
use quickwit_common::tower::make_channel;
7274

7375
use crate::cluster::set_indexing_tasks_in_node_state;
@@ -76,7 +78,7 @@ impl ClusterNode {
7678
let gossip_advertise_addr = ([127, 0, 0, 1], port).into();
7779
let grpc_advertise_addr = ([127, 0, 0, 1], port + 1).into();
7880
let chitchat_id = ChitchatId::new(node_id.to_string(), 0, gossip_advertise_addr);
79-
let channel = make_channel(grpc_advertise_addr).await;
81+
let channel = make_channel(grpc_advertise_addr, Duration::from_secs(30)).await;
8082
let mut node_state = NodeState::for_test();
8183
node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(","));
8284
node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string());

quickwit/quickwit-common/src/tower/transport.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ where K: Hash + Eq + Clone + Send + Sync + 'static
200200
/// Creates a channel from a socket address.
201201
///
202202
/// The function is marked as `async` because it requires an executor (`connect_lazy`).
203-
pub async fn make_channel(socket_addr: SocketAddr) -> Channel {
203+
pub async fn make_channel(socket_addr: SocketAddr, request_timeout: Duration) -> Channel {
204204
let uri = Uri::builder()
205205
.scheme("http")
206206
.authority(socket_addr.to_string())
@@ -209,7 +209,7 @@ pub async fn make_channel(socket_addr: SocketAddr) -> Channel {
209209
.expect("provided arguments should be valid");
210210
Endpoint::from(uri)
211211
.connect_timeout(Duration::from_secs(5))
212-
.timeout(Duration::from_secs(30))
212+
.timeout(request_timeout)
213213
.connect_lazy()
214214
}
215215

quickwit/quickwit-config/src/node_config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ pub struct NodeConfig {
418418
pub gossip_advertise_addr: SocketAddr,
419419
pub grpc_advertise_addr: SocketAddr,
420420
pub gossip_interval: Duration,
421+
pub request_timeout: Duration,
421422
pub peer_seeds: Vec<String>,
422423
pub data_dir_path: PathBuf,
423424
pub metastore_uri: Uri,

0 commit comments

Comments
 (0)