From d0ce1723ed60e9fc8decea1dcd51bcd9bf0e1863 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Wed, 2 Oct 2024 16:49:33 -0400 Subject: [PATCH] Better shutdown of merge pipelines. Before this PR, when the last indexing pipeline is shutdown, the merge pipeline associated to an index would be abruptly shutdown too. This PR makes two changes to this behavior. First, we wait for ongoing or pending merges to be executed before shutting down the pipeline. Second, merge policy get the opportunity to offer a list of extra merges to run. This functionality is introduced to help users who migrated from elasticsearch and rely on a daily indexes. Closes #5474 --- .../src/merge_policy_config.rs | 19 ++ .../src/actors/indexing_service.rs | 36 ++-- .../src/actors/merge_pipeline.rs | 122 +++++++++-- .../src/actors/merge_planner.rs | 40 +++- quickwit/quickwit-indexing/src/actors/mod.rs | 2 +- .../quickwit-indexing/src/actors/publisher.rs | 20 ++ .../merge_policy/const_write_amplification.rs | 201 +++++++++++++++++- .../quickwit-indexing/src/merge_policy/mod.rs | 50 ++++- .../merge_policy/stable_log_merge_policy.rs | 10 +- 9 files changed, 437 insertions(+), 63 deletions(-) diff --git a/quickwit/quickwit-config/src/merge_policy_config.rs b/quickwit/quickwit-config/src/merge_policy_config.rs index 2f96ed49d78..a4ed1010e73 100644 --- a/quickwit/quickwit-config/src/merge_policy_config.rs +++ b/quickwit/quickwit-config/src/merge_policy_config.rs @@ -21,6 +21,10 @@ use std::time::Duration; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; +fn is_zero(value: &usize) -> bool { + *value == 0 +} + #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct ConstWriteAmplificationMergePolicyConfig { @@ -42,6 +46,15 @@ pub struct ConstWriteAmplificationMergePolicyConfig { #[serde(deserialize_with = "parse_human_duration")] #[serde(serialize_with = "serialize_duration")] pub maturation_period: Duration, + #[serde(default)] + #[serde(skip_serializing_if = "is_zero")] + pub max_finalize_merge_operations: usize, + /// Splits with a number of docs higher than + /// `max_finalize_split_num_docs` will not be considered + /// for finalize split merge operations. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub max_finalize_split_num_docs: Option, } impl Default for ConstWriteAmplificationMergePolicyConfig { @@ -51,6 +64,8 @@ impl Default for ConstWriteAmplificationMergePolicyConfig { merge_factor: default_merge_factor(), max_merge_factor: default_max_merge_factor(), maturation_period: default_maturation_period(), + max_finalize_merge_operations: 0, + max_finalize_split_num_docs: None, } } } @@ -146,6 +161,10 @@ impl Default for MergePolicyConfig { } impl MergePolicyConfig { + pub fn noop() -> Self { + MergePolicyConfig::Nop + } + pub fn validate(&self) -> anyhow::Result<()> { let (merge_factor, max_merge_factor) = match self { MergePolicyConfig::Nop => { diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index df71cc92ea4..7f50f9c3ad7 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -27,8 +27,8 @@ use async_trait::async_trait; use futures::TryStreamExt; use itertools::Itertools; use quickwit_actors::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, Mailbox, - Observation, + Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Command, Handler, Healthz, + Mailbox, Observation, }; use quickwit_cluster::Cluster; use quickwit_common::fs::get_cache_directory_path; @@ -504,21 +504,27 @@ impl IndexingService { .merge_pipeline_handles .remove_entry(&merge_pipeline_to_shutdown) { - // We kill the merge pipeline to avoid waiting a merge operation to finish as it can - // be long. + // We gracefully shutdown the merge pipeline, so we can complete the in-flight + // merges. info!( index_uid=%merge_pipeline_to_shutdown.index_uid, source_id=%merge_pipeline_to_shutdown.source_id, - "no more indexing pipeline on this index and source, killing merge pipeline" + "shutting down orphan merge pipeline" ); - merge_pipeline_handle.handle.kill().await; + // The queue capacity of the merge pipeline is unbounded, so `.send_message(...)` + // should not block. + // We avoid using `.quit()` here because it waits for the actor to exit. + merge_pipeline_handle + .handle + .mailbox() + .send_message(Command::Quit) + .await + .expect("merge pipeline mailbox should not be full"); } } - // Finally remove the merge pipeline with an exit status. + // Finally, we remove the completed or failed merge pipelines. self.merge_pipeline_handles - .retain(|_, merge_pipeline_mailbox_handle| { - merge_pipeline_mailbox_handle.handle.state().is_running() - }); + .retain(|_, merge_pipeline_handle| merge_pipeline_handle.handle.state().is_running()); self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len(); self.update_chitchat_running_plan().await; @@ -543,23 +549,23 @@ impl IndexingService { immature_splits_opt: Option>, ctx: &ActorContext, ) -> Result, IndexingError> { - if let Some(merge_pipeline_mailbox_handle) = self + if let Some(merge_pipeline_handle) = self .merge_pipeline_handles .get(&merge_pipeline_params.pipeline_id) { - return Ok(merge_pipeline_mailbox_handle.mailbox.clone()); + return Ok(merge_pipeline_handle.mailbox.clone()); } let merge_pipeline_id = merge_pipeline_params.pipeline_id.clone(); let merge_pipeline = MergePipeline::new(merge_pipeline_params, immature_splits_opt, ctx.spawn_ctx()); let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone(); let (_pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(merge_pipeline); - let merge_pipeline_mailbox_handle = MergePipelineHandle { + let merge_pipeline_handle = MergePipelineHandle { mailbox: merge_planner_mailbox.clone(), handle: pipeline_handle, }; self.merge_pipeline_handles - .insert(merge_pipeline_id, merge_pipeline_mailbox_handle); + .insert(merge_pipeline_id, merge_pipeline_handle); self.counters.num_running_merge_pipelines += 1; Ok(merge_planner_mailbox) } @@ -1190,7 +1196,7 @@ mod tests { #[tokio::test] async fn test_indexing_service_apply_plan() { - const PARAMS_FINGERPRINT: u64 = 3865067856550546352u64; + const PARAMS_FINGERPRINT: u64 = 3865067856550546352; quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 2e3475f5759..0e7e5c998f1 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -42,7 +42,8 @@ use time::OffsetDateTime; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; -use super::MergeSchedulerService; +use super::publisher::DisconnectMergePlanner; +use super::{MergeSchedulerService, RunFinalizeMergePolicyAndQuit}; use crate::actors::indexing_pipeline::wait_duration_before_retry; use crate::actors::merge_split_downloader::MergeSplitDownloader; use crate::actors::publisher::PublisherType; @@ -56,6 +57,22 @@ use crate::split_store::IndexingSplitStore; /// concurrently. static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10); +/// Instructs the merge pipeline that it should stops itself. +/// Merge that have already been scheduled are not aborted. +/// +/// In addition, the finalizer merge policy will be executed to schedule a few +/// additional merges. +/// +/// After reception the `FinalizeAndClosePipeline`, the merge pipeline loop will +/// be disconnected. In other words, the connection from the merge publisher to +/// the merge planner will be cut, so that the merge pipeline will terminate naturally. +/// +/// Supervisation will still exist. However it will not restart the pipeline +/// in case of failure, it will just kill all of the merge pipeline actors. (for +/// instance, if one of the actor is stuck). +#[derive(Debug, Clone, Copy)] +pub struct FinishPendingMergesAndShutdownPipeline; + struct MergePipelineHandles { merge_planner: ActorHandle, merge_split_downloader: ActorHandle, @@ -96,6 +113,8 @@ pub struct MergePipeline { kill_switch: KillSwitch, /// Immature splits passed to the merge planner the first time the pipeline is spawned. initial_immature_splits_opt: Option>, + // After it is set to true, we don't respawn pipeline actors if they fail. + shutdown_initiated: bool, } #[async_trait] @@ -141,6 +160,7 @@ impl MergePipeline { merge_planner_inbox, merge_planner_mailbox, initial_immature_splits_opt, + shutdown_initiated: false, } } @@ -251,7 +271,7 @@ impl MergePipeline { Some(self.merge_planner_mailbox.clone()), None, ); - let (merge_publisher_mailbox, merge_publisher_handler) = ctx + let (merge_publisher_mailbox, merge_publisher_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .set_backpressure_micros_counter( @@ -271,7 +291,7 @@ impl MergePipeline { self.params.max_concurrent_split_uploads, self.params.event_broker.clone(), ); - let (merge_uploader_mailbox, merge_uploader_handler) = ctx + let (merge_uploader_mailbox, merge_uploader_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .spawn(merge_uploader); @@ -279,7 +299,7 @@ impl MergePipeline { // Merge Packager let tag_fields = self.params.doc_mapper.tag_named_fields()?; let merge_packager = Packager::new("MergePackager", tag_fields, merge_uploader_mailbox); - let (merge_packager_mailbox, merge_packager_handler) = ctx + let (merge_packager_mailbox, merge_packager_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .spawn(merge_packager); @@ -300,7 +320,7 @@ impl MergePipeline { merge_executor_io_controls, merge_packager_mailbox, ); - let (merge_executor_mailbox, merge_executor_handler) = ctx + let (merge_executor_mailbox, merge_executor_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .set_backpressure_micros_counter( @@ -316,7 +336,7 @@ impl MergePipeline { executor_mailbox: merge_executor_mailbox, io_controls: split_downloader_io_controls, }; - let (merge_split_downloader_mailbox, merge_split_downloader_handler) = ctx + let (merge_split_downloader_mailbox, merge_split_downloader_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .set_backpressure_micros_counter( @@ -334,7 +354,7 @@ impl MergePipeline { merge_split_downloader_mailbox, self.params.merge_scheduler_service.clone(), ); - let (_, merge_planner_handler) = ctx + let (_, merge_planner_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .set_mailboxes( @@ -346,12 +366,12 @@ impl MergePipeline { self.previous_generations_statistics = self.statistics.clone(); self.statistics.generation += 1; self.handles_opt = Some(MergePipelineHandles { - merge_planner: merge_planner_handler, - merge_split_downloader: merge_split_downloader_handler, - merge_executor: merge_executor_handler, - merge_packager: merge_packager_handler, - merge_uploader: merge_uploader_handler, - merge_publisher: merge_publisher_handler, + merge_planner: merge_planner_handle, + merge_split_downloader: merge_split_downloader_handle, + merge_executor: merge_executor_handle, + merge_packager: merge_packager_handle, + merge_uploader: merge_uploader_handle, + merge_publisher: merge_publisher_handle, next_check_for_progress: Instant::now() + *HEARTBEAT, }); Ok(()) @@ -359,14 +379,14 @@ impl MergePipeline { async fn terminate(&mut self) { self.kill_switch.kill(); - if let Some(handlers) = self.handles_opt.take() { + if let Some(handles) = self.handles_opt.take() { tokio::join!( - handlers.merge_planner.kill(), - handlers.merge_split_downloader.kill(), - handlers.merge_executor.kill(), - handlers.merge_packager.kill(), - handlers.merge_uploader.kill(), - handlers.merge_publisher.kill(), + handles.merge_planner.kill(), + handles.merge_split_downloader.kill(), + handles.merge_executor.kill(), + handles.merge_packager.kill(), + handles.merge_uploader.kill(), + handles.merge_publisher.kill(), ); } } @@ -412,6 +432,7 @@ impl MergePipeline { ctx.schedule_self_msg(*quickwit_actors::HEARTBEAT, Spawn { retry_count: 0 }); } Health::Success => { + info!(index_uid=%self.params.pipeline_id.index_uid, "merge pipeline success, shutting down"); return Err(ActorExitStatus::Success); } } @@ -467,6 +488,45 @@ impl Handler for MergePipeline { } } +#[async_trait] +impl Handler for MergePipeline { + type Reply = (); + async fn handle( + &mut self, + _: FinishPendingMergesAndShutdownPipeline, + _ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + info!(index_uid=%self.params.pipeline_id.index_uid, "shutdown merge pipeline initiated"); + // From now on, we will not respawn the pipeline if it fails. + self.shutdown_initiated = true; + if let Some(handles) = &self.handles_opt { + // This disconnects the merge planner from the merge publisher, + // breaking the merge planner pipeline loop. + // + // As a result, the pipeline will naturally terminate + // once all of the pending / ongoing merge operations are completed. + let _ = handles + .merge_publisher + .mailbox() + .send_message(DisconnectMergePlanner) + .await; + + // We also initiate the merge planner finalization routine. + // Depending on the merge policy, it may emit a few more merge + // operations. + let _ = handles + .merge_planner + .mailbox() + .send_message(RunFinalizeMergePolicyAndQuit) + .await; + } else { + // we won't respawn the pipeline in the future, so there is nothing + // to do here. + } + Ok(()) + } +} + #[async_trait] impl Handler for MergePipeline { type Reply = (); @@ -476,6 +536,9 @@ impl Handler for MergePipeline { spawn: Spawn, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { + if self.shutdown_initiated { + return Ok(()); + } if self.handles_opt.is_some() { return Ok(()); } @@ -530,6 +593,7 @@ mod tests { use quickwit_storage::RamStorage; use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; + use crate::actors::{MergePlanner, Publisher}; use crate::merge_policy::default_merge_policy; use crate::IndexingSplitStore; @@ -576,12 +640,24 @@ mod tests { event_broker: Default::default(), }; let pipeline = MergePipeline::new(pipeline_params, None, universe.spawn_ctx()); - let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); - let (pipeline_exit_status, pipeline_statistics) = pipeline_handler.quit().await; + let _merge_planner_mailbox = pipeline.merge_planner_mailbox().clone(); + let (pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); + pipeline_mailbox + .ask(super::FinishPendingMergesAndShutdownPipeline) + .await + .unwrap(); + + let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.join().await; assert_eq!(pipeline_statistics.generation, 1); assert_eq!(pipeline_statistics.num_spawn_attempts, 1); assert_eq!(pipeline_statistics.num_published_splits, 0); - assert!(matches!(pipeline_exit_status, ActorExitStatus::Quit)); + assert!(matches!(pipeline_exit_status, ActorExitStatus::Success)); + + // Checking that the merge pipeline actors have been properly cleaned up. + assert!(universe.get_one::().is_none()); + assert!(universe.get_one::().is_none()); + assert!(universe.get_one::().is_none()); + universe.assert_quit().await; Ok(()) } diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 7b04513056d..900e2122eb9 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -38,6 +38,9 @@ use crate::merge_policy::MergeOperation; use crate::models::NewSplits; use crate::MergePolicy; +#[derive(Debug)] +pub(crate) struct RunFinalizeMergePolicyAndQuit; + #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct MergePartition { partition_id: u64, @@ -78,6 +81,7 @@ pub struct MergePlanner { known_split_ids_recompute_attempt_id: usize, merge_policy: Arc, + merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, @@ -126,6 +130,22 @@ impl Actor for MergePlanner { } } +#[async_trait] +impl Handler for MergePlanner { + type Reply = (); + + async fn handle( + &mut self, + _plan_merge: RunFinalizeMergePolicyAndQuit, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + // Note we ignore messages that could be coming from a different incarnation. + // (See comment on `Self::incarnation_start_at`.) + self.send_merge_ops(true, ctx).await?; + Err(ActorExitStatus::Success) + } +} + #[async_trait] impl Handler for MergePlanner { type Reply = (); @@ -138,7 +158,7 @@ impl Handler for MergePlanner { if plan_merge.incarnation_started_at == self.incarnation_started_at { // Note we ignore messages that could be coming from a different incarnation. // (See comment on `Self::incarnation_start_at`.) - self.send_merge_ops(ctx).await?; + self.send_merge_ops(false, ctx).await?; } self.recompute_known_splits_if_necessary(); Ok(()) @@ -155,7 +175,7 @@ impl Handler for MergePlanner { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { self.record_splits_if_necessary(new_splits.new_splits); - self.send_merge_ops(ctx).await?; + self.send_merge_ops(false, ctx).await?; self.recompute_known_splits_if_necessary(); Ok(()) } @@ -273,12 +293,18 @@ impl MergePlanner { } async fn compute_merge_ops( &mut self, + is_finalize: bool, ctx: &ActorContext, ) -> Result, ActorExitStatus> { let mut merge_operations = Vec::new(); for young_splits in self.partitioned_young_splits.values_mut() { if !young_splits.is_empty() { - merge_operations.extend(self.merge_policy.operations(young_splits)); + let operations = if is_finalize { + self.merge_policy.finalize_operations(young_splits) + } else { + self.merge_policy.operations(young_splits) + }; + merge_operations.extend(operations); } ctx.record_progress(); ctx.yield_now().await; @@ -289,13 +315,17 @@ impl MergePlanner { Ok(merge_operations) } - async fn send_merge_ops(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { + async fn send_merge_ops( + &mut self, + is_finalize: bool, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { // We identify all of the merge operations we want to run and leave it // to the merge scheduler to decide in which order these should be scheduled. // // The merge scheduler has the merit of knowing about merge operations from other // index as well. - let merge_ops = self.compute_merge_ops(ctx).await?; + let merge_ops = self.compute_merge_ops(is_finalize, ctx).await?; for merge_operation in merge_ops { info!(merge_operation=?merge_operation, "schedule merge operation"); let tracked_merge_operation = self diff --git a/quickwit/quickwit-indexing/src/actors/mod.rs b/quickwit/quickwit-indexing/src/actors/mod.rs index 9552a426e29..ab70f06df35 100644 --- a/quickwit/quickwit-indexing/src/actors/mod.rs +++ b/quickwit/quickwit-indexing/src/actors/mod.rs @@ -42,7 +42,7 @@ pub use indexing_pipeline::{IndexingPipeline, IndexingPipelineParams}; pub use indexing_service::{IndexingService, IndexingServiceCounters, INDEXING_DIR_NAME}; pub use merge_executor::{combine_partition_ids, merge_split_attrs, MergeExecutor}; pub use merge_pipeline::MergePipeline; -pub use merge_planner::MergePlanner; +pub(crate) use merge_planner::{MergePlanner, RunFinalizeMergePolicyAndQuit}; pub use merge_scheduler_service::{schedule_merge, MergePermit, MergeSchedulerService}; pub use merge_split_downloader::MergeSplitDownloader; pub use packager::Packager; diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index 4e999e7f7a2..24bdd361065 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -51,6 +51,11 @@ impl PublisherType { } } +/// Disconnect the merge planner loop back. +/// This message is used to cut the merge pipeline loop, and let it terminate. +#[derive(Debug)] +pub(crate) struct DisconnectMergePlanner; + #[derive(Clone)] pub struct Publisher { publisher_type: PublisherType, @@ -97,6 +102,21 @@ impl Actor for Publisher { } } +#[async_trait] +impl Handler for Publisher { + type Reply = (); + + async fn handle( + &mut self, + _: DisconnectMergePlanner, + _ctx: &ActorContext, + ) -> Result<(), quickwit_actors::ActorExitStatus> { + info!("disconnecting merge planner mailbox"); + self.merge_planner_mailbox_opt = None; + Ok(()) + } +} + #[async_trait] impl Handler for Publisher { type Reply = (); diff --git a/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs b/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs index 3e9290ef57e..38e7b72c981 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs @@ -18,15 +18,20 @@ // along with this program. If not, see . use std::collections::HashMap; +use std::ops::RangeInclusive; use quickwit_config::merge_policy_config::ConstWriteAmplificationMergePolicyConfig; use quickwit_config::IndexingSettings; use quickwit_metastore::{SplitMaturity, SplitMetadata}; use time::OffsetDateTime; +use tracing::info; use super::MergeOperation; use crate::merge_policy::MergePolicy; +// Smallest number of splits in a finalize merge. +const FINALIZE_MIN_MERGE_FACTOR: usize = 3; + /// The `ConstWriteAmplificationMergePolicy` has been designed for a use /// case where there are a several index partitions with different sizes, /// and partitions tend to be searched separately. (e.g. partitioning by tenant.) @@ -82,6 +87,8 @@ impl ConstWriteAmplificationMergePolicy { merge_factor: 3, max_merge_factor: 5, maturation_period: Duration::from_secs(3600), + max_finalize_merge_operations: 0, + max_finalize_split_num_docs: None, }; Self::new(config, 10_000_000) } @@ -92,10 +99,11 @@ impl ConstWriteAmplificationMergePolicy { fn single_merge_operation_within_num_merge_op_level( &self, splits: &mut Vec, + merge_factor_range: RangeInclusive, ) -> Option { let mut num_splits_in_merge = 0; let mut num_docs_in_merge = 0; - for split in splits.iter().take(self.config.max_merge_factor) { + for split in splits.iter().take(*merge_factor_range.end()) { num_docs_in_merge += split.num_docs; num_splits_in_merge += 1; if num_docs_in_merge >= self.split_num_docs_target { @@ -103,7 +111,7 @@ impl ConstWriteAmplificationMergePolicy { } } if (num_docs_in_merge < self.split_num_docs_target) - && (num_splits_in_merge < self.config.merge_factor) + && (num_splits_in_merge < *merge_factor_range.start()) { return None; } @@ -123,19 +131,26 @@ impl ConstWriteAmplificationMergePolicy { .then_with(|| left.split_id().cmp(right.split_id())) }); let mut merge_operations = Vec::new(); - while let Some(merge_op) = self.single_merge_operation_within_num_merge_op_level(splits) { + while let Some(merge_op) = + self.single_merge_operation_within_num_merge_op_level(splits, self.merge_factor_range()) + { merge_operations.push(merge_op); } merge_operations } + + fn merge_factor_range(&self) -> RangeInclusive { + self.config.merge_factor..=self.config.max_merge_factor + } } impl MergePolicy for ConstWriteAmplificationMergePolicy { fn operations(&self, splits: &mut Vec) -> Vec { let mut group_by_num_merge_ops: HashMap> = HashMap::default(); let mut mature_splits = Vec::new(); + let now = OffsetDateTime::now_utc(); for split in splits.drain(..) { - if split.is_mature(OffsetDateTime::now_utc()) { + if split.is_mature(now) { mature_splits.push(split); } else { group_by_num_merge_ops @@ -149,11 +164,75 @@ impl MergePolicy for ConstWriteAmplificationMergePolicy { for splits_in_group in group_by_num_merge_ops.values_mut() { let merge_ops = self.merge_operations_within_num_merge_op_level(splits_in_group); merge_operations.extend(merge_ops); + // we readd the splits that are not used in a merge operation into the splits vector. splits.append(splits_in_group); } merge_operations } + fn finalize_operations(&self, splits: &mut Vec) -> Vec { + if self.config.max_finalize_merge_operations == 0 { + return Vec::new(); + } + + let now = OffsetDateTime::now_utc(); + + // We first isolate mature splits. Let's not touch them. + let (mature_splits, mut young_splits): (Vec, Vec) = + splits.drain(..).partition(|split: &SplitMetadata| { + if let Some(max_finalize_split_num_docs) = self.config.max_finalize_split_num_docs { + if split.num_docs > max_finalize_split_num_docs { + return true; + } + } + split.is_mature(now) + }); + splits.extend(mature_splits); + + // We then sort the split by reverse creation date and split id. + // You may notice that reverse is the opposite of the rest of the policy. + // + // This is because these are the youngest splits. If we limit ourselves in the number of + // merge we will operate, we might as well focus on the young == smaller ones for that + // last merge. + young_splits.sort_by(|left, right| { + left.create_timestamp + .cmp(&right.create_timestamp) + .reverse() + .then_with(|| left.split_id().cmp(right.split_id())) + }); + let mut merge_operations = Vec::new(); + while merge_operations.len() < self.config.max_finalize_merge_operations { + let min_merge_factor = FINALIZE_MIN_MERGE_FACTOR.min(self.config.max_merge_factor); + let merge_factor_range = min_merge_factor..=self.config.max_merge_factor; + if let Some(merge_op) = self.single_merge_operation_within_num_merge_op_level( + &mut young_splits, + merge_factor_range, + ) { + merge_operations.push(merge_op); + } else { + break; + } + } + + // We readd the young splits that are not used in any merge operation. + splits.extend(young_splits); + + assert!(merge_operations.len() <= self.config.max_finalize_merge_operations); + + let num_splits_per_merge_op: Vec = + merge_operations.iter().map(|op| op.splits.len()).collect(); + let num_docs_per_merge_op: Vec = merge_operations + .iter() + .map(|op| op.splits.iter().map(|split| split.num_docs).sum::()) + .collect(); + info!( + num_splits_per_merge_op=?num_splits_per_merge_op, + num_docs_per_merge_op=?num_docs_per_merge_op, + "finalize merge operation"); + merge_operations + } + fn split_maturity(&self, split_num_docs: usize, split_num_merge_ops: usize) -> SplitMaturity { if split_num_merge_ops >= self.config.max_merge_ops { return SplitMaturity::Mature; @@ -372,7 +451,7 @@ mod tests { let final_splits = crate::merge_policy::tests::aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &vals[..], - |splits| { + &|splits| { let mut num_merge_ops_counts: HashMap = HashMap::default(); for split in splits { *num_merge_ops_counts.entry(split.num_merge_ops).or_default() += 1; @@ -392,4 +471,116 @@ mod tests { assert_eq!(final_splits.len(), 49); Ok(()) } + + #[tokio::test] + async fn test_simulate_const_write_amplification_merge_policy_with_finalize() { + let mut merge_policy = ConstWriteAmplificationMergePolicy::for_test(); + merge_policy.config.max_merge_factor = 10; + merge_policy.config.merge_factor = 10; + merge_policy.split_num_docs_target = 10_000_000; + + let vals: Vec = vec![1; 9 + 90 + 900]; //< 1_211 splits with a single doc each. + + let num_final_splits_given_max_finalize_merge_operations = + |split_num_docs: Vec, max_finalize_merge_operations: usize| { + let mut merge_policy_clone = merge_policy.clone(); + merge_policy_clone.config.max_finalize_merge_operations = + max_finalize_merge_operations; + async move { + crate::merge_policy::tests::aux_test_simulate_merge_planner_num_docs( + Arc::new(merge_policy_clone), + &split_num_docs[..], + &|_splits| {}, + ) + .await + .unwrap() + } + }; + + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vals.clone(), 0) + .await + .len(), + 27 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vals.clone(), 1) + .await + .len(), + 18 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vals.clone(), 2) + .await + .len(), + 9 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vals.clone(), 3) + .await + .len(), + 3 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vec![1; 6], 1) + .await + .len(), + 1 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vec![1; 3], 1) + .await + .len(), + 1 + ); + assert_eq!( + num_final_splits_given_max_finalize_merge_operations(vec![1; 2], 1) + .await + .len(), + 2 + ); + + // We check that the youngest splits are merged in priority. + let final_splits = num_final_splits_given_max_finalize_merge_operations( + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + 1, + ) + .await; + assert_eq!(final_splits.len(), 2); + + let mut split_num_docs: Vec = final_splits + .iter() + .map(|split| split.num_docs) + .collect::>(); + split_num_docs.sort(); + assert_eq!(split_num_docs[0], 11); + assert_eq!(split_num_docs[1], 55); + } + + #[tokio::test] + async fn test_simulate_const_write_amplification_merge_policy_with_finalize_max_num_docs() { + let mut merge_policy = ConstWriteAmplificationMergePolicy::for_test(); + merge_policy.config.max_merge_factor = 10; + merge_policy.config.merge_factor = 10; + merge_policy.split_num_docs_target = 10_000_000; + merge_policy.config.max_finalize_split_num_docs = Some(999_999); + merge_policy.config.max_finalize_merge_operations = 3; + + let split_num_docs: Vec = vec![999_999, 1_000_000, 999_999, 999_999]; + + let final_splits = crate::merge_policy::tests::aux_test_simulate_merge_planner_num_docs( + Arc::new(merge_policy), + &split_num_docs[..], + &|_splits| {}, + ) + .await + .unwrap(); + + assert_eq!(final_splits.len(), 2); + let mut split_num_docs: Vec = + final_splits.iter().map(|split| split.num_docs).collect(); + split_num_docs.sort(); + assert_eq!(split_num_docs[0], 1_000_000); + assert_eq!(split_num_docs[1], 999_999 * 3); + } } diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 69fccbbede0..06519932bbd 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -151,6 +151,17 @@ pub trait MergePolicy: Send + Sync + fmt::Debug { /// Returns the list of merge operations that should be performed. fn operations(&self, splits: &mut Vec) -> Vec; + /// After the last indexing pipeline has been shutdown, quickwit + /// finishes the ongoing merge operations, and eventually needs to shut it down. + /// + /// This method makes it possible to offer a last list of merge operations before + /// really shutting down the merge policy. + /// + /// This is especially useful for users relying on a one-index-per-day scheme. + fn finalize_operations(&self, _splits: &mut Vec) -> Vec { + Vec::new() + } + /// Returns split maturity. /// A split is either: /// - `Mature` if it does not undergo new merge operations. @@ -167,8 +178,7 @@ pub trait MergePolicy: Send + Sync + fmt::Debug { } pub fn merge_policy_from_settings(settings: &IndexingSettings) -> Arc { - let merge_policy_config = settings.merge_policy.clone(); - match merge_policy_config { + match settings.merge_policy.clone() { MergePolicyConfig::Nop => Arc::new(NopMergePolicy), MergePolicyConfig::ConstWriteAmplification(config) => { let merge_policy = @@ -183,7 +193,12 @@ pub fn merge_policy_from_settings(settings: &IndexingSettings) -> Arc Arc { - merge_policy_from_settings(&IndexingSettings::default()) + let indexing_settings = IndexingSettings::default(); + merge_policy_from_settings(&indexing_settings) +} + +pub fn nop_merge_policy() -> Arc { + Arc::new(NopMergePolicy) } struct SplitShortDebug<'a>(&'a SplitMetadata); @@ -219,6 +234,7 @@ pub mod tests { use super::*; use crate::actors::{ merge_split_attrs, MergePlanner, MergeSchedulerService, MergeSplitDownloader, + RunFinalizeMergePolicyAndQuit, }; use crate::models::{create_split_metadata, NewSplits}; @@ -396,10 +412,10 @@ pub mod tests { merged_split } - pub async fn aux_test_simulate_merge_planner( + async fn aux_test_simulate_merge_planner( merge_policy: Arc, incoming_splits: Vec, - check_final_configuration: CheckFn, + check_final_configuration: &dyn Fn(&[SplitMetadata]), ) -> anyhow::Result> { let universe = Universe::new(); let (merge_task_mailbox, merge_task_inbox) = @@ -420,7 +436,7 @@ pub mod tests { let mut split_index: HashMap = HashMap::default(); let (merge_planner_mailbox, merge_planner_handler) = universe.spawn_builder().spawn(merge_planner); - let mut split_metadatas: Vec = Vec::new(); + for split in incoming_splits { split_index.insert(split.split_id().to_string(), split.clone()); merge_planner_mailbox @@ -443,9 +459,25 @@ pub mod tests { .send_message(NewSplits { new_splits }) .await?; } - split_metadatas = split_index.values().cloned().collect(); + let split_metadatas: Vec = split_index.values().cloned().collect(); check_final_configuration(&split_metadatas); } + + merge_planner_mailbox + .send_message(RunFinalizeMergePolicyAndQuit) + .await + .unwrap(); + + let obs = merge_planner_handler.process_pending_and_observe().await; + assert_eq!(obs.obs_type, quickwit_actors::ObservationType::PostMortem); + + let merge_tasks = merge_task_inbox.drain_for_test_typed::(); + for merge_task in merge_tasks { + apply_merge(&merge_policy, &mut split_index, &merge_task); + } + + let split_metadatas: Vec = split_index.values().cloned().collect(); + universe.assert_quit().await; Ok(split_metadatas) } @@ -473,10 +505,10 @@ pub mod tests { } } - pub async fn aux_test_simulate_merge_planner_num_docs( + pub async fn aux_test_simulate_merge_planner_num_docs( merge_policy: Arc, batch_num_docs: &[usize], - check_final_configuration: CheckFn, + check_final_configuration: &dyn Fn(&[SplitMetadata]), ) -> anyhow::Result> { let split_metadatas: Vec = batch_num_docs .iter() diff --git a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs index c3858f55dca..a486ea02cd8 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs @@ -662,7 +662,7 @@ mod tests { aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &vec![10_000; 100_000], - |splits| { + &|splits| { let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); assert!(splits.len() <= merge_policy.max_num_splits_ideal_case(num_docs)) }, @@ -691,7 +691,7 @@ mod tests { aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &batch_num_docs, - |splits| { + &|splits| { let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); assert!(splits.len() <= merge_policy.max_num_splits_worst_case(num_docs)); }, @@ -708,7 +708,7 @@ mod tests { aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &batch_num_docs, - |splits| { + &|splits| { let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); assert!(splits.len() <= merge_policy.max_num_splits_worst_case(num_docs)); }, @@ -723,7 +723,7 @@ mod tests { aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &vec![10_000; 1_000], - |splits| { + &|splits| { let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); assert!(splits.len() <= merge_policy.max_num_splits_ideal_case(num_docs)); }, @@ -739,7 +739,7 @@ mod tests { aux_test_simulate_merge_planner_num_docs( Arc::new(merge_policy.clone()), &vals[..], - |splits| { + &|splits| { let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); assert!(splits.len() <= merge_policy.max_num_splits_worst_case(num_docs)); },