From 61483b9932e90b2bf45ef2f253ca528a9a77ddb5 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 22 Oct 2024 15:59:27 +0900 Subject: [PATCH 1/2] Prevents single split searches from different leaf_search from interleaving. Before this PR, we just used a semaphore to acquire a permit and start a new tokio task to run our single split search. In pseudo code, a leaf_search would look like: ``` for split in splits { let permit = semaphore.acquire().await; tokio::spawn(async move { single_split_search(split); drop(permit) }) } ``` The problem with this is that it allows interleaving split search from one search request with another one. This interleaving strongly impacts search latency. For instance, one can imagine two queries A and B with 3 splits each. Executing as follows | A | A | A | B | B | B | offers a much short latency for A than | A | B | B | A | B | A | This PR also adds two metrics to monitor the number of queue single split search. --- quickwit/Cargo.lock | 1 + quickwit/quickwit-search/Cargo.toml | 1 + quickwit/quickwit-search/src/leaf.rs | 18 +- quickwit/quickwit-search/src/lib.rs | 1 + quickwit/quickwit-search/src/list_terms.rs | 6 +- quickwit/quickwit-search/src/metrics.rs | 18 +- .../quickwit-search/src/search_permits.rs | 213 ++++++++++++++++++ quickwit/quickwit-search/src/service.rs | 12 +- 8 files changed, 252 insertions(+), 18 deletions(-) create mode 100644 quickwit/quickwit-search/src/search_permits.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index b26b4272d0a..e18bf82144f 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6473,6 +6473,7 @@ dependencies = [ "quickwit-proto", "quickwit-query", "quickwit-storage", + "rand 0.8.5", "rayon", "serde", "serde_json", diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index fa815f47b56..f1c1f8dbdad 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -50,6 +50,7 @@ quickwit-storage = { workspace = true } [dev-dependencies] assert-json-diff = { workspace = true } proptest = { workspace = true } +rand = { workspace = true } serde_json = { workspace = true } typetag = { workspace = true } diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 819015b6d26..4104d80376e 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -50,6 +50,7 @@ use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; +use crate::search_permits::SearchPermit; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; @@ -1183,7 +1184,6 @@ async fn resolve_storage_and_leaf_search( aggregations_limits: AggregationLimitsGuard, ) -> crate::Result { let storage = storage_resolver.resolve(&index_uri).await?; - leaf_search( searcher_context.clone(), search_request.clone(), @@ -1259,10 +1259,16 @@ pub async fn leaf_search( let incremental_merge_collector = IncrementalCollector::new(merge_collector); let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector)); - for (split, mut request) in split_with_req { - let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore - .clone() - .acquire_owned() + // We acquire all of the leaf search permits to make sure our single split search tasks + // do no interleave with other leaf search requests. + let permit_futures = searcher_context + .leaf_search_split_semaphore + .get_permits_futures(split_with_req.len()); + + for ((split, mut request), permit_fut) in + split_with_req.into_iter().zip(permit_futures.into_iter()) + { + let leaf_split_search_permit = permit_fut .instrument(info_span!("waiting_for_leaf_search_split_semaphore")) .await .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); @@ -1355,7 +1361,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit, + leaf_split_search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index a09545d92b8..dec493fdb7b 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -44,6 +44,7 @@ mod service; pub(crate) mod top_k_collector; mod metrics; +mod search_permits; #[cfg(test)] mod tests; diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 0a781355162..a9d0ddf204e 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -331,10 +331,12 @@ pub async fn leaf_list_terms( let index_storage_clone = index_storage.clone(); let searcher_context_clone = searcher_context.clone(); async move { - let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone() - .acquire_owned() + let _leaf_split_search_permit = searcher_context_clone + .leaf_search_split_semaphore + .get_one_permit() .await .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + // TODO dedicated counter and timer? crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index ccdcc6a519c..287c8711377 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -21,8 +21,8 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_histogram, - new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, + exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge_vec, + new_histogram, new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, }; pub struct SearchMetrics { @@ -35,6 +35,8 @@ pub struct SearchMetrics { pub leaf_searches_splits_total: IntCounter, pub leaf_search_split_duration_secs: Histogram, pub job_assigned_total: IntCounterVec<1>, + pub leaf_search_single_split_tasks_pending: IntGauge, + pub leaf_search_single_split_tasks_ongoing: IntGauge, } impl Default for SearchMetrics { @@ -50,6 +52,14 @@ impl Default for SearchMetrics { .copied() .collect(); + let leaf_search_single_split_tasks = new_gauge_vec::<1>( + "leaf_search_single_split_tasks", + "Number of single split search tasks pending or ongoing", + "search", + &[], + ["status"], // takes values "ongoing" or "pending" + ); + SearchMetrics { root_search_requests_total: new_counter_vec( "root_search_requests_total", @@ -110,6 +120,10 @@ impl Default for SearchMetrics { "search", exponential_buckets(0.001, 2.0, 15).unwrap(), ), + leaf_search_single_split_tasks_ongoing: leaf_search_single_split_tasks + .with_label_values(["ongoing"]), + leaf_search_single_split_tasks_pending: leaf_search_single_split_tasks + .with_label_values(["pending"]), job_assigned_total: new_counter_vec( "job_assigned_total", "Number of job assigned to searchers, per affinity rank.", diff --git a/quickwit/quickwit-search/src/search_permits.rs b/quickwit/quickwit-search/src/search_permits.rs new file mode 100644 index 00000000000..3405e2f0df2 --- /dev/null +++ b/quickwit/quickwit-search/src/search_permits.rs @@ -0,0 +1,213 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::VecDeque; +use std::sync::{Arc, Mutex, Weak}; + +use quickwit_common::metrics::GaugeGuard; +use tokio::sync::oneshot; + +/// `SearchPermits` is a distributor of permits to perform single split +/// search operation. +/// +/// Requests are served in order. +#[derive(Clone)] +pub struct SearchPermits { + inner: Arc>, +} + +impl SearchPermits { + pub fn new(num_permits: usize) -> SearchPermits { + SearchPermits { + inner: Arc::new(Mutex::new(InnerSearchPermits { + num_permits_available: num_permits, + permits_requests: VecDeque::new(), + })), + } + } + + /// Returns a list of future permits in the form of a Receiver channel. + pub fn get_one_permit(&self) -> oneshot::Receiver { + let mut permits_lock = self.inner.lock().unwrap(); + permits_lock.get_permit(&self.inner) + } + + /// Returns a list of future permits in the form of a Receiver channel. + /// + /// The permits returned are guaranteed to be resolved in order. + /// In addition, the permits are guaranteed to be resolved before permits returned by + /// subsequent calls to this function (or get_permit). + pub fn get_permits_futures(&self, num_permits: usize) -> Vec> { + let mut permits_lock = self.inner.lock().unwrap(); + permits_lock.get_permits(num_permits, &self.inner) + } +} + +struct InnerSearchPermits { + num_permits_available: usize, + permits_requests: VecDeque>, +} + +impl InnerSearchPermits { + fn get_permits( + &mut self, + num_permits: usize, + inner: &Arc>, + ) -> Vec> { + let mut permits = Vec::with_capacity(num_permits); + for _ in 0..num_permits { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back(tx); + permits.push(rx); + } + self.assign_available_permits(inner); + permits + } + + fn get_permit( + &mut self, + inner: &Arc>, + ) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back(tx); + self.assign_available_permits(inner); + rx + } + + fn recycle_permit(&mut self, inner: &Arc>) { + self.num_permits_available += 1; + self.assign_available_permits(inner); + } + + fn assign_available_permits(&mut self, inner: &Arc>) { + while self.num_permits_available > 0 { + let Some(sender) = self.permits_requests.pop_front() else { + break; + }; + let send_res = sender.send(SearchPermit { + _ongoing_gauge_guard: GaugeGuard::from_gauge( + &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, + ), + inner_opt: Some(Arc::downgrade(inner)), + }); + match send_res { + Ok(()) => { + self.num_permits_available -= 1; + } + Err(mut search_permit) => { + search_permit.disable_drop(); + drop(search_permit); + } + } + } + crate::SEARCH_METRICS + .leaf_search_single_split_tasks_pending + .set(self.permits_requests.len() as i64); + } +} + +pub struct SearchPermit { + _ongoing_gauge_guard: GaugeGuard<'static>, + inner_opt: Option>>, +} + +impl SearchPermit { + fn disable_drop(&mut self) { + self.inner_opt = None; + } +} + +impl Drop for SearchPermit { + fn drop(&mut self) { + let Some(inner) = self.inner_opt.take() else { + return; + }; + let Some(inner) = inner.upgrade() else { + return; + }; + let mut inner_guard = inner.lock().unwrap(); + inner_guard.recycle_permit(&inner); + } +} + +#[cfg(test)] +mod tests { + use tokio::task::JoinSet; + + #[tokio::test] + async fn test_search_permits_get_permits_future() { + // We test here that `get_permits_futures` does not interleave + let search_permits = super::SearchPermits::new(1); + let mut all_futures = Vec::new(); + let first_batch_of_permits = search_permits.get_permits_futures(10); + assert_eq!(first_batch_of_permits.len(), 10); + all_futures.extend( + first_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((1, i), fut)), + ); + + let second_batch_of_permits = search_permits.get_permits_futures(10); + assert_eq!(second_batch_of_permits.len(), 10); + all_futures.extend( + second_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((2, i), fut)), + ); + + use rand::seq::SliceRandom; + // not super useful, considering what join set does, but still a tiny bit more sound. + all_futures.shuffle(&mut rand::thread_rng()); + + let mut join_set = JoinSet::new(); + for (res, fut) in all_futures { + join_set.spawn(async move { + let permit = fut.await; + (res, permit) + }); + } + let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20); + while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await { + ordered_result.push((batch_id, order)); + } + + assert_eq!(ordered_result.len(), 20); + for (i, res) in ordered_result[0..10].iter().enumerate() { + assert_eq!(res, &(1, i)); + } + for (i, res) in ordered_result[10..20].iter().enumerate() { + assert_eq!(res, &(2, i)); + } + } + + #[tokio::test] + async fn test_search_permits_receiver_race_condition() { + // Here we test that we don't have a problem if the Receiver is dropped. + // In particular, we want to check that there is not a race condition where drop attempts to + // lock the mutex. + let search_permits = super::SearchPermits::new(1); + let permit_rx = search_permits.get_one_permit(); + let permit_rx2 = search_permits.get_one_permit(); + drop(permit_rx2); + drop(permit_rx); + let _permit_rx = search_permits.get_one_permit(); + } +} diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 65516a99e76..b9bf596de21 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -50,6 +50,7 @@ use crate::list_fields_cache::ListFieldsCache; use crate::list_terms::{leaf_list_terms, root_list_terms}; use crate::root::fetch_docs_phase; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; +use crate::search_permits::SearchPermits; use crate::search_stream::{leaf_search_stream, root_search_stream}; use crate::{fetch_docs, root_search, search_plan, ClusterClient, SearchError}; @@ -449,7 +450,7 @@ pub struct SearcherContext { /// Fast fields cache. pub fast_fields_cache: Arc, /// Counting semaphore to limit concurrent leaf search split requests. - pub leaf_search_split_semaphore: Arc, + pub leaf_search_split_semaphore: SearchPermits, /// Split footer cache. pub split_footer_cache: MemorySizedCache, /// Counting semaphore to limit concurrent split stream requests. @@ -468,10 +469,6 @@ impl std::fmt::Debug for SearcherContext { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("SearcherContext") .field("searcher_config", &self.searcher_config) - .field( - "leaf_search_split_semaphore", - &self.leaf_search_split_semaphore, - ) .field("split_stream_semaphore", &self.split_stream_semaphore) .finish() } @@ -491,9 +488,8 @@ impl SearcherContext { capacity_in_bytes, &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); - let leaf_search_split_semaphore = Arc::new(Semaphore::new( - searcher_config.max_num_concurrent_split_searches, - )); + let leaf_search_split_semaphore = + SearchPermits::new(searcher_config.max_num_concurrent_split_searches); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; From e02874662afd9ecac2d5aab03e5c0d7dea9c8404 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 23 Oct 2024 09:51:01 +0900 Subject: [PATCH 2/2] CR comments --- quickwit/quickwit-search/src/leaf.rs | 10 +- quickwit/quickwit-search/src/lib.rs | 2 +- quickwit/quickwit-search/src/list_terms.rs | 4 +- ...h_permits.rs => search_permit_provider.rs} | 123 +++++++++--------- quickwit/quickwit-search/src/service.rs | 8 +- 5 files changed, 77 insertions(+), 70 deletions(-) rename quickwit/quickwit-search/src/{search_permits.rs => search_permit_provider.rs} (63%) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 4104d80376e..8c5c7095422 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -50,7 +50,7 @@ use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; -use crate::search_permits::SearchPermit; +use crate::search_permit_provider::SearchPermit; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; @@ -1262,8 +1262,8 @@ pub async fn leaf_search( // We acquire all of the leaf search permits to make sure our single split search tasks // do no interleave with other leaf search requests. let permit_futures = searcher_context - .leaf_search_split_semaphore - .get_permits_futures(split_with_req.len()); + .search_permit_provider + .get_permits(split_with_req.len()); for ((split, mut request), permit_fut) in split_with_req.into_iter().zip(permit_futures.into_iter()) @@ -1361,7 +1361,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - leaf_split_search_permit: SearchPermit, + search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); @@ -1380,7 +1380,7 @@ async fn leaf_search_single_split_wrapper( .await; // We explicitly drop it, to highlight it to the reader - std::mem::drop(leaf_split_search_permit); + std::mem::drop(search_permit); if leaf_search_single_split_res.is_ok() { timer.observe_duration(); diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index dec493fdb7b..58c464b7463 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -44,7 +44,7 @@ mod service; pub(crate) mod top_k_collector; mod metrics; -mod search_permits; +mod search_permit_provider; #[cfg(test)] mod tests; diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index a9d0ddf204e..765203438d1 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -332,8 +332,8 @@ pub async fn leaf_list_terms( let searcher_context_clone = searcher_context.clone(); async move { let _leaf_split_search_permit = searcher_context_clone - .leaf_search_split_semaphore - .get_one_permit() + .search_permit_provider + .get_permit() .await .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); diff --git a/quickwit/quickwit-search/src/search_permits.rs b/quickwit/quickwit-search/src/search_permit_provider.rs similarity index 63% rename from quickwit/quickwit-search/src/search_permits.rs rename to quickwit/quickwit-search/src/search_permit_provider.rs index 3405e2f0df2..f6883efb34b 100644 --- a/quickwit/quickwit-search/src/search_permits.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -18,57 +18,71 @@ // along with this program. If not, see . use std::collections::VecDeque; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{Arc, Mutex}; use quickwit_common::metrics::GaugeGuard; use tokio::sync::oneshot; -/// `SearchPermits` is a distributor of permits to perform single split +/// `SearchPermitProvider` is a distributor of permits to perform single split /// search operation. /// /// Requests are served in order. #[derive(Clone)] -pub struct SearchPermits { - inner: Arc>, +pub struct SearchPermitProvider { + inner_arc: Arc>, } -impl SearchPermits { - pub fn new(num_permits: usize) -> SearchPermits { - SearchPermits { - inner: Arc::new(Mutex::new(InnerSearchPermits { +impl SearchPermitProvider { + pub fn new(num_permits: usize) -> SearchPermitProvider { + SearchPermitProvider { + inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { num_permits_available: num_permits, permits_requests: VecDeque::new(), })), } } - /// Returns a list of future permits in the form of a Receiver channel. - pub fn get_one_permit(&self) -> oneshot::Receiver { - let mut permits_lock = self.inner.lock().unwrap(); - permits_lock.get_permit(&self.inner) + /// Returns a future permit in the form of a oneshot Receiver channel. + /// + /// At this point the permit is not acquired yet. + #[must_use] + pub fn get_permit(&self) -> oneshot::Receiver { + let mut permits_lock = self.inner_arc.lock().unwrap(); + permits_lock.get_permit(&self.inner_arc) } - /// Returns a list of future permits in the form of a Receiver channel. + /// Returns a list of future permits in the form of oneshot Receiver channels. /// /// The permits returned are guaranteed to be resolved in order. /// In addition, the permits are guaranteed to be resolved before permits returned by - /// subsequent calls to this function (or get_permit). - pub fn get_permits_futures(&self, num_permits: usize) -> Vec> { - let mut permits_lock = self.inner.lock().unwrap(); - permits_lock.get_permits(num_permits, &self.inner) + /// subsequent calls to this function (or `get_permit`). + #[must_use] + pub fn get_permits(&self, num_permits: usize) -> Vec> { + let mut permits_lock = self.inner_arc.lock().unwrap(); + permits_lock.get_permits(num_permits, &self.inner_arc) } } -struct InnerSearchPermits { +struct InnerSearchPermitProvider { num_permits_available: usize, permits_requests: VecDeque>, } -impl InnerSearchPermits { +impl InnerSearchPermitProvider { + fn get_permit( + &mut self, + inner_arc: &Arc>, + ) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back(tx); + self.assign_available_permits(inner_arc); + rx + } + fn get_permits( &mut self, num_permits: usize, - inner: &Arc>, + inner_arc: &Arc>, ) -> Vec> { let mut permits = Vec::with_capacity(num_permits); for _ in 0..num_permits { @@ -76,43 +90,35 @@ impl InnerSearchPermits { self.permits_requests.push_back(tx); permits.push(rx); } - self.assign_available_permits(inner); + self.assign_available_permits(inner_arc); permits } - fn get_permit( - &mut self, - inner: &Arc>, - ) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - self.assign_available_permits(inner); - rx - } - - fn recycle_permit(&mut self, inner: &Arc>) { + fn recycle_permit(&mut self, inner_arc: &Arc>) { self.num_permits_available += 1; - self.assign_available_permits(inner); + self.assign_available_permits(inner_arc); } - fn assign_available_permits(&mut self, inner: &Arc>) { + fn assign_available_permits(&mut self, inner_arc: &Arc>) { while self.num_permits_available > 0 { let Some(sender) = self.permits_requests.pop_front() else { break; }; + let mut ongoing_gauge_guard = GaugeGuard::from_gauge( + &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, + ); + ongoing_gauge_guard.add(1); let send_res = sender.send(SearchPermit { - _ongoing_gauge_guard: GaugeGuard::from_gauge( - &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, - ), - inner_opt: Some(Arc::downgrade(inner)), + _ongoing_gauge_guard: ongoing_gauge_guard, + inner_arc: inner_arc.clone(), + recycle_on_drop: true, }); match send_res { Ok(()) => { self.num_permits_available -= 1; } - Err(mut search_permit) => { - search_permit.disable_drop(); - drop(search_permit); + Err(search_permit) => { + search_permit.drop_without_recycling_permit(); } } } @@ -124,25 +130,24 @@ impl InnerSearchPermits { pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, - inner_opt: Option>>, + inner_arc: Arc>, + recycle_on_drop: bool, } impl SearchPermit { - fn disable_drop(&mut self) { - self.inner_opt = None; + fn drop_without_recycling_permit(mut self) { + self.recycle_on_drop = false; + drop(self); } } impl Drop for SearchPermit { fn drop(&mut self) { - let Some(inner) = self.inner_opt.take() else { - return; - }; - let Some(inner) = inner.upgrade() else { + if !self.recycle_on_drop { return; - }; - let mut inner_guard = inner.lock().unwrap(); - inner_guard.recycle_permit(&inner); + } + let mut inner_guard = self.inner_arc.lock().unwrap(); + inner_guard.recycle_permit(&self.inner_arc.clone()); } } @@ -150,12 +155,14 @@ impl Drop for SearchPermit { mod tests { use tokio::task::JoinSet; + use super::*; + #[tokio::test] async fn test_search_permits_get_permits_future() { // We test here that `get_permits_futures` does not interleave - let search_permits = super::SearchPermits::new(1); + let search_permits = SearchPermitProvider::new(1); let mut all_futures = Vec::new(); - let first_batch_of_permits = search_permits.get_permits_futures(10); + let first_batch_of_permits = search_permits.get_permits(10); assert_eq!(first_batch_of_permits.len(), 10); all_futures.extend( first_batch_of_permits @@ -164,7 +171,7 @@ mod tests { .map(move |(i, fut)| ((1, i), fut)), ); - let second_batch_of_permits = search_permits.get_permits_futures(10); + let second_batch_of_permits = search_permits.get_permits(10); assert_eq!(second_batch_of_permits.len(), 10); all_futures.extend( second_batch_of_permits @@ -203,11 +210,11 @@ mod tests { // Here we test that we don't have a problem if the Receiver is dropped. // In particular, we want to check that there is not a race condition where drop attempts to // lock the mutex. - let search_permits = super::SearchPermits::new(1); - let permit_rx = search_permits.get_one_permit(); - let permit_rx2 = search_permits.get_one_permit(); + let search_permits = SearchPermitProvider::new(1); + let permit_rx = search_permits.get_permit(); + let permit_rx2 = search_permits.get_permit(); drop(permit_rx2); drop(permit_rx); - let _permit_rx = search_permits.get_one_permit(); + let _permit_rx = search_permits.get_permit(); } } diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index b9bf596de21..7b288cefc3d 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -50,7 +50,7 @@ use crate::list_fields_cache::ListFieldsCache; use crate::list_terms::{leaf_list_terms, root_list_terms}; use crate::root::fetch_docs_phase; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; -use crate::search_permits::SearchPermits; +use crate::search_permit_provider::SearchPermitProvider; use crate::search_stream::{leaf_search_stream, root_search_stream}; use crate::{fetch_docs, root_search, search_plan, ClusterClient, SearchError}; @@ -450,7 +450,7 @@ pub struct SearcherContext { /// Fast fields cache. pub fast_fields_cache: Arc, /// Counting semaphore to limit concurrent leaf search split requests. - pub leaf_search_split_semaphore: SearchPermits, + pub search_permit_provider: SearchPermitProvider, /// Split footer cache. pub split_footer_cache: MemorySizedCache, /// Counting semaphore to limit concurrent split stream requests. @@ -489,7 +489,7 @@ impl SearcherContext { &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); let leaf_search_split_semaphore = - SearchPermits::new(searcher_config.max_num_concurrent_split_searches); + SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; @@ -506,7 +506,7 @@ impl SearcherContext { Self { searcher_config, fast_fields_cache: storage_long_term_cache, - leaf_search_split_semaphore, + search_permit_provider: leaf_search_split_semaphore, split_footer_cache: global_split_footer_cache, split_stream_semaphore, leaf_search_cache,