Skip to content

Commit b97f398

Browse files
authored
Adding optional load shedding and concurrency limitation to bulk ingest. (#4991)
1 parent e7091f8 commit b97f398

File tree

6 files changed

+177
-4
lines changed

6 files changed

+177
-4
lines changed

quickwit/quickwit-common/src/lib.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,20 @@ pub fn get_from_env<T: FromStr + Debug>(key: &str, default_value: T) -> T {
9393
default_value
9494
}
9595

96+
pub fn get_from_env_opt<T: FromStr + Debug>(key: &str) -> Option<T> {
97+
let Some(value_str) = std::env::var(key).ok() else {
98+
info!("{key} is not set");
99+
return None;
100+
};
101+
if let Ok(value) = T::from_str(&value_str) {
102+
info!(value=?value, "setting `{}` from environment", key);
103+
Some(value)
104+
} else {
105+
error!(value_str=%value_str, "failed to parse `{}` from environment", key);
106+
None
107+
}
108+
}
109+
96110
pub fn truncate_str(text: &str, max_len: usize) -> &str {
97111
if max_len > text.len() {
98112
return text;

quickwit/quickwit-serve/src/decompression.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

2020
use std::io::Read;
21+
use std::sync::OnceLock;
2122

2223
use bytes::Bytes;
2324
use flate2::read::GzDecoder;
@@ -27,6 +28,13 @@ use thiserror::Error;
2728
use warp::reject::Reject;
2829
use warp::Filter;
2930

31+
use crate::load_shield::{LoadShield, LoadShieldPermit};
32+
33+
fn get_ingest_load_shield() -> &'static LoadShield {
34+
static LOAD_SHIELD: OnceLock<LoadShield> = OnceLock::new();
35+
LOAD_SHIELD.get_or_init(|| LoadShield::new("ingest"))
36+
}
37+
3038
/// There are two ways to decompress the body:
3139
/// - Stream the body through an async decompressor
3240
/// - Fetch the body and then decompress the bytes
@@ -83,22 +91,27 @@ pub(crate) fn get_body_bytes() -> impl Filter<Extract = (Body,), Error = warp::R
8391
warp::header::optional("content-encoding")
8492
.and(warp::body::bytes())
8593
.and_then(|encoding: Option<String>, body: Bytes| async move {
86-
decompress_body(encoding, body).await.map(Body::from)
94+
let permit = get_ingest_load_shield().acquire_permit().await?;
95+
decompress_body(encoding, body)
96+
.await
97+
.map(|content| Body::new(content, permit))
8798
})
8899
}
89100

90101
pub(crate) struct Body {
91102
pub content: Bytes,
92103
_gauge_guard: GaugeGuard<'static>,
104+
_permit: LoadShieldPermit,
93105
}
94106

95-
impl From<Bytes> for Body {
96-
fn from(content: Bytes) -> Self {
107+
impl Body {
108+
pub fn new(content: Bytes, load_shield_permit: LoadShieldPermit) -> Body {
97109
let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.rest_server);
98110
gauge_guard.add(content.len() as i64);
99111
Body {
100112
content,
101113
_gauge_guard: gauge_guard,
114+
_permit: load_shield_permit,
102115
}
103116
}
104117
}

quickwit/quickwit-serve/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ mod index_api;
3232
mod indexing_api;
3333
mod ingest_api;
3434
mod jaeger_api;
35+
mod load_shield;
3536
mod metrics;
3637
mod metrics_api;
3738
mod node_info_handler;
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright (C) 2024 Quickwit, Inc.
2+
//
3+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
4+
// For commercial licensing, contact us at hello@quickwit.io.
5+
//
6+
// AGPL:
7+
// This program is free software: you can redistribute it and/or modify
8+
// it under the terms of the GNU Affero General Public License as
9+
// published by the Free Software Foundation, either version 3 of the
10+
// License, or (at your option) any later version.
11+
//
12+
// This program is distributed in the hope that it will be useful,
13+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
// GNU Affero General Public License for more details.
16+
//
17+
// You should have received a copy of the GNU Affero General Public License
18+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+
use std::time::Duration;
21+
22+
use quickwit_common::metrics::{GaugeGuard, IntGauge};
23+
use tokio::sync::{Semaphore, SemaphorePermit};
24+
25+
use crate::rest::TooManyRequests;
26+
27+
pub struct LoadShield {
28+
in_flight_semaphore_opt: Option<Semaphore>, // This one is doing the load shedding.
29+
concurrency_semaphore_opt: Option<Semaphore>,
30+
ongoing_gauge: IntGauge,
31+
pending_gauge: IntGauge,
32+
}
33+
34+
pub struct LoadShieldPermit {
35+
_concurrency_permit_opt: Option<SemaphorePermit<'static>>,
36+
_in_flight_permit_opt: Option<SemaphorePermit<'static>>,
37+
_ongoing_gauge_guard: GaugeGuard<'static>,
38+
}
39+
40+
impl LoadShield {
41+
pub fn new(endpoint_group: &'static str) -> LoadShield {
42+
let endpoint_group_uppercase = endpoint_group.to_ascii_uppercase();
43+
let max_in_flight_env_key = format!("QW_{endpoint_group_uppercase}_MAX_IN_FLIGHT");
44+
let max_concurrency_env_key = format!("QW_{endpoint_group_uppercase}_MAX_CONCURRENCY");
45+
let max_in_flight_opt: Option<usize> =
46+
quickwit_common::get_from_env_opt(&max_in_flight_env_key);
47+
let max_concurrency_opt: Option<usize> =
48+
quickwit_common::get_from_env_opt(&max_concurrency_env_key);
49+
let in_flight_semaphore_opt = max_in_flight_opt.map(Semaphore::new);
50+
let concurrency_semaphore_opt = max_concurrency_opt.map(Semaphore::new);
51+
let pending_gauge = crate::metrics::SERVE_METRICS
52+
.pending_requests
53+
.with_label_values([endpoint_group]);
54+
let ongoing_gauge = crate::metrics::SERVE_METRICS
55+
.ongoing_requests
56+
.with_label_values([endpoint_group]);
57+
LoadShield {
58+
in_flight_semaphore_opt,
59+
concurrency_semaphore_opt,
60+
ongoing_gauge,
61+
pending_gauge,
62+
}
63+
}
64+
65+
async fn acquire_in_flight_permit(
66+
&'static self,
67+
) -> Result<Option<SemaphorePermit<'static>>, warp::Rejection> {
68+
let Some(in_flight_semaphore) = &self.in_flight_semaphore_opt else {
69+
return Ok(None);
70+
};
71+
let Ok(in_flight_permit) = in_flight_semaphore.try_acquire() else {
72+
// Wait a little to deal before load shedding. The point is to lower the load associated
73+
// with super aggressive clients.
74+
tokio::time::sleep(Duration::from_millis(100)).await;
75+
return Err(warp::reject::custom(TooManyRequests));
76+
};
77+
Ok(Some(in_flight_permit))
78+
}
79+
80+
async fn acquire_concurrency_permit(&'static self) -> Option<SemaphorePermit<'static>> {
81+
let concurrency_semaphore = self.concurrency_semaphore_opt.as_ref()?;
82+
Some(concurrency_semaphore.acquire().await.unwrap())
83+
}
84+
85+
pub async fn acquire_permit(&'static self) -> Result<LoadShieldPermit, warp::Rejection> {
86+
let mut pending_gauge_guard = GaugeGuard::from_gauge(&self.pending_gauge);
87+
pending_gauge_guard.add(1);
88+
let in_flight_permit_opt = self.acquire_in_flight_permit().await?;
89+
let concurrency_permit_opt = self.acquire_concurrency_permit().await;
90+
drop(pending_gauge_guard);
91+
let mut ongoing_gauge_guard = GaugeGuard::from_gauge(&self.ongoing_gauge);
92+
ongoing_gauge_guard.add(1);
93+
Ok(LoadShieldPermit {
94+
_in_flight_permit_opt: in_flight_permit_opt,
95+
_concurrency_permit_opt: concurrency_permit_opt,
96+
_ongoing_gauge_guard: ongoing_gauge_guard,
97+
})
98+
}
99+
}

quickwit/quickwit-serve/src/metrics.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

2020
use once_cell::sync::Lazy;
21-
use quickwit_common::metrics::{new_counter_vec, new_histogram_vec, HistogramVec, IntCounterVec};
21+
use quickwit_common::metrics::{
22+
new_counter_vec, new_gauge_vec, new_histogram_vec, HistogramVec, IntCounterVec, IntGaugeVec,
23+
};
2224

2325
pub struct RestMetrics {
2426
pub http_requests_total: IntCounterVec<2>,
2527
pub request_duration_secs: HistogramVec<2>,
28+
pub ongoing_requests: IntGaugeVec<1>,
29+
pub pending_requests: IntGaugeVec<1>,
2630
}
2731

2832
impl Default for RestMetrics {
@@ -43,6 +47,20 @@ impl Default for RestMetrics {
4347
["method", "status_code"],
4448
quickwit_common::metrics::exponential_buckets(0.02, 2.0, 8).unwrap(),
4549
),
50+
ongoing_requests: new_gauge_vec(
51+
"ingest_ongoing_requests",
52+
"Number of ongoing ingest requests.",
53+
"",
54+
&[],
55+
["endpoint_group"],
56+
),
57+
pending_requests: new_gauge_vec(
58+
"ingest_pending_requests",
59+
"Number of pending ingest requests.",
60+
"",
61+
&[],
62+
["endpoint_group"],
63+
),
4664
}
4765
}
4866
}

