diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index bbd416cfd7f..0466bcdec61 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -574,7 +574,8 @@ async fn test_cmd_update_index() { index_metadata.index_config.retention_policy_opt, Some(RetentionPolicy { retention_period: String::from("1 week"), - evaluation_schedule: String::from("daily") + evaluation_schedule: String::from("daily"), + timestamp_type: Default::default(), }) ); diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index b4c6b69d7b1..e6e7adb3766 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -206,6 +206,20 @@ pub struct SearchSettings { pub default_search_fields: Vec, } +#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize, Default, utoipa::ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum RetentionTimestampType { + #[default] + Primary, + Secondary, +} + +impl RetentionTimestampType { + pub fn is_primary(&self) -> bool { + matches!(self, RetentionTimestampType::Primary) + } +} + #[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct RetentionPolicy { @@ -220,6 +234,11 @@ pub struct RetentionPolicy { #[serde(default = "RetentionPolicy::default_schedule")] #[serde(rename = "schedule")] pub evaluation_schedule: String, + + /// The target timestamp field to use for retention evaluation. When the + /// range is not in the split's metadata, the split is never deleted. + #[serde(default, skip_serializing_if = "RetentionTimestampType::is_primary")] + pub timestamp_type: RetentionTimestampType, } impl RetentionPolicy { @@ -467,6 +486,7 @@ impl crate::TestableForRegression for IndexConfig { message_mapping, ], timestamp_field: Some("timestamp".to_string()), + secondary_timestamp_field: None, tag_fields: BTreeSet::from_iter(["tenant_id".to_string(), "log_level".to_string()]), partition_key: Some("tenant_id".to_string()), max_num_partitions: NonZeroU32::new(100).unwrap(), @@ -502,6 +522,7 @@ impl crate::TestableForRegression for IndexConfig { let retention_policy_opt = Some(RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "daily".to_string(), + timestamp_type: RetentionTimestampType::Primary, }); IndexConfig { index_id: "my-index".to_string(), @@ -674,6 +695,7 @@ mod tests { let expected_retention_policy = RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "daily".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; assert_eq!( index_config.retention_policy_opt.unwrap(), @@ -853,6 +875,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "hourly".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; let retention_policy_yaml = serde_yaml::to_string(&retention_policy).unwrap(); assert_eq!( @@ -873,6 +896,22 @@ mod tests { let expected_retention_policy = RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "hourly".to_string(), + timestamp_type: RetentionTimestampType::Primary, + }; + assert_eq!(retention_policy, expected_retention_policy); + } + { + let retention_policy_yaml = r#" + period: 90 days + schedule: daily + "#; + let retention_policy = + serde_yaml::from_str::(retention_policy_yaml).unwrap(); + + let expected_retention_policy = RetentionPolicy { + retention_period: "90 days".to_string(), + evaluation_schedule: "daily".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; assert_eq!(retention_policy, expected_retention_policy); } @@ -880,6 +919,7 @@ mod tests { let retention_policy_yaml = r#" period: 90 days schedule: daily + timestamp_type: secondary "#; let retention_policy = serde_yaml::from_str::(retention_policy_yaml).unwrap(); @@ -887,6 +927,7 @@ mod tests { let expected_retention_policy = RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "daily".to_string(), + timestamp_type: RetentionTimestampType::Secondary, }; assert_eq!(retention_policy, expected_retention_policy); } @@ -898,6 +939,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "hourly".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; assert_eq!( retention_policy.retention_period().unwrap(), @@ -907,6 +949,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "foo".to_string(), evaluation_schedule: "hourly".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; assert_eq!( retention_policy.retention_period().unwrap_err().to_string(), @@ -931,6 +974,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "@hourly".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; assert_eq!( retention_policy.evaluation_schedule().unwrap(), @@ -941,6 +985,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "hourly".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; assert_eq!( retention_policy.evaluation_schedule().unwrap(), @@ -951,6 +996,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "0 * * * * *".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; let evaluation_schedule = retention_policy.evaluation_schedule().unwrap(); assert_eq!(evaluation_schedule.seconds().count(), 1); @@ -964,6 +1010,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "hourly".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; retention_policy.validate().unwrap(); } @@ -971,6 +1018,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "foo".to_string(), evaluation_schedule: "hourly".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; retention_policy.validate().unwrap_err(); } @@ -978,6 +1026,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: "foo".to_string(), + timestamp_type: RetentionTimestampType::Primary, }; retention_policy.validate().unwrap_err(); } @@ -990,6 +1039,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "1 hour".to_string(), evaluation_schedule: schedule_str.to_string(), + timestamp_type: RetentionTimestampType::Primary, }; let next_evaluation_duration = chrono::Duration::nanoseconds( diff --git a/quickwit/quickwit-config/src/index_config/serialize.rs b/quickwit/quickwit-config/src/index_config/serialize.rs index 24fcc6d1ac2..01b3692a85d 100644 --- a/quickwit/quickwit-config/src/index_config/serialize.rs +++ b/quickwit/quickwit-config/src/index_config/serialize.rs @@ -255,6 +255,7 @@ mod test { invalid_index_config.retention_policy_opt = Some(RetentionPolicy { retention_period: "90 days".to_string(), evaluation_schedule: "hourly".to_string(), + timestamp_type: Default::default(), }); let validation_err = invalid_index_config .build_and_validate(None) diff --git a/quickwit/quickwit-config/src/index_template/mod.rs b/quickwit/quickwit-config/src/index_template/mod.rs index d312edab1be..32fcad37e8d 100644 --- a/quickwit/quickwit-config/src/index_template/mod.rs +++ b/quickwit/quickwit-config/src/index_template/mod.rs @@ -177,6 +177,7 @@ impl crate::TestableForRegression for IndexTemplate { retention_policy_opt: Some(RetentionPolicy { retention_period: "42 days".to_string(), evaluation_schedule: "daily".to_string(), + timestamp_type: Default::default(), }), } } @@ -236,6 +237,7 @@ mod tests { index_template.retention_policy_opt = Some(RetentionPolicy { retention_period: "42 days".to_string(), evaluation_schedule: "hourly".to_string(), + timestamp_type: Default::default(), }); let default_index_root_uri = Uri::for_test("s3://test-bucket/indexes"); @@ -291,6 +293,7 @@ mod tests { index_template.retention_policy_opt = Some(RetentionPolicy { retention_period: "".to_string(), evaluation_schedule: "".to_string(), + timestamp_type: Default::default(), }); let error = index_template.validate().unwrap_err(); assert!( diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 21ef6455eed..04771add011 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -46,8 +46,8 @@ pub use cluster_config::ClusterConfig; use index_config::serialize::{IndexConfigV0_8, VersionedIndexConfig}; pub use index_config::{ IndexConfig, IndexingResources, IndexingSettings, IngestSettings, RetentionPolicy, - SearchSettings, build_doc_mapper, load_index_config_from_user_config, load_index_config_update, - prepare_doc_mapping_update, + RetentionTimestampType, SearchSettings, build_doc_mapper, load_index_config_from_user_config, + load_index_config_update, prepare_doc_mapping_update, }; pub use quickwit_doc_mapper::DocMapping; use serde::Serialize; @@ -118,6 +118,7 @@ pub fn disable_ingest_v1() -> bool { PulsarSourceParams, RegionOrEndpoint, RetentionPolicy, + RetentionTimestampType, SearchSettings, SourceConfigV0_7, SourceConfigV0_8, diff --git a/quickwit/quickwit-doc-mapper/src/doc_mapper/doc_mapper_impl.rs b/quickwit/quickwit-doc-mapper/src/doc_mapper/doc_mapper_impl.rs index 9530c9bb31c..9d5e40d1f6a 100644 --- a/quickwit/quickwit-doc-mapper/src/doc_mapper/doc_mapper_impl.rs +++ b/quickwit/quickwit-doc-mapper/src/doc_mapper/doc_mapper_impl.rs @@ -75,6 +75,8 @@ pub struct DocMapper { timestamp_field_name: Option, /// Timestamp field path (name parsed) timestamp_field_path: Option>, + /// Secondary timestamp field name. + secondary_timestamp_field_name: Option, /// Root node of the field mapping tree. /// See [`MappingNode`]. field_mappings: MappingNode, @@ -138,6 +140,7 @@ impl From for DocMapperBuilder { mode: default_doc_mapper.mode, field_mappings: default_doc_mapper.field_mappings.into(), timestamp_field: default_doc_mapper.timestamp_field_name, + secondary_timestamp_field: default_doc_mapper.secondary_timestamp_field_name, tag_fields: default_doc_mapper.tag_field_names, partition_key: partition_key_opt, max_num_partitions: default_doc_mapper.max_num_partitions, @@ -288,6 +291,7 @@ impl TryFrom for DocMapper { default_search_field_names, timestamp_field_name: doc_mapping.timestamp_field, timestamp_field_path, + secondary_timestamp_field_name: doc_mapping.secondary_timestamp_field, field_mappings, concatenate_dynamic_fields, tag_field_names, @@ -667,6 +671,11 @@ impl DocMapper { self.timestamp_field_name.as_deref() } + /// Returns the secondary timestamp field name. + pub fn secondary_timestamp_field_name(&self) -> Option<&str> { + self.secondary_timestamp_field_name.as_deref() + } + /// Returns the tag `NameField`s on the current schema. /// Returns an error if a tag field is not found in this schema. pub fn tag_named_fields(&self) -> anyhow::Result> { diff --git a/quickwit/quickwit-doc-mapper/src/doc_mapping.rs b/quickwit/quickwit-doc-mapper/src/doc_mapping.rs index 2fae5d45452..d8afa4b16e9 100644 --- a/quickwit/quickwit-doc-mapper/src/doc_mapping.rs +++ b/quickwit/quickwit-doc-mapper/src/doc_mapping.rs @@ -126,6 +126,13 @@ pub struct DocMapping { #[serde(default)] pub timestamp_field: Option, + /// Field with the secondary timestamp. A new secondary time can be added + /// but it cannot be changed. If the secondary timestamp is missing from a + /// document in the split, the range is not set in the split metadata. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub secondary_timestamp_field: Option, + /// Declares the low cardinality fields for which the values ​​are recorded directly in the /// splits metadata. #[schema(value_type = Vec)] @@ -199,6 +206,7 @@ mod tests { }, ], timestamp_field: Some("timestamp".to_string()), + secondary_timestamp_field: None, tag_fields: BTreeSet::from_iter(["level".to_string()]), partition_key: Some("tenant_id".to_string()), max_num_partitions: NonZeroU32::new(100).unwrap(), diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 2cfa0614e40..d5d493d0f18 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -695,6 +695,7 @@ mod tests { let retention_policy = RetentionPolicy { retention_period: "42 hours".to_string(), evaluation_schedule: "hourly".to_string(), + timestamp_type: Default::default(), }; index_config.retention_policy_opt = Some(retention_policy.clone()); diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 84ba3987f4a..64a08d3f5da 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -40,7 +40,7 @@ use quickwit_proto::metastore::{ use quickwit_proto::types::{DocMappingUid, PublishToken}; use quickwit_query::get_quickwit_fastfield_normalizer_manager; use serde::Serialize; -use tantivy::schema::Schema; +use tantivy::schema::{Field, Schema, Value}; use tantivy::store::{Compressor, ZstdCompressor}; use tantivy::tokenizer::TokenizerManager; use tantivy::{DateTime, IndexBuilder, IndexSettings}; @@ -94,6 +94,7 @@ struct IndexerState { publish_token_opt: Option, schema: Schema, doc_mapping_uid: DocMappingUid, + secondary_time_field_opt: Option, tokenizer_manager: TokenizerManager, max_num_partitions: NonZeroU32, index_settings: IndexSettings, @@ -326,6 +327,28 @@ impl IndexerState { if let Some(timestamp) = timestamp_opt { record_timestamp(timestamp, &mut indexed_split.split_attrs.time_range); } + if let Some(secondary_timestamp_field) = self.secondary_time_field_opt { + let secondary_timestamp_opt = doc + .get_first(secondary_timestamp_field) + .and_then(|val| val.as_datetime()); + if let Some(secondary_timestamp) = secondary_timestamp_opt { + record_timestamp( + secondary_timestamp, + &mut indexed_split.split_attrs.secondary_time_range, + ); + } else { + // The secondary timestamp should always be present. If not, + // we cannot derive a range for the split. + indexed_split.split_attrs.secondary_time_range = + Some(DateTime::MIN..=DateTime::MAX); + quickwit_common::rate_limited_warn!( + limit_per_min = 1, + index_id = self.pipeline_id.index_uid.index_id, + split_id = indexed_split.split_id(), + "secondary timestamp field missing" + ); + } + } let _protect_guard = ctx.protect_zone(); indexed_split .index_writer @@ -340,6 +363,12 @@ impl IndexerState { } } +fn extract_secondary_timestamp_field(doc_mapper: &DocMapper) -> Option { + let schema = doc_mapper.schema(); + let timestamp_field_name = doc_mapper.secondary_timestamp_field_name()?; + schema.get_field(timestamp_field_name).ok() +} + /// A workbench hosts the set of `IndexedSplit` that are being built. struct IndexingWorkbench { workbench_id: Ulid, @@ -570,6 +599,7 @@ impl Indexer { publish_token_opt: None, schema, doc_mapping_uid: doc_mapper.doc_mapping_uid(), + secondary_time_field_opt: extract_secondary_timestamp_field(&doc_mapper), tokenizer_manager: tokenizer_manager.tantivy_manager().clone(), index_settings, max_num_partitions: doc_mapper.max_num_partitions(), @@ -1662,4 +1692,163 @@ mod tests { universe.assert_quit().await; Ok(()) } + + fn doc_mapper_with_secondary_time() -> DocMapper { + const JSON_CONFIG_VALUE: &str = r#" + { + "store_source": true, + "index_field_presence": true, + "default_search_fields": ["body"], + "timestamp_field": "timestamp", + "secondary_timestamp_field": "event.created", + "field_mappings": [ + { + "name": "timestamp", + "type": "datetime", + "output_format": "unix_timestamp_secs", + "fast": true + }, + { + "name": "body", + "type": "text", + "stored": true + }, + { + "name": "event", + "type": "object", + "field_mappings": [ + { + "name": "created", + "type": "datetime", + "output_format": "unix_timestamp_secs", + "fast": true + } + ] + } + ] + }"#; + serde_json::from_str::(JSON_CONFIG_VALUE).unwrap() + } + + #[tokio::test] + async fn test_indexer_primary_and_secondary_time() -> anyhow::Result<()> { + let index_uid = IndexUid::new_with_random_ulid("test-index"); + let pipeline_id = IndexingPipelineId { + index_uid: index_uid.clone(), + source_id: "test-source".to_string(), + node_id: NodeId::from("test-node"), + pipeline_uid: PipelineUid::default(), + }; + let doc_mapper = Arc::new(doc_mapper_with_secondary_time()); + let last_delete_opstamp = 10; + let schema = doc_mapper.schema(); + let body_field = schema.get_field("body").unwrap(); + let timestamp_field = schema.get_field("timestamp").unwrap(); + let event_created_field = schema.get_field("event.created").unwrap(); + let indexing_directory = TempDirectory::for_test(); + let mut indexing_settings = IndexingSettings::for_test(); + indexing_settings.split_num_docs_target = 3; + let universe = Universe::with_accelerated_time(); + let (index_serializer_mailbox, index_serializer_inbox) = universe.create_test_mailbox(); + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore.expect_publish_splits().never(); + mock_metastore + .expect_last_delete_opstamp() + .times(1) + .returning(move |delete_opstamp_request| { + assert_eq!(delete_opstamp_request.index_uid(), &index_uid); + Ok(LastDeleteOpstampResponse::new(last_delete_opstamp)) + }); + mock_metastore.expect_publish_splits().never(); + let indexer = Indexer::new( + pipeline_id, + doc_mapper, + MetastoreServiceClient::from_mock(mock_metastore), + indexing_directory, + indexing_settings, + None, + index_serializer_mailbox, + ); + let (indexer_mailbox, indexer_handle) = universe.spawn_builder().spawn(indexer); + indexer_mailbox + .send_message(ProcessedDocBatch::new( + vec![ + ProcessedDoc { + doc: doc!( + body_field=>"this is a test document", + timestamp_field=>DateTime::from_timestamp_secs(1_662_000_435), + event_created_field=>DateTime::from_timestamp_secs(1_663_000_435), + ), + timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_000_435)), + partition: 1, + num_bytes: 30, + }, + ProcessedDoc { + doc: doc!( + body_field=>"this is a test document 2", + timestamp_field=>DateTime::from_timestamp_secs(1_662_000_535), + event_created_field=>DateTime::from_timestamp_secs(1_663_000_535), + ), + timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_000_535)), + partition: 1, + num_bytes: 30, + }, + ], + SourceCheckpointDelta::from_range(4..6), + false, + )) + .await?; + indexer_mailbox + .send_message(ProcessedDocBatch::new( + vec![ + ProcessedDoc { + doc: doc!( + body_field=>"this is a test document 3", + timestamp_field=>DateTime::from_timestamp_secs(1_662_000_635), + event_created_field=>DateTime::from_timestamp_secs(1_663_000_635), + ), + timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_000_635)), + partition: 1, + num_bytes: 30, + }, + ProcessedDoc { + doc: doc!( + body_field=>"this is a test document 4", + timestamp_field=>DateTime::from_timestamp_secs(1_662_000_735), + event_created_field=>DateTime::from_timestamp_secs(1_663_000_735), + ), + timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_000_735)), + partition: 1, + num_bytes: 30, + }, + ], + SourceCheckpointDelta::from_range(6..8), + false, + )) + .await?; + + indexer_handle.process_pending_and_observe().await; + let messages: Vec = index_serializer_inbox.drain_for_test_typed(); + assert_eq!(messages.len(), 1); + let batch = messages.into_iter().next().unwrap(); + assert_eq!(batch.commit_trigger, CommitTrigger::NumDocsLimit); + assert_eq!(batch.splits.len(), 1); + let new_split_attrs = &batch.splits[0].split_attrs; + assert_eq!(new_split_attrs.num_docs, 4); + + assert_eq!( + new_split_attrs.time_range.as_ref().unwrap(), + &(DateTime::from_timestamp_secs(1_662_000_435) + ..=DateTime::from_timestamp_secs(1_662_000_735)) + ); + assert_eq!( + new_split_attrs.secondary_time_range.as_ref().unwrap(), + &(DateTime::from_timestamp_secs(1_663_000_435) + ..=DateTime::from_timestamp_secs(1_663_000_735)) + ); + + batch.splits.into_iter().next().unwrap().finalize()?; + universe.assert_quit().await; + Ok(()) + } } diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 4c1a5215752..51010bd423c 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -209,6 +209,30 @@ fn merge_time_range(splits: &[SplitMetadata]) -> Option }) } +fn merge_secondary_time_range_if_exists( + splits: &[SplitMetadata], +) -> Option> { + if splits + .iter() + .any(|split| split.secondary_time_range.is_none()) + { + return None; + } + + splits + .iter() + .flat_map(|split| split.secondary_time_range.clone()) + .flat_map(|secondary_time_range| { + vec![*secondary_time_range.start(), *secondary_time_range.end()].into_iter() + }) + .minmax() + .into_option() + .map(|(min_timestamp, max_timestamp)| { + DateTime::from_timestamp_secs(min_timestamp) + ..=DateTime::from_timestamp_secs(max_timestamp) + }) +} + fn sum_doc_sizes_in_bytes(splits: &[SplitMetadata]) -> u64 { splits .iter() @@ -253,6 +277,7 @@ pub fn merge_split_attrs( ) -> anyhow::Result { let partition_id = combine_partition_ids_aux(splits.iter().map(|split| split.partition_id)); let time_range: Option> = merge_time_range(splits); + let secondary_time_range = merge_secondary_time_range_if_exists(splits); let uncompressed_docs_size_in_bytes = sum_doc_sizes_in_bytes(splits); let num_docs = sum_num_docs(splits); let replaced_split_ids: Vec = splits @@ -283,6 +308,7 @@ pub fn merge_split_attrs( partition_id, replaced_split_ids, time_range, + secondary_time_range, num_docs, uncompressed_docs_size_in_bytes, delete_opstamp, @@ -466,6 +492,7 @@ impl MergeExecutor { partition_id: split.partition_id, replaced_split_ids: vec![split.split_id.clone()], time_range, + secondary_time_range: None, num_docs, uncompressed_docs_size_in_bytes, delete_opstamp: last_delete_opstamp, diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index 9dbb1fd8963..18e0bb40d73 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -526,6 +526,7 @@ mod tests { num_docs, uncompressed_docs_size_in_bytes: num_docs * 15, time_range: timerange_opt, + secondary_time_range: None, replaced_split_ids: Vec::new(), delete_opstamp: 0, num_merge_ops: 0, diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index d6fe2751aaa..2a012858587 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -587,6 +587,7 @@ mod tests { DateTime::from_timestamp_secs(1_628_203_589) ..=DateTime::from_timestamp_secs(1_628_203_640), ), + secondary_time_range: None, uncompressed_docs_size_in_bytes: 1_000, num_docs: 10, replaced_split_ids: Vec::new(), @@ -701,6 +702,7 @@ mod tests { DateTime::from_timestamp_secs(1_628_203_589) ..=DateTime::from_timestamp_secs(1_628_203_640), ), + secondary_time_range: None, replaced_split_ids: vec![ "replaced-split-1".to_string(), "replaced-split-2".to_string(), @@ -728,6 +730,7 @@ mod tests { DateTime::from_timestamp_secs(1_628_203_589) ..=DateTime::from_timestamp_secs(1_628_203_640), ), + secondary_time_range: None, replaced_split_ids: vec![ "replaced-split-1".to_string(), "replaced-split-2".to_string(), @@ -849,6 +852,7 @@ mod tests { split_id: "test-split".to_string(), partition_id: 3u64, time_range: None, + secondary_time_range: None, uncompressed_docs_size_in_bytes: 1_000, num_docs: 10, replaced_split_ids: Vec::new(), @@ -1030,6 +1034,7 @@ mod tests { DateTime::from_timestamp_secs(1_628_203_589) ..=DateTime::from_timestamp_secs(1_628_203_640), ), + secondary_time_range: None, uncompressed_docs_size_in_bytes: 1_000, num_docs: 10, replaced_split_ids: Vec::new(), diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index cd272bdc34c..e129feede9b 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -108,6 +108,7 @@ impl IndexedSplitBuilder { replaced_split_ids: Vec::new(), uncompressed_docs_size_in_bytes: 0, time_range: None, + secondary_time_range: None, delete_opstamp: last_delete_opstamp, num_merge_ops: 0, }, diff --git a/quickwit/quickwit-indexing/src/models/split_attrs.rs b/quickwit/quickwit-indexing/src/models/split_attrs.rs index 2e0504bdd35..dde48fab25a 100644 --- a/quickwit/quickwit-indexing/src/models/split_attrs.rs +++ b/quickwit/quickwit-indexing/src/models/split_attrs.rs @@ -59,6 +59,7 @@ pub struct SplitAttrs { pub uncompressed_docs_size_in_bytes: u64, pub time_range: Option>, + pub secondary_time_range: Option>, pub replaced_split_ids: Vec, @@ -100,12 +101,22 @@ pub fn create_split_metadata( .as_ref() .map(|range| range.start().into_timestamp_secs()..=range.end().into_timestamp_secs()); + let secondary_time_range = split_attrs + .secondary_time_range + .as_ref() + // don't record the range if an event was missing the secondary timestamp + .filter(|range| range != &&(DateTime::MIN..=DateTime::MAX)) + .map(|range| range.start().into_timestamp_secs()..=range.end().into_timestamp_secs()); + let mut maturity = merge_policy.split_maturity(split_attrs.num_docs as usize, split_attrs.num_merge_ops); if let Some(max_maturity) = max_maturity_before_end_of_retention( retention_policy, create_timestamp, time_range.as_ref().map(|time_range| *time_range.end()), + secondary_time_range + .as_ref() + .map(|time_range| *time_range.end()), ) { maturity = maturity.min(max_maturity); } @@ -118,6 +129,7 @@ pub fn create_split_metadata( partition_id: split_attrs.partition_id, num_docs: split_attrs.num_docs as usize, time_range, + secondary_time_range, uncompressed_docs_size_in_bytes: split_attrs.uncompressed_docs_size_in_bytes, create_timestamp, maturity, @@ -134,9 +146,14 @@ fn max_maturity_before_end_of_retention( retention_policy: Option<&quickwit_config::RetentionPolicy>, create_timestamp: i64, time_range_end: Option, + secondary_time_range_end: Option, ) -> Option { - let time_range_end = time_range_end? as u64; - let retention_period_s = retention_policy?.retention_period().ok()?.as_secs(); + let retention_policy = retention_policy?; + let time_range_end = match retention_policy.timestamp_type { + quickwit_config::RetentionTimestampType::Primary => time_range_end? as u64, + quickwit_config::RetentionTimestampType::Secondary => secondary_time_range_end? as u64, + }; + let retention_period_s = retention_policy.retention_period().ok()?.as_secs(); let maturity = if let Some(maturation_period_s) = (time_range_end + retention_period_s).checked_sub(create_timestamp as u64) @@ -165,6 +182,7 @@ mod tests { let retention_policy = quickwit_config::RetentionPolicy { evaluation_schedule: "daily".to_string(), retention_period: "300 sec".to_string(), + timestamp_type: Default::default(), }; let create_timestamp = 1000; @@ -174,6 +192,7 @@ mod tests { Some(&retention_policy), create_timestamp, Some(200), + None, ), Some(SplitMaturity::Mature) ); @@ -184,6 +203,7 @@ mod tests { Some(&retention_policy), create_timestamp, Some(750), + None, ), Some(SplitMaturity::Immature { maturation_period: Duration::from_secs(50) @@ -192,14 +212,19 @@ mod tests { // no retention policy assert_eq!( - max_maturity_before_end_of_retention(None, create_timestamp, Some(850),), + max_maturity_before_end_of_retention(None, create_timestamp, Some(850), None), None, ); // no timestamp_range.end but a retention policy, that's odd, don't change anything about // the maturity period assert_eq!( - max_maturity_before_end_of_retention(Some(&retention_policy), create_timestamp, None,), + max_maturity_before_end_of_retention( + Some(&retention_policy), + create_timestamp, + None, + None + ), None, ); } diff --git a/quickwit/quickwit-integration-tests/src/tests/mod.rs b/quickwit/quickwit-integration-tests/src/tests/mod.rs index 519537bb5fb..34cefdc46a7 100644 --- a/quickwit/quickwit-integration-tests/src/tests/mod.rs +++ b/quickwit/quickwit-integration-tests/src/tests/mod.rs @@ -19,6 +19,7 @@ mod ingest_v2_tests; mod kafka_tests; mod no_cp_tests; mod otlp_tests; +mod secondary_timestamp; #[cfg(feature = "sqs-localstack-tests")] mod sqs_tests; mod tls_tests; diff --git a/quickwit/quickwit-integration-tests/src/tests/secondary_timestamp.rs b/quickwit/quickwit-integration-tests/src/tests/secondary_timestamp.rs new file mode 100644 index 00000000000..9ae3b1e5bea --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/tests/secondary_timestamp.rs @@ -0,0 +1,352 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use quickwit_config::ConfigFormat; +use quickwit_config::service::QuickwitService; +use quickwit_metastore::SplitState; +use quickwit_rest_client::rest_client::CommitType; +use quickwit_serve::{ListSplitsQueryParams, SearchRequestQueryString}; +use serde_json::json; + +use crate::ingest_json; +use crate::test_utils::{ClusterSandboxBuilder, ingest}; + +#[tokio::test] +async fn test_secondary_timestamp_happy_path() { + quickwit_common::setup_logging_for_tests(); + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; + let index_id = "test-secondary-timestamp"; + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + - name: event_time + type: datetime + fast: true + - name: ingestion_time + type: datetime + fast: true + timestamp_field: event_time + secondary_timestamp_field: ingestion_time + indexing_settings: + commit_timeout_secs: 1 + merge_policy: + type: stable_log + merge_factor: 2 + max_merge_factor: 2 + retention: + period: 20 years + schedule: daily + timestamp_type: secondary + "# + ); + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create(index_config.clone(), ConfigFormat::Yaml, false) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "first record", "event_time": 1735689600, "ingestion_time": 1735776000}), + CommitType::Auto, + ) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "second record", "event_time": 1735689601, "ingestion_time": 1735776001}), + CommitType::Auto, + ) + .await + .unwrap(); + + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 1) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "third record", "event_time": 1735689602, "ingestion_time": 1735776002}), + CommitType::Auto, + ) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "fourth record", "event_time": 1735689603, "ingestion_time": 1735776003}), + CommitType::Auto, + ) + .await + .unwrap(); + + // Wait for splits to merge, since we created 2 splits and merge factor is 2. + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::MarkedForDeletion]), 2) + .await + .unwrap(); + sandbox.assert_hit_count(index_id, "body:record", 4).await; + + let merged_split = sandbox + .rest_client(QuickwitService::Metastore) + .splits(index_id) + .list(ListSplitsQueryParams { + split_states: Some(vec![SplitState::Published]), + ..Default::default() + }) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + + assert_eq!( + merged_split.split_metadata.time_range, + Some(1735689600..=1735689603) + ); // 2025-01-01 + assert_eq!( + merged_split.split_metadata.secondary_time_range, + Some(1735776000..=1735776003) + ); // 2025-01-02 + + // Delete the index to avoid potential hanging on shutdown + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .delete(index_id, false) + .await + .unwrap(); + + sandbox.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_secondary_timestamp_update() { + quickwit_common::setup_logging_for_tests(); + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; + let index_id = "test-secondary-timestamp"; + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + - name: event_time + type: datetime + fast: true + - name: ingestion_time + type: datetime + fast: true + timestamp_field: event_time + indexing_settings: + commit_timeout_secs: 1 + merge_policy: + type: stable_log + merge_factor: 2 + max_merge_factor: 2 + retention: + period: 20 years + schedule: daily + timestamp_type: secondary + "# + ); + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create(index_config.clone(), ConfigFormat::Yaml, false) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "first record", "event_time": 1735689600, "ingestion_time": 1735776000}), + CommitType::Auto, + ) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "second record", "event_time": 1735689601, "ingestion_time": 1735776001}), + CommitType::Auto, + ) + .await + .unwrap(); + + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 1) + .await + .unwrap(); + + let updated_index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + - name: event_time + type: datetime + fast: true + - name: ingestion_time + type: datetime + fast: true + timestamp_field: event_time + secondary_timestamp_field: ingestion_time + indexing_settings: + commit_timeout_secs: 1 + merge_policy: + type: stable_log + merge_factor: 2 + max_merge_factor: 2 + retention: + period: 20 years + schedule: daily + timestamp_type: secondary + "# + ); + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .update( + index_id, + updated_index_config.clone(), + ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "third record", "event_time": 1735689602, "ingestion_time": 1735776002}), + CommitType::Auto, + ) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "fourth record", "event_time": 1735689603, "ingestion_time": 1735776003}), + CommitType::Auto, + ) + .await + .unwrap(); + + // We don't expect splits to be merged since the doc mapping was updated + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 2) + .await + .unwrap(); + sandbox.assert_hit_count(index_id, "body:record", 4).await; + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + let mut splits = sandbox + .rest_client(QuickwitService::Metastore) + .splits(index_id) + .list(ListSplitsQueryParams { + split_states: Some(vec![SplitState::Published]), + ..Default::default() + }) + .await + .unwrap(); + + splits.sort_by_key(|s| *s.split_metadata.time_range.as_ref().unwrap().start()); + + assert_eq!( + splits[0].split_metadata.time_range, + Some(1735689600..=1735689601) + ); // 2025-01-01 + assert_eq!(splits[0].split_metadata.secondary_time_range, None); + + assert_eq!( + splits[1].split_metadata.time_range, + Some(1735689602..=1735689603) + ); // 2025-01-01 + assert_eq!( + splits[1].split_metadata.secondary_time_range, + Some(1735776002..=1735776003) + ); // 2025-01-02 + + let response = sandbox + .rest_client(QuickwitService::Indexer) + .search( + index_id, + SearchRequestQueryString { + query: "*".to_string(), + max_hits: 10, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(response.hits.len(), 4); + + let response = sandbox + .rest_client(QuickwitService::Indexer) + .search( + index_id, + SearchRequestQueryString { + query: "ingestion_time:[2025-01-02T00:00:00 TO 2025-01-02T00:00:01]".to_string(), + max_hits: 10, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(response.hits.len(), 2); + + let response = sandbox + .rest_client(QuickwitService::Indexer) + .search( + index_id, + SearchRequestQueryString { + query: "ingestion_time:[2025-01-02T00:00:02 TO 2025-01-02T00:00:03]".to_string(), + max_hits: 10, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(response.hits.len(), 2); + + // Delete the index to avoid potential hanging on shutdown + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .delete(index_id, false) + .await + .unwrap(); + + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs index 38a1fbd3e8f..6e43ad5def3 100644 --- a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs +++ b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs @@ -308,6 +308,7 @@ mod tests { index.retention_policy_opt = Some(RetentionPolicy { retention_period: retention_period.to_string(), evaluation_schedule: EVALUATION_SCHEDULE.to_string(), + timestamp_type: Default::default(), }) } index @@ -341,6 +342,7 @@ mod tests { let scheduler = RetentionPolicy { retention_period: "".to_string(), evaluation_schedule: EVALUATION_SCHEDULE.to_string(), + timestamp_type: Default::default(), }; scheduler.duration_until_next_evaluation().unwrap() + Duration::from_secs(1) diff --git a/quickwit/quickwit-metastore/migrations_sk/postgresql/1_add-event-creation-range-fields.down.sql b/quickwit/quickwit-metastore/migrations_sk/postgresql/1_add-event-creation-range-fields.down.sql new file mode 100644 index 00000000000..0483299e982 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations_sk/postgresql/1_add-event-creation-range-fields.down.sql @@ -0,0 +1,5 @@ +ALTER TABLE public.splits + DROP COLUMN secondary_time_range_start; + +ALTER TABLE public.splits + DROP COLUMN secondary_time_range_end; \ No newline at end of file diff --git a/quickwit/quickwit-metastore/migrations_sk/postgresql/1_add-event-creation-range-fields.up.sql b/quickwit/quickwit-metastore/migrations_sk/postgresql/1_add-event-creation-range-fields.up.sql new file mode 100644 index 00000000000..0cde6a924ae --- /dev/null +++ b/quickwit/quickwit-metastore/migrations_sk/postgresql/1_add-event-creation-range-fields.up.sql @@ -0,0 +1,5 @@ +ALTER TABLE public.splits + ADD COLUMN secondary_time_range_start BIGINT; + +ALTER TABLE public.splits + ADD COLUMN secondary_time_range_end BIGINT; \ No newline at end of file diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 15aeeb21a5c..df1432d2720 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -742,6 +742,23 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool { } } + if let Some(range) = &split.split_metadata.secondary_time_range + && !query.secondary_time_range.overlaps_with(range.clone()) + { + return false; + } + if let Some(v) = query.max_secondary_time_range_end { + match ( + &split.split_metadata.secondary_time_range, + &split.split_metadata.time_range, + ) { + (Some(secondary_time_range), _) if secondary_time_range.end() > &v => return false, + (None, Some(time_range)) if time_range.end() > &v => return false, + (None, None) => return false, + _ => {} + } + } + if let Some(node_id) = &query.node_id && split.split_metadata.node_id != *node_id { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index db50d097ce7..98f2f1d5039 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -658,6 +658,12 @@ pub struct ListSplitsQuery { /// The maximum time range end to filter by. pub max_time_range_end: Option, + /// The time range to filter by on the secondary timestamp. + pub secondary_time_range: FilterRange, + + /// The maximum time range end to filter by on the secondary timestamp. + pub max_secondary_time_range_end: Option, + /// The delete opstamp range to filter by. pub delete_opstamp: FilterRange, @@ -725,6 +731,8 @@ impl ListSplitsQuery { tags: None, time_range: Default::default(), max_time_range_end: None, + secondary_time_range: Default::default(), + max_secondary_time_range_end: None, delete_opstamp: Default::default(), update_timestamp: Default::default(), create_timestamp: Default::default(), @@ -749,6 +757,8 @@ impl ListSplitsQuery { tags: None, time_range: Default::default(), max_time_range_end: None, + secondary_time_range: Default::default(), + max_secondary_time_range_end: None, delete_opstamp: Default::default(), update_timestamp: Default::default(), create_timestamp: Default::default(), @@ -769,6 +779,8 @@ impl ListSplitsQuery { tags: None, time_range: Default::default(), max_time_range_end: None, + secondary_time_range: Default::default(), + max_secondary_time_range_end: None, delete_opstamp: Default::default(), update_timestamp: Default::default(), create_timestamp: Default::default(), @@ -849,6 +861,28 @@ impl ListSplitsQuery { self } + /// Sets the field's lower bound to match values that are + /// *less than* the provided value. + pub fn with_secondary_time_range_end_lt(mut self, v: i64) -> Self { + self.secondary_time_range.end = Bound::Excluded(v); + self + } + + /// Sets the field's upper bound to match values that are + /// *greater than or equal to* the provided value. + pub fn with_secondary_time_range_start_gte(mut self, v: i64) -> Self { + self.secondary_time_range.start = Bound::Included(v); + self + } + + /// Retains only splits with a secondary time range end that is defined and + /// *less than or equal to* the provided value. If the secondary time range + /// end is not defined, falls back to the primary. + pub fn with_max_secondary_time_range_end(mut self, v: i64) -> Self { + self.max_secondary_time_range_end = Some(v); + self + } + /// Sets the field's lower bound to match values that are /// *less than or equal to* the provided value. pub fn with_delete_opstamp_lte(mut self, v: u64) -> Self { diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index e63a9400688..39925146ea5 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -129,6 +129,9 @@ impl PostgresqlMetastore { run_migrations(&connection_pool, skip_migrations, skip_locking).await?; + super::migrator_sk::run_fork_migrations(&connection_pool, skip_migrations, skip_locking) + .await?; + let metastore = PostgresqlMetastore { uri: connection_uri.clone(), connection_pool, @@ -592,6 +595,8 @@ impl MetastoreService for PostgresqlMetastore { let mut split_ids = Vec::with_capacity(splits_metadata.len()); let mut time_range_start_list = Vec::with_capacity(splits_metadata.len()); let mut time_range_end_list = Vec::with_capacity(splits_metadata.len()); + let mut secondary_time_range_start_list = Vec::with_capacity(splits_metadata.len()); + let mut secondary_time_range_end_list = Vec::with_capacity(splits_metadata.len()); let mut tags_list = Vec::with_capacity(splits_metadata.len()); let mut splits_metadata_json = Vec::with_capacity(splits_metadata.len()); let mut delete_opstamps = Vec::with_capacity(splits_metadata.len()); @@ -612,6 +617,16 @@ impl MetastoreService for PostgresqlMetastore { let time_range_end = split_metadata.time_range.map(|range| *range.end()); time_range_end_list.push(time_range_end); + let secondary_time_range_start = split_metadata + .secondary_time_range + .as_ref() + .map(|range| *range.start()); + secondary_time_range_start_list.push(secondary_time_range_start); + let secondary_time_range_end = split_metadata + .secondary_time_range + .map(|range| *range.end()); + secondary_time_range_end_list.push(secondary_time_range_end); + let tags: Vec = split_metadata.tags.into_iter().collect(); tags_list.push(sqlx::types::Json(tags)); split_ids.push(split_metadata.split_id); @@ -624,25 +639,29 @@ impl MetastoreService for PostgresqlMetastore { run_with_tx!(self.connection_pool, tx, "stage splits", { let upserted_split_ids: Vec = sqlx::query_scalar(r#" INSERT INTO splits - (split_id, time_range_start, time_range_end, tags, split_metadata_json, delete_opstamp, maturity_timestamp, split_state, index_uid, node_id) + (split_id, time_range_start, time_range_end, secondary_time_range_start, secondary_time_range_end, tags, split_metadata_json, delete_opstamp, maturity_timestamp, split_state, index_uid, node_id) SELECT split_id, time_range_start, time_range_end, + secondary_time_range_start, + secondary_time_range_end, ARRAY(SELECT json_array_elements_text(tags_json::json)) as tags, split_metadata_json, delete_opstamp, to_timestamp(maturity_timestamp), - $9 as split_state, - $10 as index_uid, + $11 as split_state, + $12 as index_uid, node_id FROM - UNNEST($1, $2, $3, $4, $5, $6, $7, $8) - AS staged_splits (split_id, time_range_start, time_range_end, tags_json, split_metadata_json, delete_opstamp, maturity_timestamp, node_id) + UNNEST($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + AS staged_splits (split_id, time_range_start, time_range_end, secondary_time_range_start, secondary_time_range_end, tags_json, split_metadata_json, delete_opstamp, maturity_timestamp, node_id) ON CONFLICT(index_uid, split_id) DO UPDATE SET time_range_start = excluded.time_range_start, time_range_end = excluded.time_range_end, + secondary_time_range_start = excluded.secondary_time_range_start, + secondary_time_range_end = excluded.secondary_time_range_end, tags = excluded.tags, split_metadata_json = excluded.split_metadata_json, delete_opstamp = excluded.delete_opstamp, @@ -656,6 +675,8 @@ impl MetastoreService for PostgresqlMetastore { .bind(&split_ids) .bind(time_range_start_list) .bind(time_range_end_list) + .bind(secondary_time_range_start_list) + .bind(secondary_time_range_end_list) .bind(tags_list) .bind(splits_metadata_json) .bind(delete_opstamps) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/migrator_sk.rs b/quickwit/quickwit-metastore/src/metastore/postgres/migrator_sk.rs new file mode 100644 index 00000000000..74188446e78 --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/postgres/migrator_sk.rs @@ -0,0 +1,296 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; +use sqlx::migrate::{Migrate, Migrator}; +use sqlx::{Acquire, PgConnection, Postgres}; +use tracing::{error, instrument}; + +use super::pool::TrackedPool; + +fn get_migrations() -> Migrator { + sqlx::migrate!("migrations_sk/postgresql") +} + +/// Configure the search path to a separate schema +/// +/// This is necessary to create a separate migration table for schema changes +/// belonging to the fork. +async fn set_schema_to_fork_migrations_temporarily(conn: &mut PgConnection) -> MetastoreResult<()> { + sqlx::query("SET LOCAL search_path TO sk_fork_migrations") + .execute(conn) + .await?; + Ok(()) +} + +/// Runs migrations in parallel to the upstream migrations +/// +/// We should restrain to simple schema changes that do not interfere with +/// upstream migrations (e.g., adding columns). +#[instrument(skip_all)] +pub(super) async fn run_fork_migrations( + pool: &TrackedPool, + skip_migrations: bool, + skip_locking: bool, +) -> MetastoreResult<()> { + let mut tx = pool.begin().await?; + let conn = tx.acquire().await?; + + if !skip_migrations { + sqlx::query("CREATE SCHEMA IF NOT EXISTS sk_fork_migrations") + .execute(&mut *conn) + .await?; + } + + set_schema_to_fork_migrations_temporarily(&mut *conn).await?; + + let mut migrator = get_migrations(); + + if skip_locking { + migrator.set_locking(false); + } + + if !skip_migrations { + // this is an hidden function, made to get "around the annoying "implementation of `Acquire` + // is not general enough" error", which is the error we get otherwise. + let migrate_result = migrator.run_direct(conn).await; + + let Err(migrate_error) = migrate_result else { + tx.commit().await?; + return Ok(()); + }; + tx.rollback().await?; + error!(error=%migrate_error, "failed to run PostgreSQL migrations"); + + Err(MetastoreError::Internal { + message: "failed to run PostgreSQL migrations".to_string(), + cause: migrate_error.to_string(), + }) + } else { + check_migrations(migrator, conn).await + } +} + +async fn check_migrations(migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { + let dirty = match conn.dirty_version().await { + Ok(dirty) => dirty, + Err(migrate_error) => { + error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: migrate_error.to_string(), + }); + } + }; + if let Some(dirty) = dirty { + error!("migration {dirty} is dirty"); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: format!("migration {dirty} is dirty"), + }); + }; + let applied_migrations = match conn.list_applied_migrations().await { + Ok(applied_migrations) => applied_migrations, + Err(migrate_error) => { + error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: migrate_error.to_string(), + }); + } + }; + let expected_migrations: BTreeMap<_, _> = migrator + .iter() + .filter(|migration| migration.migration_type.is_up_migration()) + .map(|migration| (migration.version, migration)) + .collect(); + if applied_migrations.len() < expected_migrations.len() { + error!( + "missing migrations, expected {} migrations, only {} present in database", + expected_migrations.len(), + applied_migrations.len() + ); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: format!( + "missing migrations, expected {} migrations, only {} present in database", + expected_migrations.len(), + applied_migrations.len() + ), + }); + } + for applied_migration in applied_migrations { + let Some(migration) = expected_migrations.get(&applied_migration.version) else { + error!( + "found unknown migration {} in database", + applied_migration.version + ); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: format!( + "found unknown migration {} in database", + applied_migration.version + ), + }); + }; + if migration.checksum != applied_migration.checksum { + error!( + "migration {} differ between database and expected value", + applied_migration.version + ); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: format!( + "migration {} differ between database and expected value", + applied_migration.version + ), + }); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use quickwit_common::uri::Uri; + use sqlx::migrate::Migrate; + use sqlx::{Acquire, Postgres}; + + use super::{get_migrations, run_fork_migrations}; + use crate::metastore::postgres::migrator::run_migrations; + use crate::metastore::postgres::migrator_sk::set_schema_to_fork_migrations_temporarily; + use crate::metastore::postgres::pool::TrackedPool; + use crate::metastore::postgres::utils::establish_connection; + + async fn get_search_path(pool: &TrackedPool) -> String { + let mut conn = pool.acquire().await.unwrap(); + sqlx::query_scalar("SHOW search_path") + .fetch_one(&mut *conn) + .await + .unwrap() + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_check_migration() { + let _ = tracing_subscriber::fmt::try_init(); + + dotenvy::dotenv().ok(); + let uri: Uri = std::env::var("QW_TEST_DATABASE_URL") + .expect("environment variable `QW_TEST_DATABASE_URL` should be set") + .parse() + .expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI"); + + { + let connection_pool = + establish_connection(&uri, 1, 1, Duration::from_secs(2), None, None, false) + .await + .unwrap(); + + let original_search_path = get_search_path(&connection_pool).await; + + // make sure upstream migrations are run + run_migrations(&connection_pool, false, false) + .await + .unwrap(); + + run_fork_migrations(&connection_pool, false, false) + .await + .unwrap(); + + // we just ran migration, nothing else to run + run_fork_migrations(&connection_pool, true, false) + .await + .unwrap(); + + let migrations = get_migrations(); + let last_migration = migrations + .iter() + .map(|migration| migration.version) + .max() + .expect("no migration exists?"); + let up_migration = migrations + .iter() + .find(|migration| { + migration.version == last_migration + && migration.migration_type.is_up_migration() + }) + .unwrap(); + let down_migration = migrations + .iter() + .find(|migration| { + migration.version == last_migration + && migration.migration_type.is_down_migration() + }) + .unwrap(); + + // verify that the search path modification doesn't leak + let current_search_path = get_search_path(&connection_pool).await; + assert_eq!( + original_search_path, current_search_path, + "search_path should not be modified after migrations" + ); + + // check that a read only migration that wasn't already applied fails + { + let mut tx = connection_pool.begin().await.unwrap(); + let conn = tx.acquire().await.unwrap(); + set_schema_to_fork_migrations_temporarily(&mut *conn) + .await + .unwrap(); + conn.revert(down_migration).await.unwrap(); + tx.commit().await.unwrap(); + } + + run_fork_migrations(&connection_pool, true, false) + .await + .unwrap_err(); + + { + let mut tx = connection_pool.begin().await.unwrap(); + let conn = tx.acquire().await.unwrap(); + set_schema_to_fork_migrations_temporarily(&mut *conn) + .await + .unwrap(); + conn.apply(up_migration).await.unwrap(); + tx.commit().await.unwrap(); + } + } + + { + let connection_pool = + establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, true) + .await + .unwrap(); + // error because we are in read only mode, and we try to run migrations + run_fork_migrations(&connection_pool, false, false) + .await + .unwrap_err(); + // okay because all migrations were already run before + run_fork_migrations(&connection_pool, true, false) + .await + .unwrap(); + } + } +} diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs index 0dc445e071e..84a04f4b4f5 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs @@ -17,6 +17,7 @@ mod factory; mod metastore; mod metrics; mod migrator; +mod migrator_sk; mod model; mod pool; mod split_stream; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs index 117f37047d9..86853c531b4 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs @@ -79,6 +79,8 @@ pub enum Splits { SplitState, TimeRangeStart, TimeRangeEnd, + SecondaryTimeRangeStart, + SecondaryTimeRangeEnd, CreateTimestamp, UpdateTimestamp, PublishTimestamp, @@ -128,6 +130,10 @@ pub(super) struct PgSplit { pub index_uid: IndexUid, /// Delete opstamp. pub delete_opstamp: i64, + /// The min timestamp for the secondary time dimension. + pub secondary_time_range_start: Option, + /// The max timestamp for the secondary time dimension. + pub secondary_time_range_end: Option, } impl PgSplit { diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 28519b8a294..b5769201948 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -158,6 +158,48 @@ pub(super) fn append_query_filters_and_order_by( Bound::Unbounded => {} }; + if let Some(v) = query.max_secondary_time_range_end { + sql.cond_where(any![ + Expr::col(Splits::SecondaryTimeRangeEnd).lte(v), + sea_query::all![ + Expr::col(Splits::SecondaryTimeRangeEnd).is_null(), + Expr::col(Splits::TimeRangeEnd).lte(v) + ] + ]); + } + + match query.secondary_time_range.start { + Bound::Included(v) => { + sql.cond_where(any![ + Expr::col(Splits::SecondaryTimeRangeEnd).gte(v), + Expr::col(Splits::SecondaryTimeRangeEnd).is_null() + ]); + } + Bound::Excluded(v) => { + sql.cond_where(any![ + Expr::col(Splits::SecondaryTimeRangeEnd).gt(v), + Expr::col(Splits::SecondaryTimeRangeEnd).is_null() + ]); + } + Bound::Unbounded => {} + }; + + match query.secondary_time_range.end { + Bound::Included(v) => { + sql.cond_where(any![ + Expr::col(Splits::SecondaryTimeRangeStart).lte(v), + Expr::col(Splits::SecondaryTimeRangeStart).is_null() + ]); + } + Bound::Excluded(v) => { + sql.cond_where(any![ + Expr::col(Splits::SecondaryTimeRangeStart).lt(v), + Expr::col(Splits::SecondaryTimeRangeStart).is_null() + ]); + } + Bound::Unbounded => {} + }; + match &query.mature { Bound::Included(evaluation_datetime) => { sql.cond_where(any![ diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index fe88fe379d3..829029e5d43 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -97,6 +97,9 @@ pub struct SplitMetadata { /// the split, expressed in seconds. pub time_range: Option>, + /// The min / max ingestion time in the split. + pub secondary_time_range: Option>, + /// Timestamp for tracking when the split was created. pub create_timestamp: i64, @@ -148,6 +151,7 @@ impl fmt::Debug for SplitMetadata { &self.uncompressed_docs_size_in_bytes, ); debug_struct.field("time_range", &self.time_range); + debug_struct.field("secondary_time_range", &self.secondary_time_range); debug_struct.field("create_timestamp", &self.create_timestamp); debug_struct.field("maturity", &self.maturity); if !self.tags.is_empty() { @@ -273,6 +277,7 @@ impl quickwit_config::TestableForRegression for SplitMetadata { num_docs: 12303, uncompressed_docs_size_in_bytes: 234234, time_range: Some(121000..=130198), + secondary_time_range: None, create_timestamp: 3, maturity: SplitMaturity::Immature { maturation_period: Duration::from_secs(4), @@ -406,6 +411,7 @@ mod tests { num_docs: 100, uncompressed_docs_size_in_bytes: 1024, time_range: Some(0..=100), + secondary_time_range: Some(120000..=130000), create_timestamp: 1629867600, maturity: SplitMaturity::Mature, tags: { @@ -423,14 +429,14 @@ mod tests { doc_mapping_uid: DocMappingUid::default(), }; - let expected_output = "SplitMetadata { split_id: \"split-1\", index_uid: IndexUid { \ - index_id: \"00000000-0000-0000-0000-000000000000\", \ - incarnation_id: Ulid(0) }, partition_id: 0, source_id: \ - \"source-1\", node_id: \"node-1\", num_docs: 100, \ - uncompressed_docs_size_in_bytes: 1024, time_range: Some(0..=100), \ - create_timestamp: 1629867600, maturity: Mature, tags: \ - \"{\\\"🐱\\\", \\\"😻\\\", \\\"😼\\\", \\\"😿\\\", and 1 more}\", \ - footer_offsets: 0..1024, delete_opstamp: 0, num_merge_ops: 0 }"; + let expected_output = + "SplitMetadata { split_id: \"split-1\", index_uid: IndexUid { index_id: \ + \"00000000-0000-0000-0000-000000000000\", incarnation_id: Ulid(0) }, partition_id: \ + 0, source_id: \"source-1\", node_id: \"node-1\", num_docs: 100, \ + uncompressed_docs_size_in_bytes: 1024, time_range: Some(0..=100), \ + secondary_time_range: Some(120000..=130000), create_timestamp: 1629867600, maturity: \ + Mature, tags: \"{\\\"🐱\\\", \\\"😻\\\", \\\"😼\\\", \\\"😿\\\", and 1 more}\", \ + footer_offsets: 0..1024, delete_opstamp: 0, num_merge_ops: 0 }"; assert_eq!(format!("{split_metadata:?}"), expected_output); } diff --git a/quickwit/quickwit-metastore/src/split_metadata_version.rs b/quickwit/quickwit-metastore/src/split_metadata_version.rs index 8325290be92..5f6204c85b7 100644 --- a/quickwit/quickwit-metastore/src/split_metadata_version.rs +++ b/quickwit/quickwit-metastore/src/split_metadata_version.rs @@ -59,6 +59,11 @@ pub(crate) struct SplitMetadataV0_8 { /// the split. pub time_range: Option>, + #[schema(value_type = Option)] + #[serde(default, skip_serializing_if = "Option::is_none")] + /// The min / max ingestion time in the split. + pub secondary_time_range: Option>, + /// Timestamp for tracking when the split was created. #[serde(default = "utc_now_timestamp")] pub create_timestamp: i64, @@ -122,6 +127,7 @@ impl From for SplitMetadata { num_docs: v8.num_docs, uncompressed_docs_size_in_bytes: v8.uncompressed_docs_size_in_bytes, time_range: v8.time_range, + secondary_time_range: v8.secondary_time_range, create_timestamp: v8.create_timestamp, maturity: v8.maturity, tags: v8.tags, @@ -144,6 +150,7 @@ impl From for SplitMetadataV0_8 { num_docs: split.num_docs, uncompressed_docs_size_in_bytes: split.uncompressed_docs_size_in_bytes, time_range: split.time_range, + secondary_time_range: split.secondary_time_range, create_timestamp: split.create_timestamp, maturity: split.maturity, tags: split.tags, diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 50e0f695a3c..5aea2e74ed3 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -110,6 +110,7 @@ pub async fn test_metastore_update_retention_policy< let new_retention_policy_opt = Some(RetentionPolicy { retention_period: String::from("3 days"), evaluation_schedule: String::from("daily"), + timestamp_type: Default::default(), }); // set and unset retention policy multiple times diff --git a/quickwit/quickwit-metastore/src/tests/list_splits.rs b/quickwit/quickwit-metastore/src/tests/list_splits.rs index 429e169f0f0..11afe5955cd 100644 --- a/quickwit/quickwit-metastore/src/tests/list_splits.rs +++ b/quickwit/quickwit-metastore/src/tests/list_splits.rs @@ -276,6 +276,7 @@ pub async fn test_metastore_list_splits, + start_timestamp: Option, + end_timestamp: Option, + start_secondary_timestamp: Option, + end_secondary_timestamp: Option, + tags_filter_opt: Option, + metastore: &mut MetastoreServiceClient, +) -> crate::Result> { + let Some(mut query) = ListSplitsQuery::try_from_index_uids(index_uids) else { + return Ok(Vec::new()); + }; + query = query.with_split_state(SplitState::Published); + + if let Some(start_ts) = start_timestamp { + query = query.with_time_range_start_gte(start_ts); + } + if let Some(end_ts) = end_timestamp { + query = query.with_time_range_end_lt(end_ts); + } + if let Some(start_ts) = start_secondary_timestamp { + query = query.with_secondary_time_range_start_gte(start_ts); + } + if let Some(end_ts) = end_secondary_timestamp { + query = query.with_secondary_time_range_end_lt(end_ts); + } + if let Some(tags_filter) = tags_filter_opt { + query = query.with_tags_filter(tags_filter); + } + let list_splits_request = ListSplitsRequest::try_from_list_splits_query(&query)?; + let splits_metadata: Vec = metastore + .list_splits(list_splits_request) + .await? + .collect_splits_metadata() + .await?; + Ok(splits_metadata) +} + /// Resolve index patterns and returns IndexMetadata for found indices. /// Patterns follow the elastic search patterns. pub async fn resolve_index_patterns( diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 141f04da64d..c9f1f766e4a 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -56,7 +56,7 @@ use crate::search_response_rest::StorageRequestCount; use crate::service::SearcherContext; use crate::{ SearchError, SearchJobPlacer, SearchPlanResponseRest, SearchServiceClient, - extract_split_and_footer_offsets, list_relevant_splits, + extract_split_and_footer_offsets, list_relevant_splits_with_secondary_time, }; /// Maximum accepted scroll TTL. @@ -277,6 +277,35 @@ fn validate_request_and_build_metadata( }) } +fn validate_secondary_time(index_metadata: &[IndexMetadata]) -> crate::Result> { + let mut secondary_timestamp_field_opt = None; + for index_metadata in index_metadata { + let index_secondary_timestamp_field_opt = index_metadata + .index_config + .doc_mapping + .secondary_timestamp_field + .as_deref(); + if let Some(index_secondary_timestamp_field) = index_secondary_timestamp_field_opt { + match secondary_timestamp_field_opt { + Some(secondary_timestamp_field) + if secondary_timestamp_field != index_secondary_timestamp_field => + { + return Err(SearchError::InvalidQuery( + "the timestamp field (if present) must be the same for all indexes" + .to_string(), + )); + } + None => { + secondary_timestamp_field_opt = + Some(index_secondary_timestamp_field.to_string()); + } + _ => {} + } + } + } + Ok(secondary_timestamp_field_opt) +} + /// Validate sort field types. fn validate_sort_field_types( schema: &Schema, @@ -1113,6 +1142,7 @@ async fn refine_and_list_matches( query_ast_resolved: QueryAst, sort_fields_is_datetime: HashMap, timestamp_field_opt: Option, + secondary_timestamp_field_opt: Option, ) -> crate::Result> { let index_uids = indexes_metadata .iter() @@ -1132,14 +1162,28 @@ async fn refine_and_list_matches( &mut search_request.end_timestamp, ); } + + let mut start_secondary_timestamp_opt: Option = None; + let mut end_secondary_timestamp_opt: Option = None; + if let Some(secondary_timestamp_field) = &secondary_timestamp_field_opt { + refine_start_end_timestamp_from_ast( + &query_ast_resolved, + secondary_timestamp_field, + &mut start_secondary_timestamp_opt, + &mut end_secondary_timestamp_opt, + ); + } + let tag_filter_ast = extract_tags_from_query(query_ast_resolved); // TODO if search after is set, we sort by timestamp and we don't want to count all results, // we can refine more here. Same if we sort by _shard_doc - let split_metadatas: Vec = list_relevant_splits( + let split_metadatas: Vec = list_relevant_splits_with_secondary_time( index_uids, search_request.start_timestamp, search_request.end_timestamp, + start_secondary_timestamp_opt, + end_secondary_timestamp_opt, tag_filter_ast, metastore, ) @@ -1185,6 +1229,7 @@ async fn plan_splits_for_root_search( } let request_metadata = validate_request_and_build_metadata(&indexes_metadata, search_request)?; + let secondary_timestamp_field_opt = validate_secondary_time(&indexes_metadata)?; let split_metadatas = refine_and_list_matches( metastore, search_request, @@ -1192,6 +1237,7 @@ async fn plan_splits_for_root_search( request_metadata.query_ast_resolved, request_metadata.sort_fields_is_datetime, request_metadata.timestamp_field_opt, + secondary_timestamp_field_opt, ) .await?; Ok(( @@ -1309,6 +1355,7 @@ pub async fn search_plan( .map_err(|err| SearchError::Internal(format!("failed to build doc mapper. cause: {err}")))?; let request_metadata = validate_request_and_build_metadata(&indexes_metadata, &search_request)?; + let secondary_timestamp_field_opt = validate_secondary_time(&indexes_metadata)?; let split_metadatas = refine_and_list_matches( &mut metastore, &mut search_request, @@ -1316,6 +1363,7 @@ pub async fn search_plan( request_metadata.query_ast_resolved.clone(), request_metadata.sort_fields_is_datetime, request_metadata.timestamp_field_opt, + secondary_timestamp_field_opt, ) .await?;