From 77fe5221356748f8296a22a4d151686b6b54c074 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 23 Oct 2024 09:51:01 +0900 Subject: [PATCH] CR comments --- quickwit/quickwit-search/src/leaf.rs | 2 +- quickwit/quickwit-search/src/lib.rs | 2 +- ...h_permits.rs => search_permit_provider.rs} | 38 ++++++++++--------- quickwit/quickwit-search/src/service.rs | 6 +-- 4 files changed, 25 insertions(+), 23 deletions(-) rename quickwit/quickwit-search/src/{search_permits.rs => search_permit_provider.rs} (86%) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 4104d80376e..00581612bb0 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -50,7 +50,7 @@ use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; -use crate::search_permits::SearchPermit; +use crate::search_permit_provider::SearchPermit; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index dec493fdb7b..58c464b7463 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -44,7 +44,7 @@ mod service; pub(crate) mod top_k_collector; mod metrics; -mod search_permits; +mod search_permit_provider; #[cfg(test)] mod tests; diff --git a/quickwit/quickwit-search/src/search_permits.rs b/quickwit/quickwit-search/src/search_permit_provider.rs similarity index 86% rename from quickwit/quickwit-search/src/search_permits.rs rename to quickwit/quickwit-search/src/search_permit_provider.rs index 3405e2f0df2..d506e69e813 100644 --- a/quickwit/quickwit-search/src/search_permits.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -28,14 +28,14 @@ use tokio::sync::oneshot; /// /// Requests are served in order. #[derive(Clone)] -pub struct SearchPermits { - inner: Arc>, +pub struct SearchPermitProvider { + inner: Arc>, } -impl SearchPermits { - pub fn new(num_permits: usize) -> SearchPermits { - SearchPermits { - inner: Arc::new(Mutex::new(InnerSearchPermits { +impl SearchPermitProvider { + pub fn new(num_permits: usize) -> SearchPermitProvider { + SearchPermitProvider { + inner: Arc::new(Mutex::new(InnerSearchPermitProvider { num_permits_available: num_permits, permits_requests: VecDeque::new(), })), @@ -59,16 +59,16 @@ impl SearchPermits { } } -struct InnerSearchPermits { +struct InnerSearchPermitProvider { num_permits_available: usize, permits_requests: VecDeque>, } -impl InnerSearchPermits { +impl InnerSearchPermitProvider { fn get_permits( &mut self, num_permits: usize, - inner: &Arc>, + inner_arc: &Arc>, ) -> Vec> { let mut permits = Vec::with_capacity(num_permits); for _ in 0..num_permits { @@ -76,13 +76,13 @@ impl InnerSearchPermits { self.permits_requests.push_back(tx); permits.push(rx); } - self.assign_available_permits(inner); + self.assign_available_permits(inner_arc); permits } fn get_permit( &mut self, - inner: &Arc>, + inner: &Arc>, ) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); self.permits_requests.push_back(tx); @@ -90,12 +90,12 @@ impl InnerSearchPermits { rx } - fn recycle_permit(&mut self, inner: &Arc>) { + fn recycle_permit(&mut self, inner_arc: &Arc>) { self.num_permits_available += 1; - self.assign_available_permits(inner); + self.assign_available_permits(inner_arc); } - fn assign_available_permits(&mut self, inner: &Arc>) { + fn assign_available_permits(&mut self, inner_arc: &Arc>) { while self.num_permits_available > 0 { let Some(sender) = self.permits_requests.pop_front() else { break; @@ -104,7 +104,7 @@ impl InnerSearchPermits { _ongoing_gauge_guard: GaugeGuard::from_gauge( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ), - inner_opt: Some(Arc::downgrade(inner)), + inner_opt: Some(Arc::downgrade(inner_arc)), }); match send_res { Ok(()) => { @@ -124,7 +124,7 @@ impl InnerSearchPermits { pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, - inner_opt: Option>>, + inner_opt: Option>>, } impl SearchPermit { @@ -150,10 +150,12 @@ impl Drop for SearchPermit { mod tests { use tokio::task::JoinSet; + use super::*; + #[tokio::test] async fn test_search_permits_get_permits_future() { // We test here that `get_permits_futures` does not interleave - let search_permits = super::SearchPermits::new(1); + let search_permits = SearchPermitProvider::new(1); let mut all_futures = Vec::new(); let first_batch_of_permits = search_permits.get_permits_futures(10); assert_eq!(first_batch_of_permits.len(), 10); @@ -203,7 +205,7 @@ mod tests { // Here we test that we don't have a problem if the Receiver is dropped. // In particular, we want to check that there is not a race condition where drop attempts to // lock the mutex. - let search_permits = super::SearchPermits::new(1); + let search_permits = SearchPermitProvider::new(1); let permit_rx = search_permits.get_one_permit(); let permit_rx2 = search_permits.get_one_permit(); drop(permit_rx2); diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index b9bf596de21..1629ea6acb4 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -50,7 +50,7 @@ use crate::list_fields_cache::ListFieldsCache; use crate::list_terms::{leaf_list_terms, root_list_terms}; use crate::root::fetch_docs_phase; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; -use crate::search_permits::SearchPermits; +use crate::search_permit_provider::SearchPermitProvider; use crate::search_stream::{leaf_search_stream, root_search_stream}; use crate::{fetch_docs, root_search, search_plan, ClusterClient, SearchError}; @@ -450,7 +450,7 @@ pub struct SearcherContext { /// Fast fields cache. pub fast_fields_cache: Arc, /// Counting semaphore to limit concurrent leaf search split requests. - pub leaf_search_split_semaphore: SearchPermits, + pub leaf_search_split_semaphore: SearchPermitProvider, /// Split footer cache. pub split_footer_cache: MemorySizedCache, /// Counting semaphore to limit concurrent split stream requests. @@ -489,7 +489,7 @@ impl SearcherContext { &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); let leaf_search_split_semaphore = - SearchPermits::new(searcher_config.max_num_concurrent_split_searches); + SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize;