Skip to content

Commit

Permalink
Fix broken traces + add minor enhancements (#3794)
Browse files Browse the repository at this point in the history
* Jaeger now accepts OTLP protocol, remove jaeger exporter stuff.

* Fix broken traces.
  • Loading branch information
fmassot authored Sep 4, 2023
1 parent 00f2304 commit 0e70d28
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 25 deletions.
5 changes: 5 additions & 0 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::env;
use anyhow::Context;
use colored::Colorize;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::trace::BatchConfig;
use opentelemetry::sdk::{trace, Resource};
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
Expand Down Expand Up @@ -70,6 +71,9 @@ fn setup_logging_and_tracing(
// It is thus set on layers, see https://github.com/tokio-rs/tracing/issues/1817
if std::env::var_os(QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY).is_some() {
let otlp_exporter = opentelemetry_otlp::new_exporter().tonic().with_env();
// In debug mode, Quickwit can generate a lot of spans, and the default queue size of 2048
// is too small.
let batch_config = BatchConfig::default().with_max_queue_size(32768);
let trace_config = trace::config().with_resource(Resource::new([
KeyValue::new("service.name", "quickwit"),
KeyValue::new("service.version", build_info.version.clone()),
Expand All @@ -78,6 +82,7 @@ fn setup_logging_and_tracing(
.tracing()
.with_exporter(otlp_exporter)
.with_trace_config(trace_config)
.with_batch_config(batch_config)
.install_batch(opentelemetry::runtime::Tokio)
.context("Failed to initialize OpenTelemetry OTLP exporter.")?;
registry
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-directories/src/storage_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl FileHandle for StorageDirectoryFileHandle {
Err(unsupported_operation(&self.path))
}

#[instrument(level = "debug", fields(path = %self.path.to_string_lossy()), skip(self))]
#[instrument(level = "debug", fields(path = %self.path.to_string_lossy(), byte_range_size = byte_range.end - byte_range.start), skip(self))]
async fn read_bytes_async(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
if byte_range.is_empty() {
return Ok(OwnedBytes::empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl DeleteTaskPlanner {
.get_relevant_stale_splits(self.index_uid.clone(), last_delete_opstamp, ctx)
.await?;
ctx.record_progress();
info!(
debug!(
index_id = self.index_uid.index_id(),
last_delete_opstamp = last_delete_opstamp,
num_stale_splits = stale_splits.len()
Expand Down
19 changes: 12 additions & 7 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use anyhow::Context;
use futures::future::try_join_all;
use itertools::{Either, Itertools};
use quickwit_common::PrettySample;
use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo};
use quickwit_proto::search::{
Expand All @@ -47,7 +48,7 @@ use crate::collector::{make_collector_for_split, make_merge_collector};
use crate::service::SearcherContext;
use crate::SearchError;

#[instrument(skip(index_storage, footer_cache))]
#[instrument(skip_all)]
async fn get_split_footer_from_cache_or_fetch(
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
Expand Down Expand Up @@ -87,7 +88,7 @@ async fn get_split_footer_from_cache_or_fetch(
/// - A split footer cache given by `SearcherContext.split_footer_cache`.
/// - A fast fields cache given by `SearcherContext.storage_long_term_cache`.
/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`.
#[instrument(skip(searcher_context, index_storage, tokenizer_manager))]
#[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))]
pub(crate) async fn open_index_with_caches(
searcher_context: &SearcherContext,
index_storage: Arc<dyn Storage>,
Expand Down Expand Up @@ -144,9 +145,9 @@ pub(crate) async fn open_index_with_caches(
/// * `term_dict_field_names` - A list of fields, where the whole dictionary needs to be loaded.
/// This is e.g. required for term aggregation, since we don't know in advance which terms are going
/// to be hit.
#[instrument(skip(searcher))]
#[instrument(skip_all)]
pub(crate) async fn warmup(searcher: &Searcher, warmup_info: &WarmupInfo) -> anyhow::Result<()> {
debug!(warmup_info=?warmup_info, "warmup");
debug!(warmup_info=?warmup_info);
let warm_up_terms_future = warm_up_terms(searcher, &warmup_info.terms_grouped_by_field)
.instrument(debug_span!("warm_up_terms"));
let warm_up_term_ranges_future =
Expand Down Expand Up @@ -326,7 +327,7 @@ async fn warm_up_fieldnorms(searcher: &Searcher, requires_scoring: bool) -> anyh
}

/// Apply a leaf search on a single split.
#[instrument(skip(searcher_context, search_request, storage, split, doc_mapper,))]
#[instrument(skip_all, fields(split_id = split.split_id))]
async fn leaf_search_single_split(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
Expand Down Expand Up @@ -372,7 +373,7 @@ async fn leaf_search_single_split(
warmup_info.merge(collector_warmup_info);

warmup(&searcher, &warmup_info).await?;
let span = info_span!("tantivy_search", split_id = %split.split_id);
let span = info_span!("tantivy_search");
let leaf_search_response = crate::run_cpu_intensive(move || {
let _span_guard = span.enter();
searcher.search(&query, &quickwit_collector)
Expand Down Expand Up @@ -431,13 +432,15 @@ pub(crate) fn rewrite_start_end_time_bounds(
/// [PartialHit](quickwit_proto::search::PartialHit) candidates. The root will be in
/// charge to consolidate, identify the actual final top hits to display, and
/// fetch the actual documents to convert the partial hits into actual Hits.
#[instrument(skip_all, fields(index = ?request.index_id_patterns))]
pub async fn leaf_search(
searcher_context: Arc<SearcherContext>,
request: &SearchRequest,
index_storage: Arc<dyn Storage>,
splits: &[SplitIdAndFooterOffsets],
doc_mapper: Arc<dyn DocMapper>,
) -> Result<LeafSearchResponse, SearchError> {
info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(splits, 5));
let request = Arc::new(request.clone());
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
Expand Down Expand Up @@ -513,7 +516,7 @@ pub async fn leaf_search(
}

/// Apply a leaf list terms on a single split.
#[instrument(skip(searcher_context, search_request, storage, split))]
#[instrument(skip_all, fields(split_id = split.split_id))]
async fn leaf_list_terms_single_split(
searcher_context: &SearcherContext,
search_request: &ListTermsRequest,
Expand Down Expand Up @@ -621,12 +624,14 @@ fn term_to_data(field: Field, field_type: &FieldType, field_value: &[u8]) -> Vec
}

/// `leaf` step of list terms.
#[instrument(skip_all, fields(index = ?request.index_id))]
pub async fn leaf_list_terms(
searcher_context: Arc<SearcherContext>,
request: &ListTermsRequest,
index_storage: Arc<dyn Storage>,
splits: &[SplitIdAndFooterOffsets],
) -> Result<LeafListTermsResponse, SearchError> {
info!(split_offsets = ?PrettySample::new(splits, 5));
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
.map(|split| {
Expand Down
19 changes: 9 additions & 10 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use futures::future::try_join_all;
use itertools::Itertools;
use quickwit_common::shared_consts::{DELETION_GRACE_PERIOD, SCROLL_BATCH_LEN};
use quickwit_common::uri::Uri;
use quickwit_common::PrettySample;
use quickwit_config::{build_doc_mapper, IndexConfig};
use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_doc_mapper::{DocMapper, DYNAMIC_FIELD_NAME};
Expand All @@ -44,7 +45,7 @@ use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResult
use tantivy::collector::Collector;
use tantivy::schema::{FieldType, Schema};
use tantivy::TantivyError;
use tracing::{debug, error, info_span, instrument};
use tracing::{debug, error, info, info_span, instrument};

use crate::cluster_client::ClusterClient;
use crate::collector::{make_merge_collector, QuickwitAggregations};
Expand Down Expand Up @@ -367,7 +368,7 @@ fn get_scroll_ttl_duration(search_request: &SearchRequest) -> crate::Result<Opti
Ok(Some(scroll_ttl))
}

#[instrument(skip(search_request, indexes_metas_for_leaf_search, cluster_client))]
#[instrument(skip_all)]
async fn search_partial_hits_phase_with_scroll(
searcher_context: &SearcherContext,
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
Expand Down Expand Up @@ -431,7 +432,7 @@ async fn search_partial_hits_phase_with_scroll(
}
}

#[instrument(skip(search_request, indexes_metas_for_leaf_search, cluster_client))]
#[instrument(skip_all)]
pub(crate) async fn search_partial_hits_phase(
searcher_context: &SearcherContext,
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
Expand Down Expand Up @@ -492,6 +493,7 @@ pub(crate) fn get_snippet_request(search_request: &SearchRequest) -> Option<Snip
})
}

#[instrument(skip_all, fields(partial_hits_num=partial_hits.len()))]
pub(crate) async fn fetch_docs_phase(
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
partial_hits: &[PartialHit],
Expand Down Expand Up @@ -571,19 +573,15 @@ pub(crate) async fn fetch_docs_phase(
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip(
searcher_context,
indexes_metas_for_leaf_search,
search_request,
cluster_client
))]
#[instrument(skip_all)]
async fn root_search_aux(
searcher_context: &SearcherContext,
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
search_request: SearchRequest,
split_metadatas: Vec<SplitMetadata>,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
info!(split_metadatas = ?PrettySample::new(&split_metadatas, 5));
let (first_phase_result, scroll_key_and_start_offset_opt): (
LeafSearchResponse,
Option<ScrollKeyAndStartOffset>,
Expand Down Expand Up @@ -671,13 +669,14 @@ fn finalize_aggregation_if_any(
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip(search_request, cluster_client, metastore))]
#[instrument(skip_all)]
pub async fn root_search(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
metastore: &dyn Metastore,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
info!(searcher_context = ?searcher_context, search_request = ?search_request);
let start_instant = tokio::time::Instant::now();
let indexes_metadata = metastore
.list_indexes_metadatas(ListIndexesQuery::IndexIdPatterns(
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-search/src/search_stream/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::marker::PhantomData;
use std::sync::Arc;

use futures::{FutureExt, StreamExt};
use quickwit_common::PrettySample;
use quickwit_doc_mapper::DocMapper;
use quickwit_proto::search::{
LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest,
Expand Down Expand Up @@ -51,13 +52,15 @@ use crate::{Result, SearchError};
// to process stream in grpc_adapter.rs to change SearchError
// to tonic::Status as tonic::Status is required by the stream result
// signature defined by proto generated code.
#[instrument(skip_all, fields(index = request.index_id))]
pub async fn leaf_search_stream(
searcher_context: Arc<SearcherContext>,
request: SearchStreamRequest,
storage: Arc<dyn Storage>,
splits: Vec<SplitIdAndFooterOffsets>,
doc_mapper: Arc<dyn DocMapper>,
) -> UnboundedReceiverStream<crate::Result<LeafSearchStreamResponse>> {
info!(split_offsets = ?PrettySample::new(&splits, 5));
let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel();
let span = info_span!("leaf_search_stream",);
tokio::spawn(
Expand Down Expand Up @@ -105,7 +108,7 @@ async fn leaf_search_results_stream(
}

/// Apply a leaf search on a single split.
#[instrument(fields(split_id = %split.split_id), skip(searcher_context, split, doc_mapper, stream_request, storage))]
#[instrument(skip_all, fields(split_id = %split.split_id))]
async fn leaf_search_stream_single_split(
searcher_context: Arc<SearcherContext>,
split: SplitIdAndFooterOffsets,
Expand Down
5 changes: 0 additions & 5 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use quickwit_storage::{MemorySizedCache, QuickwitCache, StorageCache, StorageRes
use tantivy::aggregation::AggregationLimits;
use tokio::sync::Semaphore;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::info;

use crate::leaf_cache::LeafSearchCache;
use crate::root::{fetch_docs_phase, get_snippet_request};
Expand Down Expand Up @@ -178,7 +177,6 @@ impl SearchService for SearchServiceImpl {
let search_request = leaf_search_request
.search_request
.ok_or_else(|| SearchError::Internal("No search request.".to_string()))?;
info!(index=?search_request.index_id_patterns, splits=?leaf_search_request.split_offsets, "leaf_search");
let storage = self
.storage_resolver
.resolve(&Uri::from_well_formed(leaf_search_request.index_uri))
Expand Down Expand Up @@ -242,7 +240,6 @@ impl SearchService for SearchServiceImpl {
let stream_request = leaf_stream_request
.request
.ok_or_else(|| SearchError::Internal("No search request.".to_string()))?;
info!(index=?stream_request.index_id, splits=?leaf_stream_request.split_offsets, "leaf_search");
let storage = self
.storage_resolver
.resolve(&Uri::from_well_formed(leaf_stream_request.index_uri))
Expand Down Expand Up @@ -280,8 +277,6 @@ impl SearchService for SearchServiceImpl {
let search_request = leaf_search_request
.list_terms_request
.ok_or_else(|| SearchError::Internal("No search request.".to_string()))?;
info!(index=?search_request.index_id, splits=?leaf_search_request.split_offsets,
"leaf_search");
let storage = self
.storage_resolver
.resolve(&Uri::from_well_formed(leaf_search_request.index_uri))
Expand Down

0 comments on commit 0e70d28

Please sign in to comment.