diff --git a/docs/configuration/node-config.md b/docs/configuration/node-config.md index f99558c368d..d736f03c585 100644 --- a/docs/configuration/node-config.md +++ b/docs/configuration/node-config.md @@ -199,7 +199,7 @@ This section contains the configuration options for a Searcher. | `max_num_concurrent_split_searches` | Maximum number of concurrent split search requests running on a Searcher. | `100` | | `max_num_concurrent_split_streams` | Maximum number of concurrent split stream requests running on a Searcher. | `100` | | `split_cache` | Searcher split cache configuration options defined in the section below. Cache disabled if unspecified. | | - +| `request_timeout_secs` | The time before a search request is cancelled. This should match the timeout of the stack calling into quickwit if there is one set. | `30` | ### Searcher split cache configuration diff --git a/docs/get-started/tutorials/trace-analytics-with-grafana.md b/docs/get-started/tutorials/trace-analytics-with-grafana.md index ea0f55fb8de..6d099b6ee0d 100644 --- a/docs/get-started/tutorials/trace-analytics-with-grafana.md +++ b/docs/get-started/tutorials/trace-analytics-with-grafana.md @@ -12,7 +12,9 @@ You only need a few minutes to get Grafana working with Quickwit and build meani ## Create a Docker Compose recipe -Let's add a [Quickwit instance](../installation.md) with the OTLP service enabled. +First, create a `docker-compose.yml` file. This file will define the services needed to run Quickwit with OpenTelemetry and Grafana with the Quickwit Datasource plugin. + +Below is the complete Docker Compose configuration: ```yaml version: '3.0' @@ -25,23 +27,21 @@ services: ports: - 7280:7280 command: ["run"] -``` -Then we create a [Grafana](https://grafana.com/docs/grafana/latest/setup-grafana/installation/docker/#run-grafana-via-docker-compose) service with the [Quickwit Datasource](https://github.com/quickwit-oss/quickwit-datasource) plugin. - -```yaml grafana: image: grafana/grafana-oss container_name: grafana ports: - "${MAP_HOST_GRAFANA:-127.0.0.1}:3000:3000" environment: - GF_INSTALL_PLUGINS: https://github.com/quickwit-oss/quickwit-datasource/releases/download/v0.3.1/quickwit-quickwit-datasource-0.3.1.zip;quickwit-quickwit-datasource + GF_INSTALL_PLUGINS: https://github.com/quickwit-oss/quickwit-datasource/releases/download/v0.4.6/quickwit-quickwit-datasource-0.4.6.zip;quickwit-quickwit-datasource GF_AUTH_DISABLE_LOGIN_FORM: "true" GF_AUTH_ANONYMOUS_ENABLED: "true" GF_AUTH_ANONYMOUS_ORG_ROLE: Admin ``` +The default Grafana port is 3000. If this port is already taken, you can modify the port mapping, for example, changing 3000:3000 to 3100:3000 or any other available port. + Save and run the recipe: ```bash @@ -99,3 +99,5 @@ Quickwit sends itself its own traces, so you should already have data to display Here's what your first dashboard can look like : ![Quickwit Panel in Grafana Dashboard](../../assets/images/screenshot-grafana-tutorial-dashboard.png) + + diff --git a/docs/reference/metrics.md b/docs/reference/metrics.md index 315b6e3fc9c..77c8b7bd397 100644 --- a/docs/reference/metrics.md +++ b/docs/reference/metrics.md @@ -3,7 +3,7 @@ title: Metrics sidebar_position: 70 --- -Quickwit exposes some key metrics via [Prometheus](https://prometheus.io/). You can use any front-end that supports Prometheus to examine the behavior of Quickwit visually. +Quickwit exposes key metrics in the [Prometheus](https://prometheus.io/) format on the `/metrics` endpoint. You can use any front-end that supports Prometheus to examine the behavior of Quickwit visually. ## Cache Metrics diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 233fa27d139..3ce5526bd7a 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5894,7 +5894,6 @@ version = "0.8.0" dependencies = [ "anyhow", "itertools 0.13.0", - "ouroboros", "serde", "serde_json", "tantivy", diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 9305d6485d7..00d42985bda 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -521,7 +521,8 @@ fn indexing_task_to_chitchat_kv(indexing_task: &IndexingTask) -> (String, String let index_uid = indexing_task.index_uid(); let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task.pipeline_uid()); let shard_ids_str = shard_ids.iter().sorted().join(","); - let value = format!("{index_uid}:{source_id}:{shard_ids_str}"); + let fingerprint = indexing_task.params_fingerprint; + let value = format!("{index_uid}:{source_id}:{fingerprint}:{shard_ids_str}"); (key, value) } @@ -536,8 +537,12 @@ fn parse_shard_ids_str(shard_ids_str: &str) -> Vec { fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option { let pipeline_uid_str = key.strip_prefix(INDEXING_TASK_PREFIX)?; let pipeline_uid = PipelineUid::from_str(pipeline_uid_str).ok()?; - let (source_uid, shards_str) = value.rsplit_once(':')?; - let (index_uid, source_id) = source_uid.rsplit_once(':')?; + let mut field_iterator = value.rsplitn(4, ':'); + let shards_str = field_iterator.next()?; + let fingerprint_str = field_iterator.next()?; + let source_id = field_iterator.next()?; + let index_uid = field_iterator.next()?; + let params_fingerprint: u64 = fingerprint_str.parse().ok()?; let index_uid = index_uid.parse().ok()?; let shard_ids = parse_shard_ids_str(shards_str); Some(IndexingTask { @@ -545,7 +550,7 @@ fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option source_id: source_id.to_string(), pipeline_uid: Some(pipeline_uid), shard_ids, - params_fingerprint: 0, + params_fingerprint, }) } @@ -1143,11 +1148,11 @@ mod tests { let mut chitchat_guard = chitchat_handle.lock().await; chitchat_guard.self_node_state().set( format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS0"), - "my_index:00000000000000000000000000:my_source:1,3".to_string(), + "my_index:00000000000000000000000000:my_source:41:1,3".to_string(), ); chitchat_guard.self_node_state().set( format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS1"), - "my_index-00000000000000000000000000-my_source:3,5".to_string(), + "my_index-00000000000000000000000000-my_source:53:3,5".to_string(), ); } node.wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5)) @@ -1358,14 +1363,15 @@ mod tests { #[test] fn test_parse_chitchat_kv() { assert!( - chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:1,3").is_none() + chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:42:1,3").is_none() ); let task = super::chitchat_kv_to_indexing_task( "indexer.task:01BX5ZZKBKACTAV9WEVGEMMVS0", - "my_index:00000000000000000000000000:my_source:00000000000000000001,\ + "my_index:00000000000000000000000000:my_source:42:00000000000000000001,\ 00000000000000000003", ) .unwrap(); + assert_eq!(task.params_fingerprint, 42); assert_eq!( task.pipeline_uid(), PipelineUid::from_str("01BX5ZZKBKACTAV9WEVGEMMVS0").unwrap() diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 6cfbdcfc7ac..cc5f2b33fc5 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -676,10 +676,8 @@ impl IngestController { model: &mut ControlPlaneModel, progress: &Progress, ) -> MetastoreResult<()> { - const NUM_PERMITS: u64 = 1; - if !model - .acquire_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .unwrap_or(false) { return Ok(()); @@ -698,7 +696,7 @@ impl IngestController { if successful_source_uids.is_empty() { // We did not manage to create the shard. // We can release our permit. - model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Up); warn!( index_uid=%source_uid.index_uid, source_id=%source_uid.source_id, @@ -722,7 +720,7 @@ impl IngestController { source_id=%source_uid.source_id, "scaling up number of shards to {new_num_open_shards} failed: {metastore_error:?}" ); - model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Up); Err(metastore_error) } } @@ -860,10 +858,12 @@ impl IngestController { model: &mut ControlPlaneModel, progress: &Progress, ) -> MetastoreResult<()> { - const NUM_PERMITS: u64 = 1; + if shard_stats.num_open_shards == 0 { + return Ok(()); + } if !model - .acquire_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .unwrap_or(false) { return Ok(()); @@ -876,12 +876,12 @@ impl IngestController { "scaling down number of shards to {new_num_open_shards}" ); let Some((leader_id, shard_id)) = find_scale_down_candidate(&source_uid, model) else { - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); }; info!("scaling down shard {shard_id} from {leader_id}"); let Some(ingester) = self.ingester_pool.get(&leader_id) else { - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); }; let shard_pkeys = vec![ShardPKey { @@ -896,7 +896,7 @@ impl IngestController { .await { warn!("failed to scale down number of shards: {error}"); - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); } model.close_shards(&source_uid, &[shard_id]); diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index ca314233f6a..d4e02f67c2c 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -378,10 +378,9 @@ impl ControlPlaneModel { &mut self, source_uid: &SourceUid, scaling_mode: ScalingMode, - num_permits: u64, ) -> Option { self.shard_table - .acquire_scaling_permits(source_uid, scaling_mode, num_permits) + .acquire_scaling_permits(source_uid, scaling_mode) } pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { @@ -389,14 +388,9 @@ impl ControlPlaneModel { .drain_scaling_permits(source_uid, scaling_mode) } - pub fn release_scaling_permits( - &mut self, - source_uid: &SourceUid, - scaling_mode: ScalingMode, - num_permits: u64, - ) { + pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { self.shard_table - .release_scaling_permits(source_uid, scaling_mode, num_permits) + .release_scaling_permits(source_uid, scaling_mode) } } diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 29c579cddcd..00b440dec50 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -544,14 +544,13 @@ impl ShardTable { &mut self, source_uid: &SourceUid, scaling_mode: ScalingMode, - num_permits: u64, ) -> Option { let table_entry = self.table_entries.get_mut(source_uid)?; let scaling_rate_limiter = match scaling_mode { ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter, ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter, }; - Some(scaling_rate_limiter.acquire(num_permits)) + Some(scaling_rate_limiter.acquire(1)) } pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { @@ -564,18 +563,13 @@ impl ShardTable { } } - pub fn release_scaling_permits( - &mut self, - source_uid: &SourceUid, - scaling_mode: ScalingMode, - num_permits: u64, - ) { + pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { if let Some(table_entry) = self.table_entries.get_mut(source_uid) { let scaling_rate_limiter = match scaling_mode { ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter, ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter, }; - scaling_rate_limiter.release(num_permits); + scaling_rate_limiter.release(1); } } } @@ -1058,7 +1052,7 @@ mod tests { source_id: source_id.clone(), }; assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .is_none()); shard_table.add_source(&index_uid, &source_id); @@ -1071,7 +1065,7 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .unwrap()); let new_available_permits = shard_table @@ -1096,7 +1090,7 @@ mod tests { source_id: source_id.clone(), }; assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .is_none()); shard_table.add_source(&index_uid, &source_id); @@ -1109,7 +1103,7 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .unwrap()); let new_available_permits = shard_table @@ -1143,10 +1137,10 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .unwrap()); - shard_table.release_scaling_permits(&source_uid, ScalingMode::Up, 1); + shard_table.release_scaling_permits(&source_uid, ScalingMode::Up); let new_available_permits = shard_table .table_entries @@ -1179,10 +1173,10 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .unwrap()); - shard_table.release_scaling_permits(&source_uid, ScalingMode::Down, 1); + shard_table.release_scaling_permits(&source_uid, ScalingMode::Down); let new_available_permits = shard_table .table_entries diff --git a/quickwit/quickwit-datetime/Cargo.toml b/quickwit/quickwit-datetime/Cargo.toml index c30e6b029e1..004e959a348 100644 --- a/quickwit/quickwit-datetime/Cargo.toml +++ b/quickwit/quickwit-datetime/Cargo.toml @@ -13,7 +13,6 @@ license.workspace = true [dependencies] anyhow = { workspace = true } itertools = { workspace = true } -ouroboros = "0.18.0" serde = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } diff --git a/quickwit/quickwit-datetime/src/date_time_format.rs b/quickwit/quickwit-datetime/src/date_time_format.rs index 42b282ef6db..1758e289113 100644 --- a/quickwit/quickwit-datetime/src/date_time_format.rs +++ b/quickwit/quickwit-datetime/src/date_time_format.rs @@ -20,138 +20,14 @@ use std::fmt::Display; use std::str::FromStr; -use ouroboros::self_referencing; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value as JsonValue; -use time::error::Format; use time::format_description::well_known::{Iso8601, Rfc2822, Rfc3339}; -use time::format_description::FormatItem; -use time::parsing::Parsed; -use time::{Month, OffsetDateTime, PrimitiveDateTime}; -use time_fmt::parse::time_format_item::parse_to_format_item; - -use crate::TantivyDateTime; - -/// A date time parser that holds the format specification `Vec`. -#[self_referencing] -pub struct StrptimeParser { - strptime_format: String, - with_timezone: bool, - #[borrows(strptime_format)] - #[covariant] - items: Vec>, -} - -impl FromStr for StrptimeParser { - type Err = String; - - fn from_str(strptime_format: &str) -> Result { - StrptimeParser::try_new( - strptime_format.to_string(), - strptime_format.to_lowercase().contains("%z"), - |strptime_format: &String| { - parse_to_format_item(strptime_format).map_err(|error| { - format!("invalid strptime format `{strptime_format}`: {error}") - }) - }, - ) - } -} - -impl StrptimeParser { - /// Parse a given date according to the datetime format specified during the StrptimeParser - /// creation. If the date format does not provide a specific a time, the time will be set to - /// 00:00:00. - fn parse_primitive_date_time(&self, date_time_str: &str) -> anyhow::Result { - let mut parsed = Parsed::new(); - if !parsed - .parse_items(date_time_str.as_bytes(), self.borrow_items())? - .is_empty() - { - anyhow::bail!( - "datetime string `{}` does not match strptime format `{}`", - date_time_str, - self.borrow_strptime_format() - ); - } - // The parsed datetime contains a date but seems to be missing "time". - // We complete it artificially with 00:00:00. - if parsed.hour_24().is_none() - && !(parsed.hour_12().is_some() && parsed.hour_12_is_pm().is_some()) - { - parsed.set_hour_24(0u8); - parsed.set_minute(0u8); - parsed.set_second(0u8); - } - if parsed.year().is_none() { - let now = OffsetDateTime::now_utc(); - let year = infer_year(parsed.month(), now.month(), now.year()); - parsed.set_year(year); - } - let date_time = parsed.try_into()?; - Ok(date_time) - } - - pub fn parse_date_time(&self, date_time_str: &str) -> Result { - if *self.borrow_with_timezone() { - OffsetDateTime::parse(date_time_str, self.borrow_items()).map_err(|err| err.to_string()) - } else { - self.parse_primitive_date_time(date_time_str) - .map(|date_time| date_time.assume_utc()) - .map_err(|err| err.to_string()) - } - } - - pub fn format_date_time(&self, date_time: &OffsetDateTime) -> Result { - date_time.format(self.borrow_items()) - } -} +use time::Month; -impl Clone for StrptimeParser { - fn clone(&self) -> Self { - // `self.format` is already known to be a valid format. - Self::from_str(self.borrow_strptime_format().as_str()).unwrap() - } -} - -impl PartialEq for StrptimeParser { - fn eq(&self, other: &Self) -> bool { - self.borrow_strptime_format() == other.borrow_strptime_format() - } -} - -impl Eq for StrptimeParser {} - -impl std::fmt::Debug for StrptimeParser { - fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter - .debug_struct("StrptimeParser") - .field("format", &self.borrow_strptime_format()) - .finish() - } -} - -impl std::hash::Hash for StrptimeParser { - fn hash(&self, state: &mut H) { - self.borrow_strptime_format().hash(state); - } -} - -// `Strftime` format special characters. -// These characters are taken from the parsing crate we use for compatibility. -const STRFTIME_FORMAT_MARKERS: [&str; 36] = [ - "%a", "%A", "%b", "%B", "%c", "%C", "%d", "%D", "%e", "%f", "%F", "%h", "%H", "%I", "%j", "%k", - "%l", "%m", "%M", "%n", "%p", "%P", "%r", "%R", "%S", "%t", "%T", "%U", "%w", "%W", "%x", "%X", - "%y", "%Y", "%z", "%Z", -]; - -// Checks if a format contains `strftime` special characters. -fn is_strftime_formatting(format_str: &str) -> bool { - STRFTIME_FORMAT_MARKERS - .iter() - .any(|marker| format_str.contains(marker)) -} +use crate::java_date_time_format::is_strftime_formatting; +use crate::{StrptimeParser, TantivyDateTime}; /// Specifies the datetime and unix timestamp formats to use when parsing date strings. #[derive(Clone, Debug, Eq, PartialEq, Hash, Default)] @@ -170,7 +46,7 @@ impl DateTimeInputFormat { DateTimeInputFormat::Iso8601 => "iso8601", DateTimeInputFormat::Rfc2822 => "rfc2822", DateTimeInputFormat::Rfc3339 => "rfc3339", - DateTimeInputFormat::Strptime(parser) => parser.borrow_strptime_format(), + DateTimeInputFormat::Strptime(parser) => parser.strptime_format.as_str(), DateTimeInputFormat::Timestamp => "unix_timestamp", } } @@ -198,7 +74,7 @@ impl FromStr for DateTimeInputFormat { format must contain at least one `strftime` special characters" )); } - DateTimeInputFormat::Strptime(StrptimeParser::from_str(date_time_format_str)?) + DateTimeInputFormat::Strptime(StrptimeParser::from_strptime(date_time_format_str)?) } }; Ok(date_time_format) @@ -241,7 +117,7 @@ impl DateTimeOutputFormat { DateTimeOutputFormat::Iso8601 => "iso8601", DateTimeOutputFormat::Rfc2822 => "rfc2822", DateTimeOutputFormat::Rfc3339 => "rfc3339", - DateTimeOutputFormat::Strptime(parser) => parser.borrow_strptime_format(), + DateTimeOutputFormat::Strptime(parser) => parser.strptime_format.as_str(), DateTimeOutputFormat::TimestampSecs => "unix_timestamp_secs", DateTimeOutputFormat::TimestampMillis => "unix_timestamp_millis", DateTimeOutputFormat::TimestampMicros => "unix_timestamp_micros", @@ -300,7 +176,7 @@ impl FromStr for DateTimeOutputFormat { format must contain at least one `strftime` special characters" )); } - DateTimeOutputFormat::Strptime(StrptimeParser::from_str(date_time_format_str)?) + DateTimeOutputFormat::Strptime(StrptimeParser::from_strptime(date_time_format_str)?) } }; Ok(date_time_format) @@ -341,7 +217,6 @@ pub(super) fn infer_year( #[cfg(test)] mod tests { - use time::macros::datetime; use time::Month; use super::*; @@ -462,20 +337,6 @@ mod tests { } } - #[test] - fn test_strictly_parse_datetime_format() { - let parser = StrptimeParser::from_str("%Y-%m-%d").unwrap(); - assert_eq!( - parser.parse_date_time("2021-01-01").unwrap(), - datetime!(2021-01-01 00:00:00 UTC) - ); - let error = parser.parse_date_time("2021-01-01TABC").unwrap_err(); - assert_eq!( - error, - "datetime string `2021-01-01TABC` does not match strptime format `%Y-%m-%d`" - ); - } - #[test] fn test_infer_year() { let inferred_year = infer_year(None, Month::January, 2024); diff --git a/quickwit/quickwit-datetime/src/date_time_parsing.rs b/quickwit/quickwit-datetime/src/date_time_parsing.rs index 14c1fa9be90..54e8d4b88bb 100644 --- a/quickwit/quickwit-datetime/src/date_time_parsing.rs +++ b/quickwit/quickwit-datetime/src/date_time_parsing.rs @@ -179,8 +179,6 @@ pub fn parse_timestamp(timestamp: i64) -> Result { #[cfg(test)] mod tests { - use std::str::FromStr; - use time::macros::datetime; use time::Month; @@ -262,7 +260,7 @@ mod tests { ), ]; for (fmt, date_time_str, expected) in test_data { - let parser = StrptimeParser::from_str(fmt).unwrap(); + let parser = StrptimeParser::from_strptime(fmt).unwrap(); let result = parser.parse_date_time(date_time_str); if let Err(error) = &result { panic!( @@ -276,14 +274,14 @@ mod tests { #[test] fn test_parse_date_without_time() { - let strptime_parser = StrptimeParser::from_str("%Y-%m-%d").unwrap(); + let strptime_parser = StrptimeParser::from_strptime("%Y-%m-%d").unwrap(); let date = strptime_parser.parse_date_time("2012-05-21").unwrap(); assert_eq!(date, datetime!(2012-05-21 00:00:00 UTC)); } #[test] fn test_parse_date_am_pm_hour_not_zeroed() { - let strptime_parser = StrptimeParser::from_str("%Y-%m-%d %I:%M:%S %p").unwrap(); + let strptime_parser = StrptimeParser::from_strptime("%Y-%m-%d %I:%M:%S %p").unwrap(); let date = strptime_parser .parse_date_time("2012-05-21 10:05:12 pm") .unwrap(); @@ -309,13 +307,13 @@ mod tests { DateTimeInputFormat::Rfc2822, DateTimeInputFormat::Rfc3339, DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y-%m-%d %H:%M:%S").unwrap(), + StrptimeParser::from_strptime("%Y-%m-%d %H:%M:%S").unwrap(), ), DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y/%m/%d %H:%M:%S").unwrap(), + StrptimeParser::from_strptime("%Y/%m/%d %H:%M:%S").unwrap(), ), DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y/%m/%d %H:%M:%S %z").unwrap(), + StrptimeParser::from_strptime("%Y/%m/%d %H:%M:%S %z").unwrap(), ), DateTimeInputFormat::Timestamp, ], @@ -452,7 +450,7 @@ mod tests { DateTimeInputFormat::Iso8601, DateTimeInputFormat::Rfc3339, DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y-%m-%d %H:%M:%S.%f").unwrap(), + StrptimeParser::from_strptime("%Y-%m-%d %H:%M:%S.%f").unwrap(), ), ], ) diff --git a/quickwit/quickwit-datetime/src/java_date_time_format.rs b/quickwit/quickwit-datetime/src/java_date_time_format.rs new file mode 100644 index 00000000000..1cc035c90f3 --- /dev/null +++ b/quickwit/quickwit-datetime/src/java_date_time_format.rs @@ -0,0 +1,817 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::num::NonZeroU8; +use std::sync::OnceLock; + +use time::error::{Format, TryFromParsed}; +use time::format_description::modifier::{ + Day, Hour, Minute, Month as MonthModifier, Padding, Second, Subsecond, SubsecondDigits, + WeekNumber, WeekNumberRepr, Weekday, WeekdayRepr, Year, YearRepr, +}; +use time::format_description::{Component, OwnedFormatItem}; +use time::parsing::Parsed; +use time::{Month, OffsetDateTime, PrimitiveDateTime, UtcOffset}; +use time_fmt::parse::time_format_item::parse_to_format_item; + +use crate::date_time_format; + +const JAVA_DATE_FORMAT_TOKENS: &[&str] = &[ + "yyyy", + "xxxx", + "xx[xx]", + "SSSSSSSSS", // For nanoseconds + "SSSSSSS", // For microseconds + "SSSSSS", // For fractional seconds up to six digits + "SSSSS", + "SSSS", + "SSS", + "SS", + "ZZ", + "xx", + "ww", + "w[w]", + "yy", + "MM", + "dd", + "HH", + "hh", + "kk", + "mm", + "ss", + "aa", + "a", + "w", + "M", + "d", + "H", + "h", + "k", + "m", + "s", + "S", + "Z", + "e", +]; + +fn literal(s: &[u8]) -> OwnedFormatItem { + // builds a boxed slice from a slice + let boxed_slice: Box<[u8]> = s.to_vec().into_boxed_slice(); + OwnedFormatItem::Literal(boxed_slice) +} + +#[inline] +fn get_padding(ptn: &str) -> Padding { + if ptn.len() == 2 { + Padding::Zero + } else { + Padding::None + } +} + +fn build_zone_offset(_: &str) -> Option { + // 'Z' literal to represent UTC offset + let z_literal = OwnedFormatItem::Literal(Box::from(b"Z".as_ref())); + + // Offset in '+/-HH:MM' format + let offset_with_delimiter_items: Box<[OwnedFormatItem]> = vec![ + OwnedFormatItem::Component(Component::OffsetHour(Default::default())), + OwnedFormatItem::Literal(Box::from(b":".as_ref())), + OwnedFormatItem::Component(Component::OffsetMinute(Default::default())), + ] + .into_boxed_slice(); + let offset_with_delimiter_compound = OwnedFormatItem::Compound(offset_with_delimiter_items); + + // Offset in '+/-HHMM' format + let offset_items: Box<[OwnedFormatItem]> = vec![ + OwnedFormatItem::Component(Component::OffsetHour(Default::default())), + OwnedFormatItem::Component(Component::OffsetMinute(Default::default())), + ] + .into_boxed_slice(); + let offset_compound = OwnedFormatItem::Compound(offset_items); + + Some(OwnedFormatItem::First( + vec![z_literal, offset_with_delimiter_compound, offset_compound].into_boxed_slice(), + )) +} + +fn build_year_item(ptn: &str) -> Option { + let mut full_year = Year::default(); + full_year.repr = YearRepr::Full; + let full_year_component = OwnedFormatItem::Component(Component::Year(full_year)); + + let mut short_year = Year::default(); + short_year.repr = YearRepr::LastTwo; + let short_year_component = OwnedFormatItem::Component(Component::Year(short_year)); + + if ptn.len() == 4 { + Some(full_year_component) + } else if ptn.len() == 2 { + Some(short_year_component) + } else { + Some(OwnedFormatItem::First( + vec![full_year_component, short_year_component].into_boxed_slice(), + )) + } +} + +fn build_week_based_year_item(ptn: &str) -> Option { + // TODO no `Component` for that + build_year_item(ptn) +} + +fn build_month_item(ptn: &str) -> Option { + let mut month: MonthModifier = Default::default(); + month.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Month(month))) +} + +fn build_day_item(ptn: &str) -> Option { + let mut day = Day::default(); + day.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Day(day))) +} + +fn build_day_of_week_item(_: &str) -> Option { + let mut weekday = Weekday::default(); + weekday.repr = WeekdayRepr::Monday; + weekday.one_indexed = false; + Some(OwnedFormatItem::Component(Component::Weekday(weekday))) +} + +fn build_week_of_year_item(ptn: &str) -> Option { + let mut week_number = WeekNumber::default(); + week_number.repr = WeekNumberRepr::Monday; + week_number.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::WeekNumber( + week_number, + ))) +} + +fn build_hour_item(ptn: &str) -> Option { + let mut hour = Hour::default(); + hour.padding = get_padding(ptn); + hour.is_12_hour_clock = false; + Some(OwnedFormatItem::Component(Component::Hour(hour))) +} + +fn build_minute_item(ptn: &str) -> Option { + let mut minute: Minute = Default::default(); + minute.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Minute(minute))) +} + +fn build_second_item(ptn: &str) -> Option { + let mut second: Second = Default::default(); + second.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Second(second))) +} + +fn build_fraction_of_second_item(_ptn: &str) -> Option { + let mut subsecond: Subsecond = Default::default(); + subsecond.digits = SubsecondDigits::OneOrMore; + Some(OwnedFormatItem::Component(Component::Subsecond(subsecond))) +} + +fn parse_java_datetime_format_items_recursive( + chars: &mut std::iter::Peekable, +) -> Result, String> { + let mut items = Vec::new(); + + while let Some(&c) = chars.peek() { + match c { + '[' => { + chars.next(); + let optional_items = parse_java_datetime_format_items_recursive(chars)?; + items.push(OwnedFormatItem::Optional(Box::new( + OwnedFormatItem::Compound(optional_items.into_boxed_slice()), + ))); + } + ']' => { + chars.next(); + break; + } + '\'' => { + chars.next(); + let mut literal_str = String::new(); + while let Some(&next_c) = chars.peek() { + if next_c == '\'' { + chars.next(); + break; + } else { + literal_str.push(next_c); + chars.next(); + } + } + items.push(literal(literal_str.as_bytes())); + } + _ => { + if let Some(format_item) = match_java_date_format_token(chars)? { + items.push(format_item); + } else { + // Treat as a literal character + items.push(literal(c.to_string().as_bytes())); + chars.next(); + } + } + } + } + + Ok(items) +} + +// Elasticsearch/OpenSearch uses a set of preconfigured formats, more information could be found +// here https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html +fn match_java_date_format_token( + chars: &mut std::iter::Peekable, +) -> Result, String> { + if chars.peek().is_none() { + return Ok(None); + } + + let remaining: String = chars.clone().collect(); + + // Try to match the longest possible token + for token in JAVA_DATE_FORMAT_TOKENS { + if remaining.starts_with(token) { + for _ in 0..token.len() { + chars.next(); + } + + let format_item = match *token { + "yyyy" | "yy" => build_year_item(token), + "xxxx" | "xx[xx]" | "xx" => build_week_based_year_item(token), + "MM" | "M" => build_month_item(token), + "dd" | "d" => build_day_item(token), + "HH" | "H" => build_hour_item(token), + "mm" | "m" => build_minute_item(token), + "ss" | "s" => build_second_item(token), + "SSSSSSSSS" | "SSSSSSS" | "SSSSSS" | "SSSSS" | "SSSS" | "SSS" | "SS" | "S" => { + build_fraction_of_second_item(token) + } + "Z" => build_zone_offset(token), + "ww" | "w[w]" | "w" => build_week_of_year_item(token), + "e" => build_day_of_week_item(token), + _ => return Err(format!("Unrecognized token '{}'", token)), + }; + return Ok(format_item); + } + } + + Ok(None) +} + +// Check if the given date time format is a common alias and replace it with the +// Java date format it is mapped to, if any. +// If the java_datetime_format is not an alias, it is expected to be a +// java date time format and should be returned as is. +fn resolve_java_datetime_format_alias(java_datetime_format: &str) -> &str { + static JAVA_DATE_FORMAT_ALIASES: OnceLock> = + OnceLock::new(); + let java_datetime_format_map = JAVA_DATE_FORMAT_ALIASES.get_or_init(|| { + let mut m = HashMap::new(); + m.insert("date_optional_time", "yyyy-MM-dd['T'HH:mm:ss.SSSZ]"); + m.insert( + "strict_date_optional_time", + "yyyy[-MM[-dd['T'HH[:mm[:ss[.SSS[Z]]]]]]]", + ); + m.insert( + "strict_date_optional_time_nanos", + "yyyy[-MM[-dd['T'HH:mm:ss.SSSSSSZ]]]", + ); + m.insert("basic_date", "yyyyMMdd"); + + m.insert("strict_basic_week_date", "xxxx'W'wwe"); + m.insert("basic_week_date", "xx[xx]'W'wwe"); + + m.insert("strict_basic_week_date_time", "xxxx'W'wwe'T'HHmmss.SSSZ"); + m.insert("basic_week_date_time", "xx[xx]'W'wwe'T'HHmmss.SSSZ"); + + m.insert( + "strict_basic_week_date_time_no_millis", + "xxxx'W'wwe'T'HHmmssZ", + ); + m.insert("basic_week_date_time_no_millis", "xx[xx]'W'wwe'T'HHmmssZ"); + + m.insert("strict_week_date", "xxxx-'W'ww-e"); + m.insert("week_date", "xxxx-'W'w[w]-e"); + m + }); + java_datetime_format_map + .get(java_datetime_format) + .copied() + .unwrap_or(java_datetime_format) +} + +/// A date time parser that holds the format specification `Vec`. +#[derive(Clone)] +pub struct StrptimeParser { + pub(crate) strptime_format: String, + items: Box<[OwnedFormatItem]>, +} + +pub fn parse_java_datetime_format_items( + java_datetime_format: &str, +) -> Result, String> { + let mut chars = java_datetime_format.chars().peekable(); + let items = parse_java_datetime_format_items_recursive(&mut chars)?; + Ok(items.into_boxed_slice()) +} + +impl StrptimeParser { + /// Parse a date assume UTC if unspecified. + /// See `parse_date_time_with_default_timezone` for more details. + pub fn parse_date_time(&self, date_time_str: &str) -> Result { + self.parse_date_time_with_default_timezone(date_time_str, UtcOffset::UTC) + } + + /// Parse a date. If no timezone is specified we will assume the timezone passed as + /// `default_offset`. If the date is missing, it will be automatically set to 00:00:00. + pub fn parse_date_time_with_default_timezone( + &self, + date_time_str: &str, + default_offset: UtcOffset, + ) -> Result { + let mut parsed = Parsed::new(); + if !parsed + .parse_items(date_time_str.as_bytes(), &self.items) + .map_err(|err| err.to_string())? + .is_empty() + { + return Err(format!( + "datetime string `{}` does not match strptime format `{}`", + date_time_str, &self.strptime_format + )); + } + + // The parsed datetime contains a date but seems to be missing "time". + // We complete it artificially with 00:00:00. + if parsed.hour_24().is_none() + && !(parsed.hour_12().is_some() && parsed.hour_12_is_pm().is_some()) + { + parsed.set_hour_24(0u8); + parsed.set_minute(0u8); + parsed.set_second(0u8); + } + + if parsed.year().is_none() { + let now = OffsetDateTime::now_utc(); + let year = date_time_format::infer_year(parsed.month(), now.month(), now.year()); + parsed.set_year(year); + } + + if parsed.day().is_none() && parsed.monday_week_number().is_none() { + parsed.set_day(NonZeroU8::try_from(1u8).unwrap()); + } + + if parsed.month().is_none() && parsed.monday_week_number().is_none() { + parsed.set_month(Month::January); + } + + if parsed.offset_hour().is_some() { + let offset_datetime: OffsetDateTime = parsed + .try_into() + .map_err(|err: TryFromParsed| err.to_string())?; + return Ok(offset_datetime); + } + let primitive_date_time: PrimitiveDateTime = parsed + .try_into() + .map_err(|err: TryFromParsed| err.to_string())?; + Ok(primitive_date_time.assume_offset(default_offset)) + } + + pub fn format_date_time(&self, date_time: &OffsetDateTime) -> Result { + date_time.format(&self.items) + } + + pub fn from_strptime(strptime_format: &str) -> Result { + let items: Box<[OwnedFormatItem]> = parse_to_format_item(strptime_format) + .map_err(|err| format!("invalid strptime format `{strptime_format}`: {err}"))? + .into_iter() + .map(|item| item.into()) + .collect::>() + .into_boxed_slice(); + Ok(StrptimeParser::new(strptime_format.to_string(), items)) + } + + pub fn from_java_datetime_format(java_datetime_format: &str) -> Result { + let java_datetime_format_resolved = + resolve_java_datetime_format_alias(java_datetime_format); + let items: Box<[OwnedFormatItem]> = + parse_java_datetime_format_items(java_datetime_format_resolved)?; + Ok(StrptimeParser::new(java_datetime_format.to_string(), items)) + } + + fn new(strptime_format: String, items: Box<[OwnedFormatItem]>) -> Self { + StrptimeParser { + strptime_format, + items, + } + } +} + +impl PartialEq for StrptimeParser { + fn eq(&self, other: &Self) -> bool { + self.strptime_format == other.strptime_format + } +} + +impl Eq for StrptimeParser {} + +impl std::fmt::Debug for StrptimeParser { + fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter + .debug_struct("StrptimeParser") + .field("format", &self.strptime_format) + .finish() + } +} + +impl std::hash::Hash for StrptimeParser { + fn hash(&self, state: &mut H) { + self.strptime_format.hash(state); + } +} + +// `Strftime` format special characters. +// These characters are taken from the parsing crate we use for compatibility. +const STRFTIME_FORMAT_MARKERS: [&str; 36] = [ + "%a", "%A", "%b", "%B", "%c", "%C", "%d", "%D", "%e", "%f", "%F", "%h", "%H", "%I", "%j", "%k", + "%l", "%m", "%M", "%n", "%p", "%P", "%r", "%R", "%S", "%t", "%T", "%U", "%w", "%W", "%x", "%X", + "%y", "%Y", "%z", "%Z", +]; + +// Checks if a format contains `strftime` special characters. +pub fn is_strftime_formatting(format_str: &str) -> bool { + STRFTIME_FORMAT_MARKERS + .iter() + .any(|marker| format_str.contains(marker)) +} + +#[cfg(test)] +mod tests { + use time::macros::datetime; + + use super::*; + use crate::java_date_time_format::parse_java_datetime_format_items; + + #[test] + fn test_parse_datetime_format_missing_time() { + let parser = StrptimeParser::from_strptime("%Y-%m-%d").unwrap(); + assert_eq!( + parser.parse_date_time("2021-01-01").unwrap(), + datetime!(2021-01-01 00:00:00 UTC) + ); + } + + #[test] + fn test_parse_datetime_format_strict_on_trailing_data() { + let parser = StrptimeParser::from_strptime("%Y-%m-%d").unwrap(); + let error = parser.parse_date_time("2021-01-01TABC").unwrap_err(); + assert_eq!( + error, + "datetime string `2021-01-01TABC` does not match strptime format `%Y-%m-%d`" + ); + } + + #[test] + fn test_parse_strptime_with_timezone() { + let parser = StrptimeParser::from_strptime("%Y-%m-%dT%H:%M:%S %z").unwrap(); + let offset_datetime = parser + .parse_date_time("2021-01-01T11:00:03 +07:00") + .unwrap(); + assert_eq!(offset_datetime, datetime!(2021-01-01 11:00:03 +7)); + } + + #[track_caller] + fn test_parse_java_datetime_aux( + java_date_time_format: &str, + date_str: &str, + expected_datetime: OffsetDateTime, + ) { + let parser = StrptimeParser::from_java_datetime_format(java_date_time_format).unwrap(); + let datetime = parser.parse_date_time(date_str).unwrap(); + assert_eq!(datetime, expected_datetime); + } + + #[test] + fn test_parse_java_datetime_format() { + test_parse_java_datetime_aux("yyyyMMdd", "20210101", datetime!(2021-01-01 00:00:00 UTC)); + test_parse_java_datetime_aux( + "yyyy MM dd", + "2021 01 01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd", + "2021!01?01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd'T'HH:", + "2021!01?01T13:", + datetime!(2021-01-01 13:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd['T'[HH:]]", + "2021!01?01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd['T'[HH:]", + "2021!01?01T", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd['T'[HH:]]", + "2021!01?01T13:", + datetime!(2021-01-01 13:00:00 UTC), + ); + } + + #[test] + fn test_parse_java_missing_time() { + test_parse_java_datetime_aux( + "yyyy-MM-dd", + "2021-01-01", + datetime!(2021-01-01 00:00:00 UTC), + ); + } + + #[test] + fn test_parse_java_optional_missing_time() { + test_parse_java_datetime_aux( + "yyyy-MM-dd[ HH:mm:ss]", + "2021-01-01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy-MM-dd[ HH:mm:ss]", + "2021-01-01 12:34:56", + datetime!(2021-01-01 12:34:56 UTC), + ); + } + + #[test] + fn test_parse_java_datetime_format_aliases() { + test_parse_java_datetime_aux( + "date_optional_time", + "2021-01-01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "date_optional_time", + "2021-01-21T03:01:22.312+01:00", + datetime!(2021-01-21 03:01:22.312 +1), + ); + } + + #[test] + fn test_parse_java_week_formats() { + test_parse_java_datetime_aux( + "basic_week_date", + "2024W313", + datetime!(2024-08-01 0:00:00.0 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date", + "24W313", + datetime!(2024-08-01 0:00:00.0 +00:00:00), + ); + // // ❌ 'the 'year' component could not be parsed' + // test_parse_java_datetime_aux( + // "basic_week_date", + // "1W313", + // datetime!(2018-08-02 0:00:00.0 +00:00:00), + // ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.1Z", + datetime!(2018-08-02 12:12:12.1 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.123Z", + datetime!(2018-08-02 12:12:12.123 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.123456789Z", + datetime!(2018-08-02 12:12:12.123456789 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.123+0100", + datetime!(2018-08-02 12:12:12.123 +01:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time_no_millis", + "2018W313T121212Z", + datetime!(2018-08-02 12:12:12.0 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time_no_millis", + "2018W313T121212+0100", + datetime!(2018-08-02 12:12:12.0 +01:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time_no_millis", + "2018W313T121212+01:00", + datetime!(2018-08-02 12:12:12.0 +01:00:00), + ); + + test_parse_java_datetime_aux( + "week_date", + "2012-W48-6", + datetime!(2012-12-02 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "week_date", + "2012-W01-6", + datetime!(2012-01-08 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "week_date", + "2012-W1-6", + datetime!(2012-01-08 0:00:00.0 +00:00:00), + ); + } + + #[test] + fn test_parse_java_strict_week_formats() { + test_parse_java_datetime_aux( + "strict_basic_week_date", + "2024W313", + datetime!(2024-08-01 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "strict_week_date", + "2012-W48-6", + datetime!(2012-12-02 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "strict_week_date", + "2012-W01-6", + datetime!(2012-01-08 0:00:00.0 +00:00:00), + ); + } + + #[test] + fn test_parse_strict_date_optional_time() { + let parser = + StrptimeParser::from_java_datetime_format("strict_date_optional_time").unwrap(); + let dates = [ + "2019", + "2019-03", + "2019-03-23", + "2019-03-23T21:34", + "2019-03-23T21:34:46", + "2019-03-23T21:34:46.123Z", + "2019-03-23T21:35:46.123+00:00", + "2019-03-23T21:36:46.123+03:00", + "2019-03-23T21:37:46.123+0300", + ]; + let expected = [ + datetime!(2019-01-01 00:00:00 UTC), + datetime!(2019-03-01 00:00:00 UTC), + datetime!(2019-03-23 00:00:00 UTC), + datetime!(2019-03-23 21:34 UTC), + datetime!(2019-03-23 21:34:46 UTC), + datetime!(2019-03-23 21:34:46.123 UTC), + datetime!(2019-03-23 21:35:46.123 UTC), + datetime!(2019-03-23 21:36:46.123 +03:00:00), + datetime!(2019-03-23 21:37:46.123 +03:00:00), + ]; + for (date_str, &expected_dt) in dates.iter().zip(expected.iter()) { + let parsed_dt = parser + .parse_date_time(date_str) + .unwrap_or_else(|e| panic!("Failed to parse {}: {}", date_str, e)); + assert_eq!(parsed_dt, expected_dt); + } + } + + #[test] + fn test_parse_strict_date_optional_time_nanos() { + let parser = + StrptimeParser::from_java_datetime_format("strict_date_optional_time_nanos").unwrap(); + let dates = [ + "2019", + "2019-03", + "2019-03-23", + "2019-03-23T21:34:46.123456789Z", + "2019-03-23T21:35:46.123456789+00:00", + "2019-03-23T21:36:46.123456789+03:00", + "2019-03-23T21:37:46.123456789+0300", + ]; + let expected = [ + datetime!(2019-01-01 00:00:00 UTC), + datetime!(2019-03-01 00:00:00 UTC), + datetime!(2019-03-23 00:00:00 UTC), + datetime!(2019-03-23 21:34:46.123456789 UTC), + datetime!(2019-03-23 21:35:46.123456789 UTC), + datetime!(2019-03-23 21:36:46.123456789 +03:00:00), + datetime!(2019-03-23 21:37:46.123456789 +03:00:00), + ]; + for (date_str, &expected_dt) in dates.iter().zip(expected.iter()) { + let parsed_dt = parser + .parse_date_time(date_str) + .unwrap_or_else(|e| panic!("Failed to parse {}: {}", date_str, e)); + assert_eq!(parsed_dt, expected_dt); + } + } + + #[test] + fn test_parse_java_datetime_format_items() { + let format_str = "xx[xx]'W'wwe"; + let result = parse_java_datetime_format_items(format_str).unwrap(); + + // We expect the tokens to be parsed as: + // - 'xx[xx]' (week-based year) with optional length + // - 'W' (literal) + // - 'ww' (week of year) + // - 'e' (day of week) + + assert_eq!(result.len(), 4); + + // Verify each token + match &result[0] { + OwnedFormatItem::First(boxed_slice) => { + assert_eq!(boxed_slice.len(), 2); + match (&boxed_slice[0], &boxed_slice[1]) { + ( + OwnedFormatItem::Component(Component::Year(_)), + OwnedFormatItem::Component(Component::Year(_)), + ) => {} + unexpected => { + panic!("Expected two Year components, but found: {:?}", unexpected) + } + } + } + unexpected => panic!( + "Expected First with two Year components, but found: {:?}", + unexpected + ), + } + + match &result[1] { + OwnedFormatItem::Literal(lit) => assert_eq!(lit.as_ref(), b"W"), + unexpected => panic!("Expected literal 'W', but found: {:?}", unexpected), + } + + match &result[2] { + OwnedFormatItem::Component(Component::WeekNumber(_)) => {} + unexpected => panic!("Expected WeekNumber component, but found: {:?}", unexpected), + } + + match &result[3] { + OwnedFormatItem::Component(Component::Weekday(_)) => {} + unexpected => panic!("Expected Weekday component, but found: {:?}", unexpected), + } + } + + #[test] + fn test_parse_java_datetime_format_with_literals() { + let format = "yyyy'T'Z-HHuu"; + let parser = StrptimeParser::from_java_datetime_format(format).unwrap(); + + let test_cases = [ + ("2023TZ-14uu", datetime!(2023-01-01 14:00:00 UTC)), + ("2024TZ-05uu", datetime!(2024-01-01 05:00:00 UTC)), + ("2025TZ-23uu", datetime!(2025-01-01 23:00:00 UTC)), + ]; + + for (input, expected) in test_cases.iter() { + let result = parser.parse_date_time(input).unwrap(); + assert_eq!(result, *expected, "Failed to parse {}", input); + } + + // Test error case + let error_case = "2023-1430"; + assert!( + parser.parse_date_time(error_case).is_err(), + "Expected error for input: {}", + error_case + ); + } +} diff --git a/quickwit/quickwit-datetime/src/lib.rs b/quickwit/quickwit-datetime/src/lib.rs index eb4d8c940ba..03003641dcc 100644 --- a/quickwit/quickwit-datetime/src/lib.rs +++ b/quickwit/quickwit-datetime/src/lib.rs @@ -19,9 +19,11 @@ mod date_time_format; mod date_time_parsing; +pub mod java_date_time_format; -pub use date_time_format::{DateTimeInputFormat, DateTimeOutputFormat, StrptimeParser}; +pub use date_time_format::{DateTimeInputFormat, DateTimeOutputFormat}; pub use date_time_parsing::{ parse_date_time_str, parse_timestamp, parse_timestamp_float, parse_timestamp_int, }; +pub use java_date_time_format::StrptimeParser; pub use tantivy::DateTime as TantivyDateTime; diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 4a24aedc012..74971077b3b 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -25,6 +25,7 @@ use std::time::Duration; use anyhow::Context; use futures::{Future, StreamExt}; use itertools::Itertools; +use quickwit_common::metrics::IntCounter; use quickwit_common::pretty::PrettySample; use quickwit_common::Progress; use quickwit_metastore::{ @@ -44,6 +45,26 @@ use tracing::{error, instrument}; /// The maximum number of splits that the GC should delete per attempt. const DELETE_SPLITS_BATCH_SIZE: usize = 10_000; +pub struct GcMetrics { + pub deleted_splits: IntCounter, + pub deleted_bytes: IntCounter, + pub failed_splits: IntCounter, +} + +trait RecordGcMetrics { + fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize); +} + +impl RecordGcMetrics for Option { + fn record(&self, num_deleted_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize) { + if let Some(metrics) = self { + metrics.deleted_splits.inc_by(num_deleted_splits as u64); + metrics.deleted_bytes.inc_by(num_deleted_bytes); + metrics.failed_splits.inc_by(num_failed_splits as u64); + } + } +} + /// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from /// storage and metastore. #[derive(Error, Debug)] @@ -94,6 +115,7 @@ pub async fn run_garbage_collect( deletion_grace_period: Duration, dry_run: bool, progress_opt: Option<&Progress>, + metrics: Option, ) -> anyhow::Result { let grace_period_timestamp = OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64; @@ -170,6 +192,7 @@ pub async fn run_garbage_collect( metastore, indexes, progress_opt, + metrics, ) .await) } @@ -179,6 +202,7 @@ async fn delete_splits( storages: &HashMap>, metastore: MetastoreServiceClient, progress_opt: Option<&Progress>, + metrics: &Option, split_removal_info: &mut SplitRemovalInfo, ) -> Result<(), ()> { let mut delete_split_from_index_res_stream = @@ -219,9 +243,26 @@ async fn delete_splits( while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await { match delete_split_result { Ok(entries) => { + let deleted_bytes = entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + let deleted_splits_count = entries.len(); + + metrics.record(deleted_splits_count, deleted_bytes, 0); split_removal_info.removed_split_entries.extend(entries); } Err(delete_split_error) => { + let deleted_bytes = delete_split_error + .successes + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + let deleted_splits_count = delete_split_error.successes.len(); + let failed_splits_count = delete_split_error.storage_failures.len() + + delete_split_error.metastore_failures.len(); + + metrics.record(deleted_splits_count, deleted_bytes, failed_splits_count); split_removal_info .removed_split_entries .extend(delete_split_error.successes); @@ -265,13 +306,14 @@ async fn list_splits_metadata( /// /// The aim of this is to spread the load out across a longer period /// rather than short, heavy bursts on the metastore and storage system itself. -#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))] +#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))] async fn delete_splits_marked_for_deletion_several_indexes( index_uids: Vec, updated_before_timestamp: i64, metastore: MetastoreServiceClient, storages: HashMap>, progress_opt: Option<&Progress>, + metrics: Option, ) -> SplitRemovalInfo { let mut split_removal_info = SplitRemovalInfo::default(); @@ -280,7 +322,7 @@ async fn delete_splits_marked_for_deletion_several_indexes( return split_removal_info; }; - let list_splits_query = list_splits_query + let mut list_splits_query = list_splits_query .with_split_state(SplitState::MarkedForDeletion) .with_update_timestamp_lte(updated_before_timestamp) .with_limit(DELETE_SPLITS_BATCH_SIZE) @@ -300,11 +342,13 @@ async fn delete_splits_marked_for_deletion_several_indexes( } }; - let num_splits_to_delete = splits_metadata_to_delete.len(); - - if num_splits_to_delete == 0 { + // set split after which to search for the next loop + let Some(last_split_metadata) = splits_metadata_to_delete.last() else { break; - } + }; + list_splits_query = list_splits_query.after_split(last_split_metadata); + + let num_splits_to_delete = splits_metadata_to_delete.len(); let splits_metadata_to_delete_per_index: HashMap> = splits_metadata_to_delete @@ -312,18 +356,20 @@ async fn delete_splits_marked_for_deletion_several_indexes( .map(|meta| (meta.index_uid.clone(), meta)) .into_group_map(); - let delete_split_res = delete_splits( + // ignore return we continue either way + let _: Result<(), ()> = delete_splits( splits_metadata_to_delete_per_index, &storages, metastore.clone(), progress_opt, + &metrics, &mut split_removal_info, ) .await; - if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || delete_split_res.is_err() { - // stop the gc if this was the last batch or we encountered an error - // (otherwise we might try deleting the same splits in an endless loop) + if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE { + // stop the gc if this was the last batch + // we are guaranteed to make progress due to .after_split() break; } } @@ -345,7 +391,7 @@ pub async fn delete_splits_from_storage_and_metastore( metastore: MetastoreServiceClient, splits: Vec, progress_opt: Option<&Progress>, -) -> anyhow::Result, DeleteSplitsError> { +) -> Result, DeleteSplitsError> { let mut split_infos: HashMap = HashMap::with_capacity(splits.len()); for split in splits { @@ -511,6 +557,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -538,6 +585,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -615,6 +663,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -642,6 +691,7 @@ mod tests { Duration::from_secs(0), false, None, + None, ) .await .unwrap(); @@ -680,6 +730,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 5d4dc5ec149..0fe5c77cc2b 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -373,6 +373,7 @@ impl IndexService { Duration::ZERO, dry_run, None, + None, ) .await?; diff --git a/quickwit/quickwit-index-management/src/lib.rs b/quickwit/quickwit-index-management/src/lib.rs index 93b6ee6d1c3..65a7ef861ce 100644 --- a/quickwit/quickwit-index-management/src/lib.rs +++ b/quickwit/quickwit-index-management/src/lib.rs @@ -20,5 +20,5 @@ mod garbage_collection; mod index; -pub use garbage_collection::run_garbage_collect; +pub use garbage_collection::{run_garbage_collect, GcMetrics}; pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError}; diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 4087f2ed230..7ab58bb873f 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -120,6 +120,7 @@ pub struct IndexingPipeline { handles_opt: Option, // Killswitch used for the actors in the pipeline. This is not the supervisor killswitch. kill_switch: KillSwitch, + // The set of shard is something that can change dynamically without necessarily // requiring a respawn of the pipeline. // We keep the list of shards here however, to reassign them after a respawn. @@ -158,12 +159,16 @@ impl Actor for IndexingPipeline { impl IndexingPipeline { pub fn new(params: IndexingPipelineParams) -> Self { + let params_fingerprint = params.params_fingerprint; IndexingPipeline { params, previous_generations_statistics: Default::default(), handles_opt: None, kill_switch: KillSwitch::default(), - statistics: IndexingStatistics::default(), + statistics: IndexingStatistics { + params_fingerprint, + ..Default::default() + }, shard_ids: Default::default(), } } @@ -264,6 +269,7 @@ impl IndexingPipeline { .set_num_spawn_attempts(self.statistics.num_spawn_attempts); let pipeline_metrics_opt = handles.indexer.last_observation().pipeline_metrics_opt; self.statistics.pipeline_metrics_opt = pipeline_metrics_opt; + self.statistics.params_fingerprint = self.params.params_fingerprint; self.statistics.shard_ids.clone_from(&self.shard_ids); ctx.observe(self); } @@ -587,6 +593,7 @@ pub struct IndexingPipelineParams { pub source_storage_resolver: StorageResolver, pub ingester_pool: IngesterPool, pub queues_dir_path: PathBuf, + pub params_fingerprint: u64, pub event_broker: EventBroker, } @@ -716,6 +723,7 @@ mod tests { cooperative_indexing_permits: None, merge_planner_mailbox, event_broker: EventBroker::default(), + params_fingerprint: 42u64, }; let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); @@ -828,6 +836,7 @@ mod tests { cooperative_indexing_permits: None, merge_planner_mailbox, event_broker: Default::default(), + params_fingerprint: 42u64, }; let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); @@ -926,6 +935,7 @@ mod tests { cooperative_indexing_permits: None, merge_planner_mailbox: merge_planner_mailbox.clone(), event_broker: Default::default(), + params_fingerprint: 42u64, }; let indexing_pipeline = IndexingPipeline::new(indexing_pipeline_params); let (_indexing_pipeline_mailbox, indexing_pipeline_handler) = @@ -1051,6 +1061,7 @@ mod tests { max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, merge_planner_mailbox, + params_fingerprint: 42u64, event_broker: Default::default(), }; let pipeline = IndexingPipeline::new(pipeline_params); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 757d434adca..df71cc92ea4 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -312,10 +312,12 @@ impl IndexingService { let max_concurrent_split_uploads_merge = (self.max_concurrent_split_uploads - max_concurrent_split_uploads_index).max(1); + let params_fingerprint = index_config.indexing_params_fingerprint(); let pipeline_params = IndexingPipelineParams { pipeline_id: indexing_pipeline_id.clone(), metastore: self.metastore.clone(), storage, + // Indexing-related parameters doc_mapper, indexing_directory, @@ -323,6 +325,7 @@ impl IndexingService { split_store, max_concurrent_split_uploads_index, cooperative_indexing_permits: self.cooperative_indexing_permits.clone(), + // Merge-related parameters merge_policy, max_concurrent_split_uploads_merge, @@ -333,6 +336,7 @@ impl IndexingService { ingester_pool: self.ingester_pool.clone(), queues_dir_path: self.queue_dir_path.clone(), source_storage_resolver: self.storage_resolver.clone(), + params_fingerprint, event_broker: self.event_broker.clone(), }; @@ -755,20 +759,14 @@ impl IndexingService { .indexing_pipelines .values() .map(|pipeline_handle| { - let shard_ids: Vec = pipeline_handle - .handle - .last_observation() - .shard_ids - .iter() - .cloned() - .collect(); - + let assignment = pipeline_handle.handle.last_observation(); + let shard_ids: Vec = assignment.shard_ids.iter().cloned().collect(); IndexingTask { index_uid: Some(pipeline_handle.indexing_pipeline_id.index_uid.clone()), source_id: pipeline_handle.indexing_pipeline_id.source_id.clone(), pipeline_uid: Some(pipeline_handle.indexing_pipeline_id.pipeline_uid), shard_ids, - params_fingerprint: 0, + params_fingerprint: assignment.params_fingerprint, } }) .collect(); @@ -1192,6 +1190,8 @@ mod tests { #[tokio::test] async fn test_indexing_service_apply_plan() { + const PARAMS_FINGERPRINT: u64 = 3865067856550546352u64; + quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) @@ -1251,14 +1251,14 @@ mod tests { source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(0u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, ]; indexing_service @@ -1297,28 +1297,28 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(2u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, ]; indexing_service @@ -1359,21 +1359,21 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, ]; indexing_service diff --git a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs index 21f84e678da..68b44a9744b 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs @@ -56,6 +56,7 @@ pub struct IndexingStatistics { // List of shard ids. #[schema(value_type = Vec)] pub shard_ids: BTreeSet, + pub params_fingerprint: u64, } impl IndexingStatistics { diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index fbfdeb2b1e1..bb82e55d856 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -20,13 +20,13 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::{stream, StreamExt}; use quickwit_actors::{Actor, ActorContext, Handler}; use quickwit_common::shared_consts::split_deletion_grace_period; -use quickwit_index_management::run_garbage_collect; +use quickwit_index_management::{run_garbage_collect, GcMetrics}; use quickwit_metastore::ListIndexesMetadataResponseExt; use quickwit_proto::metastore::{ ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, @@ -36,6 +36,8 @@ use quickwit_storage::{Storage, StorageResolver}; use serde::Serialize; use tracing::{debug, error, info}; +use crate::metrics::JANITOR_METRICS; + const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes /// Staged files needs to be deleted if there was a failure. @@ -51,10 +53,10 @@ pub struct GarbageCollectorCounters { pub num_deleted_files: usize, /// The number of bytes deleted. pub num_deleted_bytes: usize, - /// The number of failed garbage collection run on an index. - pub num_failed_gc_run_on_index: usize, - /// The number of successful garbage collection run on an index. - pub num_successful_gc_run_on_index: usize, + /// The number of failed garbage collection run. + pub num_failed_gc_run: usize, + /// The number of successful garbage collection run. + pub num_successful_gc_run: usize, /// The number or failed storage resolution. pub num_failed_storage_resolution: usize, /// The number of splits that were unable to be removed. @@ -86,6 +88,8 @@ impl GarbageCollector { debug!("loading indexes from the metastore"); self.counters.num_passes += 1; + let start = Instant::now(); + let response = match self .metastore .list_indexes_metadata(ListIndexesMetadataRequest::all()) @@ -137,23 +141,43 @@ impl GarbageCollector { split_deletion_grace_period(), false, Some(ctx.progress()), + Some(GcMetrics { + deleted_splits: JANITOR_METRICS + .gc_deleted_splits + .with_label_values(["success"]) + .clone(), + deleted_bytes: JANITOR_METRICS.gc_deleted_bytes.clone(), + failed_splits: JANITOR_METRICS + .gc_deleted_splits + .with_label_values(["error"]) + .clone(), + }), ) .await; + let run_duration = start.elapsed().as_secs(); + JANITOR_METRICS.gc_seconds_total.inc_by(run_duration); + let deleted_file_entries = match gc_res { Ok(removal_info) => { - self.counters.num_successful_gc_run_on_index += 1; + self.counters.num_successful_gc_run += 1; + JANITOR_METRICS.gc_runs.with_label_values(["success"]).inc(); self.counters.num_failed_splits += removal_info.failed_splits.len(); removal_info.removed_split_entries } Err(error) => { - self.counters.num_failed_gc_run_on_index += 1; + self.counters.num_failed_gc_run += 1; + JANITOR_METRICS.gc_runs.with_label_values(["error"]).inc(); error!(error=?error, "failed to run garbage collection"); return; } }; if !deleted_file_entries.is_empty() { let num_deleted_splits = deleted_file_entries.len(); + let num_deleted_bytes = deleted_file_entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64() as usize) + .sum::(); let deleted_files: HashSet<&Path> = deleted_file_entries .iter() .map(|deleted_entry| deleted_entry.file_name.as_path()) @@ -163,11 +187,8 @@ impl GarbageCollector { num_deleted_splits = num_deleted_splits, "Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits, ); - self.counters.num_deleted_files += deleted_file_entries.len(); - self.counters.num_deleted_bytes += deleted_file_entries - .iter() - .map(|entry| entry.file_size_bytes.as_u64() as usize) - .sum::(); + self.counters.num_deleted_files += num_deleted_splits; + self.counters.num_deleted_bytes += num_deleted_bytes; } } } @@ -348,6 +369,7 @@ mod tests { split_deletion_grace_period(), false, None, + None, ) .await; assert!(result.is_ok()); @@ -497,9 +519,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 2); assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); // 30 secs later @@ -508,9 +530,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 2); assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); // 60 secs later @@ -519,9 +541,9 @@ mod tests { assert_eq!(counters.num_passes, 2); assert_eq!(counters.num_deleted_files, 4); assert_eq!(counters.num_deleted_bytes, 80); - assert_eq!(counters.num_successful_gc_run_on_index, 2); + assert_eq!(counters.num_successful_gc_run, 2); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); universe.assert_quit().await; } @@ -585,9 +607,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 0); assert_eq!(counters.num_deleted_bytes, 0); - assert_eq!(counters.num_successful_gc_run_on_index, 0); + assert_eq!(counters.num_successful_gc_run, 0); assert_eq!(counters.num_failed_storage_resolution, 1); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); universe.assert_quit().await; } @@ -608,7 +630,7 @@ mod tests { }); mock_metastore .expect_list_splits() - .times(2) + .times(3) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); assert_eq!(query.index_uids.len(), 2); @@ -616,24 +638,40 @@ mod tests { .contains(&query.index_uids[0].index_id.as_ref())); assert!(["test-index-1", "test-index-2"] .contains(&query.index_uids[1].index_id.as_ref())); - let splits = match query.split_states[0] { + let splits_ids_string: Vec = + (0..8000).map(|seq| format!("split-{seq:04}")).collect(); + let splits_ids: Vec<&str> = splits_ids_string + .iter() + .map(|string| string.as_str()) + .collect(); + let mut splits = match query.split_states[0] { SplitState::Staged => { let mut splits = make_splits("test-index-1", &["a"], SplitState::Staged); splits.append(&mut make_splits("test-index-2", &["a"], SplitState::Staged)); splits } SplitState::MarkedForDeletion => { + assert_eq!(query.limit, Some(10_000)); let mut splits = - make_splits("test-index-1", &["a", "b"], SplitState::MarkedForDeletion); + make_splits("test-index-1", &splits_ids, SplitState::MarkedForDeletion); splits.append(&mut make_splits( "test-index-2", - &["a", "b"], + &splits_ids, SplitState::MarkedForDeletion, )); splits } _ => panic!("only Staged and MarkedForDeletion expected."), }; + if let Some((index_uid, split_id)) = query.after_split { + splits.retain(|split| { + ( + &split.split_metadata.index_uid, + &split.split_metadata.split_id, + ) > (&index_uid, &split_id) + }); + } + splits.truncate(10_000); let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); Ok(ServiceStream::from(vec![Ok(splits)])) }); @@ -648,7 +686,7 @@ mod tests { }); mock_metastore .expect_delete_splits() - .times(2) + .times(3) .returning(|delete_splits_request| { let index_uid: IndexUid = delete_splits_request.index_uid().clone(); let split_ids = HashSet::<&str>::from_iter( @@ -657,14 +695,30 @@ mod tests { .iter() .map(|split_id| split_id.as_str()), ); - let expected_split_ids = HashSet::<&str>::from_iter(["a", "b"]); - - assert_eq!(split_ids, expected_split_ids); + if index_uid.index_id == "test-index-1" { + assert_eq!(split_ids.len(), 8000); + for seq in 0..8000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else if split_ids.len() == 2000 { + for seq in 0..2000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else if split_ids.len() == 6000 { + for seq in 2000..8000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else { + panic!(); + } // This should not cause the whole run to fail and return an error, // instead this should simply get logged and return the list of splits // which have successfully been deleted. - if index_uid.index_id == "test-index-2" { + if index_uid.index_id == "test-index-2" && split_ids.len() == 2000 { Err(MetastoreError::Db { message: "fail to delete".to_string(), }) @@ -682,12 +736,12 @@ mod tests { let counters = handle.process_pending_and_observe().await.state; assert_eq!(counters.num_passes, 1); - assert_eq!(counters.num_deleted_files, 2); - assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_deleted_files, 14000); + assert_eq!(counters.num_deleted_bytes, 20 * 14000); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); - assert_eq!(counters.num_failed_splits, 2); + assert_eq!(counters.num_failed_gc_run, 0); + assert_eq!(counters.num_failed_splits, 2000); universe.assert_quit().await; } } diff --git a/quickwit/quickwit-janitor/src/metrics.rs b/quickwit/quickwit-janitor/src/metrics.rs index d3392af7b3f..0f3760e6d87 100644 --- a/quickwit/quickwit-janitor/src/metrics.rs +++ b/quickwit/quickwit-janitor/src/metrics.rs @@ -18,10 +18,18 @@ // along with this program. If not, see . use once_cell::sync::Lazy; -use quickwit_common::metrics::{new_gauge_vec, IntGaugeVec}; +use quickwit_common::metrics::{ + new_counter, new_counter_vec, new_gauge_vec, IntCounter, IntCounterVec, IntGaugeVec, +}; pub struct JanitorMetrics { pub ongoing_num_delete_operations_total: IntGaugeVec<1>, + pub gc_deleted_splits: IntCounterVec<1>, + pub gc_deleted_bytes: IntCounter, + pub gc_runs: IntCounterVec<1>, + pub gc_seconds_total: IntCounter, + // TODO having a current run duration which is 0|undefined out of run, and returns `now - + // start_time` during a run would be nice } impl Default for JanitorMetrics { @@ -34,6 +42,32 @@ impl Default for JanitorMetrics { &[], ["index"], ), + gc_deleted_splits: new_counter_vec( + "gc_deleted_splits_total", + "Total number of splits deleted by the garbage collector.", + "quickwit_janitor", + &[], + ["result"], + ), + gc_deleted_bytes: new_counter( + "gc_deleted_bytes_total", + "Total number of bytes deleted by the garbage collector.", + "quickwit_janitor", + &[], + ), + gc_runs: new_counter_vec( + "gc_runs_total", + "Total number of garbage collector execition.", + "quickwit_janitor", + &[], + ["result"], + ), + gc_seconds_total: new_counter( + "gc_seconds_total", + "Total time spent running the garbage collector", + "quickwit_janitor", + &[], + ), } } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 265697b0e81..e4d6799cefa 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -424,48 +424,32 @@ impl FileBackedIndex { /// Lists splits. pub(crate) fn list_splits(&self, query: &ListSplitsQuery) -> MetastoreResult> { - let limit = query.limit.unwrap_or(usize::MAX); - let offset = query.offset.unwrap_or_default(); - - let splits: Vec = match query.sort_by { - SortBy::Staleness => self - .splits + let limit = query + .limit + .map(|limit| limit + query.offset.unwrap_or_default()) + .unwrap_or(usize::MAX); + // skip is done at a higher layer in case other indexes give spltis that would go before + // ours + + let results = if query.sort_by == SortBy::None { + // internally sorted_unstable_by collect everything to an intermediary vec. When not + // sorting at all, skip that. + self.splits .values() .filter(|split| split_query_predicate(split, query)) - .sorted_unstable_by(|left_split, right_split| { - left_split - .split_metadata - .delete_opstamp - .cmp(&right_split.split_metadata.delete_opstamp) - .then_with(|| { - left_split - .publish_timestamp - .cmp(&right_split.publish_timestamp) - }) - }) - .skip(offset) .take(limit) .cloned() - .collect(), - SortBy::IndexUid => self - .splits - .values() - .filter(|split| split_query_predicate(split, query)) - .sorted_unstable_by_key(|split| &split.split_metadata.index_uid) - .skip(offset) - .take(limit) - .cloned() - .collect(), - SortBy::None => self - .splits + .collect() + } else { + self.splits .values() .filter(|split| split_query_predicate(split, query)) - .skip(offset) + .sorted_unstable_by(|lhs, rhs| query.sort_by.compare(lhs, rhs)) .take(limit) .cloned() - .collect(), + .collect() }; - Ok(splits) + Ok(results) } /// Deletes a split. @@ -762,6 +746,17 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool { } } + if let Some((index_uid, split_id)) = &query.after_split { + if *index_uid > split.split_metadata.index_uid { + return false; + } + if *index_uid == split.split_metadata.index_uid + && *split_id >= split.split_metadata.split_id + { + return false; + } + } + true } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 729e10b6fe0..10bbd814949 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -380,7 +380,7 @@ impl FileBackedMetastore { /// No error is returned if any of the requested `index_uid` does not exist. async fn list_splits_inner(&self, request: ListSplitsRequest) -> MetastoreResult> { let list_splits_query = request.deserialize_list_splits_query()?; - let mut all_splits = Vec::new(); + let mut splits_per_index = Vec::with_capacity(list_splits_query.index_uids.len()); for index_uid in &list_splits_query.index_uids { let splits = match self .read(index_uid, |index| index.list_splits(&list_splits_query)) @@ -393,9 +393,19 @@ impl FileBackedMetastore { } Err(error) => return Err(error), }; - all_splits.extend(splits); + splits_per_index.push(splits); } - Ok(all_splits) + + let limit = list_splits_query.limit.unwrap_or(usize::MAX); + let offset = list_splits_query.offset.unwrap_or_default(); + + let merged_results = splits_per_index + .into_iter() + .kmerge_by(|lhs, rhs| list_splits_query.sort_by.compare(lhs, rhs).is_lt()) + .skip(offset) + .take(limit) + .collect(); + Ok(merged_results) } /// Helper used for testing to obtain the data associated with the given index. diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 06211e1f63a..56dc8b84abf 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -24,6 +24,7 @@ pub mod postgres; pub mod control_plane_metastore; +use std::cmp::Ordering; use std::ops::{Bound, RangeInclusive}; use async_trait::async_trait; @@ -632,6 +633,9 @@ pub struct ListSplitsQuery { /// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending /// order. pub sort_by: SortBy, + + /// Only return splits whose (index_uid, split_id) are lexicographically after this split + pub after_split: Option<(IndexUid, SplitId)>, } #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] @@ -641,6 +645,33 @@ pub enum SortBy { IndexUid, } +impl SortBy { + fn compare(&self, left_split: &Split, right_split: &Split) -> Ordering { + match self { + SortBy::None => Ordering::Equal, + SortBy::Staleness => left_split + .split_metadata + .delete_opstamp + .cmp(&right_split.split_metadata.delete_opstamp) + .then_with(|| { + left_split + .publish_timestamp + .cmp(&right_split.publish_timestamp) + }), + SortBy::IndexUid => left_split + .split_metadata + .index_uid + .cmp(&right_split.split_metadata.index_uid) + .then_with(|| { + left_split + .split_metadata + .split_id + .cmp(&right_split.split_metadata.split_id) + }), + } + } +} + #[allow(unused_attributes)] impl ListSplitsQuery { /// Creates a new [`ListSplitsQuery`] for the designated index. @@ -658,6 +689,7 @@ impl ListSplitsQuery { create_timestamp: Default::default(), mature: Bound::Unbounded, sort_by: SortBy::None, + after_split: None, } } @@ -680,6 +712,7 @@ impl ListSplitsQuery { create_timestamp: Default::default(), mature: Bound::Unbounded, sort_by: SortBy::None, + after_split: None, }) } @@ -850,11 +883,18 @@ impl ListSplitsQuery { self } - /// Sorts the splits by index_uid. + /// Sorts the splits by index_uid and split_id. pub fn sort_by_index_uid(mut self) -> Self { self.sort_by = SortBy::IndexUid; self } + + /// Only return splits whose (index_uid, split_id) are lexicographically after this split. + /// This is only useful if results are sorted by index_uid and split_id. + pub fn after_split(mut self, split_meta: &SplitMetadata) -> Self { + self.after_split = Some((split_meta.index_uid.clone(), split_meta.split_id.clone())); + self + } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 008b611a36b..ce0d84468e5 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2076,7 +2076,25 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC"# + r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC, "split_id" ASC"# + ) + ); + + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + + let query = + ListSplitsQuery::for_index(index_uid.clone()).after_split(&crate::SplitMetadata { + index_uid: index_uid.clone(), + split_id: "my_split".to_string(), + ..Default::default() + }); + append_query_filters(sql, &query); + + assert_eq!( + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("index_uid", "split_id") > ('{index_uid}', 'my_split')"# ) ); } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 6e850ae0fed..65ef9ce1df6 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -187,15 +187,24 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits Expr::expr(val) }); + if let Some((index_uid, split_id)) = &query.after_split { + sql.cond_where( + Expr::tuple([ + Expr::col(Splits::IndexUid).into(), + Expr::col(Splits::SplitId).into(), + ]) + .gt(Expr::tuple([Expr::value(index_uid), Expr::value(split_id)])), + ); + } + match query.sort_by { SortBy::Staleness => { - sql.order_by( - (Splits::DeleteOpstamp, Splits::PublishTimestamp), - Order::Asc, - ); + sql.order_by(Splits::DeleteOpstamp, Order::Asc) + .order_by(Splits::PublishTimestamp, Order::Asc); } SortBy::IndexUid => { - sql.order_by(Splits::IndexUid, Order::Asc); + sql.order_by(Splits::IndexUid, Order::Asc) + .order_by(Splits::SplitId, Order::Asc); } SortBy::None => (), } diff --git a/quickwit/quickwit-metastore/src/tests/list_splits.rs b/quickwit/quickwit-metastore/src/tests/list_splits.rs index cd1cc1712f3..de9c43b7e01 100644 --- a/quickwit/quickwit-metastore/src/tests/list_splits.rs +++ b/quickwit/quickwit-metastore/src/tests/list_splits.rs @@ -1155,3 +1155,356 @@ pub async fn test_metastore_list_stale_splits< cleanup_index(&mut metastore, index_uid).await; } } + +pub async fn test_metastore_list_sorted_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + + let split_id = append_random_suffix("test-list-sorted-splits-"); + let index_id_1 = append_random_suffix("test-list-sorted-splits-1"); + let index_uid_1 = IndexUid::new_with_random_ulid(&index_id_1); + let index_uri_1 = format!("ram:///indexes/{index_id_1}"); + let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); + + let index_id_2 = append_random_suffix("test-list-sorted-splits-2"); + let index_uid_2 = IndexUid::new_with_random_ulid(&index_id_2); + let index_uri_2 = format!("ram:///indexes/{index_id_2}"); + let index_config_2 = IndexConfig::for_test(&index_id_2, &index_uri_2); + + let split_id_1 = format!("{split_id}--split-1"); + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.clone(), + index_uid: index_uid_1.clone(), + delete_opstamp: 5, + ..Default::default() + }; + let split_id_2 = format!("{split_id}--split-2"); + let split_metadata_2 = SplitMetadata { + split_id: split_id_2.clone(), + index_uid: index_uid_2.clone(), + delete_opstamp: 3, + ..Default::default() + }; + let split_id_3 = format!("{split_id}--split-3"); + let split_metadata_3 = SplitMetadata { + split_id: split_id_3.clone(), + index_uid: index_uid_1.clone(), + delete_opstamp: 1, + ..Default::default() + }; + let split_id_4 = format!("{split_id}--split-4"); + let split_metadata_4 = SplitMetadata { + split_id: split_id_4.clone(), + index_uid: index_uid_2.clone(), + delete_opstamp: 0, + ..Default::default() + }; + let split_id_5 = format!("{split_id}--split-5"); + let split_metadata_5 = SplitMetadata { + split_id: split_id_5.clone(), + index_uid: index_uid_1.clone(), + delete_opstamp: 2, + ..Default::default() + }; + let split_id_6 = format!("{split_id}--split-6"); + let split_metadata_6 = SplitMetadata { + split_id: split_id_6.clone(), + index_uid: index_uid_2.clone(), + delete_opstamp: 4, + ..Default::default() + }; + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid_1: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); + let index_uid_2: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + { + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_1.clone(), + vec![split_metadata_1, split_metadata_3, split_metadata_5], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_1.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_1.clone(), vec![split_id_3.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_2.clone(), + vec![split_metadata_2, split_metadata_4, split_metadata_6], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_2.clone()), + staged_split_ids: vec![split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_2.clone(), vec![split_id_4.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + } + + let query = + ListSplitsQuery::try_from_index_uids(vec![index_uid_1.clone(), index_uid_2.clone()]) + .unwrap() + .sort_by_staleness(); + let splits = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query(&query).unwrap()) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); + // we don't use collect_split_ids because it sorts splits internally + let split_ids = splits + .iter() + .map(|split| split.split_id()) + .collect::>(); + assert_eq!( + split_ids, + &[ + &split_id_4, + &split_id_3, + &split_id_5, + &split_id_2, + &split_id_6, + &split_id_1, + ] + ); + + let query = + ListSplitsQuery::try_from_index_uids(vec![index_uid_1.clone(), index_uid_2.clone()]) + .unwrap() + .sort_by_index_uid(); + let splits = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query(&query).unwrap()) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); + // we don't use collect_split_ids because it sorts splits internally + let split_ids = splits + .iter() + .map(|split| split.split_id()) + .collect::>(); + assert_eq!( + split_ids, + &[ + &split_id_1, + &split_id_3, + &split_id_5, + &split_id_2, + &split_id_4, + &split_id_6, + ] + ); + + cleanup_index(&mut metastore, index_uid_1.clone()).await; + cleanup_index(&mut metastore, index_uid_2.clone()).await; +} + +pub async fn test_metastore_list_after_split< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + + let split_id = append_random_suffix("test-list-sorted-splits-"); + let index_id_1 = append_random_suffix("test-list-sorted-splits-1"); + let index_uri_1 = format!("ram:///indexes/{index_id_1}"); + let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); + + let index_id_2 = append_random_suffix("test-list-sorted-splits-2"); + let index_uri_2 = format!("ram:///indexes/{index_id_2}"); + let index_config_2 = IndexConfig::for_test(&index_id_2, &index_uri_2); + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid_1: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); + let index_uid_2: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let split_id_1 = format!("{split_id}--split-1"); + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_2 = format!("{split_id}--split-2"); + let split_metadata_2 = SplitMetadata { + split_id: split_id_2.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + let split_id_3 = format!("{split_id}--split-3"); + let split_metadata_3 = SplitMetadata { + split_id: split_id_3.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_4 = format!("{split_id}--split-4"); + let split_metadata_4 = SplitMetadata { + split_id: split_id_4.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + let split_id_5 = format!("{split_id}--split-5"); + let split_metadata_5 = SplitMetadata { + split_id: split_id_5.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_6 = format!("{split_id}--split-6"); + let split_metadata_6 = SplitMetadata { + split_id: split_id_6.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + + { + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_1.clone(), + vec![ + split_metadata_1.clone(), + split_metadata_3.clone(), + split_metadata_5.clone(), + ], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_1.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_1.clone(), vec![split_id_3.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_2.clone(), + vec![ + split_metadata_2.clone(), + split_metadata_4.clone(), + split_metadata_6.clone(), + ], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_2.clone()), + staged_split_ids: vec![split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_2.clone(), vec![split_id_4.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + } + + let expected_all = [ + &split_metadata_1, + &split_metadata_3, + &split_metadata_5, + &split_metadata_2, + &split_metadata_4, + &split_metadata_6, + ]; + + for i in 0..expected_all.len() { + let after = expected_all[i]; + let expected_res = expected_all[(i + 1)..] + .iter() + .map(|split| (&split.index_uid, &split.split_id)) + .collect::>(); + + let query = + ListSplitsQuery::try_from_index_uids(vec![index_uid_1.clone(), index_uid_2.clone()]) + .unwrap() + .sort_by_index_uid() + .after_split(after); + let splits = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query(&query).unwrap()) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); + // we don't use collect_split_ids because it sorts splits internally + let split_ids = splits + .iter() + .map(|split| { + ( + &split.split_metadata.index_uid, + &split.split_metadata.split_id, + ) + }) + .collect::>(); + assert_eq!(split_ids, expected_res,); + } + + cleanup_index(&mut metastore, index_uid_1.clone()).await; + cleanup_index(&mut metastore, index_uid_2.clone()).await; +} diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 3e0add028df..7699c3eb11f 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -431,6 +431,20 @@ macro_rules! metastore_test_suite { $crate::tests::list_splits::test_metastore_list_stale_splits::<$metastore_type>().await; } + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_sorted_splits() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::list_splits::test_metastore_list_sorted_splits::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_after_split() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::list_splits::test_metastore_list_after_split::<$metastore_type>().await; + } + #[tokio::test] #[serial_test::file_serial] async fn test_metastore_update_splits_delete_opstamp() { diff --git a/quickwit/quickwit-query/Cargo.toml b/quickwit/quickwit-query/Cargo.toml index e1229da8e8b..bee650198c8 100644 --- a/quickwit/quickwit-query/Cargo.toml +++ b/quickwit/quickwit-query/Cargo.toml @@ -22,6 +22,7 @@ serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } tantivy = { workspace = true } +time = { workspace = true } thiserror = { workspace = true } whichlang = { workspace = true, optional = true } diff --git a/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs b/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs index 9e7d07e23da..337ec019e9d 100644 --- a/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs +++ b/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs @@ -18,10 +18,10 @@ // along with this program. If not, see . use std::ops::Bound; -use std::str::FromStr; use quickwit_datetime::StrptimeParser; use serde::Deserialize; +use time::format_description::well_known::Rfc3339; use crate::elastic_query_dsl::one_field_map::OneFieldMap; use crate::elastic_query_dsl::ConvertibleToQueryAst; @@ -59,10 +59,9 @@ impl ConvertibleToQueryAst for RangeQuery { boost, format, } = self.value; - let (gt, gte, lt, lte) = if let Some(JsonLiteral::String(fmt)) = format { - let parser = StrptimeParser::from_str(&fmt).map_err(|reason| { - anyhow::anyhow!("failed to create parser from : {}; reason: {}", fmt, reason) - })?; + let (gt, gte, lt, lte) = if let Some(JsonLiteral::String(java_date_format)) = format { + let parser = StrptimeParser::from_java_datetime_format(&java_date_format) + .map_err(|err| anyhow::anyhow!("failed to parse range query date format. {err}"))?; ( gt.map(|v| parse_and_convert(v, &parser)).transpose()?, gte.map(|v| parse_and_convert(v, &parser)).transpose()?, @@ -102,7 +101,8 @@ fn parse_and_convert(literal: JsonLiteral, parser: &StrptimeParser) -> anyhow::R let parsed_date_time = parser .parse_date_time(&date_time_str) .map_err(|reason| anyhow::anyhow!("Failed to parse date time: {}", reason))?; - Ok(JsonLiteral::String(parsed_date_time.to_string())) + let parsed_date_time_rfc3339 = parsed_date_time.format(&Rfc3339)?; + Ok(JsonLiteral::String(parsed_date_time_rfc3339)) } else { Ok(literal) } @@ -110,39 +110,62 @@ fn parse_and_convert(literal: JsonLiteral, parser: &StrptimeParser) -> anyhow::R #[cfg(test)] mod tests { - use std::str::FromStr; + use std::ops::Bound; - use quickwit_datetime::StrptimeParser; - - use crate::elastic_query_dsl::range_query::parse_and_convert; + use super::{RangeQuery as ElasticRangeQuery, RangeQueryParams as ElasticRangeQueryParams}; + use crate::elastic_query_dsl::ConvertibleToQueryAst; + use crate::query_ast::{QueryAst, RangeQuery}; use crate::JsonLiteral; #[test] - fn test_parse_and_convert() -> anyhow::Result<()> { - let parser = StrptimeParser::from_str("%Y-%m-%d %H:%M:%S").unwrap(); - - // valid datetime - let input = JsonLiteral::String("2022-12-30 05:45:00".to_string()); - let result = parse_and_convert(input, &parser)?; - assert_eq!( - result, - JsonLiteral::String("2022-12-30 5:45:00.0 +00:00:00".to_string()) - ); - - // invalid datetime - let input = JsonLiteral::String("invalid datetime".to_string()); - let result = parse_and_convert(input, &parser); - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Failed to parse date time")); - - // non_string(number) input - let input = JsonLiteral::Number(27.into()); - let result = parse_and_convert(input.clone(), &parser)?; - assert_eq!(result, input); + fn test_date_range_query_with_format() { + let range_query_params = ElasticRangeQueryParams { + gt: Some(JsonLiteral::String("2021-01-03T13:32:43".to_string())), + gte: None, + lt: None, + lte: None, + boost: None, + format: JsonLiteral::String("yyyy-MM-dd['T'HH:mm:ss]".to_string()).into(), + }; + let range_query: ElasticRangeQuery = ElasticRangeQuery { + field: "date".to_string(), + value: range_query_params, + }; + let range_query_ast = range_query.convert_to_query_ast().unwrap(); + assert!(matches!( + range_query_ast, + QueryAst::Range(RangeQuery { + field, + lower_bound: Bound::Excluded(lower_bound), + upper_bound: Bound::Unbounded, + }) + if field == "date" && lower_bound == JsonLiteral::String("2021-01-03T13:32:43Z".to_string()) + )); + } - Ok(()) + #[test] + fn test_date_range_query_with_strict_date_optional_time_format() { + let range_query_params = ElasticRangeQueryParams { + gt: None, + gte: None, + lt: None, + lte: Some(JsonLiteral::String("2024-09-28T10:22:55.797Z".to_string())), + boost: None, + format: JsonLiteral::String("strict_date_optional_time".to_string()).into(), + }; + let range_query: ElasticRangeQuery = ElasticRangeQuery { + field: "timestamp".to_string(), + value: range_query_params, + }; + let range_query_ast = range_query.convert_to_query_ast().unwrap(); + assert!(matches!( + range_query_ast, + QueryAst::Range(RangeQuery { + field, + lower_bound: Bound::Unbounded, + upper_bound: Bound::Included(upper_bound), + }) + if field == "timestamp" && upper_bound == JsonLiteral::String("2024-09-28T10:22:55.797Z".to_string()) + )); } } diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml index 5337325c229..bbedea70e0d 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml @@ -243,5 +243,18 @@ expected: total: value: 68 relation: "eq" - - +--- +# Timestamp field with a custom format. +json: + query: + range: + created_at: + gte: "2015|02|01 T00:00:00.001999Z" + lte: "2015|02|01 T00:00:00.001999Z" + # Elasticsearch date format requires text to be escaped with single quotes + format: yyyy|MM|dd 'T'HH:mm:ss.SSSSSS'Z' +expected: + hits: + total: + value: 1 + relation: "eq"