Skip to content

Commit

Permalink
CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 23, 2024
1 parent 61483b9 commit 77fe522
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 23 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tracing::*;
use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
use crate::metrics::SEARCH_METRICS;
use crate::root::is_metadata_count_request_with_ast;
use crate::search_permits::SearchPermit;
use crate::search_permit_provider::SearchPermit;
use crate::service::{deserialize_doc_mapper, SearcherContext};
use crate::{QuickwitAggregations, SearchError};

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod service;
pub(crate) mod top_k_collector;

mod metrics;
mod search_permits;
mod search_permit_provider;

#[cfg(test)]
mod tests;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use tokio::sync::oneshot;
///
/// Requests are served in order.
#[derive(Clone)]
pub struct SearchPermits {
inner: Arc<Mutex<InnerSearchPermits>>,
pub struct SearchPermitProvider {
inner: Arc<Mutex<InnerSearchPermitProvider>>,
}

impl SearchPermits {
pub fn new(num_permits: usize) -> SearchPermits {
SearchPermits {
inner: Arc::new(Mutex::new(InnerSearchPermits {
impl SearchPermitProvider {
pub fn new(num_permits: usize) -> SearchPermitProvider {
SearchPermitProvider {
inner: Arc::new(Mutex::new(InnerSearchPermitProvider {
num_permits_available: num_permits,
permits_requests: VecDeque::new(),
})),
Expand All @@ -59,43 +59,43 @@ impl SearchPermits {
}
}

struct InnerSearchPermits {
struct InnerSearchPermitProvider {
num_permits_available: usize,
permits_requests: VecDeque<oneshot::Sender<SearchPermit>>,
}

impl InnerSearchPermits {
impl InnerSearchPermitProvider {
fn get_permits(
&mut self,
num_permits: usize,
inner: &Arc<Mutex<InnerSearchPermits>>,
inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>,
) -> Vec<oneshot::Receiver<SearchPermit>> {
let mut permits = Vec::with_capacity(num_permits);
for _ in 0..num_permits {
let (tx, rx) = oneshot::channel();
self.permits_requests.push_back(tx);
permits.push(rx);
}
self.assign_available_permits(inner);
self.assign_available_permits(inner_arc);
permits
}

fn get_permit(
&mut self,
inner: &Arc<Mutex<InnerSearchPermits>>,
inner: &Arc<Mutex<InnerSearchPermitProvider>>,
) -> oneshot::Receiver<SearchPermit> {
let (tx, rx) = oneshot::channel();
self.permits_requests.push_back(tx);
self.assign_available_permits(inner);
rx
}

fn recycle_permit(&mut self, inner: &Arc<Mutex<InnerSearchPermits>>) {
fn recycle_permit(&mut self, inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>) {
self.num_permits_available += 1;
self.assign_available_permits(inner);
self.assign_available_permits(inner_arc);
}

fn assign_available_permits(&mut self, inner: &Arc<Mutex<InnerSearchPermits>>) {
fn assign_available_permits(&mut self, inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>) {
while self.num_permits_available > 0 {
let Some(sender) = self.permits_requests.pop_front() else {
break;
Expand All @@ -104,7 +104,7 @@ impl InnerSearchPermits {
_ongoing_gauge_guard: GaugeGuard::from_gauge(
&crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing,
),
inner_opt: Some(Arc::downgrade(inner)),
inner_opt: Some(Arc::downgrade(inner_arc)),
});
match send_res {
Ok(()) => {
Expand All @@ -124,7 +124,7 @@ impl InnerSearchPermits {

pub struct SearchPermit {
_ongoing_gauge_guard: GaugeGuard<'static>,
inner_opt: Option<Weak<Mutex<InnerSearchPermits>>>,
inner_opt: Option<Weak<Mutex<InnerSearchPermitProvider>>>,
}

impl SearchPermit {
Expand All @@ -150,10 +150,12 @@ impl Drop for SearchPermit {
mod tests {
use tokio::task::JoinSet;

use super::*;

#[tokio::test]
async fn test_search_permits_get_permits_future() {
// We test here that `get_permits_futures` does not interleave
let search_permits = super::SearchPermits::new(1);
let search_permits = SearchPermitProvider::new(1);
let mut all_futures = Vec::new();
let first_batch_of_permits = search_permits.get_permits_futures(10);
assert_eq!(first_batch_of_permits.len(), 10);
Expand Down Expand Up @@ -203,7 +205,7 @@ mod tests {
// Here we test that we don't have a problem if the Receiver is dropped.
// In particular, we want to check that there is not a race condition where drop attempts to
// lock the mutex.
let search_permits = super::SearchPermits::new(1);
let search_permits = SearchPermitProvider::new(1);
let permit_rx = search_permits.get_one_permit();
let permit_rx2 = search_permits.get_one_permit();
drop(permit_rx2);
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::list_fields_cache::ListFieldsCache;
use crate::list_terms::{leaf_list_terms, root_list_terms};
use crate::root::fetch_docs_phase;
use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset};
use crate::search_permits::SearchPermits;
use crate::search_permit_provider::SearchPermitProvider;
use crate::search_stream::{leaf_search_stream, root_search_stream};
use crate::{fetch_docs, root_search, search_plan, ClusterClient, SearchError};

Expand Down Expand Up @@ -450,7 +450,7 @@ pub struct SearcherContext {
/// Fast fields cache.
pub fast_fields_cache: Arc<dyn StorageCache>,
/// Counting semaphore to limit concurrent leaf search split requests.
pub leaf_search_split_semaphore: SearchPermits,
pub leaf_search_split_semaphore: SearchPermitProvider,
/// Split footer cache.
pub split_footer_cache: MemorySizedCache<String>,
/// Counting semaphore to limit concurrent split stream requests.
Expand Down Expand Up @@ -489,7 +489,7 @@ impl SearcherContext {
&quickwit_storage::STORAGE_METRICS.split_footer_cache,
);
let leaf_search_split_semaphore =
SearchPermits::new(searcher_config.max_num_concurrent_split_searches);
SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches);
let split_stream_semaphore =
Semaphore::new(searcher_config.max_num_concurrent_split_streams);
let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize;
Expand Down

0 comments on commit 77fe522

Please sign in to comment.