Skip to content

Commit

Permalink
Adds a concept of request_id for logging/correlation purpose.
Browse files Browse the repository at this point in the history
We also measure the amount of memory taken by a split search, and log
this.
  • Loading branch information
fulmicoton committed Oct 25, 2024
1 parent 02a5b6a commit 3dc9cb2
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 22 deletions.
18 changes: 12 additions & 6 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ impl CachingDirectory {
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
);
CachingDirectory::new(underlying, Arc::new(byte_range_cache))
}

/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: Arc<ByteRangeCache>) -> CachingDirectory {
CachingDirectory { underlying, cache }
}
}

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-directories/src/hot_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ pub fn write_hotcache<D: Directory>(
// We use the caching directory here in order to defensively ensure that
// the content of the directory that will be written in the hotcache is precisely
// the same that was read on the first pass.

let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory));
let debug_proxy_directory = DebugProxyDirectory::wrap(caching_directory);
let index = Index::open(debug_proxy_directory.clone())?;
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ message ListFields {
// -- Search -------------------

message SearchRequest {
// id used for logging/debugging purpose.
// If left empty, the root node will generate one.
string request_id = 18;

// Index ID patterns
repeated string index_id_patterns = 1;

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn fetch_docs_in_split(
index_storage,
split,
Some(doc_mapper.tokenizer_manager()),
false,
None,
)
.await
.context("open-index-for-split")?;
Expand Down
18 changes: 12 additions & 6 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use quickwit_proto::search::{
use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery};
use quickwit_query::tokenizers::TokenizerManager;
use quickwit_storage::{
wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage,
StorageResolver, TimeoutAndRetryStorage,
wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes,
SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage,
};
use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::aggregation::AggregationLimitsGuard;
Expand Down Expand Up @@ -133,7 +133,7 @@ pub(crate) async fn open_index_with_caches(
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: bool,
ephemeral_unbounded_cache: Option<Arc<ByteRangeCache>>,
) -> anyhow::Result<Index> {
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
Expand Down Expand Up @@ -165,8 +165,8 @@ pub(crate) async fn open_index_with_caches(

let directory = StorageDirectory::new(bundle_storage_with_cache);

let hot_directory = if ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory));
let hot_directory = if let Some(cache) = ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new(Arc::new(directory), cache);
HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)?
} else {
HotDirectory::open(directory, hotcache_bytes.read_bytes()?)?
Expand Down Expand Up @@ -399,12 +399,15 @@ async fn leaf_search_single_split(
}

let split_id = split.split_id.to_string();
let byte_range_cache = Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
));
let index = open_index_with_caches(
searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(byte_range_cache.clone()),
)
.await?;
let split_schema = index.schema();
Expand All @@ -425,6 +428,9 @@ async fn leaf_search_single_split(
warmup_info.simplify();

warmup(&searcher, &warmup_info).await?;

info!(request=%search_request.request_id, input_data=byte_range_cache.get_num_bytes(), "split search input data memory");

let span = info_span!("tantivy_search");

let (search_request, leaf_search_response) = {
Expand Down
13 changes: 11 additions & 2 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use quickwit_proto::search::{
SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_proto::types::IndexUid;
use quickwit_storage::Storage;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::schema::{Field, FieldType};
use tantivy::{ReloadPolicy, Term};
use tracing::{debug, error, info, instrument};
Expand Down Expand Up @@ -216,7 +216,16 @@ async fn leaf_list_terms_single_split(
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
) -> crate::Result<LeafListTermsResponse> {
let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?;
let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let index = open_index_with_caches(
searcher_context,
storage,
&split,
None,
Some(Arc::new(cache)),
)
.await?;
let split_schema = index.schema();
let reader = index
.reader_builder()
Expand Down
20 changes: 17 additions & 3 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use tantivy::collector::Collector;
use tantivy::schema::{Field, FieldEntry, FieldType, Schema};
use tantivy::TantivyError;
use tracing::{debug, info, info_span, instrument};
use ulid::Ulid;

use crate::cluster_client::ClusterClient;
use crate::collector::{make_merge_collector, QuickwitAggregations};
Expand Down Expand Up @@ -352,6 +353,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result<

// We do not mutate
Ok(SearchRequest {
request_id: req.request_id.clone(),
index_id_patterns: req.index_id_patterns.clone(),
query_ast: req.query_ast.clone(),
start_timestamp: req.start_timestamp,
Expand Down Expand Up @@ -1107,14 +1109,21 @@ async fn refine_and_list_matches(
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip_all)]
#[instrument(skip_all, fields(request_id))]
pub async fn root_search(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
mut metastore: MetastoreServiceClient,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
info!(searcher_context = ?searcher_context, search_request = ?search_request);
if search_request.request_id.is_empty() {
search_request.request_id = Ulid::new().to_string();
}

let request_id = search_request.request_id.clone();
tracing::Span::current().record("request_id", request_id.as_str());

info!(search_request = ?search_request);
let start_instant = tokio::time::Instant::now();
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_id_patterns: search_request.index_id_patterns.clone(),
Expand Down Expand Up @@ -1169,8 +1178,13 @@ pub async fn root_search(
)
.await;

let elapsed = start_instant.elapsed();

info!(request_id=%request_id.as_str(), num_docs=num_docs, num_splits=num_splits, elapsed_time_millis=%elapsed.as_millis(), "search completed");
En0KEwoEMTIzNBgDIgkKBwgKEgMYgAgSJAgAEiAs2CFWr5WyHHWEiMhTXxVNw4gP7PlADPaGfr_AQk9WohpA6LZTjFfFhcFQrMsp2O7bOI9BOzP-jIE5PGhha62HDfX4t5FLQivX5rUhH5iTv2c-rd0kDSazrww4cD1UCeytDSIiCiCfMgpVPOuqq371l1wHVhCXoIscKW-wrwiKN80vR_Rfzg==

if let Ok(search_response) = &mut search_response_result {
search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64;
search_response.elapsed_time_micros = elapsed.as_micros() as u64;
}
let label_values = if search_response_result.is_ok() {
["success"]
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-search/src/search_stream/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_proto::search::{
LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest,
SplitIdAndFooterOffsets,
};
use quickwit_storage::Storage;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType};
use tantivy::fastfield::Column;
use tantivy::query::Query;
Expand Down Expand Up @@ -127,12 +127,15 @@ async fn leaf_search_stream_single_split(
&split,
);

let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);

let index = open_index_with_caches(
&searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(Arc::new(cache)),
)
.await?;
let split_schema = index.schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ fn build_request_for_es_api(

Ok((
quickwit_proto::search::SearchRequest {
request_id: String::default(),
index_id_patterns,
query_ast: serde_json::to_string(&query_ast).expect("Failed to serialize QueryAst"),
max_hits,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ pub fn search_request_from_api_request(
let query_ast = query_ast_from_user_text(&search_request.query, search_request.search_fields);
let query_ast_json = serde_json::to_string(&query_ast)?;
let search_request = quickwit_proto::search::SearchRequest {
request_id: String::default(),
index_id_patterns,
query_ast: query_ast_json,
snippet_fields: search_request.snippet_fields.unwrap_or_default(),
Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-storage/src/cache/byte_range_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
}
}

fn get_num_bytes(&self) -> u64 {
self.num_bytes
}

fn get_slice(&mut self, tag: &T, byte_range: Range<usize>) -> Option<OwnedBytes> {
if byte_range.start == byte_range.end {
return Some(OwnedBytes::empty());
Expand Down Expand Up @@ -356,6 +360,11 @@ impl ByteRangeCache {
}
}

/// Overall amount of bytes stored in the cache.
pub fn get_num_bytes(&self) -> u64 {
self.inner.lock().unwrap().get_num_bytes()
}

/// If available, returns the cached view of the slice.
pub fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes> {
self.inner.lock().unwrap().get_slice(path, byte_range)
Expand Down

0 comments on commit 3dc9cb2

Please sign in to comment.