quickwit/quickwit-serve/src/rest.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
// You should have received a copy of the GNU Affero General Public License
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

20+
use std::fmt::Formatter;
2021
use std::net::SocketAddr;
2122
use std::sync::Arc;
2223

@@ -65,6 +66,28 @@ pub(crate) struct InvalidArgument(pub String);
6566

6667
impl warp::reject::Reject for InvalidArgument {}
6768

69+
#[derive(Debug)]
70+
pub struct TooManyRequests;
71+
72+
impl warp::reject::Reject for TooManyRequests {}
73+
74+
impl std::fmt::Display for TooManyRequests {
75+
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
76+
write!(f, "too many requests")
77+
}
78+
}
79+
80+
#[derive(Debug)]
81+
pub struct InternalError(pub String);
82+
83+
impl warp::reject::Reject for InternalError {}
84+
85+
impl std::fmt::Display for InternalError {
86+
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
87+
write!(f, "internal error: {}", self.0)
88+
}
89+
}
90+
6891
/// Starts REST services.
6992
pub(crate) async fn start_rest_server(
7093
rest_listen_addr: SocketAddr,
@@ -325,6 +348,11 @@ fn get_status_with_error(rejection: Rejection) -> RestApiError {
325348
status_code: StatusCode::PAYLOAD_TOO_LARGE,
326349
message: error.to_string(),
327350
}
351+
} else if let Some(err) = rejection.find::<TooManyRequests>() {
352+
RestApiError {
353+
status_code: StatusCode::TOO_MANY_REQUESTS,
354+
message: err.to_string(),
355+
}
328356
} else {
329357
error!("REST server error: {:?}", rejection);
330358
RestApiError {

0 commit comments

Comments
 (0)