diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 8ef52c80656..60ea2a9429e 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -121,8 +121,6 @@ pub struct IndexingSettings { pub merge_policy: MergePolicyConfig, #[serde(default)] pub resources: IndexingResources, - #[serde(default = "MergePolicyConfig::noop")] - pub finalize_merge_policy: MergePolicyConfig, } impl IndexingSettings { @@ -163,7 +161,6 @@ impl Default for IndexingSettings { docstore_compression_level: Self::default_docstore_compression_level(), split_num_docs_target: Self::default_split_num_docs_target(), merge_policy: MergePolicyConfig::default(), - finalize_merge_policy: MergePolicyConfig::noop(), resources: IndexingResources::default(), } } diff --git a/quickwit/quickwit-config/src/merge_policy_config.rs b/quickwit/quickwit-config/src/merge_policy_config.rs index 757af7571b5..a4ed1010e73 100644 --- a/quickwit/quickwit-config/src/merge_policy_config.rs +++ b/quickwit/quickwit-config/src/merge_policy_config.rs @@ -21,6 +21,10 @@ use std::time::Duration; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; +fn is_zero(value: &usize) -> bool { + *value == 0 +} + #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct ConstWriteAmplificationMergePolicyConfig { @@ -42,6 +46,15 @@ pub struct ConstWriteAmplificationMergePolicyConfig { #[serde(deserialize_with = "parse_human_duration")] #[serde(serialize_with = "serialize_duration")] pub maturation_period: Duration, + #[serde(default)] + #[serde(skip_serializing_if = "is_zero")] + pub max_finalize_merge_operations: usize, + /// Splits with a number of docs higher than + /// `max_finalize_split_num_docs` will not be considered + /// for finalize split merge operations. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub max_finalize_split_num_docs: Option, } impl Default for ConstWriteAmplificationMergePolicyConfig { @@ -51,6 +64,8 @@ impl Default for ConstWriteAmplificationMergePolicyConfig { merge_factor: default_merge_factor(), max_merge_factor: default_max_merge_factor(), maturation_period: default_maturation_period(), + max_finalize_merge_operations: 0, + max_finalize_split_num_docs: None, } } } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 643ed0498cb..7ab58bb873f 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -619,7 +619,7 @@ mod tests { use super::{IndexingPipeline, *}; use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; - use crate::merge_policy::{default_merge_policy, NopMergePolicy}; + use crate::merge_policy::default_merge_policy; #[test] fn test_wait_duration() { @@ -908,7 +908,6 @@ mod tests { metastore: metastore.clone(), split_store: split_store.clone(), merge_policy: default_merge_policy(), - finalize_merge_policy: Arc::new(NopMergePolicy), max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, merge_scheduler_service: universe.get_or_spawn_one(), diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 7654c9e55e2..7f50f9c3ad7 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -284,14 +284,8 @@ impl IndexingService { let message = format!("failed to spawn indexing pipeline: {error}"); IndexingError::Internal(message) })?; - let merge_policy = crate::merge_policy::merge_policy_from_settings( - index_config.indexing_settings.merge_policy.clone(), - &index_config.indexing_settings, - ); - let finalize_merge_policy = crate::merge_policy::merge_policy_from_settings( - index_config.indexing_settings.finalize_merge_policy.clone(), - &index_config.indexing_settings, - ); + let merge_policy = + crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings); let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone()); let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) @@ -306,7 +300,6 @@ impl IndexingService { split_store: split_store.clone(), merge_scheduler_service: self.merge_scheduler_service.clone(), merge_policy: merge_policy.clone(), - finalize_merge_policy, merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), max_concurrent_split_uploads: self.max_concurrent_split_uploads, event_broker: self.event_broker.clone(), @@ -1203,7 +1196,7 @@ mod tests { #[tokio::test] async fn test_indexing_service_apply_plan() { - const PARAMS_FINGERPRINT: u64 = 3865067856550546352u64; + const PARAMS_FINGERPRINT: u64 = 3865067856550546352; quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 61f8fa194c7..8228e46d9c8 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -42,9 +42,8 @@ use time::OffsetDateTime; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; -use super::merge_planner::RunFinalizeMergePolicy; use super::publisher::DisconnectMergePlanner; -use super::MergeSchedulerService; +use super::{MergeSchedulerService, RunFinalizeMergePolicyAndQuit}; use crate::actors::indexing_pipeline::wait_duration_before_retry; use crate::actors::merge_split_downloader::MergeSplitDownloader; use crate::actors::publisher::PublisherType; @@ -114,7 +113,7 @@ pub struct MergePipeline { kill_switch: KillSwitch, /// Immature splits passed to the merge planner the first time the pipeline is spawned. initial_immature_splits_opt: Option>, - // After it is set to true, we don't respawn. + // After it is set to true, we don't respawn pipeline actors if they fail. shutdown_initiated: bool, } @@ -352,7 +351,6 @@ impl MergePipeline { &self.params.pipeline_id, immature_splits, self.params.merge_policy.clone(), - self.params.finalize_merge_policy.clone(), merge_split_downloader_mailbox, self.params.merge_scheduler_service.clone(), ); @@ -500,14 +498,25 @@ impl Handler for MergePipeline { // From now on, we will not respawn the pipeline if it fails. self.shutdown_initiated = true; if let Some(handles) = &self.handles_opt { + // This disconnects the merge planner from the merge publisher, + // breaking the merge planner pipeline loop. + // + // As a result, the pipeline will naturally terminate + // once all of the pending / ongoing merge operations are completed. let _ = handles .merge_publisher .mailbox() - .send_message(DisconnectMergePlanner); + .send_message(DisconnectMergePlanner) + .await; + + // We also initiate the merge planner finalization routine. + // Depending on the merge policy, it may emit a few more merge + // operations. let _ = handles .merge_planner .mailbox() - .send_message(RunFinalizeMergePolicy); + .send_message(RunFinalizeMergePolicyAndQuit) + .await; } else { // we won't respawn the pipeline in the future, so there is nothing // to do here. @@ -561,7 +570,6 @@ pub struct MergePipelineParams { pub merge_scheduler_service: Mailbox, pub split_store: IndexingSplitStore, pub merge_policy: Arc, - pub finalize_merge_policy: Arc, pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline. pub merge_io_throughput_limiter_opt: Option, pub event_broker: EventBroker, @@ -583,7 +591,7 @@ mod tests { use quickwit_storage::RamStorage; use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; - use crate::merge_policy::{default_merge_policy, nop_merge_policy}; + use crate::merge_policy::default_merge_policy; use crate::IndexingSplitStore; #[tokio::test] @@ -624,7 +632,6 @@ mod tests { merge_scheduler_service: universe.get_or_spawn_one(), split_store, merge_policy: default_merge_policy(), - finalize_merge_policy: nop_merge_policy(), max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, event_broker: Default::default(), diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 7cb692affc6..900e2122eb9 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -39,7 +39,7 @@ use crate::models::NewSplits; use crate::MergePolicy; #[derive(Debug)] -pub(crate) struct RunFinalizeMergePolicy; +pub(crate) struct RunFinalizeMergePolicyAndQuit; #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct MergePartition { @@ -81,7 +81,6 @@ pub struct MergePlanner { known_split_ids_recompute_attempt_id: usize, merge_policy: Arc, - finalize_merge_policy: Arc, merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, @@ -132,12 +131,12 @@ impl Actor for MergePlanner { } #[async_trait] -impl Handler for MergePlanner { +impl Handler for MergePlanner { type Reply = (); async fn handle( &mut self, - _plan_merge: RunFinalizeMergePolicy, + _plan_merge: RunFinalizeMergePolicyAndQuit, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { // Note we ignore messages that could be coming from a different incarnation. @@ -193,7 +192,6 @@ impl MergePlanner { pipeline_id: &MergePipelineId, immature_splits: Vec, merge_policy: Arc, - finalize_merge_policy: Arc, merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, ) -> MergePlanner { @@ -206,7 +204,6 @@ impl MergePlanner { known_split_ids_recompute_attempt_id: 0, partitioned_young_splits: Default::default(), merge_policy, - finalize_merge_policy, merge_split_downloader_mailbox, merge_scheduler_service, ongoing_merge_operations_inventory: Inventory::default(), @@ -302,12 +299,12 @@ impl MergePlanner { let mut merge_operations = Vec::new(); for young_splits in self.partitioned_young_splits.values_mut() { if !young_splits.is_empty() { - let merge_policy = if is_finalize { - &self.finalize_merge_policy + let operations = if is_finalize { + self.merge_policy.finalize_operations(young_splits) } else { - &self.merge_policy + self.merge_policy.operations(young_splits) }; - merge_operations.extend(merge_policy.operations(young_splits)); + merge_operations.extend(operations); } ctx.record_progress(); ctx.yield_now().await; @@ -380,7 +377,7 @@ mod tests { use crate::actors::MergePlanner; use crate::merge_policy::{ - merge_policy_from_settings, MergePolicy, MergeTask, NopMergePolicy, StableLogMergePolicy, + merge_policy_from_settings, MergePolicy, MergeTask, StableLogMergePolicy, }; use crate::models::NewSplits; @@ -441,7 +438,6 @@ mod tests { &pipeline_id, Vec::new(), merge_policy, - Arc::new(NopMergePolicy), merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); @@ -562,15 +558,11 @@ mod tests { 2, ), ]; - let merge_policy: Arc = - merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); - let finalize_merge_policy: Arc = - merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); + let merge_policy: Arc = merge_policy_from_settings(&indexing_settings); let merge_planner = MergePlanner::new( &pipeline_id, immature_splits.clone(), merge_policy, - finalize_merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); @@ -658,17 +650,11 @@ mod tests { 2, ), ]; - let merge_policy: Arc = - merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); - let finalize_merge_policy: Arc = merge_policy_from_settings( - indexing_settings.finalize_merge_policy.clone(), - &indexing_settings, - ); + let merge_policy: Arc = merge_policy_from_settings(&indexing_settings); let merge_planner = MergePlanner::new( &pipeline_id, immature_splits.clone(), merge_policy, - finalize_merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); @@ -729,15 +715,11 @@ mod tests { 2, ), ]; - let merge_policy: Arc = - merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); - let finalize_merge_policy: Arc = - merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); + let merge_policy: Arc = merge_policy_from_settings(&indexing_settings); let merge_planner = MergePlanner::new( &pipeline_id, immature_splits.clone(), merge_policy, - finalize_merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); diff --git a/quickwit/quickwit-indexing/src/actors/mod.rs b/quickwit/quickwit-indexing/src/actors/mod.rs index 9552a426e29..ab70f06df35 100644 --- a/quickwit/quickwit-indexing/src/actors/mod.rs +++ b/quickwit/quickwit-indexing/src/actors/mod.rs @@ -42,7 +42,7 @@ pub use indexing_pipeline::{IndexingPipeline, IndexingPipelineParams}; pub use indexing_service::{IndexingService, IndexingServiceCounters, INDEXING_DIR_NAME}; pub use merge_executor::{combine_partition_ids, merge_split_attrs, MergeExecutor}; pub use merge_pipeline::MergePipeline; -pub use merge_planner::MergePlanner; +pub(crate) use merge_planner::{MergePlanner, RunFinalizeMergePolicyAndQuit}; pub use merge_scheduler_service::{schedule_merge, MergePermit, MergeSchedulerService}; pub use merge_split_downloader::MergeSplitDownloader; pub use packager::Packager; diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index 481c4971b71..24bdd361065 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -111,8 +111,8 @@ impl Handler for Publisher { _: DisconnectMergePlanner, _ctx: &ActorContext, ) -> Result<(), quickwit_actors::ActorExitStatus> { - info!("disconnecting merge planner mailbox."); - self.merge_planner_mailbox_opt.take(); + info!("disconnecting merge planner mailbox"); + self.merge_planner_mailbox_opt = None; Ok(()) } } diff --git a/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs b/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs index 3e9290ef57e..45a14a30002 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs @@ -18,15 +18,20 @@ // along with this program. If not, see . use std::collections::HashMap; +use std::ops::RangeInclusive; use quickwit_config::merge_policy_config::ConstWriteAmplificationMergePolicyConfig; use quickwit_config::IndexingSettings; use quickwit_metastore::{SplitMaturity, SplitMetadata}; use time::OffsetDateTime; +use tracing::info; use super::MergeOperation; use crate::merge_policy::MergePolicy; +// Smallest number of splits in a finalize merge. +const FINALIZE_MIN_MERGE_FACTOR: usize = 3; + /// The `ConstWriteAmplificationMergePolicy` has been designed for a use /// case where there are a several index partitions with different sizes, /// and partitions tend to be searched separately. (e.g. partitioning by tenant.) @@ -82,6 +87,8 @@ impl ConstWriteAmplificationMergePolicy { merge_factor: 3, max_merge_factor: 5, maturation_period: Duration::from_secs(3600), + max_finalize_merge_operations: 0, + max_finalize_split_num_docs: None, }; Self::new(config, 10_000_000) } @@ -92,10 +99,11 @@ impl ConstWriteAmplificationMergePolicy { fn single_merge_operation_within_num_merge_op_level( &self, splits: &mut Vec, + merge_factor_range: RangeInclusive, ) -> Option { let mut num_splits_in_merge = 0; let mut num_docs_in_merge = 0; - for split in splits.iter().take(self.config.max_merge_factor) { + for split in splits.iter().take(*merge_factor_range.end()) { num_docs_in_merge += split.num_docs; num_splits_in_merge += 1; if num_docs_in_merge >= self.split_num_docs_target { @@ -103,7 +111,7 @@ impl ConstWriteAmplificationMergePolicy { } } if (num_docs_in_merge < self.split_num_docs_target) - && (num_splits_in_merge < self.config.merge_factor) + && (num_splits_in_merge < *merge_factor_range.start()) { return None; } @@ -123,19 +131,26 @@ impl ConstWriteAmplificationMergePolicy { .then_with(|| left.split_id().cmp(right.split_id())) }); let mut merge_operations = Vec::new(); - while let Some(merge_op) = self.single_merge_operation_within_num_merge_op_level(splits) { + while let Some(merge_op) = + self.single_merge_operation_within_num_merge_op_level(splits, self.merge_factor_range()) + { merge_operations.push(merge_op); } merge_operations } + + fn merge_factor_range(&self) -> RangeInclusive { + self.config.max_merge_factor..=self.config.merge_factor + } } impl MergePolicy for ConstWriteAmplificationMergePolicy { fn operations(&self, splits: &mut Vec) -> Vec { let mut group_by_num_merge_ops: HashMap> = HashMap::default(); let mut mature_splits = Vec::new(); + let now = OffsetDateTime::now_utc(); for split in splits.drain(..) { - if split.is_mature(OffsetDateTime::now_utc()) { + if split.is_mature(now) { mature_splits.push(split); } else { group_by_num_merge_ops @@ -149,11 +164,75 @@ impl MergePolicy for ConstWriteAmplificationMergePolicy { for splits_in_group in group_by_num_merge_ops.values_mut() { let merge_ops = self.merge_operations_within_num_merge_op_level(splits_in_group); merge_operations.extend(merge_ops); + // we readd the splits that are not used in a merge operation into the splits vector. splits.append(splits_in_group); } merge_operations } + fn finalize_operations(&self, splits: &mut Vec) -> Vec { + if self.config.max_finalize_merge_operations == 0 { + return Vec::new(); + } + + let now = OffsetDateTime::now_utc(); + + // We first isolate mature splits. Let's not touch them. + let (mature_splits, mut young_splits): (Vec, Vec) = + splits.drain(..).partition(|split: &SplitMetadata| { + if let Some(max_finalize_split_num_docs) = self.config.max_finalize_split_num_docs { + if split.num_docs > max_finalize_split_num_docs { + return true; + } + } + split.is_mature(now) + }); + splits.extend(mature_splits); + + // We then sort the split by reverse creation date and split id. + // You may notice that reverse is the opposite of the rest of the policy. + // + // This is because these are the youngest splits. If we limit ourselves in the number of + // merge we will operate, we might as well focus on the young == smaller ones for that + // last merge. + young_splits.sort_by(|left, right| { + left.create_timestamp + .cmp(&right.create_timestamp) + .reverse() + .then_with(|| left.split_id().cmp(right.split_id())) + }); + let mut merge_operations = Vec::new(); + while merge_operations.len() < self.config.max_finalize_merge_operations { + let min_merge_factor = FINALIZE_MIN_MERGE_FACTOR.min(self.config.max_merge_factor); + let merge_factor_range = min_merge_factor..=self.config.max_merge_factor; + if let Some(merge_op) = self.single_merge_operation_within_num_merge_op_level( + &mut young_splits, + merge_factor_range, + ) { + merge_operations.push(merge_op); + } else { + break; + } + } + + // We readd the young splits that are not used in any merge operation. + splits.extend(young_splits); + + assert!(merge_operations.len() <= self.config.max_finalize_merge_operations); + + let num_splits_per_merge_op: Vec = + merge_operations.iter().map(|op| op.splits.len()).collect(); + let num_docs_per_merge_op: Vec = merge_operations + .iter() + .map(|op| op.splits.iter().map(|split| split.num_docs).sum::()) + .collect(); + info!( + num_splits_per_merge_op=?num_splits_per_merge_op, + num_docs_per_merge_op=?num_docs_per_merge_op, + "finalize merge operation"); + merge_operations + } + fn split_maturity(&self, split_num_docs: usize, split_num_merge_ops: usize) -> SplitMaturity { if split_num_merge_ops >= self.config.max_merge_ops { return SplitMaturity::Mature; @@ -372,7 +451,7 @@ mod tests { let final_splits = crate::merge_policy::tests::aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &vals[..], - |splits| { + &|splits| { let mut num_merge_ops_counts: HashMap = HashMap::default(); for split in splits { *num_merge_ops_counts.entry(split.num_merge_ops).or_default() += 1; @@ -392,4 +471,118 @@ mod tests { assert_eq!(final_splits.len(), 49); Ok(()) } + + #[tokio::test] + async fn test_simulate_const_write_amplification_merge_policy_with_finalize() { + let mut merge_policy = ConstWriteAmplificationMergePolicy::for_test(); + merge_policy.config.max_merge_factor = 10; + merge_policy.config.merge_factor = 10; + merge_policy.split_num_docs_target = 10_000_000; + + let vals: Vec = vec![1; 9 + 90 + 900]; //< 1_211 splits with a single doc each. + + let num_final_splits_given_max_finalize_merge_operations = + |split_num_docs: Vec, max_finalize_merge_operations: usize| { + let mut merge_policy_clone = merge_policy.clone(); + merge_policy_clone.config.max_finalize_merge_operations = + max_finalize_merge_operations; + async move { + crate::merge_policy::tests::aux_test_simulate_merge_planner_num_docs( + Arc::new(merge_policy_clone), + &split_num_docs[..], + &|_splits| {}, + ) + .await + .unwrap() + } + }; + + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vals.clone(), 0) + .await + .len(), + 27 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vals.clone(), 1) + .await + .len(), + 18 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vals.clone(), 2) + .await + .len(), + 9 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vals.clone(), 3) + .await + .len(), + 3 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vec![1; 6], 1) + .await + .len(), + 1 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vec![1; 3], 1) + .await + .len(), + 1 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vec![1; 2], 1) + .await + .len(), + 2 + ); + + // We check that the youngest splits are merged in priority. + let final_splits = num_final_splits_given_max_finalize_merge_operations( + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + 1, + ) + .await; + assert_eq!(final_splits.len(), 2); + + let mut split_num_docs: Vec = final_splits + .iter() + .map(|split| split.num_docs) + .collect::>(); + split_num_docs.sort(); + assert_eq!(split_num_docs[0], 11); + assert_eq!(split_num_docs[1], 55); + } + + #[tokio::test] + async fn test_simulate_const_write_amplification_merge_policy_with_finalize_max_num_docs() { + let mut merge_policy = ConstWriteAmplificationMergePolicy::for_test(); + merge_policy.config.max_merge_factor = 10; + merge_policy.config.merge_factor = 10; + merge_policy.split_num_docs_target = 10_000_000; + merge_policy.config.max_finalize_split_num_docs = Some(999_999); + merge_policy.config.max_finalize_merge_operations = 3; + + let split_num_docs: Vec = vec![999_999, 1_000_000, 999_999, 999_999]; + + let final_splits = crate::merge_policy::tests::aux_test_simulate_merge_planner_num_docs( + Arc::new(merge_policy), + &split_num_docs[..], + &|_splits| {}, + ) + .await + .unwrap(); + + assert_eq!(final_splits.len(), 2); + let mut split_num_docs: Vec = final_splits + .iter() + .map(|split| split.num_docs) + .collect(); + split_num_docs.sort(); + assert_eq!(split_num_docs[0], 1_000_000); + assert_eq!(split_num_docs[1], 999_999 * 3); + } } diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 95622a11460..06519932bbd 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -151,6 +151,17 @@ pub trait MergePolicy: Send + Sync + fmt::Debug { /// Returns the list of merge operations that should be performed. fn operations(&self, splits: &mut Vec) -> Vec; + /// After the last indexing pipeline has been shutdown, quickwit + /// finishes the ongoing merge operations, and eventually needs to shut it down. + /// + /// This method makes it possible to offer a last list of merge operations before + /// really shutting down the merge policy. + /// + /// This is especially useful for users relying on a one-index-per-day scheme. + fn finalize_operations(&self, _splits: &mut Vec) -> Vec { + Vec::new() + } + /// Returns split maturity. /// A split is either: /// - `Mature` if it does not undergo new merge operations. @@ -166,11 +177,8 @@ pub trait MergePolicy: Send + Sync + fmt::Debug { fn check_is_valid(&self, _merge_op: &MergeOperation, _remaining_splits: &[SplitMetadata]) {} } -pub fn merge_policy_from_settings( - merge_policy_config: MergePolicyConfig, - settings: &IndexingSettings, -) -> Arc { - match merge_policy_config { +pub fn merge_policy_from_settings(settings: &IndexingSettings) -> Arc { + match settings.merge_policy.clone() { MergePolicyConfig::Nop => Arc::new(NopMergePolicy), MergePolicyConfig::ConstWriteAmplification(config) => { let merge_policy = @@ -186,7 +194,7 @@ pub fn merge_policy_from_settings( pub fn default_merge_policy() -> Arc { let indexing_settings = IndexingSettings::default(); - merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings) + merge_policy_from_settings(&indexing_settings) } pub fn nop_merge_policy() -> Arc { @@ -226,6 +234,7 @@ pub mod tests { use super::*; use crate::actors::{ merge_split_attrs, MergePlanner, MergeSchedulerService, MergeSplitDownloader, + RunFinalizeMergePolicyAndQuit, }; use crate::models::{create_split_metadata, NewSplits}; @@ -403,10 +412,10 @@ pub mod tests { merged_split } - pub async fn aux_test_simulate_merge_planner( + async fn aux_test_simulate_merge_planner( merge_policy: Arc, incoming_splits: Vec, - check_final_configuration: CheckFn, + check_final_configuration: &dyn Fn(&[SplitMetadata]), ) -> anyhow::Result> { let universe = Universe::new(); let (merge_task_mailbox, merge_task_inbox) = @@ -421,14 +430,13 @@ pub mod tests { &pipeline_id.merge_pipeline_id(), Vec::new(), merge_policy.clone(), - Arc::new(NopMergePolicy), merge_task_mailbox, universe.get_or_spawn_one::(), ); let mut split_index: HashMap = HashMap::default(); let (merge_planner_mailbox, merge_planner_handler) = universe.spawn_builder().spawn(merge_planner); - let mut split_metadatas: Vec = Vec::new(); + for split in incoming_splits { split_index.insert(split.split_id().to_string(), split.clone()); merge_planner_mailbox @@ -451,9 +459,25 @@ pub mod tests { .send_message(NewSplits { new_splits }) .await?; } - split_metadatas = split_index.values().cloned().collect(); + let split_metadatas: Vec = split_index.values().cloned().collect(); check_final_configuration(&split_metadatas); } + + merge_planner_mailbox + .send_message(RunFinalizeMergePolicyAndQuit) + .await + .unwrap(); + + let obs = merge_planner_handler.process_pending_and_observe().await; + assert_eq!(obs.obs_type, quickwit_actors::ObservationType::PostMortem); + + let merge_tasks = merge_task_inbox.drain_for_test_typed::(); + for merge_task in merge_tasks { + apply_merge(&merge_policy, &mut split_index, &merge_task); + } + + let split_metadatas: Vec = split_index.values().cloned().collect(); + universe.assert_quit().await; Ok(split_metadatas) } @@ -481,10 +505,10 @@ pub mod tests { } } - pub async fn aux_test_simulate_merge_planner_num_docs( + pub async fn aux_test_simulate_merge_planner_num_docs( merge_policy: Arc, batch_num_docs: &[usize], - check_final_configuration: CheckFn, + check_final_configuration: &dyn Fn(&[SplitMetadata]), ) -> anyhow::Result> { let split_metadatas: Vec = batch_num_docs .iter() diff --git a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs index c3858f55dca..a486ea02cd8 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs @@ -662,7 +662,7 @@ mod tests { aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &vec![10_000; 100_000], - |splits| { + &|splits| { let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); assert!(splits.len() <= merge_policy.max_num_splits_ideal_case(num_docs)) }, @@ -691,7 +691,7 @@ mod tests { aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &batch_num_docs, - |splits| { + &|splits| { let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); assert!(splits.len() <= merge_policy.max_num_splits_worst_case(num_docs)); }, @@ -708,7 +708,7 @@ mod tests { aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &batch_num_docs, - |splits| { + &|splits| { let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); assert!(splits.len() <= merge_policy.max_num_splits_worst_case(num_docs)); }, @@ -723,7 +723,7 @@ mod tests { aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &vec![10_000; 1_000], - |splits| { + &|splits| { let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); assert!(splits.len() <= merge_policy.max_num_splits_ideal_case(num_docs)); }, @@ -739,7 +739,7 @@ mod tests { aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &vals[..], - |splits| { + &|splits| { let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); assert!(splits.len() <= merge_policy.max_num_splits_worst_case(num_docs)); }, diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 072f7338af3..5c25c7ae1c7 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -176,10 +176,7 @@ impl DeleteTaskPipeline { ctx.spawn_actor().supervise(publisher); let split_store = IndexingSplitStore::create_without_local_store_for_test(self.index_storage.clone()); - let merge_policy = merge_policy_from_settings( - index_config.indexing_settings.merge_policy.clone(), - &index_config.indexing_settings, - ); + let merge_policy = merge_policy_from_settings(&index_config.indexing_settings); let uploader = Uploader::new( UploaderType::DeleteUploader, self.metastore.clone(),