Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ async fn test_cmd_update_index() {
index_metadata.index_config.retention_policy_opt,
Some(RetentionPolicy {
retention_period: String::from("1 week"),
evaluation_schedule: String::from("daily")
evaluation_schedule: String::from("daily"),
timestamp_type: Default::default(),
})
);

Expand Down
50 changes: 50 additions & 0 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,20 @@ pub struct SearchSettings {
pub default_search_fields: Vec<String>,
}

#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize, Default, utoipa::ToSchema)]
#[serde(rename_all = "lowercase")]
pub enum RetentionTimestampType {
#[default]
Primary,
Secondary,
}

impl RetentionTimestampType {
pub fn is_primary(&self) -> bool {
matches!(self, RetentionTimestampType::Primary)
}
}

#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct RetentionPolicy {
Expand All @@ -220,6 +234,11 @@ pub struct RetentionPolicy {
#[serde(default = "RetentionPolicy::default_schedule")]
#[serde(rename = "schedule")]
pub evaluation_schedule: String,

/// The target timestamp field to use for retention evaluation. When the
/// range is not in the split's metadata, the split is never deleted.
#[serde(default, skip_serializing_if = "RetentionTimestampType::is_primary")]
pub timestamp_type: RetentionTimestampType,
}

impl RetentionPolicy {
Expand Down Expand Up @@ -467,6 +486,7 @@ impl crate::TestableForRegression for IndexConfig {
message_mapping,
],
timestamp_field: Some("timestamp".to_string()),
secondary_timestamp_field: None,
tag_fields: BTreeSet::from_iter(["tenant_id".to_string(), "log_level".to_string()]),
partition_key: Some("tenant_id".to_string()),
max_num_partitions: NonZeroU32::new(100).unwrap(),
Expand Down Expand Up @@ -502,6 +522,7 @@ impl crate::TestableForRegression for IndexConfig {
let retention_policy_opt = Some(RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "daily".to_string(),
timestamp_type: RetentionTimestampType::Primary,
});
IndexConfig {
index_id: "my-index".to_string(),
Expand Down Expand Up @@ -674,6 +695,7 @@ mod tests {
let expected_retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "daily".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
assert_eq!(
index_config.retention_policy_opt.unwrap(),
Expand Down Expand Up @@ -853,6 +875,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "hourly".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
let retention_policy_yaml = serde_yaml::to_string(&retention_policy).unwrap();
assert_eq!(
Expand All @@ -873,20 +896,38 @@ mod tests {
let expected_retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "hourly".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
assert_eq!(retention_policy, expected_retention_policy);
}
{
let retention_policy_yaml = r#"
period: 90 days
schedule: daily
"#;
let retention_policy =
serde_yaml::from_str::<RetentionPolicy>(retention_policy_yaml).unwrap();

let expected_retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "daily".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
assert_eq!(retention_policy, expected_retention_policy);
}
{
let retention_policy_yaml = r#"
period: 90 days
schedule: daily
timestamp_type: secondary
"#;
let retention_policy =
serde_yaml::from_str::<RetentionPolicy>(retention_policy_yaml).unwrap();

let expected_retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "daily".to_string(),
timestamp_type: RetentionTimestampType::Secondary,
};
assert_eq!(retention_policy, expected_retention_policy);
}
Expand All @@ -898,6 +939,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "hourly".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
assert_eq!(
retention_policy.retention_period().unwrap(),
Expand All @@ -907,6 +949,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "foo".to_string(),
evaluation_schedule: "hourly".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
assert_eq!(
retention_policy.retention_period().unwrap_err().to_string(),
Expand All @@ -931,6 +974,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "@hourly".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
assert_eq!(
retention_policy.evaluation_schedule().unwrap(),
Expand All @@ -941,6 +985,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "hourly".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
assert_eq!(
retention_policy.evaluation_schedule().unwrap(),
Expand All @@ -951,6 +996,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "0 * * * * *".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
let evaluation_schedule = retention_policy.evaluation_schedule().unwrap();
assert_eq!(evaluation_schedule.seconds().count(), 1);
Expand All @@ -964,20 +1010,23 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "hourly".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
retention_policy.validate().unwrap();
}
{
let retention_policy = RetentionPolicy {
retention_period: "foo".to_string(),
evaluation_schedule: "hourly".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
retention_policy.validate().unwrap_err();
}
{
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "foo".to_string(),
timestamp_type: RetentionTimestampType::Primary,
};
retention_policy.validate().unwrap_err();
}
Expand All @@ -990,6 +1039,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: schedule_str.to_string(),
timestamp_type: RetentionTimestampType::Primary,
};

let next_evaluation_duration = chrono::Duration::nanoseconds(
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ mod test {
invalid_index_config.retention_policy_opt = Some(RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "hourly".to_string(),
timestamp_type: Default::default(),
});
let validation_err = invalid_index_config
.build_and_validate(None)
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-config/src/index_template/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl crate::TestableForRegression for IndexTemplate {
retention_policy_opt: Some(RetentionPolicy {
retention_period: "42 days".to_string(),
evaluation_schedule: "daily".to_string(),
timestamp_type: Default::default(),
}),
}
}
Expand Down Expand Up @@ -236,6 +237,7 @@ mod tests {
index_template.retention_policy_opt = Some(RetentionPolicy {
retention_period: "42 days".to_string(),
evaluation_schedule: "hourly".to_string(),
timestamp_type: Default::default(),
});
let default_index_root_uri = Uri::for_test("s3://test-bucket/indexes");

Expand Down Expand Up @@ -291,6 +293,7 @@ mod tests {
index_template.retention_policy_opt = Some(RetentionPolicy {
retention_period: "".to_string(),
evaluation_schedule: "".to_string(),
timestamp_type: Default::default(),
});
let error = index_template.validate().unwrap_err();
assert!(
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ pub use cluster_config::ClusterConfig;
use index_config::serialize::{IndexConfigV0_8, VersionedIndexConfig};
pub use index_config::{
IndexConfig, IndexingResources, IndexingSettings, IngestSettings, RetentionPolicy,
SearchSettings, build_doc_mapper, load_index_config_from_user_config, load_index_config_update,
prepare_doc_mapping_update,
RetentionTimestampType, SearchSettings, build_doc_mapper, load_index_config_from_user_config,
load_index_config_update, prepare_doc_mapping_update,
};
pub use quickwit_doc_mapper::DocMapping;
use serde::Serialize;
Expand Down Expand Up @@ -118,6 +118,7 @@ pub fn disable_ingest_v1() -> bool {
PulsarSourceParams,
RegionOrEndpoint,
RetentionPolicy,
RetentionTimestampType,
SearchSettings,
SourceConfigV0_7,
SourceConfigV0_8,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub struct DocMapper {
timestamp_field_name: Option<String>,
/// Timestamp field path (name parsed)
timestamp_field_path: Option<Vec<String>>,
/// Secondary timestamp field name.
secondary_timestamp_field_name: Option<String>,
/// Root node of the field mapping tree.
/// See [`MappingNode`].
field_mappings: MappingNode,
Expand Down Expand Up @@ -138,6 +140,7 @@ impl From<DocMapper> for DocMapperBuilder {
mode: default_doc_mapper.mode,
field_mappings: default_doc_mapper.field_mappings.into(),
timestamp_field: default_doc_mapper.timestamp_field_name,
secondary_timestamp_field: default_doc_mapper.secondary_timestamp_field_name,
tag_fields: default_doc_mapper.tag_field_names,
partition_key: partition_key_opt,
max_num_partitions: default_doc_mapper.max_num_partitions,
Expand Down Expand Up @@ -288,6 +291,7 @@ impl TryFrom<DocMapperBuilder> for DocMapper {
default_search_field_names,
timestamp_field_name: doc_mapping.timestamp_field,
timestamp_field_path,
secondary_timestamp_field_name: doc_mapping.secondary_timestamp_field,
field_mappings,
concatenate_dynamic_fields,
tag_field_names,
Expand Down Expand Up @@ -667,6 +671,11 @@ impl DocMapper {
self.timestamp_field_name.as_deref()
}

/// Returns the secondary timestamp field name.
pub fn secondary_timestamp_field_name(&self) -> Option<&str> {
self.secondary_timestamp_field_name.as_deref()
}

/// Returns the tag `NameField`s on the current schema.
/// Returns an error if a tag field is not found in this schema.
pub fn tag_named_fields(&self) -> anyhow::Result<Vec<NamedField>> {
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-doc-mapper/src/doc_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ pub struct DocMapping {
#[serde(default)]
pub timestamp_field: Option<String>,

/// Field with the secondary timestamp. A new secondary time can be added
/// but it cannot be changed. If the secondary timestamp is missing from a
/// document in the split, the range is not set in the split metadata.
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub secondary_timestamp_field: Option<String>,

/// Declares the low cardinality fields for which the values ​​are recorded directly in the
/// splits metadata.
#[schema(value_type = Vec<String>)]
Expand Down Expand Up @@ -199,6 +206,7 @@ mod tests {
},
],
timestamp_field: Some("timestamp".to_string()),
secondary_timestamp_field: None,
tag_fields: BTreeSet::from_iter(["level".to_string()]),
partition_key: Some("tenant_id".to_string()),
max_num_partitions: NonZeroU32::new(100).unwrap(),
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "42 hours".to_string(),
evaluation_schedule: "hourly".to_string(),
timestamp_type: Default::default(),
};
index_config.retention_policy_opt = Some(retention_policy.clone());

Expand Down
Loading
Loading