Skip to content

Commit 770d8a7

Browse files
fulmicotonguilload
andauthored
Compression cpu thread pool (#4970)
* Added a thread pool for ingest decompression * Running small but cpu intensive tasks in a thread pool. This PR removes usage of spawn_blocking for cpu intensive tasks. The problem with spawn_blocking is that these tasks get scheduled on a ever growing thread pool. For instance, when the server is under load, the GZIP decompression of payloads could considerably increase the load factor of quickwit, possibly making it unresponsive to healthcheck. This PR isolates the thread pool used for the searcher, and instantiates a second generic thread pool dedicated to those short cpu-intensive tasks. * Apply suggestions from code review Co-authored-by: Adrien Guillo <adrien@quickwit.io> --------- Co-authored-by: Adrien Guillo <adrien@quickwit.io>
1 parent c5bfe5c commit 770d8a7

File tree

23 files changed

+343
-216
lines changed

23 files changed

+343
-216
lines changed

quickwit/Cargo.lock

Lines changed: 21 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04
187187
quote = "1.0.23"
188188
rand = "0.8"
189189
rand_distr = "0.4"
190-
rayon = "1"
190+
rayon = "1.10"
191191
rdkafka = { version = "0.33", default-features = false, features = [
192192
"cmake-build",
193193
"libz",
@@ -269,16 +269,12 @@ wiremock = "0.5"
269269
zstd = "0.13.0"
270270

271271
aws-config = "1.2"
272-
aws-credential-types = { version = "1.2", features = [
273-
"hardcoded-credentials",
274-
] }
272+
aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] }
275273
aws-sdk-kinesis = "1.21"
276274
aws-sdk-s3 = "1.24"
277275
aws-smithy-async = "1.2"
278276
aws-smithy-runtime = "1.3"
279-
aws-smithy-types = { version = "1.1", features = [
280-
"byte-stream-poll-next"
281-
] }
277+
aws-smithy-types = { version = "1.1", features = ["byte-stream-poll-next"] }
282278
aws-types = "1.2"
283279

284280
azure_core = { version = "0.13.0", features = ["enable_reqwest_rustls"] }
@@ -321,7 +317,7 @@ quickwit-serve = { path = "quickwit-serve" }
321317
quickwit-storage = { path = "quickwit-storage" }
322318
quickwit-telemetry = { path = "quickwit-telemetry" }
323319

324-
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "1ee5f907", default-features = false, features = [
320+
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "6181c1e", default-features = false, features = [
325321
"lz4-compression",
326322
"mmap",
327323
"quickwit",

quickwit/quickwit-common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pin-project = { workspace = true }
3030
pnet = { workspace = true }
3131
prometheus = { workspace = true }
3232
rand = { workspace = true }
33+
rayon = { workspace = true }
3334
regex = { workspace = true }
3435
serde = { workspace = true }
3536
siphasher = { workspace = true }

quickwit/quickwit-common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub mod stream_utils;
4343
pub mod temp_dir;
4444
#[cfg(any(test, feature = "testsuite"))]
4545
pub mod test_utils;
46+
pub mod thread_pool;
4647
pub mod tower;
4748
pub mod type_map;
4849
pub mod uri;

quickwit/quickwit-common/src/metrics.rs

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,19 +179,19 @@ pub fn new_histogram_vec<const N: usize>(
179179
HistogramVec { underlying }
180180
}
181181

182-
pub struct GaugeGuard {
183-
gauge: &'static IntGauge,
182+
pub struct GaugeGuard<'a> {
183+
gauge: &'a IntGauge,
184184
delta: i64,
185185
}
186186

187-
impl std::fmt::Debug for GaugeGuard {
187+
impl<'a> std::fmt::Debug for GaugeGuard<'a> {
188188
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
189189
self.delta.fmt(f)
190190
}
191191
}
192192

193-
impl GaugeGuard {
194-
pub fn from_gauge(gauge: &'static IntGauge) -> Self {
193+
impl<'a> GaugeGuard<'a> {
194+
pub fn from_gauge(gauge: &'a IntGauge) -> Self {
195195
Self { gauge, delta: 0i64 }
196196
}
197197

@@ -210,7 +210,44 @@ impl GaugeGuard {
210210
}
211211
}
212212

213-
impl Drop for GaugeGuard {
213+
impl<'a> Drop for GaugeGuard<'a> {
214+
fn drop(&mut self) {
215+
self.gauge.sub(self.delta)
216+
}
217+
}
218+
219+
pub struct OwnedGaugeGuard {
220+
gauge: IntGauge,
221+
delta: i64,
222+
}
223+
224+
impl std::fmt::Debug for OwnedGaugeGuard {
225+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
226+
self.delta.fmt(f)
227+
}
228+
}
229+
230+
impl OwnedGaugeGuard {
231+
pub fn from_gauge(gauge: IntGauge) -> Self {
232+
Self { gauge, delta: 0i64 }
233+
}
234+
235+
pub fn get(&self) -> i64 {
236+
self.delta
237+
}
238+
239+
pub fn add(&mut self, delta: i64) {
240+
self.gauge.add(delta);
241+
self.delta += delta;
242+
}
243+
244+
pub fn sub(&mut self, delta: i64) {
245+
self.gauge.sub(delta);
246+
self.delta -= delta;
247+
}
248+
}
249+
250+
impl Drop for OwnedGaugeGuard {
214251
fn drop(&mut self) {
215252
self.gauge.sub(self.delta)
216253
}

quickwit/quickwit-common/src/stream_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ where T: RpcName
233233
}
234234
}
235235

236-
pub struct InFlightValue<T>(T, #[allow(dead_code)] GaugeGuard);
236+
pub struct InFlightValue<T>(T, #[allow(dead_code)] GaugeGuard<'static>);
237237

238238
impl<T> fmt::Debug for InFlightValue<T>
239239
where T: fmt::Debug

0 commit comments

Comments
 (0)