Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevents single split searches from different leaf_search from interleaving #5509

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ quickwit-storage = { workspace = true }
[dev-dependencies]
assert-json-diff = { workspace = true }
proptest = { workspace = true }
rand = { workspace = true }
serde_json = { workspace = true }
typetag = { workspace = true }

Expand Down
18 changes: 12 additions & 6 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use tracing::*;
use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
use crate::metrics::SEARCH_METRICS;
use crate::root::is_metadata_count_request_with_ast;
use crate::search_permits::SearchPermit;
use crate::service::{deserialize_doc_mapper, SearcherContext};
use crate::{QuickwitAggregations, SearchError};

Expand Down Expand Up @@ -1183,7 +1184,6 @@ async fn resolve_storage_and_leaf_search(
aggregations_limits: AggregationLimitsGuard,
) -> crate::Result<LeafSearchResponse> {
let storage = storage_resolver.resolve(&index_uri).await?;

leaf_search(
searcher_context.clone(),
search_request.clone(),
Expand Down Expand Up @@ -1259,10 +1259,16 @@ pub async fn leaf_search(
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));

for (split, mut request) in split_with_req {
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
// We acquire all of the leaf search permits to make sure our single split search tasks
// do no interleave with other leaf search requests.
let permit_futures = searcher_context
.leaf_search_split_semaphore
.get_permits_futures(split_with_req.len());

for ((split, mut request), permit_fut) in
split_with_req.into_iter().zip(permit_futures.into_iter())
{
let leaf_split_search_permit = permit_fut
.instrument(info_span!("waiting_for_leaf_search_split_semaphore"))
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
Expand Down Expand Up @@ -1355,7 +1361,7 @@ async fn leaf_search_single_split_wrapper(
split: SplitIdAndFooterOffsets,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit,
leaf_split_search_permit: SearchPermit,
aggregations_limits: AggregationLimitsGuard,
) {
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod service;
pub(crate) mod top_k_collector;

mod metrics;
mod search_permits;

#[cfg(test)]
mod tests;
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,12 @@ pub async fn leaf_list_terms(
let index_storage_clone = index_storage.clone();
let searcher_context_clone = searcher_context.clone();
async move {
let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone()
.acquire_owned()
let _leaf_split_search_permit = searcher_context_clone
.leaf_search_split_semaphore
.get_one_permit()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");

// TODO dedicated counter and timer?
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
let timer = crate::SEARCH_METRICS
Expand Down
18 changes: 16 additions & 2 deletions quickwit/quickwit-search/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_histogram,
new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec,
exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge_vec,
new_histogram, new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
};

pub struct SearchMetrics {
Expand All @@ -35,6 +35,8 @@ pub struct SearchMetrics {
pub leaf_searches_splits_total: IntCounter,
pub leaf_search_split_duration_secs: Histogram,
pub job_assigned_total: IntCounterVec<1>,
pub leaf_search_single_split_tasks_pending: IntGauge,
pub leaf_search_single_split_tasks_ongoing: IntGauge,
}

impl Default for SearchMetrics {
Expand All @@ -50,6 +52,14 @@ impl Default for SearchMetrics {
.copied()
.collect();

let leaf_search_single_split_tasks = new_gauge_vec::<1>(
"leaf_search_single_split_tasks",
"Number of single split search tasks pending or ongoing",
"search",
&[],
["status"], // takes values "ongoing" or "pending"
);

SearchMetrics {
root_search_requests_total: new_counter_vec(
"root_search_requests_total",
Expand Down Expand Up @@ -110,6 +120,10 @@ impl Default for SearchMetrics {
"search",
exponential_buckets(0.001, 2.0, 15).unwrap(),
),
leaf_search_single_split_tasks_ongoing: leaf_search_single_split_tasks
.with_label_values(["ongoing"]),
leaf_search_single_split_tasks_pending: leaf_search_single_split_tasks
.with_label_values(["pending"]),
job_assigned_total: new_counter_vec(
"job_assigned_total",
"Number of job assigned to searchers, per affinity rank.",
Expand Down
213 changes: 213 additions & 0 deletions quickwit/quickwit-search/src/search_permits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at hello@quickwit.io.
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Weak};

use quickwit_common::metrics::GaugeGuard;
use tokio::sync::oneshot;

/// `SearchPermits` is a distributor of permits to perform single split
/// search operation.
///
/// Requests are served in order.
#[derive(Clone)]
pub struct SearchPermits {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a misnomer, it sounds like this is a akin to a tokio SemaphorePermit with num_permits > 1, but this is actually a semaphore, or a PermitProvider, or something of the like

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right.

inner: Arc<Mutex<InnerSearchPermits>>,
}

impl SearchPermits {
pub fn new(num_permits: usize) -> SearchPermits {
SearchPermits {
inner: Arc::new(Mutex::new(InnerSearchPermits {
num_permits_available: num_permits,
permits_requests: VecDeque::new(),
})),
}
}

/// Returns a list of future permits in the form of a Receiver channel.
pub fn get_one_permit(&self) -> oneshot::Receiver<SearchPermit> {
let mut permits_lock = self.inner.lock().unwrap();
permits_lock.get_permit(&self.inner)
}

/// Returns a list of future permits in the form of a Receiver channel.
///
/// The permits returned are guaranteed to be resolved in order.
/// In addition, the permits are guaranteed to be resolved before permits returned by
/// subsequent calls to this function (or get_permit).
pub fn get_permits_futures(&self, num_permits: usize) -> Vec<oneshot::Receiver<SearchPermit>> {
let mut permits_lock = self.inner.lock().unwrap();
permits_lock.get_permits(num_permits, &self.inner)
}
}

struct InnerSearchPermits {
num_permits_available: usize,
permits_requests: VecDeque<oneshot::Sender<SearchPermit>>,
}

impl InnerSearchPermits {
fn get_permits(
&mut self,
num_permits: usize,
inner: &Arc<Mutex<InnerSearchPermits>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think inner is a very confusing name, the InnerSearchPermits is already "the inner thing", so an Arc<Mutex<_>> should outer, or a handle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get the confusion/comment.
The "outter" is SearchPermits.

I have not found a common practise on how to name the object of this pattern in Rust (in C++ pimpl is common, even if this is not the common usage of the pimpl pattern).

What would be a better name? would inner_arc help a bit?

) -> Vec<oneshot::Receiver<SearchPermit>> {
let mut permits = Vec::with_capacity(num_permits);
for _ in 0..num_permits {
let (tx, rx) = oneshot::channel();
self.permits_requests.push_back(tx);
permits.push(rx);
}
self.assign_available_permits(inner);
permits
}

fn get_permit(
&mut self,
inner: &Arc<Mutex<InnerSearchPermits>>,
) -> oneshot::Receiver<SearchPermit> {
let (tx, rx) = oneshot::channel();
self.permits_requests.push_back(tx);
self.assign_available_permits(inner);
rx
}

fn recycle_permit(&mut self, inner: &Arc<Mutex<InnerSearchPermits>>) {
self.num_permits_available += 1;
self.assign_available_permits(inner);
}

fn assign_available_permits(&mut self, inner: &Arc<Mutex<InnerSearchPermits>>) {
while self.num_permits_available > 0 {
let Some(sender) = self.permits_requests.pop_front() else {
break;
};
let send_res = sender.send(SearchPermit {
_ongoing_gauge_guard: GaugeGuard::from_gauge(
&crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing,
),
inner_opt: Some(Arc::downgrade(inner)),
});
match send_res {
Ok(()) => {
self.num_permits_available -= 1;
}
Err(mut search_permit) => {
search_permit.disable_drop();
drop(search_permit);
}
}
}
crate::SEARCH_METRICS
.leaf_search_single_split_tasks_pending
.set(self.permits_requests.len() as i64);
}
}

pub struct SearchPermit {
_ongoing_gauge_guard: GaugeGuard<'static>,
inner_opt: Option<Weak<Mutex<InnerSearchPermits>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe there is need for Weak here: InnerSearchPermits doesn't hold any SearchPermit (a oneshot sender doesn't hold the thing to transmit, the oneshot receiver do)

i find this type, Option<Weak<Mutex<InnerSearchPermits>>> rather confusing, in part because SearchPermits isn't the right name imo, in part because the Option<> very much looks like a way to store a bool alongside the rest of the structure to reduce the size of the struct by one usize. I'm not against using an Option that way, but if that's the reason, it ought to be written down, and if it isn't the goal, then i'm missing what the goal of this Option is, rather than a disable_drop: bool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the Option. Setting it to None seems easier to "proofread". I agree that it makes the code less readable however, as the intent is way less clearer than with the boolean solution.

I'll add some comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you ended up changing it to a bool. If you think an Option is better, you can put it back, just add a 1-2 line comment on the purpose of None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought after packing everything as drop_without_recycling_permit it was fine like that.

}

impl SearchPermit {
fn disable_drop(&mut self) {
self.inner_opt = None;
}
}

impl Drop for SearchPermit {
fn drop(&mut self) {
let Some(inner) = self.inner_opt.take() else {
return;
};
let Some(inner) = inner.upgrade() else {
return;
};
let mut inner_guard = inner.lock().unwrap();
inner_guard.recycle_permit(&inner);
}
}

#[cfg(test)]
mod tests {
use tokio::task::JoinSet;

#[tokio::test]
async fn test_search_permits_get_permits_future() {
// We test here that `get_permits_futures` does not interleave
let search_permits = super::SearchPermits::new(1);
let mut all_futures = Vec::new();
let first_batch_of_permits = search_permits.get_permits_futures(10);
assert_eq!(first_batch_of_permits.len(), 10);
all_futures.extend(
first_batch_of_permits
.into_iter()
.enumerate()
.map(move |(i, fut)| ((1, i), fut)),
);

let second_batch_of_permits = search_permits.get_permits_futures(10);
assert_eq!(second_batch_of_permits.len(), 10);
all_futures.extend(
second_batch_of_permits
.into_iter()
.enumerate()
.map(move |(i, fut)| ((2, i), fut)),
);

use rand::seq::SliceRandom;
// not super useful, considering what join set does, but still a tiny bit more sound.
all_futures.shuffle(&mut rand::thread_rng());

let mut join_set = JoinSet::new();
for (res, fut) in all_futures {
join_set.spawn(async move {
let permit = fut.await;
(res, permit)
});
}
let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20);
while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await {
ordered_result.push((batch_id, order));
}

assert_eq!(ordered_result.len(), 20);
for (i, res) in ordered_result[0..10].iter().enumerate() {
assert_eq!(res, &(1, i));
}
for (i, res) in ordered_result[10..20].iter().enumerate() {
assert_eq!(res, &(2, i));
}
}

#[tokio::test]
async fn test_search_permits_receiver_race_condition() {
// Here we test that we don't have a problem if the Receiver is dropped.
// In particular, we want to check that there is not a race condition where drop attempts to
// lock the mutex.
let search_permits = super::SearchPermits::new(1);
let permit_rx = search_permits.get_one_permit();
let permit_rx2 = search_permits.get_one_permit();
drop(permit_rx2);
drop(permit_rx);
let _permit_rx = search_permits.get_one_permit();
}
}
12 changes: 4 additions & 8 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::list_fields_cache::ListFieldsCache;
use crate::list_terms::{leaf_list_terms, root_list_terms};
use crate::root::fetch_docs_phase;
use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset};
use crate::search_permits::SearchPermits;
use crate::search_stream::{leaf_search_stream, root_search_stream};
use crate::{fetch_docs, root_search, search_plan, ClusterClient, SearchError};

Expand Down Expand Up @@ -449,7 +450,7 @@ pub struct SearcherContext {
/// Fast fields cache.
pub fast_fields_cache: Arc<dyn StorageCache>,
/// Counting semaphore to limit concurrent leaf search split requests.
pub leaf_search_split_semaphore: Arc<Semaphore>,
pub leaf_search_split_semaphore: SearchPermits,
/// Split footer cache.
pub split_footer_cache: MemorySizedCache<String>,
/// Counting semaphore to limit concurrent split stream requests.
Expand All @@ -468,10 +469,6 @@ impl std::fmt::Debug for SearcherContext {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("SearcherContext")
.field("searcher_config", &self.searcher_config)
.field(
"leaf_search_split_semaphore",
&self.leaf_search_split_semaphore,
)
.field("split_stream_semaphore", &self.split_stream_semaphore)
.finish()
}
Expand All @@ -491,9 +488,8 @@ impl SearcherContext {
capacity_in_bytes,
&quickwit_storage::STORAGE_METRICS.split_footer_cache,
);
let leaf_search_split_semaphore = Arc::new(Semaphore::new(
searcher_config.max_num_concurrent_split_searches,
));
let leaf_search_split_semaphore =
SearchPermits::new(searcher_config.max_num_concurrent_split_searches);
let split_stream_semaphore =
Semaphore::new(searcher_config.max_num_concurrent_split_streams);
let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize;
Expand Down
Loading