diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json index 80da09968a3..0b881f85a13 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json @@ -64,7 +64,12 @@ "fast_field_cache_capacity": "10G", "split_footer_cache_capacity": "1G", "max_num_concurrent_split_streams": 120, - "max_num_concurrent_split_searches": 150 + "max_num_concurrent_split_searches": 150, + "storage_timeout_policy": { + "min_throughtput_bytes_per_secs": 100000, + "timeout_millis": 2000, + "max_num_retries": 2 + } }, "jaeger": { "enable_endpoint": true, diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml index 34e36a96109..97b44376a75 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml @@ -56,6 +56,11 @@ split_footer_cache_capacity = "1G" max_num_concurrent_split_streams = 120 max_num_concurrent_split_searches = 150 +[searcher.storage_timeout_policy] +min_throughtput_bytes_per_secs = 100000 +timeout_millis = 2000 +max_num_retries = 2 + [jaeger] enable_endpoint = true lookback_period_hours = 24 diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml index 1c5757a54ce..c23ec0c0e3f 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml @@ -59,6 +59,10 @@ searcher: split_footer_cache_capacity: 1G max_num_concurrent_split_streams: 120 max_num_concurrent_split_searches: 150 + storage_timeout_policy: + min_throughtput_bytes_per_secs: 100000 + timeout_millis: 2000 + max_num_retries: 2 jaeger: enable_endpoint: true diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 5e256793fcd..2a2a6d4be60 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -75,7 +75,7 @@ pub use crate::metastore_config::{ }; pub use crate::node_config::{ IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits, - DEFAULT_QW_CONFIG_PATH, + StorageTimeoutPolicy, DEFAULT_QW_CONFIG_PATH, }; use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 569dd94c55a..3eef1f10428 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -223,6 +223,42 @@ pub struct SearcherConfig { pub split_cache: Option, #[serde(default = "SearcherConfig::default_request_timeout_secs")] request_timeout_secs: NonZeroU64, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub storage_timeout_policy: Option, +} + +/// Configuration controlling how fast a searcher should timeout a `get_slice` +/// request to retry it. +/// +/// [Amazon's best practise](https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/timeouts-and-retries-for-latency-sensitive-applications.html) +/// suggests that to ensure low latency, it is best to: +/// - retry small GET request after 2s +/// - retry large GET request when the throughput is below some percentile. +/// +/// This policy is inspired by this guidance. It does not track instanteneous throughput, but +/// computes an overall timeout using the following formula: +/// `timeout_offset + num_bytes_get_request / min_throughtput` +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct StorageTimeoutPolicy { + pub min_throughtput_bytes_per_secs: u64, + pub timeout_millis: u64, + // Disclaimer: this is a number of retry, so the overall max number of + // attempts is `max_num_retries + 1``. + pub max_num_retries: usize, +} + +impl StorageTimeoutPolicy { + pub fn compute_timeout(&self, num_bytes: usize) -> impl Iterator { + let min_download_time_secs: f64 = if self.min_throughtput_bytes_per_secs == 0 { + 0.0f64 + } else { + num_bytes as f64 / self.min_throughtput_bytes_per_secs as f64 + }; + let timeout = Duration::from_millis(self.timeout_millis) + + Duration::from_secs_f64(min_download_time_secs); + std::iter::repeat(timeout).take(self.max_num_retries + 1) + } } impl Default for SearcherConfig { @@ -237,6 +273,7 @@ impl Default for SearcherConfig { aggregation_bucket_limit: 65000, split_cache: None, request_timeout_secs: Self::default_request_timeout_secs(), + storage_timeout_policy: None, } } } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 806a7abd520..8a1337636cf 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -612,6 +612,11 @@ mod tests { max_num_concurrent_split_streams: 120, split_cache: None, request_timeout_secs: NonZeroU64::new(30).unwrap(), + storage_timeout_policy: Some(crate::StorageTimeoutPolicy { + min_throughtput_bytes_per_secs: 100_000, + timeout_millis: 2_000, + max_num_retries: 2 + }) } ); assert_eq!( diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 7ee3617e102..819015b6d26 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -36,7 +36,7 @@ use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQ use quickwit_query::tokenizers::TokenizerManager; use quickwit_storage::{ wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage, - StorageResolver, + StorageResolver, TimeoutAndRetryStorage, }; use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations}; use tantivy::aggregation::AggregationLimitsGuard; @@ -135,13 +135,34 @@ pub(crate) async fn open_index_with_caches( tokenizer_manager: Option<&TokenizerManager>, ephemeral_unbounded_cache: bool, ) -> anyhow::Result { - let (hotcache_bytes, bundle_storage) = - open_split_bundle(searcher_context, index_storage, split_and_footer_offsets).await?; + // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, + // if configured in the searcher config. + // + // The goal here is too ensure a low latency. + + let index_storage_with_retry_on_timeout = if let Some(storage_timeout_policy) = + &searcher_context.searcher_config.storage_timeout_policy + { + Arc::new(TimeoutAndRetryStorage::new( + index_storage, + storage_timeout_policy.clone(), + )) + } else { + index_storage + }; + + let (hotcache_bytes, bundle_storage) = open_split_bundle( + searcher_context, + index_storage_with_retry_on_timeout, + split_and_footer_offsets, + ) + .await?; let bundle_storage_with_cache = wrap_storage_with_cache( searcher_context.fast_fields_cache.clone(), Arc::new(bundle_storage), ); + let directory = StorageDirectory::new(bundle_storage_with_cache); let hot_directory = if ephemeral_unbounded_cache { diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index 02a2e2389b6..c7b2e80eef8 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -35,6 +35,7 @@ mod debouncer; mod file_descriptor_cache; mod metrics; mod storage; +mod timeout_and_retry_storage; pub use debouncer::AsyncDebouncer; pub(crate) use debouncer::DebouncedStorage; @@ -92,6 +93,7 @@ pub use self::test_suite::{ storage_test_multi_part_upload, storage_test_single_part_upload, storage_test_suite, test_write_and_bulk_delete, }; +pub use self::timeout_and_retry_storage::TimeoutAndRetryStorage; pub use crate::error::{ BulkDeleteError, DeleteFailure, StorageError, StorageErrorKind, StorageResolverError, StorageResult, diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 42ca287461b..0e1547a340f 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -21,7 +21,7 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - new_counter, new_counter_with_labels, new_gauge, IntCounter, IntGauge, + new_counter, new_counter_vec, new_counter_with_labels, new_gauge, IntCounter, IntGauge, }; /// Counters associated to storage operations. @@ -32,6 +32,8 @@ pub struct StorageMetrics { pub fast_field_cache: CacheMetrics, pub split_footer_cache: CacheMetrics, pub searcher_split_cache: CacheMetrics, + pub get_slice_timeout_successes: [IntCounter; 3], + pub get_slice_timeout_all_timeouts: IntCounter, pub object_storage_get_total: IntCounter, pub object_storage_put_total: IntCounter, pub object_storage_put_parts: IntCounter, @@ -41,6 +43,21 @@ pub struct StorageMetrics { impl Default for StorageMetrics { fn default() -> Self { + let get_slice_timeout_outcome_total_vec = new_counter_vec( + "get_slice_timeout_outcome", + "Outcome of get_slice operations. success_after_1_timeout means the operation \ + succeeded after a retry caused by a timeout.", + "storage", + &[], + ["outcome"], + ); + let get_slice_timeout_successes = [ + get_slice_timeout_outcome_total_vec.with_label_values(["success_after_0_timeout"]), + get_slice_timeout_outcome_total_vec.with_label_values(["success_after_1_timeout"]), + get_slice_timeout_outcome_total_vec.with_label_values(["success_after_2+_timeout"]), + ]; + let get_slice_timeout_all_timeouts = + get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]); StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), fd_cache_metrics: CacheMetrics::for_component("fd"), @@ -48,7 +65,8 @@ impl Default for StorageMetrics { searcher_split_cache: CacheMetrics::for_component("searcher_split"), shortlived_cache: CacheMetrics::for_component("shortlived"), split_footer_cache: CacheMetrics::for_component("splitfooter"), - + get_slice_timeout_successes, + get_slice_timeout_all_timeouts, object_storage_get_total: new_counter( "object_storage_gets_total", "Number of objects fetched.", diff --git a/quickwit/quickwit-storage/src/timeout_and_retry_storage.rs b/quickwit/quickwit-storage/src/timeout_and_retry_storage.rs new file mode 100644 index 00000000000..6968682d467 --- /dev/null +++ b/quickwit/quickwit-storage/src/timeout_and_retry_storage.rs @@ -0,0 +1,285 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::ops::Range; +use std::path::Path; +use std::sync::Arc; + +use async_trait::async_trait; +use quickwit_common::uri::Uri; +use quickwit_common::{rate_limited_info, rate_limited_warn}; +use quickwit_config::StorageTimeoutPolicy; +use tantivy::directory::OwnedBytes; +use tokio::io::AsyncRead; + +use crate::storage::SendableAsync; +use crate::{BulkDeleteError, PutPayload, Storage, StorageErrorKind, StorageResult}; + +/// Storage proxy that implements a retry operation if the underlying storage +/// takes too long. +/// +/// This is useful in order to ensure a low latency on S3. +/// Retrying agressively is recommended for S3. + +/// +#[derive(Clone, Debug)] +pub struct TimeoutAndRetryStorage { + underlying: Arc, + storage_timeout_policy: StorageTimeoutPolicy, +} + +impl TimeoutAndRetryStorage { + /// Creates a new `TimeoutAndRetryStorage`. + /// + /// See [StorageTimeoutPolicy] for more information. + pub fn new(storage: Arc, storage_timeout_policy: StorageTimeoutPolicy) -> Self { + TimeoutAndRetryStorage { + underlying: storage, + storage_timeout_policy, + } + } +} + +#[async_trait] +impl Storage for TimeoutAndRetryStorage { + async fn check_connectivity(&self) -> anyhow::Result<()> { + self.underlying.check_connectivity().await + } + + async fn put(&self, path: &Path, payload: Box) -> StorageResult<()> { + self.underlying.put(path, payload).await + } + + fn copy_to<'life0, 'life1, 'life2, 'async_trait>( + &'life0 self, + path: &'life1 Path, + output: &'life2 mut dyn SendableAsync, + ) -> ::core::pin::Pin< + Box< + dyn ::core::future::Future> + + ::core::marker::Send + + 'async_trait, + >, + > + where + 'life0: 'async_trait, + 'life1: 'async_trait, + 'life2: 'async_trait, + Self: 'async_trait, + { + self.underlying.copy_to(path, output) + } + + async fn copy_to_file(&self, path: &Path, output_path: &Path) -> StorageResult { + self.underlying.copy_to_file(path, output_path).await + } + + /// Downloads a slice of a file from the storage, and returns an in memory buffer + async fn get_slice(&self, path: &Path, range: Range) -> StorageResult { + let num_bytes = range.len(); + for (attempt_id, timeout_duration) in self + .storage_timeout_policy + .compute_timeout(num_bytes) + .enumerate() + { + let get_slice_fut = self.underlying.get_slice(path, range.clone()); + // TODO test avoid aborting timed out requests. #5468 + match tokio::time::timeout(timeout_duration, get_slice_fut).await { + Ok(result) => { + crate::STORAGE_METRICS + .get_slice_timeout_successes + .get(attempt_id) + .or(crate::STORAGE_METRICS.get_slice_timeout_successes.last()) + .unwrap() + .inc(); + return result; + } + Err(_elapsed) => { + rate_limited_info!(limit_per_min=60, num_bytes=num_bytes, path=%path.display(), timeout_secs=timeout_duration.as_secs_f32(), "get timeout elapsed"); + continue; + } + } + } + rate_limited_warn!(limit_per_min=60, num_bytes=num_bytes, path=%path.display(), "all get_slice attempts timeouted"); + crate::STORAGE_METRICS.get_slice_timeout_all_timeouts.inc(); + return Err( + StorageErrorKind::Timeout.with_error(anyhow::anyhow!("internal timeout on get_slice")) + ); + } + + async fn get_slice_stream( + &self, + path: &Path, + range: Range, + ) -> StorageResult> { + self.underlying.get_slice_stream(path, range).await + } + + async fn get_all(&self, path: &Path) -> StorageResult { + self.underlying.get_all(path).await + } + + async fn delete(&self, path: &Path) -> StorageResult<()> { + self.underlying.delete(path).await + } + + async fn bulk_delete<'a>(&self, paths: &[&'a Path]) -> Result<(), BulkDeleteError> { + self.underlying.bulk_delete(paths).await + } + + async fn exists(&self, path: &Path) -> StorageResult { + self.underlying.exists(path).await + } + + async fn file_num_bytes(&self, path: &Path) -> StorageResult { + self.underlying.file_num_bytes(path).await + } + + fn uri(&self) -> &Uri { + self.underlying.uri() + } +} + +#[cfg(test)] +mod tests { + + use std::sync::Mutex; + use std::time::Duration; + + use tokio::time::Instant; + + use super::*; + + #[derive(Debug)] + struct StorageWithDelay { + delays: Mutex>, + } + + impl StorageWithDelay { + pub fn new(mut delays: Vec) -> StorageWithDelay { + delays.reverse(); + StorageWithDelay { + delays: Mutex::new(delays), + } + } + } + + #[async_trait] + impl Storage for StorageWithDelay { + fn uri(&self) -> &Uri { + todo!(); + } + + async fn check_connectivity(&self) -> anyhow::Result<()> { + todo!() + } + async fn put(&self, _path: &Path, _payload: Box) -> StorageResult<()> { + todo!(); + } + fn copy_to<'life0, 'life1, 'life2, 'async_trait>( + &'life0 self, + _path: &'life1 Path, + _output: &'life2 mut dyn SendableAsync, + ) -> ::core::pin::Pin< + Box< + dyn ::core::future::Future> + + ::core::marker::Send + + 'async_trait, + >, + > + where + 'life0: 'async_trait, + 'life1: 'async_trait, + 'life2: 'async_trait, + Self: 'async_trait, + { + todo!(); + } + + async fn get_slice(&self, _path: &Path, range: Range) -> StorageResult { + let duration_opt = self.delays.lock().unwrap().pop(); + let Some(delay) = duration_opt else { + return Err( + StorageErrorKind::Internal.with_error(anyhow::anyhow!("internal error")) + ); + }; + tokio::time::sleep(delay).await; + let buf = vec![0u8; range.len()]; + Ok(OwnedBytes::new(buf)) + } + async fn get_slice_stream( + &self, + _path: &Path, + _range: Range, + ) -> StorageResult> { + todo!() + } + async fn get_all(&self, _path: &Path) -> StorageResult { + todo!(); + } + async fn delete(&self, _path: &Path) -> StorageResult<()> { + todo!(); + } + async fn bulk_delete<'a>(&self, _paths: &[&'a Path]) -> Result<(), BulkDeleteError> { + todo!(); + } + async fn exists(&self, _path: &Path) -> StorageResult { + todo!() + } + async fn file_num_bytes(&self, _path: &Path) -> StorageResult { + todo!(); + } + } + + #[tokio::test] + async fn test_timeout_and_retry_storage() { + tokio::time::pause(); + + let timeout_policy = StorageTimeoutPolicy { + min_throughtput_bytes_per_secs: 100_000, + timeout_millis: 2_000, + max_num_retries: 1, + }; + + let path = Path::new("foo/bar"); + + { + let now = Instant::now(); + let storage_with_delay = + StorageWithDelay::new(vec![Duration::from_secs(5), Duration::from_secs(3)]); + let storage = + TimeoutAndRetryStorage::new(Arc::new(storage_with_delay), timeout_policy.clone()); + assert_eq!( + storage.get_slice(path, 10..100).await.unwrap_err().kind, + StorageErrorKind::Timeout + ); + let elapsed = now.elapsed().as_millis(); + assert!(elapsed.abs_diff(2 * 2_000) < 100); + } + { + let now = Instant::now(); + let storage_with_delay = + StorageWithDelay::new(vec![Duration::from_secs(5), Duration::from_secs(1)]); + let storage = TimeoutAndRetryStorage::new(Arc::new(storage_with_delay), timeout_policy); + assert!(storage.get_slice(path, 10..100).await.is_ok(),); + let elapsed = now.elapsed().as_millis(); + assert!(elapsed.abs_diff(2_000 + 1_000) < 100); + } + } +}