From 6373edb401493a69f39a013c4f83de27a16b64a3 Mon Sep 17 00:00:00 2001 From: Vladimir Kuznichenkov <5330267+kuzaxak@users.noreply.github.com> Date: Mon, 30 Sep 2024 05:06:17 +0300 Subject: [PATCH 1/5] Fix incompatible es java date format (#5462) * fix(es): add java date format support * Adding rest integration test * Bugfix: expressing date in rfc3339. unit test + integration test * fix: add week patterns * Add support for variable zone offset type ES supports 3 formats at the same time. To do the same we will use `First` format item with multiple options. * Replace regexp tokenizer with recursion parser In the current implementation, the RegexTokenizer uses regular expressions to tokenize the format string. However, regular expressions are not well-suited for parsing nested structures like nested brackets because they cannot match patterns with recursive depth. To address this, we'll write a custom recursive parser that can handle nested optional brackets. This parser will process the format string character by character, building the format items and managing the nesting of optional components. That change allowed me to cover `strict_date_optional_time` which has a nested depth according to the examples in [OS repo][2] Additional tests taken from [the doc][1] [1]: https://opensearch.org/docs/latest/field-types/supported-field-types/date/ [2]: https://github.com/opensearch-project/OpenSearch/blob/main/server/src/test/java/org/opensearch/common/time/DateFormattersTests.java * Move StrptimeParser and Java related parsers to a dedicated file This way it is easier to keep it manageable and clearly separate QW API DatParser logic and ES API related parser. Tests moved together with the code base. * Add additional test to cover initially reported issue This test covers that we actually can call ES API endpoint with named alias as a formatter. https://github.com/quickwit-oss/quickwit/issues/5460 --------- Co-authored-by: Eugene Tolbakov Co-authored-by: Paul Masurel --- quickwit/Cargo.lock | 1 - quickwit/quickwit-datetime/Cargo.toml | 1 - .../quickwit-datetime/src/date_time_format.rs | 153 +--- .../src/date_time_parsing.rs | 16 +- .../src/java_date_time_format.rs | 817 ++++++++++++++++++ quickwit/quickwit-datetime/src/lib.rs | 4 +- quickwit/quickwit-query/Cargo.toml | 1 + .../src/elastic_query_dsl/range_query.rs | 93 +- .../es_compatibility/0007-range_queries.yaml | 17 +- 9 files changed, 908 insertions(+), 195 deletions(-) create mode 100644 quickwit/quickwit-datetime/src/java_date_time_format.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 233fa27d139..3ce5526bd7a 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5894,7 +5894,6 @@ version = "0.8.0" dependencies = [ "anyhow", "itertools 0.13.0", - "ouroboros", "serde", "serde_json", "tantivy", diff --git a/quickwit/quickwit-datetime/Cargo.toml b/quickwit/quickwit-datetime/Cargo.toml index c30e6b029e1..004e959a348 100644 --- a/quickwit/quickwit-datetime/Cargo.toml +++ b/quickwit/quickwit-datetime/Cargo.toml @@ -13,7 +13,6 @@ license.workspace = true [dependencies] anyhow = { workspace = true } itertools = { workspace = true } -ouroboros = "0.18.0" serde = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } diff --git a/quickwit/quickwit-datetime/src/date_time_format.rs b/quickwit/quickwit-datetime/src/date_time_format.rs index 42b282ef6db..1758e289113 100644 --- a/quickwit/quickwit-datetime/src/date_time_format.rs +++ b/quickwit/quickwit-datetime/src/date_time_format.rs @@ -20,138 +20,14 @@ use std::fmt::Display; use std::str::FromStr; -use ouroboros::self_referencing; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value as JsonValue; -use time::error::Format; use time::format_description::well_known::{Iso8601, Rfc2822, Rfc3339}; -use time::format_description::FormatItem; -use time::parsing::Parsed; -use time::{Month, OffsetDateTime, PrimitiveDateTime}; -use time_fmt::parse::time_format_item::parse_to_format_item; - -use crate::TantivyDateTime; - -/// A date time parser that holds the format specification `Vec`. -#[self_referencing] -pub struct StrptimeParser { - strptime_format: String, - with_timezone: bool, - #[borrows(strptime_format)] - #[covariant] - items: Vec>, -} - -impl FromStr for StrptimeParser { - type Err = String; - - fn from_str(strptime_format: &str) -> Result { - StrptimeParser::try_new( - strptime_format.to_string(), - strptime_format.to_lowercase().contains("%z"), - |strptime_format: &String| { - parse_to_format_item(strptime_format).map_err(|error| { - format!("invalid strptime format `{strptime_format}`: {error}") - }) - }, - ) - } -} - -impl StrptimeParser { - /// Parse a given date according to the datetime format specified during the StrptimeParser - /// creation. If the date format does not provide a specific a time, the time will be set to - /// 00:00:00. - fn parse_primitive_date_time(&self, date_time_str: &str) -> anyhow::Result { - let mut parsed = Parsed::new(); - if !parsed - .parse_items(date_time_str.as_bytes(), self.borrow_items())? - .is_empty() - { - anyhow::bail!( - "datetime string `{}` does not match strptime format `{}`", - date_time_str, - self.borrow_strptime_format() - ); - } - // The parsed datetime contains a date but seems to be missing "time". - // We complete it artificially with 00:00:00. - if parsed.hour_24().is_none() - && !(parsed.hour_12().is_some() && parsed.hour_12_is_pm().is_some()) - { - parsed.set_hour_24(0u8); - parsed.set_minute(0u8); - parsed.set_second(0u8); - } - if parsed.year().is_none() { - let now = OffsetDateTime::now_utc(); - let year = infer_year(parsed.month(), now.month(), now.year()); - parsed.set_year(year); - } - let date_time = parsed.try_into()?; - Ok(date_time) - } - - pub fn parse_date_time(&self, date_time_str: &str) -> Result { - if *self.borrow_with_timezone() { - OffsetDateTime::parse(date_time_str, self.borrow_items()).map_err(|err| err.to_string()) - } else { - self.parse_primitive_date_time(date_time_str) - .map(|date_time| date_time.assume_utc()) - .map_err(|err| err.to_string()) - } - } - - pub fn format_date_time(&self, date_time: &OffsetDateTime) -> Result { - date_time.format(self.borrow_items()) - } -} +use time::Month; -impl Clone for StrptimeParser { - fn clone(&self) -> Self { - // `self.format` is already known to be a valid format. - Self::from_str(self.borrow_strptime_format().as_str()).unwrap() - } -} - -impl PartialEq for StrptimeParser { - fn eq(&self, other: &Self) -> bool { - self.borrow_strptime_format() == other.borrow_strptime_format() - } -} - -impl Eq for StrptimeParser {} - -impl std::fmt::Debug for StrptimeParser { - fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter - .debug_struct("StrptimeParser") - .field("format", &self.borrow_strptime_format()) - .finish() - } -} - -impl std::hash::Hash for StrptimeParser { - fn hash(&self, state: &mut H) { - self.borrow_strptime_format().hash(state); - } -} - -// `Strftime` format special characters. -// These characters are taken from the parsing crate we use for compatibility. -const STRFTIME_FORMAT_MARKERS: [&str; 36] = [ - "%a", "%A", "%b", "%B", "%c", "%C", "%d", "%D", "%e", "%f", "%F", "%h", "%H", "%I", "%j", "%k", - "%l", "%m", "%M", "%n", "%p", "%P", "%r", "%R", "%S", "%t", "%T", "%U", "%w", "%W", "%x", "%X", - "%y", "%Y", "%z", "%Z", -]; - -// Checks if a format contains `strftime` special characters. -fn is_strftime_formatting(format_str: &str) -> bool { - STRFTIME_FORMAT_MARKERS - .iter() - .any(|marker| format_str.contains(marker)) -} +use crate::java_date_time_format::is_strftime_formatting; +use crate::{StrptimeParser, TantivyDateTime}; /// Specifies the datetime and unix timestamp formats to use when parsing date strings. #[derive(Clone, Debug, Eq, PartialEq, Hash, Default)] @@ -170,7 +46,7 @@ impl DateTimeInputFormat { DateTimeInputFormat::Iso8601 => "iso8601", DateTimeInputFormat::Rfc2822 => "rfc2822", DateTimeInputFormat::Rfc3339 => "rfc3339", - DateTimeInputFormat::Strptime(parser) => parser.borrow_strptime_format(), + DateTimeInputFormat::Strptime(parser) => parser.strptime_format.as_str(), DateTimeInputFormat::Timestamp => "unix_timestamp", } } @@ -198,7 +74,7 @@ impl FromStr for DateTimeInputFormat { format must contain at least one `strftime` special characters" )); } - DateTimeInputFormat::Strptime(StrptimeParser::from_str(date_time_format_str)?) + DateTimeInputFormat::Strptime(StrptimeParser::from_strptime(date_time_format_str)?) } }; Ok(date_time_format) @@ -241,7 +117,7 @@ impl DateTimeOutputFormat { DateTimeOutputFormat::Iso8601 => "iso8601", DateTimeOutputFormat::Rfc2822 => "rfc2822", DateTimeOutputFormat::Rfc3339 => "rfc3339", - DateTimeOutputFormat::Strptime(parser) => parser.borrow_strptime_format(), + DateTimeOutputFormat::Strptime(parser) => parser.strptime_format.as_str(), DateTimeOutputFormat::TimestampSecs => "unix_timestamp_secs", DateTimeOutputFormat::TimestampMillis => "unix_timestamp_millis", DateTimeOutputFormat::TimestampMicros => "unix_timestamp_micros", @@ -300,7 +176,7 @@ impl FromStr for DateTimeOutputFormat { format must contain at least one `strftime` special characters" )); } - DateTimeOutputFormat::Strptime(StrptimeParser::from_str(date_time_format_str)?) + DateTimeOutputFormat::Strptime(StrptimeParser::from_strptime(date_time_format_str)?) } }; Ok(date_time_format) @@ -341,7 +217,6 @@ pub(super) fn infer_year( #[cfg(test)] mod tests { - use time::macros::datetime; use time::Month; use super::*; @@ -462,20 +337,6 @@ mod tests { } } - #[test] - fn test_strictly_parse_datetime_format() { - let parser = StrptimeParser::from_str("%Y-%m-%d").unwrap(); - assert_eq!( - parser.parse_date_time("2021-01-01").unwrap(), - datetime!(2021-01-01 00:00:00 UTC) - ); - let error = parser.parse_date_time("2021-01-01TABC").unwrap_err(); - assert_eq!( - error, - "datetime string `2021-01-01TABC` does not match strptime format `%Y-%m-%d`" - ); - } - #[test] fn test_infer_year() { let inferred_year = infer_year(None, Month::January, 2024); diff --git a/quickwit/quickwit-datetime/src/date_time_parsing.rs b/quickwit/quickwit-datetime/src/date_time_parsing.rs index 14c1fa9be90..54e8d4b88bb 100644 --- a/quickwit/quickwit-datetime/src/date_time_parsing.rs +++ b/quickwit/quickwit-datetime/src/date_time_parsing.rs @@ -179,8 +179,6 @@ pub fn parse_timestamp(timestamp: i64) -> Result { #[cfg(test)] mod tests { - use std::str::FromStr; - use time::macros::datetime; use time::Month; @@ -262,7 +260,7 @@ mod tests { ), ]; for (fmt, date_time_str, expected) in test_data { - let parser = StrptimeParser::from_str(fmt).unwrap(); + let parser = StrptimeParser::from_strptime(fmt).unwrap(); let result = parser.parse_date_time(date_time_str); if let Err(error) = &result { panic!( @@ -276,14 +274,14 @@ mod tests { #[test] fn test_parse_date_without_time() { - let strptime_parser = StrptimeParser::from_str("%Y-%m-%d").unwrap(); + let strptime_parser = StrptimeParser::from_strptime("%Y-%m-%d").unwrap(); let date = strptime_parser.parse_date_time("2012-05-21").unwrap(); assert_eq!(date, datetime!(2012-05-21 00:00:00 UTC)); } #[test] fn test_parse_date_am_pm_hour_not_zeroed() { - let strptime_parser = StrptimeParser::from_str("%Y-%m-%d %I:%M:%S %p").unwrap(); + let strptime_parser = StrptimeParser::from_strptime("%Y-%m-%d %I:%M:%S %p").unwrap(); let date = strptime_parser .parse_date_time("2012-05-21 10:05:12 pm") .unwrap(); @@ -309,13 +307,13 @@ mod tests { DateTimeInputFormat::Rfc2822, DateTimeInputFormat::Rfc3339, DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y-%m-%d %H:%M:%S").unwrap(), + StrptimeParser::from_strptime("%Y-%m-%d %H:%M:%S").unwrap(), ), DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y/%m/%d %H:%M:%S").unwrap(), + StrptimeParser::from_strptime("%Y/%m/%d %H:%M:%S").unwrap(), ), DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y/%m/%d %H:%M:%S %z").unwrap(), + StrptimeParser::from_strptime("%Y/%m/%d %H:%M:%S %z").unwrap(), ), DateTimeInputFormat::Timestamp, ], @@ -452,7 +450,7 @@ mod tests { DateTimeInputFormat::Iso8601, DateTimeInputFormat::Rfc3339, DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y-%m-%d %H:%M:%S.%f").unwrap(), + StrptimeParser::from_strptime("%Y-%m-%d %H:%M:%S.%f").unwrap(), ), ], ) diff --git a/quickwit/quickwit-datetime/src/java_date_time_format.rs b/quickwit/quickwit-datetime/src/java_date_time_format.rs new file mode 100644 index 00000000000..1cc035c90f3 --- /dev/null +++ b/quickwit/quickwit-datetime/src/java_date_time_format.rs @@ -0,0 +1,817 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::num::NonZeroU8; +use std::sync::OnceLock; + +use time::error::{Format, TryFromParsed}; +use time::format_description::modifier::{ + Day, Hour, Minute, Month as MonthModifier, Padding, Second, Subsecond, SubsecondDigits, + WeekNumber, WeekNumberRepr, Weekday, WeekdayRepr, Year, YearRepr, +}; +use time::format_description::{Component, OwnedFormatItem}; +use time::parsing::Parsed; +use time::{Month, OffsetDateTime, PrimitiveDateTime, UtcOffset}; +use time_fmt::parse::time_format_item::parse_to_format_item; + +use crate::date_time_format; + +const JAVA_DATE_FORMAT_TOKENS: &[&str] = &[ + "yyyy", + "xxxx", + "xx[xx]", + "SSSSSSSSS", // For nanoseconds + "SSSSSSS", // For microseconds + "SSSSSS", // For fractional seconds up to six digits + "SSSSS", + "SSSS", + "SSS", + "SS", + "ZZ", + "xx", + "ww", + "w[w]", + "yy", + "MM", + "dd", + "HH", + "hh", + "kk", + "mm", + "ss", + "aa", + "a", + "w", + "M", + "d", + "H", + "h", + "k", + "m", + "s", + "S", + "Z", + "e", +]; + +fn literal(s: &[u8]) -> OwnedFormatItem { + // builds a boxed slice from a slice + let boxed_slice: Box<[u8]> = s.to_vec().into_boxed_slice(); + OwnedFormatItem::Literal(boxed_slice) +} + +#[inline] +fn get_padding(ptn: &str) -> Padding { + if ptn.len() == 2 { + Padding::Zero + } else { + Padding::None + } +} + +fn build_zone_offset(_: &str) -> Option { + // 'Z' literal to represent UTC offset + let z_literal = OwnedFormatItem::Literal(Box::from(b"Z".as_ref())); + + // Offset in '+/-HH:MM' format + let offset_with_delimiter_items: Box<[OwnedFormatItem]> = vec![ + OwnedFormatItem::Component(Component::OffsetHour(Default::default())), + OwnedFormatItem::Literal(Box::from(b":".as_ref())), + OwnedFormatItem::Component(Component::OffsetMinute(Default::default())), + ] + .into_boxed_slice(); + let offset_with_delimiter_compound = OwnedFormatItem::Compound(offset_with_delimiter_items); + + // Offset in '+/-HHMM' format + let offset_items: Box<[OwnedFormatItem]> = vec![ + OwnedFormatItem::Component(Component::OffsetHour(Default::default())), + OwnedFormatItem::Component(Component::OffsetMinute(Default::default())), + ] + .into_boxed_slice(); + let offset_compound = OwnedFormatItem::Compound(offset_items); + + Some(OwnedFormatItem::First( + vec![z_literal, offset_with_delimiter_compound, offset_compound].into_boxed_slice(), + )) +} + +fn build_year_item(ptn: &str) -> Option { + let mut full_year = Year::default(); + full_year.repr = YearRepr::Full; + let full_year_component = OwnedFormatItem::Component(Component::Year(full_year)); + + let mut short_year = Year::default(); + short_year.repr = YearRepr::LastTwo; + let short_year_component = OwnedFormatItem::Component(Component::Year(short_year)); + + if ptn.len() == 4 { + Some(full_year_component) + } else if ptn.len() == 2 { + Some(short_year_component) + } else { + Some(OwnedFormatItem::First( + vec![full_year_component, short_year_component].into_boxed_slice(), + )) + } +} + +fn build_week_based_year_item(ptn: &str) -> Option { + // TODO no `Component` for that + build_year_item(ptn) +} + +fn build_month_item(ptn: &str) -> Option { + let mut month: MonthModifier = Default::default(); + month.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Month(month))) +} + +fn build_day_item(ptn: &str) -> Option { + let mut day = Day::default(); + day.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Day(day))) +} + +fn build_day_of_week_item(_: &str) -> Option { + let mut weekday = Weekday::default(); + weekday.repr = WeekdayRepr::Monday; + weekday.one_indexed = false; + Some(OwnedFormatItem::Component(Component::Weekday(weekday))) +} + +fn build_week_of_year_item(ptn: &str) -> Option { + let mut week_number = WeekNumber::default(); + week_number.repr = WeekNumberRepr::Monday; + week_number.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::WeekNumber( + week_number, + ))) +} + +fn build_hour_item(ptn: &str) -> Option { + let mut hour = Hour::default(); + hour.padding = get_padding(ptn); + hour.is_12_hour_clock = false; + Some(OwnedFormatItem::Component(Component::Hour(hour))) +} + +fn build_minute_item(ptn: &str) -> Option { + let mut minute: Minute = Default::default(); + minute.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Minute(minute))) +} + +fn build_second_item(ptn: &str) -> Option { + let mut second: Second = Default::default(); + second.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Second(second))) +} + +fn build_fraction_of_second_item(_ptn: &str) -> Option { + let mut subsecond: Subsecond = Default::default(); + subsecond.digits = SubsecondDigits::OneOrMore; + Some(OwnedFormatItem::Component(Component::Subsecond(subsecond))) +} + +fn parse_java_datetime_format_items_recursive( + chars: &mut std::iter::Peekable, +) -> Result, String> { + let mut items = Vec::new(); + + while let Some(&c) = chars.peek() { + match c { + '[' => { + chars.next(); + let optional_items = parse_java_datetime_format_items_recursive(chars)?; + items.push(OwnedFormatItem::Optional(Box::new( + OwnedFormatItem::Compound(optional_items.into_boxed_slice()), + ))); + } + ']' => { + chars.next(); + break; + } + '\'' => { + chars.next(); + let mut literal_str = String::new(); + while let Some(&next_c) = chars.peek() { + if next_c == '\'' { + chars.next(); + break; + } else { + literal_str.push(next_c); + chars.next(); + } + } + items.push(literal(literal_str.as_bytes())); + } + _ => { + if let Some(format_item) = match_java_date_format_token(chars)? { + items.push(format_item); + } else { + // Treat as a literal character + items.push(literal(c.to_string().as_bytes())); + chars.next(); + } + } + } + } + + Ok(items) +} + +// Elasticsearch/OpenSearch uses a set of preconfigured formats, more information could be found +// here https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html +fn match_java_date_format_token( + chars: &mut std::iter::Peekable, +) -> Result, String> { + if chars.peek().is_none() { + return Ok(None); + } + + let remaining: String = chars.clone().collect(); + + // Try to match the longest possible token + for token in JAVA_DATE_FORMAT_TOKENS { + if remaining.starts_with(token) { + for _ in 0..token.len() { + chars.next(); + } + + let format_item = match *token { + "yyyy" | "yy" => build_year_item(token), + "xxxx" | "xx[xx]" | "xx" => build_week_based_year_item(token), + "MM" | "M" => build_month_item(token), + "dd" | "d" => build_day_item(token), + "HH" | "H" => build_hour_item(token), + "mm" | "m" => build_minute_item(token), + "ss" | "s" => build_second_item(token), + "SSSSSSSSS" | "SSSSSSS" | "SSSSSS" | "SSSSS" | "SSSS" | "SSS" | "SS" | "S" => { + build_fraction_of_second_item(token) + } + "Z" => build_zone_offset(token), + "ww" | "w[w]" | "w" => build_week_of_year_item(token), + "e" => build_day_of_week_item(token), + _ => return Err(format!("Unrecognized token '{}'", token)), + }; + return Ok(format_item); + } + } + + Ok(None) +} + +// Check if the given date time format is a common alias and replace it with the +// Java date format it is mapped to, if any. +// If the java_datetime_format is not an alias, it is expected to be a +// java date time format and should be returned as is. +fn resolve_java_datetime_format_alias(java_datetime_format: &str) -> &str { + static JAVA_DATE_FORMAT_ALIASES: OnceLock> = + OnceLock::new(); + let java_datetime_format_map = JAVA_DATE_FORMAT_ALIASES.get_or_init(|| { + let mut m = HashMap::new(); + m.insert("date_optional_time", "yyyy-MM-dd['T'HH:mm:ss.SSSZ]"); + m.insert( + "strict_date_optional_time", + "yyyy[-MM[-dd['T'HH[:mm[:ss[.SSS[Z]]]]]]]", + ); + m.insert( + "strict_date_optional_time_nanos", + "yyyy[-MM[-dd['T'HH:mm:ss.SSSSSSZ]]]", + ); + m.insert("basic_date", "yyyyMMdd"); + + m.insert("strict_basic_week_date", "xxxx'W'wwe"); + m.insert("basic_week_date", "xx[xx]'W'wwe"); + + m.insert("strict_basic_week_date_time", "xxxx'W'wwe'T'HHmmss.SSSZ"); + m.insert("basic_week_date_time", "xx[xx]'W'wwe'T'HHmmss.SSSZ"); + + m.insert( + "strict_basic_week_date_time_no_millis", + "xxxx'W'wwe'T'HHmmssZ", + ); + m.insert("basic_week_date_time_no_millis", "xx[xx]'W'wwe'T'HHmmssZ"); + + m.insert("strict_week_date", "xxxx-'W'ww-e"); + m.insert("week_date", "xxxx-'W'w[w]-e"); + m + }); + java_datetime_format_map + .get(java_datetime_format) + .copied() + .unwrap_or(java_datetime_format) +} + +/// A date time parser that holds the format specification `Vec`. +#[derive(Clone)] +pub struct StrptimeParser { + pub(crate) strptime_format: String, + items: Box<[OwnedFormatItem]>, +} + +pub fn parse_java_datetime_format_items( + java_datetime_format: &str, +) -> Result, String> { + let mut chars = java_datetime_format.chars().peekable(); + let items = parse_java_datetime_format_items_recursive(&mut chars)?; + Ok(items.into_boxed_slice()) +} + +impl StrptimeParser { + /// Parse a date assume UTC if unspecified. + /// See `parse_date_time_with_default_timezone` for more details. + pub fn parse_date_time(&self, date_time_str: &str) -> Result { + self.parse_date_time_with_default_timezone(date_time_str, UtcOffset::UTC) + } + + /// Parse a date. If no timezone is specified we will assume the timezone passed as + /// `default_offset`. If the date is missing, it will be automatically set to 00:00:00. + pub fn parse_date_time_with_default_timezone( + &self, + date_time_str: &str, + default_offset: UtcOffset, + ) -> Result { + let mut parsed = Parsed::new(); + if !parsed + .parse_items(date_time_str.as_bytes(), &self.items) + .map_err(|err| err.to_string())? + .is_empty() + { + return Err(format!( + "datetime string `{}` does not match strptime format `{}`", + date_time_str, &self.strptime_format + )); + } + + // The parsed datetime contains a date but seems to be missing "time". + // We complete it artificially with 00:00:00. + if parsed.hour_24().is_none() + && !(parsed.hour_12().is_some() && parsed.hour_12_is_pm().is_some()) + { + parsed.set_hour_24(0u8); + parsed.set_minute(0u8); + parsed.set_second(0u8); + } + + if parsed.year().is_none() { + let now = OffsetDateTime::now_utc(); + let year = date_time_format::infer_year(parsed.month(), now.month(), now.year()); + parsed.set_year(year); + } + + if parsed.day().is_none() && parsed.monday_week_number().is_none() { + parsed.set_day(NonZeroU8::try_from(1u8).unwrap()); + } + + if parsed.month().is_none() && parsed.monday_week_number().is_none() { + parsed.set_month(Month::January); + } + + if parsed.offset_hour().is_some() { + let offset_datetime: OffsetDateTime = parsed + .try_into() + .map_err(|err: TryFromParsed| err.to_string())?; + return Ok(offset_datetime); + } + let primitive_date_time: PrimitiveDateTime = parsed + .try_into() + .map_err(|err: TryFromParsed| err.to_string())?; + Ok(primitive_date_time.assume_offset(default_offset)) + } + + pub fn format_date_time(&self, date_time: &OffsetDateTime) -> Result { + date_time.format(&self.items) + } + + pub fn from_strptime(strptime_format: &str) -> Result { + let items: Box<[OwnedFormatItem]> = parse_to_format_item(strptime_format) + .map_err(|err| format!("invalid strptime format `{strptime_format}`: {err}"))? + .into_iter() + .map(|item| item.into()) + .collect::>() + .into_boxed_slice(); + Ok(StrptimeParser::new(strptime_format.to_string(), items)) + } + + pub fn from_java_datetime_format(java_datetime_format: &str) -> Result { + let java_datetime_format_resolved = + resolve_java_datetime_format_alias(java_datetime_format); + let items: Box<[OwnedFormatItem]> = + parse_java_datetime_format_items(java_datetime_format_resolved)?; + Ok(StrptimeParser::new(java_datetime_format.to_string(), items)) + } + + fn new(strptime_format: String, items: Box<[OwnedFormatItem]>) -> Self { + StrptimeParser { + strptime_format, + items, + } + } +} + +impl PartialEq for StrptimeParser { + fn eq(&self, other: &Self) -> bool { + self.strptime_format == other.strptime_format + } +} + +impl Eq for StrptimeParser {} + +impl std::fmt::Debug for StrptimeParser { + fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter + .debug_struct("StrptimeParser") + .field("format", &self.strptime_format) + .finish() + } +} + +impl std::hash::Hash for StrptimeParser { + fn hash(&self, state: &mut H) { + self.strptime_format.hash(state); + } +} + +// `Strftime` format special characters. +// These characters are taken from the parsing crate we use for compatibility. +const STRFTIME_FORMAT_MARKERS: [&str; 36] = [ + "%a", "%A", "%b", "%B", "%c", "%C", "%d", "%D", "%e", "%f", "%F", "%h", "%H", "%I", "%j", "%k", + "%l", "%m", "%M", "%n", "%p", "%P", "%r", "%R", "%S", "%t", "%T", "%U", "%w", "%W", "%x", "%X", + "%y", "%Y", "%z", "%Z", +]; + +// Checks if a format contains `strftime` special characters. +pub fn is_strftime_formatting(format_str: &str) -> bool { + STRFTIME_FORMAT_MARKERS + .iter() + .any(|marker| format_str.contains(marker)) +} + +#[cfg(test)] +mod tests { + use time::macros::datetime; + + use super::*; + use crate::java_date_time_format::parse_java_datetime_format_items; + + #[test] + fn test_parse_datetime_format_missing_time() { + let parser = StrptimeParser::from_strptime("%Y-%m-%d").unwrap(); + assert_eq!( + parser.parse_date_time("2021-01-01").unwrap(), + datetime!(2021-01-01 00:00:00 UTC) + ); + } + + #[test] + fn test_parse_datetime_format_strict_on_trailing_data() { + let parser = StrptimeParser::from_strptime("%Y-%m-%d").unwrap(); + let error = parser.parse_date_time("2021-01-01TABC").unwrap_err(); + assert_eq!( + error, + "datetime string `2021-01-01TABC` does not match strptime format `%Y-%m-%d`" + ); + } + + #[test] + fn test_parse_strptime_with_timezone() { + let parser = StrptimeParser::from_strptime("%Y-%m-%dT%H:%M:%S %z").unwrap(); + let offset_datetime = parser + .parse_date_time("2021-01-01T11:00:03 +07:00") + .unwrap(); + assert_eq!(offset_datetime, datetime!(2021-01-01 11:00:03 +7)); + } + + #[track_caller] + fn test_parse_java_datetime_aux( + java_date_time_format: &str, + date_str: &str, + expected_datetime: OffsetDateTime, + ) { + let parser = StrptimeParser::from_java_datetime_format(java_date_time_format).unwrap(); + let datetime = parser.parse_date_time(date_str).unwrap(); + assert_eq!(datetime, expected_datetime); + } + + #[test] + fn test_parse_java_datetime_format() { + test_parse_java_datetime_aux("yyyyMMdd", "20210101", datetime!(2021-01-01 00:00:00 UTC)); + test_parse_java_datetime_aux( + "yyyy MM dd", + "2021 01 01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd", + "2021!01?01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd'T'HH:", + "2021!01?01T13:", + datetime!(2021-01-01 13:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd['T'[HH:]]", + "2021!01?01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd['T'[HH:]", + "2021!01?01T", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd['T'[HH:]]", + "2021!01?01T13:", + datetime!(2021-01-01 13:00:00 UTC), + ); + } + + #[test] + fn test_parse_java_missing_time() { + test_parse_java_datetime_aux( + "yyyy-MM-dd", + "2021-01-01", + datetime!(2021-01-01 00:00:00 UTC), + ); + } + + #[test] + fn test_parse_java_optional_missing_time() { + test_parse_java_datetime_aux( + "yyyy-MM-dd[ HH:mm:ss]", + "2021-01-01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy-MM-dd[ HH:mm:ss]", + "2021-01-01 12:34:56", + datetime!(2021-01-01 12:34:56 UTC), + ); + } + + #[test] + fn test_parse_java_datetime_format_aliases() { + test_parse_java_datetime_aux( + "date_optional_time", + "2021-01-01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "date_optional_time", + "2021-01-21T03:01:22.312+01:00", + datetime!(2021-01-21 03:01:22.312 +1), + ); + } + + #[test] + fn test_parse_java_week_formats() { + test_parse_java_datetime_aux( + "basic_week_date", + "2024W313", + datetime!(2024-08-01 0:00:00.0 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date", + "24W313", + datetime!(2024-08-01 0:00:00.0 +00:00:00), + ); + // // ❌ 'the 'year' component could not be parsed' + // test_parse_java_datetime_aux( + // "basic_week_date", + // "1W313", + // datetime!(2018-08-02 0:00:00.0 +00:00:00), + // ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.1Z", + datetime!(2018-08-02 12:12:12.1 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.123Z", + datetime!(2018-08-02 12:12:12.123 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.123456789Z", + datetime!(2018-08-02 12:12:12.123456789 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.123+0100", + datetime!(2018-08-02 12:12:12.123 +01:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time_no_millis", + "2018W313T121212Z", + datetime!(2018-08-02 12:12:12.0 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time_no_millis", + "2018W313T121212+0100", + datetime!(2018-08-02 12:12:12.0 +01:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time_no_millis", + "2018W313T121212+01:00", + datetime!(2018-08-02 12:12:12.0 +01:00:00), + ); + + test_parse_java_datetime_aux( + "week_date", + "2012-W48-6", + datetime!(2012-12-02 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "week_date", + "2012-W01-6", + datetime!(2012-01-08 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "week_date", + "2012-W1-6", + datetime!(2012-01-08 0:00:00.0 +00:00:00), + ); + } + + #[test] + fn test_parse_java_strict_week_formats() { + test_parse_java_datetime_aux( + "strict_basic_week_date", + "2024W313", + datetime!(2024-08-01 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "strict_week_date", + "2012-W48-6", + datetime!(2012-12-02 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "strict_week_date", + "2012-W01-6", + datetime!(2012-01-08 0:00:00.0 +00:00:00), + ); + } + + #[test] + fn test_parse_strict_date_optional_time() { + let parser = + StrptimeParser::from_java_datetime_format("strict_date_optional_time").unwrap(); + let dates = [ + "2019", + "2019-03", + "2019-03-23", + "2019-03-23T21:34", + "2019-03-23T21:34:46", + "2019-03-23T21:34:46.123Z", + "2019-03-23T21:35:46.123+00:00", + "2019-03-23T21:36:46.123+03:00", + "2019-03-23T21:37:46.123+0300", + ]; + let expected = [ + datetime!(2019-01-01 00:00:00 UTC), + datetime!(2019-03-01 00:00:00 UTC), + datetime!(2019-03-23 00:00:00 UTC), + datetime!(2019-03-23 21:34 UTC), + datetime!(2019-03-23 21:34:46 UTC), + datetime!(2019-03-23 21:34:46.123 UTC), + datetime!(2019-03-23 21:35:46.123 UTC), + datetime!(2019-03-23 21:36:46.123 +03:00:00), + datetime!(2019-03-23 21:37:46.123 +03:00:00), + ]; + for (date_str, &expected_dt) in dates.iter().zip(expected.iter()) { + let parsed_dt = parser + .parse_date_time(date_str) + .unwrap_or_else(|e| panic!("Failed to parse {}: {}", date_str, e)); + assert_eq!(parsed_dt, expected_dt); + } + } + + #[test] + fn test_parse_strict_date_optional_time_nanos() { + let parser = + StrptimeParser::from_java_datetime_format("strict_date_optional_time_nanos").unwrap(); + let dates = [ + "2019", + "2019-03", + "2019-03-23", + "2019-03-23T21:34:46.123456789Z", + "2019-03-23T21:35:46.123456789+00:00", + "2019-03-23T21:36:46.123456789+03:00", + "2019-03-23T21:37:46.123456789+0300", + ]; + let expected = [ + datetime!(2019-01-01 00:00:00 UTC), + datetime!(2019-03-01 00:00:00 UTC), + datetime!(2019-03-23 00:00:00 UTC), + datetime!(2019-03-23 21:34:46.123456789 UTC), + datetime!(2019-03-23 21:35:46.123456789 UTC), + datetime!(2019-03-23 21:36:46.123456789 +03:00:00), + datetime!(2019-03-23 21:37:46.123456789 +03:00:00), + ]; + for (date_str, &expected_dt) in dates.iter().zip(expected.iter()) { + let parsed_dt = parser + .parse_date_time(date_str) + .unwrap_or_else(|e| panic!("Failed to parse {}: {}", date_str, e)); + assert_eq!(parsed_dt, expected_dt); + } + } + + #[test] + fn test_parse_java_datetime_format_items() { + let format_str = "xx[xx]'W'wwe"; + let result = parse_java_datetime_format_items(format_str).unwrap(); + + // We expect the tokens to be parsed as: + // - 'xx[xx]' (week-based year) with optional length + // - 'W' (literal) + // - 'ww' (week of year) + // - 'e' (day of week) + + assert_eq!(result.len(), 4); + + // Verify each token + match &result[0] { + OwnedFormatItem::First(boxed_slice) => { + assert_eq!(boxed_slice.len(), 2); + match (&boxed_slice[0], &boxed_slice[1]) { + ( + OwnedFormatItem::Component(Component::Year(_)), + OwnedFormatItem::Component(Component::Year(_)), + ) => {} + unexpected => { + panic!("Expected two Year components, but found: {:?}", unexpected) + } + } + } + unexpected => panic!( + "Expected First with two Year components, but found: {:?}", + unexpected + ), + } + + match &result[1] { + OwnedFormatItem::Literal(lit) => assert_eq!(lit.as_ref(), b"W"), + unexpected => panic!("Expected literal 'W', but found: {:?}", unexpected), + } + + match &result[2] { + OwnedFormatItem::Component(Component::WeekNumber(_)) => {} + unexpected => panic!("Expected WeekNumber component, but found: {:?}", unexpected), + } + + match &result[3] { + OwnedFormatItem::Component(Component::Weekday(_)) => {} + unexpected => panic!("Expected Weekday component, but found: {:?}", unexpected), + } + } + + #[test] + fn test_parse_java_datetime_format_with_literals() { + let format = "yyyy'T'Z-HHuu"; + let parser = StrptimeParser::from_java_datetime_format(format).unwrap(); + + let test_cases = [ + ("2023TZ-14uu", datetime!(2023-01-01 14:00:00 UTC)), + ("2024TZ-05uu", datetime!(2024-01-01 05:00:00 UTC)), + ("2025TZ-23uu", datetime!(2025-01-01 23:00:00 UTC)), + ]; + + for (input, expected) in test_cases.iter() { + let result = parser.parse_date_time(input).unwrap(); + assert_eq!(result, *expected, "Failed to parse {}", input); + } + + // Test error case + let error_case = "2023-1430"; + assert!( + parser.parse_date_time(error_case).is_err(), + "Expected error for input: {}", + error_case + ); + } +} diff --git a/quickwit/quickwit-datetime/src/lib.rs b/quickwit/quickwit-datetime/src/lib.rs index eb4d8c940ba..03003641dcc 100644 --- a/quickwit/quickwit-datetime/src/lib.rs +++ b/quickwit/quickwit-datetime/src/lib.rs @@ -19,9 +19,11 @@ mod date_time_format; mod date_time_parsing; +pub mod java_date_time_format; -pub use date_time_format::{DateTimeInputFormat, DateTimeOutputFormat, StrptimeParser}; +pub use date_time_format::{DateTimeInputFormat, DateTimeOutputFormat}; pub use date_time_parsing::{ parse_date_time_str, parse_timestamp, parse_timestamp_float, parse_timestamp_int, }; +pub use java_date_time_format::StrptimeParser; pub use tantivy::DateTime as TantivyDateTime; diff --git a/quickwit/quickwit-query/Cargo.toml b/quickwit/quickwit-query/Cargo.toml index e1229da8e8b..bee650198c8 100644 --- a/quickwit/quickwit-query/Cargo.toml +++ b/quickwit/quickwit-query/Cargo.toml @@ -22,6 +22,7 @@ serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } tantivy = { workspace = true } +time = { workspace = true } thiserror = { workspace = true } whichlang = { workspace = true, optional = true } diff --git a/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs b/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs index 9e7d07e23da..337ec019e9d 100644 --- a/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs +++ b/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs @@ -18,10 +18,10 @@ // along with this program. If not, see . use std::ops::Bound; -use std::str::FromStr; use quickwit_datetime::StrptimeParser; use serde::Deserialize; +use time::format_description::well_known::Rfc3339; use crate::elastic_query_dsl::one_field_map::OneFieldMap; use crate::elastic_query_dsl::ConvertibleToQueryAst; @@ -59,10 +59,9 @@ impl ConvertibleToQueryAst for RangeQuery { boost, format, } = self.value; - let (gt, gte, lt, lte) = if let Some(JsonLiteral::String(fmt)) = format { - let parser = StrptimeParser::from_str(&fmt).map_err(|reason| { - anyhow::anyhow!("failed to create parser from : {}; reason: {}", fmt, reason) - })?; + let (gt, gte, lt, lte) = if let Some(JsonLiteral::String(java_date_format)) = format { + let parser = StrptimeParser::from_java_datetime_format(&java_date_format) + .map_err(|err| anyhow::anyhow!("failed to parse range query date format. {err}"))?; ( gt.map(|v| parse_and_convert(v, &parser)).transpose()?, gte.map(|v| parse_and_convert(v, &parser)).transpose()?, @@ -102,7 +101,8 @@ fn parse_and_convert(literal: JsonLiteral, parser: &StrptimeParser) -> anyhow::R let parsed_date_time = parser .parse_date_time(&date_time_str) .map_err(|reason| anyhow::anyhow!("Failed to parse date time: {}", reason))?; - Ok(JsonLiteral::String(parsed_date_time.to_string())) + let parsed_date_time_rfc3339 = parsed_date_time.format(&Rfc3339)?; + Ok(JsonLiteral::String(parsed_date_time_rfc3339)) } else { Ok(literal) } @@ -110,39 +110,62 @@ fn parse_and_convert(literal: JsonLiteral, parser: &StrptimeParser) -> anyhow::R #[cfg(test)] mod tests { - use std::str::FromStr; + use std::ops::Bound; - use quickwit_datetime::StrptimeParser; - - use crate::elastic_query_dsl::range_query::parse_and_convert; + use super::{RangeQuery as ElasticRangeQuery, RangeQueryParams as ElasticRangeQueryParams}; + use crate::elastic_query_dsl::ConvertibleToQueryAst; + use crate::query_ast::{QueryAst, RangeQuery}; use crate::JsonLiteral; #[test] - fn test_parse_and_convert() -> anyhow::Result<()> { - let parser = StrptimeParser::from_str("%Y-%m-%d %H:%M:%S").unwrap(); - - // valid datetime - let input = JsonLiteral::String("2022-12-30 05:45:00".to_string()); - let result = parse_and_convert(input, &parser)?; - assert_eq!( - result, - JsonLiteral::String("2022-12-30 5:45:00.0 +00:00:00".to_string()) - ); - - // invalid datetime - let input = JsonLiteral::String("invalid datetime".to_string()); - let result = parse_and_convert(input, &parser); - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Failed to parse date time")); - - // non_string(number) input - let input = JsonLiteral::Number(27.into()); - let result = parse_and_convert(input.clone(), &parser)?; - assert_eq!(result, input); + fn test_date_range_query_with_format() { + let range_query_params = ElasticRangeQueryParams { + gt: Some(JsonLiteral::String("2021-01-03T13:32:43".to_string())), + gte: None, + lt: None, + lte: None, + boost: None, + format: JsonLiteral::String("yyyy-MM-dd['T'HH:mm:ss]".to_string()).into(), + }; + let range_query: ElasticRangeQuery = ElasticRangeQuery { + field: "date".to_string(), + value: range_query_params, + }; + let range_query_ast = range_query.convert_to_query_ast().unwrap(); + assert!(matches!( + range_query_ast, + QueryAst::Range(RangeQuery { + field, + lower_bound: Bound::Excluded(lower_bound), + upper_bound: Bound::Unbounded, + }) + if field == "date" && lower_bound == JsonLiteral::String("2021-01-03T13:32:43Z".to_string()) + )); + } - Ok(()) + #[test] + fn test_date_range_query_with_strict_date_optional_time_format() { + let range_query_params = ElasticRangeQueryParams { + gt: None, + gte: None, + lt: None, + lte: Some(JsonLiteral::String("2024-09-28T10:22:55.797Z".to_string())), + boost: None, + format: JsonLiteral::String("strict_date_optional_time".to_string()).into(), + }; + let range_query: ElasticRangeQuery = ElasticRangeQuery { + field: "timestamp".to_string(), + value: range_query_params, + }; + let range_query_ast = range_query.convert_to_query_ast().unwrap(); + assert!(matches!( + range_query_ast, + QueryAst::Range(RangeQuery { + field, + lower_bound: Bound::Unbounded, + upper_bound: Bound::Included(upper_bound), + }) + if field == "timestamp" && upper_bound == JsonLiteral::String("2024-09-28T10:22:55.797Z".to_string()) + )); } } diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml index 5337325c229..bbedea70e0d 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml @@ -243,5 +243,18 @@ expected: total: value: 68 relation: "eq" - - +--- +# Timestamp field with a custom format. +json: + query: + range: + created_at: + gte: "2015|02|01 T00:00:00.001999Z" + lte: "2015|02|01 T00:00:00.001999Z" + # Elasticsearch date format requires text to be escaped with single quotes + format: yyyy|MM|dd 'T'HH:mm:ss.SSSSSS'Z' +expected: + hits: + total: + value: 1 + relation: "eq" From 7b220756316ee793dd8aca97a7d6fb74a75dac09 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 1 Oct 2024 10:26:16 +0900 Subject: [PATCH 2/5] Removing permits from scaling up/down methods (#5463) --- .../src/ingest/ingest_controller.rs | 20 ++++++------- .../quickwit-control-plane/src/model/mod.rs | 12 ++------ .../src/model/shard_table.rs | 28 ++++++++----------- 3 files changed, 24 insertions(+), 36 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 6cfbdcfc7ac..cc5f2b33fc5 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -676,10 +676,8 @@ impl IngestController { model: &mut ControlPlaneModel, progress: &Progress, ) -> MetastoreResult<()> { - const NUM_PERMITS: u64 = 1; - if !model - .acquire_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .unwrap_or(false) { return Ok(()); @@ -698,7 +696,7 @@ impl IngestController { if successful_source_uids.is_empty() { // We did not manage to create the shard. // We can release our permit. - model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Up); warn!( index_uid=%source_uid.index_uid, source_id=%source_uid.source_id, @@ -722,7 +720,7 @@ impl IngestController { source_id=%source_uid.source_id, "scaling up number of shards to {new_num_open_shards} failed: {metastore_error:?}" ); - model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Up); Err(metastore_error) } } @@ -860,10 +858,12 @@ impl IngestController { model: &mut ControlPlaneModel, progress: &Progress, ) -> MetastoreResult<()> { - const NUM_PERMITS: u64 = 1; + if shard_stats.num_open_shards == 0 { + return Ok(()); + } if !model - .acquire_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .unwrap_or(false) { return Ok(()); @@ -876,12 +876,12 @@ impl IngestController { "scaling down number of shards to {new_num_open_shards}" ); let Some((leader_id, shard_id)) = find_scale_down_candidate(&source_uid, model) else { - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); }; info!("scaling down shard {shard_id} from {leader_id}"); let Some(ingester) = self.ingester_pool.get(&leader_id) else { - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); }; let shard_pkeys = vec![ShardPKey { @@ -896,7 +896,7 @@ impl IngestController { .await { warn!("failed to scale down number of shards: {error}"); - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); } model.close_shards(&source_uid, &[shard_id]); diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index ca314233f6a..d4e02f67c2c 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -378,10 +378,9 @@ impl ControlPlaneModel { &mut self, source_uid: &SourceUid, scaling_mode: ScalingMode, - num_permits: u64, ) -> Option { self.shard_table - .acquire_scaling_permits(source_uid, scaling_mode, num_permits) + .acquire_scaling_permits(source_uid, scaling_mode) } pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { @@ -389,14 +388,9 @@ impl ControlPlaneModel { .drain_scaling_permits(source_uid, scaling_mode) } - pub fn release_scaling_permits( - &mut self, - source_uid: &SourceUid, - scaling_mode: ScalingMode, - num_permits: u64, - ) { + pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { self.shard_table - .release_scaling_permits(source_uid, scaling_mode, num_permits) + .release_scaling_permits(source_uid, scaling_mode) } } diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 29c579cddcd..00b440dec50 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -544,14 +544,13 @@ impl ShardTable { &mut self, source_uid: &SourceUid, scaling_mode: ScalingMode, - num_permits: u64, ) -> Option { let table_entry = self.table_entries.get_mut(source_uid)?; let scaling_rate_limiter = match scaling_mode { ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter, ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter, }; - Some(scaling_rate_limiter.acquire(num_permits)) + Some(scaling_rate_limiter.acquire(1)) } pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { @@ -564,18 +563,13 @@ impl ShardTable { } } - pub fn release_scaling_permits( - &mut self, - source_uid: &SourceUid, - scaling_mode: ScalingMode, - num_permits: u64, - ) { + pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { if let Some(table_entry) = self.table_entries.get_mut(source_uid) { let scaling_rate_limiter = match scaling_mode { ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter, ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter, }; - scaling_rate_limiter.release(num_permits); + scaling_rate_limiter.release(1); } } } @@ -1058,7 +1052,7 @@ mod tests { source_id: source_id.clone(), }; assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .is_none()); shard_table.add_source(&index_uid, &source_id); @@ -1071,7 +1065,7 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .unwrap()); let new_available_permits = shard_table @@ -1096,7 +1090,7 @@ mod tests { source_id: source_id.clone(), }; assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .is_none()); shard_table.add_source(&index_uid, &source_id); @@ -1109,7 +1103,7 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .unwrap()); let new_available_permits = shard_table @@ -1143,10 +1137,10 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .unwrap()); - shard_table.release_scaling_permits(&source_uid, ScalingMode::Up, 1); + shard_table.release_scaling_permits(&source_uid, ScalingMode::Up); let new_available_permits = shard_table .table_entries @@ -1179,10 +1173,10 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .unwrap()); - shard_table.release_scaling_permits(&source_uid, ScalingMode::Down, 1); + shard_table.release_scaling_permits(&source_uid, ScalingMode::Down); let new_available_permits = shard_table .table_entries From 8758287fc0186fa38f1713744f086ae31f5c7aa4 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 1 Oct 2024 10:55:52 +0900 Subject: [PATCH 3/5] Adding fingerprint to task in cluster state. (#5464) Closes #5454 --- quickwit/quickwit-cluster/src/cluster.rs | 22 +++++++----- .../src/actors/indexing_pipeline.rs | 13 ++++++- .../src/actors/indexing_service.rs | 36 +++++++++---------- .../src/models/indexing_statistics.rs | 1 + 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 9305d6485d7..00d42985bda 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -521,7 +521,8 @@ fn indexing_task_to_chitchat_kv(indexing_task: &IndexingTask) -> (String, String let index_uid = indexing_task.index_uid(); let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task.pipeline_uid()); let shard_ids_str = shard_ids.iter().sorted().join(","); - let value = format!("{index_uid}:{source_id}:{shard_ids_str}"); + let fingerprint = indexing_task.params_fingerprint; + let value = format!("{index_uid}:{source_id}:{fingerprint}:{shard_ids_str}"); (key, value) } @@ -536,8 +537,12 @@ fn parse_shard_ids_str(shard_ids_str: &str) -> Vec { fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option { let pipeline_uid_str = key.strip_prefix(INDEXING_TASK_PREFIX)?; let pipeline_uid = PipelineUid::from_str(pipeline_uid_str).ok()?; - let (source_uid, shards_str) = value.rsplit_once(':')?; - let (index_uid, source_id) = source_uid.rsplit_once(':')?; + let mut field_iterator = value.rsplitn(4, ':'); + let shards_str = field_iterator.next()?; + let fingerprint_str = field_iterator.next()?; + let source_id = field_iterator.next()?; + let index_uid = field_iterator.next()?; + let params_fingerprint: u64 = fingerprint_str.parse().ok()?; let index_uid = index_uid.parse().ok()?; let shard_ids = parse_shard_ids_str(shards_str); Some(IndexingTask { @@ -545,7 +550,7 @@ fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option source_id: source_id.to_string(), pipeline_uid: Some(pipeline_uid), shard_ids, - params_fingerprint: 0, + params_fingerprint, }) } @@ -1143,11 +1148,11 @@ mod tests { let mut chitchat_guard = chitchat_handle.lock().await; chitchat_guard.self_node_state().set( format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS0"), - "my_index:00000000000000000000000000:my_source:1,3".to_string(), + "my_index:00000000000000000000000000:my_source:41:1,3".to_string(), ); chitchat_guard.self_node_state().set( format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS1"), - "my_index-00000000000000000000000000-my_source:3,5".to_string(), + "my_index-00000000000000000000000000-my_source:53:3,5".to_string(), ); } node.wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5)) @@ -1358,14 +1363,15 @@ mod tests { #[test] fn test_parse_chitchat_kv() { assert!( - chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:1,3").is_none() + chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:42:1,3").is_none() ); let task = super::chitchat_kv_to_indexing_task( "indexer.task:01BX5ZZKBKACTAV9WEVGEMMVS0", - "my_index:00000000000000000000000000:my_source:00000000000000000001,\ + "my_index:00000000000000000000000000:my_source:42:00000000000000000001,\ 00000000000000000003", ) .unwrap(); + assert_eq!(task.params_fingerprint, 42); assert_eq!( task.pipeline_uid(), PipelineUid::from_str("01BX5ZZKBKACTAV9WEVGEMMVS0").unwrap() diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 4087f2ed230..7ab58bb873f 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -120,6 +120,7 @@ pub struct IndexingPipeline { handles_opt: Option, // Killswitch used for the actors in the pipeline. This is not the supervisor killswitch. kill_switch: KillSwitch, + // The set of shard is something that can change dynamically without necessarily // requiring a respawn of the pipeline. // We keep the list of shards here however, to reassign them after a respawn. @@ -158,12 +159,16 @@ impl Actor for IndexingPipeline { impl IndexingPipeline { pub fn new(params: IndexingPipelineParams) -> Self { + let params_fingerprint = params.params_fingerprint; IndexingPipeline { params, previous_generations_statistics: Default::default(), handles_opt: None, kill_switch: KillSwitch::default(), - statistics: IndexingStatistics::default(), + statistics: IndexingStatistics { + params_fingerprint, + ..Default::default() + }, shard_ids: Default::default(), } } @@ -264,6 +269,7 @@ impl IndexingPipeline { .set_num_spawn_attempts(self.statistics.num_spawn_attempts); let pipeline_metrics_opt = handles.indexer.last_observation().pipeline_metrics_opt; self.statistics.pipeline_metrics_opt = pipeline_metrics_opt; + self.statistics.params_fingerprint = self.params.params_fingerprint; self.statistics.shard_ids.clone_from(&self.shard_ids); ctx.observe(self); } @@ -587,6 +593,7 @@ pub struct IndexingPipelineParams { pub source_storage_resolver: StorageResolver, pub ingester_pool: IngesterPool, pub queues_dir_path: PathBuf, + pub params_fingerprint: u64, pub event_broker: EventBroker, } @@ -716,6 +723,7 @@ mod tests { cooperative_indexing_permits: None, merge_planner_mailbox, event_broker: EventBroker::default(), + params_fingerprint: 42u64, }; let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); @@ -828,6 +836,7 @@ mod tests { cooperative_indexing_permits: None, merge_planner_mailbox, event_broker: Default::default(), + params_fingerprint: 42u64, }; let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); @@ -926,6 +935,7 @@ mod tests { cooperative_indexing_permits: None, merge_planner_mailbox: merge_planner_mailbox.clone(), event_broker: Default::default(), + params_fingerprint: 42u64, }; let indexing_pipeline = IndexingPipeline::new(indexing_pipeline_params); let (_indexing_pipeline_mailbox, indexing_pipeline_handler) = @@ -1051,6 +1061,7 @@ mod tests { max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, merge_planner_mailbox, + params_fingerprint: 42u64, event_broker: Default::default(), }; let pipeline = IndexingPipeline::new(pipeline_params); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 757d434adca..df71cc92ea4 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -312,10 +312,12 @@ impl IndexingService { let max_concurrent_split_uploads_merge = (self.max_concurrent_split_uploads - max_concurrent_split_uploads_index).max(1); + let params_fingerprint = index_config.indexing_params_fingerprint(); let pipeline_params = IndexingPipelineParams { pipeline_id: indexing_pipeline_id.clone(), metastore: self.metastore.clone(), storage, + // Indexing-related parameters doc_mapper, indexing_directory, @@ -323,6 +325,7 @@ impl IndexingService { split_store, max_concurrent_split_uploads_index, cooperative_indexing_permits: self.cooperative_indexing_permits.clone(), + // Merge-related parameters merge_policy, max_concurrent_split_uploads_merge, @@ -333,6 +336,7 @@ impl IndexingService { ingester_pool: self.ingester_pool.clone(), queues_dir_path: self.queue_dir_path.clone(), source_storage_resolver: self.storage_resolver.clone(), + params_fingerprint, event_broker: self.event_broker.clone(), }; @@ -755,20 +759,14 @@ impl IndexingService { .indexing_pipelines .values() .map(|pipeline_handle| { - let shard_ids: Vec = pipeline_handle - .handle - .last_observation() - .shard_ids - .iter() - .cloned() - .collect(); - + let assignment = pipeline_handle.handle.last_observation(); + let shard_ids: Vec = assignment.shard_ids.iter().cloned().collect(); IndexingTask { index_uid: Some(pipeline_handle.indexing_pipeline_id.index_uid.clone()), source_id: pipeline_handle.indexing_pipeline_id.source_id.clone(), pipeline_uid: Some(pipeline_handle.indexing_pipeline_id.pipeline_uid), shard_ids, - params_fingerprint: 0, + params_fingerprint: assignment.params_fingerprint, } }) .collect(); @@ -1192,6 +1190,8 @@ mod tests { #[tokio::test] async fn test_indexing_service_apply_plan() { + const PARAMS_FINGERPRINT: u64 = 3865067856550546352u64; + quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) @@ -1251,14 +1251,14 @@ mod tests { source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(0u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, ]; indexing_service @@ -1297,28 +1297,28 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(2u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, ]; indexing_service @@ -1359,21 +1359,21 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, ]; indexing_service diff --git a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs index 21f84e678da..68b44a9744b 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs @@ -56,6 +56,7 @@ pub struct IndexingStatistics { // List of shard ids. #[schema(value_type = Vec)] pub shard_ids: BTreeSet, + pub params_fingerprint: u64, } impl IndexingStatistics { From 2dcc696433e1e31c0c330ef07d0899cae2230989 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 1 Oct 2024 09:46:30 +0200 Subject: [PATCH 4/5] improve gc resilience and add metrics (#5420) * add search after to listing splits * don't stop gcing after first error * add metrics to garbage collector * add test for sort and fix bug in file metastore with multiple index_uids * add test for search after split --- .../src/garbage_collection.rs | 73 +++- .../quickwit-index-management/src/index.rs | 1 + quickwit/quickwit-index-management/src/lib.rs | 2 +- .../src/actors/garbage_collector.rs | 124 ++++-- quickwit/quickwit-janitor/src/metrics.rs | 36 +- .../file_backed/file_backed_index/mod.rs | 61 ++- .../src/metastore/file_backed/mod.rs | 16 +- .../quickwit-metastore/src/metastore/mod.rs | 42 ++- .../src/metastore/postgres/metastore.rs | 20 +- .../src/metastore/postgres/utils.rs | 19 +- .../src/tests/list_splits.rs | 353 ++++++++++++++++++ quickwit/quickwit-metastore/src/tests/mod.rs | 14 + 12 files changed, 670 insertions(+), 91 deletions(-) diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 4a24aedc012..74971077b3b 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -25,6 +25,7 @@ use std::time::Duration; use anyhow::Context; use futures::{Future, StreamExt}; use itertools::Itertools; +use quickwit_common::metrics::IntCounter; use quickwit_common::pretty::PrettySample; use quickwit_common::Progress; use quickwit_metastore::{ @@ -44,6 +45,26 @@ use tracing::{error, instrument}; /// The maximum number of splits that the GC should delete per attempt. const DELETE_SPLITS_BATCH_SIZE: usize = 10_000; +pub struct GcMetrics { + pub deleted_splits: IntCounter, + pub deleted_bytes: IntCounter, + pub failed_splits: IntCounter, +} + +trait RecordGcMetrics { + fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize); +} + +impl RecordGcMetrics for Option { + fn record(&self, num_deleted_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize) { + if let Some(metrics) = self { + metrics.deleted_splits.inc_by(num_deleted_splits as u64); + metrics.deleted_bytes.inc_by(num_deleted_bytes); + metrics.failed_splits.inc_by(num_failed_splits as u64); + } + } +} + /// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from /// storage and metastore. #[derive(Error, Debug)] @@ -94,6 +115,7 @@ pub async fn run_garbage_collect( deletion_grace_period: Duration, dry_run: bool, progress_opt: Option<&Progress>, + metrics: Option, ) -> anyhow::Result { let grace_period_timestamp = OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64; @@ -170,6 +192,7 @@ pub async fn run_garbage_collect( metastore, indexes, progress_opt, + metrics, ) .await) } @@ -179,6 +202,7 @@ async fn delete_splits( storages: &HashMap>, metastore: MetastoreServiceClient, progress_opt: Option<&Progress>, + metrics: &Option, split_removal_info: &mut SplitRemovalInfo, ) -> Result<(), ()> { let mut delete_split_from_index_res_stream = @@ -219,9 +243,26 @@ async fn delete_splits( while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await { match delete_split_result { Ok(entries) => { + let deleted_bytes = entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + let deleted_splits_count = entries.len(); + + metrics.record(deleted_splits_count, deleted_bytes, 0); split_removal_info.removed_split_entries.extend(entries); } Err(delete_split_error) => { + let deleted_bytes = delete_split_error + .successes + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + let deleted_splits_count = delete_split_error.successes.len(); + let failed_splits_count = delete_split_error.storage_failures.len() + + delete_split_error.metastore_failures.len(); + + metrics.record(deleted_splits_count, deleted_bytes, failed_splits_count); split_removal_info .removed_split_entries .extend(delete_split_error.successes); @@ -265,13 +306,14 @@ async fn list_splits_metadata( /// /// The aim of this is to spread the load out across a longer period /// rather than short, heavy bursts on the metastore and storage system itself. -#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))] +#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))] async fn delete_splits_marked_for_deletion_several_indexes( index_uids: Vec, updated_before_timestamp: i64, metastore: MetastoreServiceClient, storages: HashMap>, progress_opt: Option<&Progress>, + metrics: Option, ) -> SplitRemovalInfo { let mut split_removal_info = SplitRemovalInfo::default(); @@ -280,7 +322,7 @@ async fn delete_splits_marked_for_deletion_several_indexes( return split_removal_info; }; - let list_splits_query = list_splits_query + let mut list_splits_query = list_splits_query .with_split_state(SplitState::MarkedForDeletion) .with_update_timestamp_lte(updated_before_timestamp) .with_limit(DELETE_SPLITS_BATCH_SIZE) @@ -300,11 +342,13 @@ async fn delete_splits_marked_for_deletion_several_indexes( } }; - let num_splits_to_delete = splits_metadata_to_delete.len(); - - if num_splits_to_delete == 0 { + // set split after which to search for the next loop + let Some(last_split_metadata) = splits_metadata_to_delete.last() else { break; - } + }; + list_splits_query = list_splits_query.after_split(last_split_metadata); + + let num_splits_to_delete = splits_metadata_to_delete.len(); let splits_metadata_to_delete_per_index: HashMap> = splits_metadata_to_delete @@ -312,18 +356,20 @@ async fn delete_splits_marked_for_deletion_several_indexes( .map(|meta| (meta.index_uid.clone(), meta)) .into_group_map(); - let delete_split_res = delete_splits( + // ignore return we continue either way + let _: Result<(), ()> = delete_splits( splits_metadata_to_delete_per_index, &storages, metastore.clone(), progress_opt, + &metrics, &mut split_removal_info, ) .await; - if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || delete_split_res.is_err() { - // stop the gc if this was the last batch or we encountered an error - // (otherwise we might try deleting the same splits in an endless loop) + if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE { + // stop the gc if this was the last batch + // we are guaranteed to make progress due to .after_split() break; } } @@ -345,7 +391,7 @@ pub async fn delete_splits_from_storage_and_metastore( metastore: MetastoreServiceClient, splits: Vec, progress_opt: Option<&Progress>, -) -> anyhow::Result, DeleteSplitsError> { +) -> Result, DeleteSplitsError> { let mut split_infos: HashMap = HashMap::with_capacity(splits.len()); for split in splits { @@ -511,6 +557,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -538,6 +585,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -615,6 +663,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -642,6 +691,7 @@ mod tests { Duration::from_secs(0), false, None, + None, ) .await .unwrap(); @@ -680,6 +730,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 5d4dc5ec149..0fe5c77cc2b 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -373,6 +373,7 @@ impl IndexService { Duration::ZERO, dry_run, None, + None, ) .await?; diff --git a/quickwit/quickwit-index-management/src/lib.rs b/quickwit/quickwit-index-management/src/lib.rs index 93b6ee6d1c3..65a7ef861ce 100644 --- a/quickwit/quickwit-index-management/src/lib.rs +++ b/quickwit/quickwit-index-management/src/lib.rs @@ -20,5 +20,5 @@ mod garbage_collection; mod index; -pub use garbage_collection::run_garbage_collect; +pub use garbage_collection::{run_garbage_collect, GcMetrics}; pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError}; diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index fbfdeb2b1e1..bb82e55d856 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -20,13 +20,13 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::{stream, StreamExt}; use quickwit_actors::{Actor, ActorContext, Handler}; use quickwit_common::shared_consts::split_deletion_grace_period; -use quickwit_index_management::run_garbage_collect; +use quickwit_index_management::{run_garbage_collect, GcMetrics}; use quickwit_metastore::ListIndexesMetadataResponseExt; use quickwit_proto::metastore::{ ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, @@ -36,6 +36,8 @@ use quickwit_storage::{Storage, StorageResolver}; use serde::Serialize; use tracing::{debug, error, info}; +use crate::metrics::JANITOR_METRICS; + const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes /// Staged files needs to be deleted if there was a failure. @@ -51,10 +53,10 @@ pub struct GarbageCollectorCounters { pub num_deleted_files: usize, /// The number of bytes deleted. pub num_deleted_bytes: usize, - /// The number of failed garbage collection run on an index. - pub num_failed_gc_run_on_index: usize, - /// The number of successful garbage collection run on an index. - pub num_successful_gc_run_on_index: usize, + /// The number of failed garbage collection run. + pub num_failed_gc_run: usize, + /// The number of successful garbage collection run. + pub num_successful_gc_run: usize, /// The number or failed storage resolution. pub num_failed_storage_resolution: usize, /// The number of splits that were unable to be removed. @@ -86,6 +88,8 @@ impl GarbageCollector { debug!("loading indexes from the metastore"); self.counters.num_passes += 1; + let start = Instant::now(); + let response = match self .metastore .list_indexes_metadata(ListIndexesMetadataRequest::all()) @@ -137,23 +141,43 @@ impl GarbageCollector { split_deletion_grace_period(), false, Some(ctx.progress()), + Some(GcMetrics { + deleted_splits: JANITOR_METRICS + .gc_deleted_splits + .with_label_values(["success"]) + .clone(), + deleted_bytes: JANITOR_METRICS.gc_deleted_bytes.clone(), + failed_splits: JANITOR_METRICS + .gc_deleted_splits + .with_label_values(["error"]) + .clone(), + }), ) .await; + let run_duration = start.elapsed().as_secs(); + JANITOR_METRICS.gc_seconds_total.inc_by(run_duration); + let deleted_file_entries = match gc_res { Ok(removal_info) => { - self.counters.num_successful_gc_run_on_index += 1; + self.counters.num_successful_gc_run += 1; + JANITOR_METRICS.gc_runs.with_label_values(["success"]).inc(); self.counters.num_failed_splits += removal_info.failed_splits.len(); removal_info.removed_split_entries } Err(error) => { - self.counters.num_failed_gc_run_on_index += 1; + self.counters.num_failed_gc_run += 1; + JANITOR_METRICS.gc_runs.with_label_values(["error"]).inc(); error!(error=?error, "failed to run garbage collection"); return; } }; if !deleted_file_entries.is_empty() { let num_deleted_splits = deleted_file_entries.len(); + let num_deleted_bytes = deleted_file_entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64() as usize) + .sum::(); let deleted_files: HashSet<&Path> = deleted_file_entries .iter() .map(|deleted_entry| deleted_entry.file_name.as_path()) @@ -163,11 +187,8 @@ impl GarbageCollector { num_deleted_splits = num_deleted_splits, "Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits, ); - self.counters.num_deleted_files += deleted_file_entries.len(); - self.counters.num_deleted_bytes += deleted_file_entries - .iter() - .map(|entry| entry.file_size_bytes.as_u64() as usize) - .sum::(); + self.counters.num_deleted_files += num_deleted_splits; + self.counters.num_deleted_bytes += num_deleted_bytes; } } } @@ -348,6 +369,7 @@ mod tests { split_deletion_grace_period(), false, None, + None, ) .await; assert!(result.is_ok()); @@ -497,9 +519,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 2); assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); // 30 secs later @@ -508,9 +530,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 2); assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); // 60 secs later @@ -519,9 +541,9 @@ mod tests { assert_eq!(counters.num_passes, 2); assert_eq!(counters.num_deleted_files, 4); assert_eq!(counters.num_deleted_bytes, 80); - assert_eq!(counters.num_successful_gc_run_on_index, 2); + assert_eq!(counters.num_successful_gc_run, 2); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); universe.assert_quit().await; } @@ -585,9 +607,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 0); assert_eq!(counters.num_deleted_bytes, 0); - assert_eq!(counters.num_successful_gc_run_on_index, 0); + assert_eq!(counters.num_successful_gc_run, 0); assert_eq!(counters.num_failed_storage_resolution, 1); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); universe.assert_quit().await; } @@ -608,7 +630,7 @@ mod tests { }); mock_metastore .expect_list_splits() - .times(2) + .times(3) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); assert_eq!(query.index_uids.len(), 2); @@ -616,24 +638,40 @@ mod tests { .contains(&query.index_uids[0].index_id.as_ref())); assert!(["test-index-1", "test-index-2"] .contains(&query.index_uids[1].index_id.as_ref())); - let splits = match query.split_states[0] { + let splits_ids_string: Vec = + (0..8000).map(|seq| format!("split-{seq:04}")).collect(); + let splits_ids: Vec<&str> = splits_ids_string + .iter() + .map(|string| string.as_str()) + .collect(); + let mut splits = match query.split_states[0] { SplitState::Staged => { let mut splits = make_splits("test-index-1", &["a"], SplitState::Staged); splits.append(&mut make_splits("test-index-2", &["a"], SplitState::Staged)); splits } SplitState::MarkedForDeletion => { + assert_eq!(query.limit, Some(10_000)); let mut splits = - make_splits("test-index-1", &["a", "b"], SplitState::MarkedForDeletion); + make_splits("test-index-1", &splits_ids, SplitState::MarkedForDeletion); splits.append(&mut make_splits( "test-index-2", - &["a", "b"], + &splits_ids, SplitState::MarkedForDeletion, )); splits } _ => panic!("only Staged and MarkedForDeletion expected."), }; + if let Some((index_uid, split_id)) = query.after_split { + splits.retain(|split| { + ( + &split.split_metadata.index_uid, + &split.split_metadata.split_id, + ) > (&index_uid, &split_id) + }); + } + splits.truncate(10_000); let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); Ok(ServiceStream::from(vec![Ok(splits)])) }); @@ -648,7 +686,7 @@ mod tests { }); mock_metastore .expect_delete_splits() - .times(2) + .times(3) .returning(|delete_splits_request| { let index_uid: IndexUid = delete_splits_request.index_uid().clone(); let split_ids = HashSet::<&str>::from_iter( @@ -657,14 +695,30 @@ mod tests { .iter() .map(|split_id| split_id.as_str()), ); - let expected_split_ids = HashSet::<&str>::from_iter(["a", "b"]); - - assert_eq!(split_ids, expected_split_ids); + if index_uid.index_id == "test-index-1" { + assert_eq!(split_ids.len(), 8000); + for seq in 0..8000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else if split_ids.len() == 2000 { + for seq in 0..2000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else if split_ids.len() == 6000 { + for seq in 2000..8000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else { + panic!(); + } // This should not cause the whole run to fail and return an error, // instead this should simply get logged and return the list of splits // which have successfully been deleted. - if index_uid.index_id == "test-index-2" { + if index_uid.index_id == "test-index-2" && split_ids.len() == 2000 { Err(MetastoreError::Db { message: "fail to delete".to_string(), }) @@ -682,12 +736,12 @@ mod tests { let counters = handle.process_pending_and_observe().await.state; assert_eq!(counters.num_passes, 1); - assert_eq!(counters.num_deleted_files, 2); - assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_deleted_files, 14000); + assert_eq!(counters.num_deleted_bytes, 20 * 14000); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); - assert_eq!(counters.num_failed_splits, 2); + assert_eq!(counters.num_failed_gc_run, 0); + assert_eq!(counters.num_failed_splits, 2000); universe.assert_quit().await; } } diff --git a/quickwit/quickwit-janitor/src/metrics.rs b/quickwit/quickwit-janitor/src/metrics.rs index d3392af7b3f..0f3760e6d87 100644 --- a/quickwit/quickwit-janitor/src/metrics.rs +++ b/quickwit/quickwit-janitor/src/metrics.rs @@ -18,10 +18,18 @@ // along with this program. If not, see . use once_cell::sync::Lazy; -use quickwit_common::metrics::{new_gauge_vec, IntGaugeVec}; +use quickwit_common::metrics::{ + new_counter, new_counter_vec, new_gauge_vec, IntCounter, IntCounterVec, IntGaugeVec, +}; pub struct JanitorMetrics { pub ongoing_num_delete_operations_total: IntGaugeVec<1>, + pub gc_deleted_splits: IntCounterVec<1>, + pub gc_deleted_bytes: IntCounter, + pub gc_runs: IntCounterVec<1>, + pub gc_seconds_total: IntCounter, + // TODO having a current run duration which is 0|undefined out of run, and returns `now - + // start_time` during a run would be nice } impl Default for JanitorMetrics { @@ -34,6 +42,32 @@ impl Default for JanitorMetrics { &[], ["index"], ), + gc_deleted_splits: new_counter_vec( + "gc_deleted_splits_total", + "Total number of splits deleted by the garbage collector.", + "quickwit_janitor", + &[], + ["result"], + ), + gc_deleted_bytes: new_counter( + "gc_deleted_bytes_total", + "Total number of bytes deleted by the garbage collector.", + "quickwit_janitor", + &[], + ), + gc_runs: new_counter_vec( + "gc_runs_total", + "Total number of garbage collector execition.", + "quickwit_janitor", + &[], + ["result"], + ), + gc_seconds_total: new_counter( + "gc_seconds_total", + "Total time spent running the garbage collector", + "quickwit_janitor", + &[], + ), } } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 265697b0e81..e4d6799cefa 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -424,48 +424,32 @@ impl FileBackedIndex { /// Lists splits. pub(crate) fn list_splits(&self, query: &ListSplitsQuery) -> MetastoreResult> { - let limit = query.limit.unwrap_or(usize::MAX); - let offset = query.offset.unwrap_or_default(); - - let splits: Vec = match query.sort_by { - SortBy::Staleness => self - .splits + let limit = query + .limit + .map(|limit| limit + query.offset.unwrap_or_default()) + .unwrap_or(usize::MAX); + // skip is done at a higher layer in case other indexes give spltis that would go before + // ours + + let results = if query.sort_by == SortBy::None { + // internally sorted_unstable_by collect everything to an intermediary vec. When not + // sorting at all, skip that. + self.splits .values() .filter(|split| split_query_predicate(split, query)) - .sorted_unstable_by(|left_split, right_split| { - left_split - .split_metadata - .delete_opstamp - .cmp(&right_split.split_metadata.delete_opstamp) - .then_with(|| { - left_split - .publish_timestamp - .cmp(&right_split.publish_timestamp) - }) - }) - .skip(offset) .take(limit) .cloned() - .collect(), - SortBy::IndexUid => self - .splits - .values() - .filter(|split| split_query_predicate(split, query)) - .sorted_unstable_by_key(|split| &split.split_metadata.index_uid) - .skip(offset) - .take(limit) - .cloned() - .collect(), - SortBy::None => self - .splits + .collect() + } else { + self.splits .values() .filter(|split| split_query_predicate(split, query)) - .skip(offset) + .sorted_unstable_by(|lhs, rhs| query.sort_by.compare(lhs, rhs)) .take(limit) .cloned() - .collect(), + .collect() }; - Ok(splits) + Ok(results) } /// Deletes a split. @@ -762,6 +746,17 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool { } } + if let Some((index_uid, split_id)) = &query.after_split { + if *index_uid > split.split_metadata.index_uid { + return false; + } + if *index_uid == split.split_metadata.index_uid + && *split_id >= split.split_metadata.split_id + { + return false; + } + } + true } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 729e10b6fe0..10bbd814949 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -380,7 +380,7 @@ impl FileBackedMetastore { /// No error is returned if any of the requested `index_uid` does not exist. async fn list_splits_inner(&self, request: ListSplitsRequest) -> MetastoreResult> { let list_splits_query = request.deserialize_list_splits_query()?; - let mut all_splits = Vec::new(); + let mut splits_per_index = Vec::with_capacity(list_splits_query.index_uids.len()); for index_uid in &list_splits_query.index_uids { let splits = match self .read(index_uid, |index| index.list_splits(&list_splits_query)) @@ -393,9 +393,19 @@ impl FileBackedMetastore { } Err(error) => return Err(error), }; - all_splits.extend(splits); + splits_per_index.push(splits); } - Ok(all_splits) + + let limit = list_splits_query.limit.unwrap_or(usize::MAX); + let offset = list_splits_query.offset.unwrap_or_default(); + + let merged_results = splits_per_index + .into_iter() + .kmerge_by(|lhs, rhs| list_splits_query.sort_by.compare(lhs, rhs).is_lt()) + .skip(offset) + .take(limit) + .collect(); + Ok(merged_results) } /// Helper used for testing to obtain the data associated with the given index. diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 06211e1f63a..56dc8b84abf 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -24,6 +24,7 @@ pub mod postgres; pub mod control_plane_metastore; +use std::cmp::Ordering; use std::ops::{Bound, RangeInclusive}; use async_trait::async_trait; @@ -632,6 +633,9 @@ pub struct ListSplitsQuery { /// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending /// order. pub sort_by: SortBy, + + /// Only return splits whose (index_uid, split_id) are lexicographically after this split + pub after_split: Option<(IndexUid, SplitId)>, } #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] @@ -641,6 +645,33 @@ pub enum SortBy { IndexUid, } +impl SortBy { + fn compare(&self, left_split: &Split, right_split: &Split) -> Ordering { + match self { + SortBy::None => Ordering::Equal, + SortBy::Staleness => left_split + .split_metadata + .delete_opstamp + .cmp(&right_split.split_metadata.delete_opstamp) + .then_with(|| { + left_split + .publish_timestamp + .cmp(&right_split.publish_timestamp) + }), + SortBy::IndexUid => left_split + .split_metadata + .index_uid + .cmp(&right_split.split_metadata.index_uid) + .then_with(|| { + left_split + .split_metadata + .split_id + .cmp(&right_split.split_metadata.split_id) + }), + } + } +} + #[allow(unused_attributes)] impl ListSplitsQuery { /// Creates a new [`ListSplitsQuery`] for the designated index. @@ -658,6 +689,7 @@ impl ListSplitsQuery { create_timestamp: Default::default(), mature: Bound::Unbounded, sort_by: SortBy::None, + after_split: None, } } @@ -680,6 +712,7 @@ impl ListSplitsQuery { create_timestamp: Default::default(), mature: Bound::Unbounded, sort_by: SortBy::None, + after_split: None, }) } @@ -850,11 +883,18 @@ impl ListSplitsQuery { self } - /// Sorts the splits by index_uid. + /// Sorts the splits by index_uid and split_id. pub fn sort_by_index_uid(mut self) -> Self { self.sort_by = SortBy::IndexUid; self } + + /// Only return splits whose (index_uid, split_id) are lexicographically after this split. + /// This is only useful if results are sorted by index_uid and split_id. + pub fn after_split(mut self, split_meta: &SplitMetadata) -> Self { + self.after_split = Some((split_meta.index_uid.clone(), split_meta.split_id.clone())); + self + } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 008b611a36b..ce0d84468e5 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2076,7 +2076,25 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC"# + r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC, "split_id" ASC"# + ) + ); + + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + + let query = + ListSplitsQuery::for_index(index_uid.clone()).after_split(&crate::SplitMetadata { + index_uid: index_uid.clone(), + split_id: "my_split".to_string(), + ..Default::default() + }); + append_query_filters(sql, &query); + + assert_eq!( + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("index_uid", "split_id") > ('{index_uid}', 'my_split')"# ) ); } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 6e850ae0fed..65ef9ce1df6 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -187,15 +187,24 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits Expr::expr(val) }); + if let Some((index_uid, split_id)) = &query.after_split { + sql.cond_where( + Expr::tuple([ + Expr::col(Splits::IndexUid).into(), + Expr::col(Splits::SplitId).into(), + ]) + .gt(Expr::tuple([Expr::value(index_uid), Expr::value(split_id)])), + ); + } + match query.sort_by { SortBy::Staleness => { - sql.order_by( - (Splits::DeleteOpstamp, Splits::PublishTimestamp), - Order::Asc, - ); + sql.order_by(Splits::DeleteOpstamp, Order::Asc) + .order_by(Splits::PublishTimestamp, Order::Asc); } SortBy::IndexUid => { - sql.order_by(Splits::IndexUid, Order::Asc); + sql.order_by(Splits::IndexUid, Order::Asc) + .order_by(Splits::SplitId, Order::Asc); } SortBy::None => (), } diff --git a/quickwit/quickwit-metastore/src/tests/list_splits.rs b/quickwit/quickwit-metastore/src/tests/list_splits.rs index cd1cc1712f3..de9c43b7e01 100644 --- a/quickwit/quickwit-metastore/src/tests/list_splits.rs +++ b/quickwit/quickwit-metastore/src/tests/list_splits.rs @@ -1155,3 +1155,356 @@ pub async fn test_metastore_list_stale_splits< cleanup_index(&mut metastore, index_uid).await; } } + +pub async fn test_metastore_list_sorted_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + + let split_id = append_random_suffix("test-list-sorted-splits-"); + let index_id_1 = append_random_suffix("test-list-sorted-splits-1"); + let index_uid_1 = IndexUid::new_with_random_ulid(&index_id_1); + let index_uri_1 = format!("ram:///indexes/{index_id_1}"); + let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); + + let index_id_2 = append_random_suffix("test-list-sorted-splits-2"); + let index_uid_2 = IndexUid::new_with_random_ulid(&index_id_2); + let index_uri_2 = format!("ram:///indexes/{index_id_2}"); + let index_config_2 = IndexConfig::for_test(&index_id_2, &index_uri_2); + + let split_id_1 = format!("{split_id}--split-1"); + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.clone(), + index_uid: index_uid_1.clone(), + delete_opstamp: 5, + ..Default::default() + }; + let split_id_2 = format!("{split_id}--split-2"); + let split_metadata_2 = SplitMetadata { + split_id: split_id_2.clone(), + index_uid: index_uid_2.clone(), + delete_opstamp: 3, + ..Default::default() + }; + let split_id_3 = format!("{split_id}--split-3"); + let split_metadata_3 = SplitMetadata { + split_id: split_id_3.clone(), + index_uid: index_uid_1.clone(), + delete_opstamp: 1, + ..Default::default() + }; + let split_id_4 = format!("{split_id}--split-4"); + let split_metadata_4 = SplitMetadata { + split_id: split_id_4.clone(), + index_uid: index_uid_2.clone(), + delete_opstamp: 0, + ..Default::default() + }; + let split_id_5 = format!("{split_id}--split-5"); + let split_metadata_5 = SplitMetadata { + split_id: split_id_5.clone(), + index_uid: index_uid_1.clone(), + delete_opstamp: 2, + ..Default::default() + }; + let split_id_6 = format!("{split_id}--split-6"); + let split_metadata_6 = SplitMetadata { + split_id: split_id_6.clone(), + index_uid: index_uid_2.clone(), + delete_opstamp: 4, + ..Default::default() + }; + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid_1: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); + let index_uid_2: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + { + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_1.clone(), + vec![split_metadata_1, split_metadata_3, split_metadata_5], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_1.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_1.clone(), vec![split_id_3.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_2.clone(), + vec![split_metadata_2, split_metadata_4, split_metadata_6], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_2.clone()), + staged_split_ids: vec![split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_2.clone(), vec![split_id_4.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + } + + let query = + ListSplitsQuery::try_from_index_uids(vec![index_uid_1.clone(), index_uid_2.clone()]) + .unwrap() + .sort_by_staleness(); + let splits = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query(&query).unwrap()) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); + // we don't use collect_split_ids because it sorts splits internally + let split_ids = splits + .iter() + .map(|split| split.split_id()) + .collect::>(); + assert_eq!( + split_ids, + &[ + &split_id_4, + &split_id_3, + &split_id_5, + &split_id_2, + &split_id_6, + &split_id_1, + ] + ); + + let query = + ListSplitsQuery::try_from_index_uids(vec![index_uid_1.clone(), index_uid_2.clone()]) + .unwrap() + .sort_by_index_uid(); + let splits = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query(&query).unwrap()) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); + // we don't use collect_split_ids because it sorts splits internally + let split_ids = splits + .iter() + .map(|split| split.split_id()) + .collect::>(); + assert_eq!( + split_ids, + &[ + &split_id_1, + &split_id_3, + &split_id_5, + &split_id_2, + &split_id_4, + &split_id_6, + ] + ); + + cleanup_index(&mut metastore, index_uid_1.clone()).await; + cleanup_index(&mut metastore, index_uid_2.clone()).await; +} + +pub async fn test_metastore_list_after_split< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + + let split_id = append_random_suffix("test-list-sorted-splits-"); + let index_id_1 = append_random_suffix("test-list-sorted-splits-1"); + let index_uri_1 = format!("ram:///indexes/{index_id_1}"); + let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); + + let index_id_2 = append_random_suffix("test-list-sorted-splits-2"); + let index_uri_2 = format!("ram:///indexes/{index_id_2}"); + let index_config_2 = IndexConfig::for_test(&index_id_2, &index_uri_2); + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid_1: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); + let index_uid_2: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let split_id_1 = format!("{split_id}--split-1"); + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_2 = format!("{split_id}--split-2"); + let split_metadata_2 = SplitMetadata { + split_id: split_id_2.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + let split_id_3 = format!("{split_id}--split-3"); + let split_metadata_3 = SplitMetadata { + split_id: split_id_3.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_4 = format!("{split_id}--split-4"); + let split_metadata_4 = SplitMetadata { + split_id: split_id_4.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + let split_id_5 = format!("{split_id}--split-5"); + let split_metadata_5 = SplitMetadata { + split_id: split_id_5.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_6 = format!("{split_id}--split-6"); + let split_metadata_6 = SplitMetadata { + split_id: split_id_6.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + + { + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_1.clone(), + vec![ + split_metadata_1.clone(), + split_metadata_3.clone(), + split_metadata_5.clone(), + ], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_1.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_1.clone(), vec![split_id_3.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_2.clone(), + vec![ + split_metadata_2.clone(), + split_metadata_4.clone(), + split_metadata_6.clone(), + ], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_2.clone()), + staged_split_ids: vec![split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_2.clone(), vec![split_id_4.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + } + + let expected_all = [ + &split_metadata_1, + &split_metadata_3, + &split_metadata_5, + &split_metadata_2, + &split_metadata_4, + &split_metadata_6, + ]; + + for i in 0..expected_all.len() { + let after = expected_all[i]; + let expected_res = expected_all[(i + 1)..] + .iter() + .map(|split| (&split.index_uid, &split.split_id)) + .collect::>(); + + let query = + ListSplitsQuery::try_from_index_uids(vec![index_uid_1.clone(), index_uid_2.clone()]) + .unwrap() + .sort_by_index_uid() + .after_split(after); + let splits = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query(&query).unwrap()) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); + // we don't use collect_split_ids because it sorts splits internally + let split_ids = splits + .iter() + .map(|split| { + ( + &split.split_metadata.index_uid, + &split.split_metadata.split_id, + ) + }) + .collect::>(); + assert_eq!(split_ids, expected_res,); + } + + cleanup_index(&mut metastore, index_uid_1.clone()).await; + cleanup_index(&mut metastore, index_uid_2.clone()).await; +} diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 3e0add028df..7699c3eb11f 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -431,6 +431,20 @@ macro_rules! metastore_test_suite { $crate::tests::list_splits::test_metastore_list_stale_splits::<$metastore_type>().await; } + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_sorted_splits() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::list_splits::test_metastore_list_sorted_splits::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_after_split() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::list_splits::test_metastore_list_after_split::<$metastore_type>().await; + } + #[tokio::test] #[serial_test::file_serial] async fn test_metastore_update_splits_delete_opstamp() { From 060dfd8929effe7d9dfb1ddb4925d440c2ffce08 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Wed, 2 Oct 2024 14:58:25 +0800 Subject: [PATCH 5/5] document searcher config request_timeout_secs (#5452) * document searcher config request_timeout_secs * add some more details to the docs * update url --- docs/configuration/node-config.md | 2 +- .../tutorials/trace-analytics-with-grafana.md | 14 ++++++++------ docs/reference/metrics.md | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/docs/configuration/node-config.md b/docs/configuration/node-config.md index f99558c368d..d736f03c585 100644 --- a/docs/configuration/node-config.md +++ b/docs/configuration/node-config.md @@ -199,7 +199,7 @@ This section contains the configuration options for a Searcher. | `max_num_concurrent_split_searches` | Maximum number of concurrent split search requests running on a Searcher. | `100` | | `max_num_concurrent_split_streams` | Maximum number of concurrent split stream requests running on a Searcher. | `100` | | `split_cache` | Searcher split cache configuration options defined in the section below. Cache disabled if unspecified. | | - +| `request_timeout_secs` | The time before a search request is cancelled. This should match the timeout of the stack calling into quickwit if there is one set. | `30` | ### Searcher split cache configuration diff --git a/docs/get-started/tutorials/trace-analytics-with-grafana.md b/docs/get-started/tutorials/trace-analytics-with-grafana.md index ea0f55fb8de..6d099b6ee0d 100644 --- a/docs/get-started/tutorials/trace-analytics-with-grafana.md +++ b/docs/get-started/tutorials/trace-analytics-with-grafana.md @@ -12,7 +12,9 @@ You only need a few minutes to get Grafana working with Quickwit and build meani ## Create a Docker Compose recipe -Let's add a [Quickwit instance](../installation.md) with the OTLP service enabled. +First, create a `docker-compose.yml` file. This file will define the services needed to run Quickwit with OpenTelemetry and Grafana with the Quickwit Datasource plugin. + +Below is the complete Docker Compose configuration: ```yaml version: '3.0' @@ -25,23 +27,21 @@ services: ports: - 7280:7280 command: ["run"] -``` -Then we create a [Grafana](https://grafana.com/docs/grafana/latest/setup-grafana/installation/docker/#run-grafana-via-docker-compose) service with the [Quickwit Datasource](https://github.com/quickwit-oss/quickwit-datasource) plugin. - -```yaml grafana: image: grafana/grafana-oss container_name: grafana ports: - "${MAP_HOST_GRAFANA:-127.0.0.1}:3000:3000" environment: - GF_INSTALL_PLUGINS: https://github.com/quickwit-oss/quickwit-datasource/releases/download/v0.3.1/quickwit-quickwit-datasource-0.3.1.zip;quickwit-quickwit-datasource + GF_INSTALL_PLUGINS: https://github.com/quickwit-oss/quickwit-datasource/releases/download/v0.4.6/quickwit-quickwit-datasource-0.4.6.zip;quickwit-quickwit-datasource GF_AUTH_DISABLE_LOGIN_FORM: "true" GF_AUTH_ANONYMOUS_ENABLED: "true" GF_AUTH_ANONYMOUS_ORG_ROLE: Admin ``` +The default Grafana port is 3000. If this port is already taken, you can modify the port mapping, for example, changing 3000:3000 to 3100:3000 or any other available port. + Save and run the recipe: ```bash @@ -99,3 +99,5 @@ Quickwit sends itself its own traces, so you should already have data to display Here's what your first dashboard can look like : ![Quickwit Panel in Grafana Dashboard](../../assets/images/screenshot-grafana-tutorial-dashboard.png) + + diff --git a/docs/reference/metrics.md b/docs/reference/metrics.md index 315b6e3fc9c..77c8b7bd397 100644 --- a/docs/reference/metrics.md +++ b/docs/reference/metrics.md @@ -3,7 +3,7 @@ title: Metrics sidebar_position: 70 --- -Quickwit exposes some key metrics via [Prometheus](https://prometheus.io/). You can use any front-end that supports Prometheus to examine the behavior of Quickwit visually. +Quickwit exposes key metrics in the [Prometheus](https://prometheus.io/) format on the `/metrics` endpoint. You can use any front-end that supports Prometheus to examine the behavior of Quickwit visually. ## Cache Metrics