Skip to content

Commit

Permalink
Introduce an explicit message to terminate merge pipelines, and
Browse files Browse the repository at this point in the history
introducing finalization merge policy.
  • Loading branch information
fulmicoton committed Oct 4, 2024
1 parent fe61044 commit bed2ae7
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 17 deletions.
3 changes: 3 additions & 0 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ pub struct IndexingSettings {
pub merge_policy: MergePolicyConfig,
#[serde(default)]
pub resources: IndexingResources,
#[serde(default = "MergePolicyConfig::noop")]
pub finalize_merge_policy: MergePolicyConfig,
}

impl IndexingSettings {
Expand Down Expand Up @@ -161,6 +163,7 @@ impl Default for IndexingSettings {
docstore_compression_level: Self::default_docstore_compression_level(),
split_num_docs_target: Self::default_split_num_docs_target(),
merge_policy: MergePolicyConfig::default(),
finalize_merge_policy: MergePolicyConfig::noop(),
resources: IndexingResources::default(),
}
}
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-config/src/merge_policy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ impl Default for MergePolicyConfig {
}

impl MergePolicyConfig {
pub fn noop() -> Self {
MergePolicyConfig::Nop
}

pub fn validate(&self) -> anyhow::Result<()> {
let (merge_factor, max_merge_factor) = match self {
MergePolicyConfig::Nop => {
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ mod tests {

use super::{IndexingPipeline, *};
use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::merge_policy::default_merge_policy;
use crate::merge_policy::{default_merge_policy, NopMergePolicy};

#[test]
fn test_wait_duration() {
Expand Down Expand Up @@ -908,6 +908,7 @@ mod tests {
metastore: metastore.clone(),
split_store: split_store.clone(),
merge_policy: default_merge_policy(),
finalize_merge_policy: Arc::new(NopMergePolicy),
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
merge_scheduler_service: universe.get_or_spawn_one(),
Expand Down
11 changes: 9 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,14 @@ impl IndexingService {
let message = format!("failed to spawn indexing pipeline: {error}");
IndexingError::Internal(message)
})?;
let merge_policy =
crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings);
let merge_policy = crate::merge_policy::merge_policy_from_settings(
index_config.indexing_settings.merge_policy.clone(),
&index_config.indexing_settings,
);
let finalize_merge_policy = crate::merge_policy::merge_policy_from_settings(
index_config.indexing_settings.finalize_merge_policy.clone(),
&index_config.indexing_settings,
);
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());

let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
Expand All @@ -300,6 +306,7 @@ impl IndexingService {
split_store: split_store.clone(),
merge_scheduler_service: self.merge_scheduler_service.clone(),
merge_policy: merge_policy.clone(),
finalize_merge_policy,
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
Expand Down
56 changes: 55 additions & 1 deletion quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use time::OffsetDateTime;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

use super::merge_planner::RunFinalizeMergePolicy;
use super::publisher::DisconnectMergePlanner;
use super::MergeSchedulerService;
use crate::actors::indexing_pipeline::wait_duration_before_retry;
use crate::actors::merge_split_downloader::MergeSplitDownloader;
Expand All @@ -56,6 +58,22 @@ use crate::split_store::IndexingSplitStore;
/// concurrently.
static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10);

/// Instructs the merge pipeline that it should stops itself.
/// Merge that have already been scheduled are not aborted.
///
/// In addition, the finalizer merge policy will be executed to schedule a few
/// additional merges.
///
/// After reception the `FinalizeAndClosePipeline`, the merge pipeline loop will
/// be disconnected. In other words, the connection from the merge publisher to
/// the merge planner will be cut, so that the merge pipeline will terminate naturally.
///
/// Supervisation will still exist. However it will not restart the pipeline
/// in case of failure, it will just kill all of the merge pipeline actors. (for
/// instance, if one of the actor is stuck).
#[derive(Debug, Clone, Copy)]
pub struct FinishPendingMergesAndShutdownPipeline;

struct MergePipelineHandles {
merge_planner: ActorHandle<MergePlanner>,
merge_split_downloader: ActorHandle<MergeSplitDownloader>,
Expand Down Expand Up @@ -96,6 +114,8 @@ pub struct MergePipeline {
kill_switch: KillSwitch,
/// Immature splits passed to the merge planner the first time the pipeline is spawned.
initial_immature_splits_opt: Option<Vec<SplitMetadata>>,
// After it is set to true, we don't respawn.
shutdown_initiated: bool,
}

#[async_trait]
Expand Down Expand Up @@ -141,6 +161,7 @@ impl MergePipeline {
merge_planner_inbox,
merge_planner_mailbox,
initial_immature_splits_opt,
shutdown_initiated: false,
}
}

Expand Down Expand Up @@ -331,6 +352,7 @@ impl MergePipeline {
&self.params.pipeline_id,
immature_splits,
self.params.merge_policy.clone(),
self.params.finalize_merge_policy.clone(),
merge_split_downloader_mailbox,
self.params.merge_scheduler_service.clone(),
);
Expand Down Expand Up @@ -467,6 +489,33 @@ impl Handler<SuperviseLoop> for MergePipeline {
}
}

#[async_trait]
impl Handler<FinishPendingMergesAndShutdownPipeline> for MergePipeline {
type Reply = ();
async fn handle(
&mut self,
_: FinishPendingMergesAndShutdownPipeline,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
// From now on, we will not respawn the pipeline if it fails.
self.shutdown_initiated = true;
if let Some(handles) = &self.handles_opt {
let _ = handles
.merge_publisher
.mailbox()
.send_message(DisconnectMergePlanner);
let _ = handles
.merge_planner
.mailbox()
.send_message(RunFinalizeMergePolicy);
} else {
// we won't respawn the pipeline in the future, so there is nothing
// to do here.
}
Ok(())
}
}

#[async_trait]
impl Handler<Spawn> for MergePipeline {
type Reply = ();
Expand All @@ -476,6 +525,9 @@ impl Handler<Spawn> for MergePipeline {
spawn: Spawn,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if self.shutdown_initiated {
return Ok(());
}
if self.handles_opt.is_some() {
return Ok(());
}
Expand Down Expand Up @@ -509,6 +561,7 @@ pub struct MergePipelineParams {
pub merge_scheduler_service: Mailbox<MergeSchedulerService>,
pub split_store: IndexingSplitStore,
pub merge_policy: Arc<dyn MergePolicy>,
pub finalize_merge_policy: Arc<dyn MergePolicy>,
pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline.
pub merge_io_throughput_limiter_opt: Option<Limiter>,
pub event_broker: EventBroker,
Expand All @@ -530,7 +583,7 @@ mod tests {
use quickwit_storage::RamStorage;

use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::merge_policy::default_merge_policy;
use crate::merge_policy::{default_merge_policy, nop_merge_policy};
use crate::IndexingSplitStore;

#[tokio::test]
Expand Down Expand Up @@ -571,6 +624,7 @@ mod tests {
merge_scheduler_service: universe.get_or_spawn_one(),
split_store,
merge_policy: default_merge_policy(),
finalize_merge_policy: nop_merge_policy(),
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
event_broker: Default::default(),
Expand Down
66 changes: 57 additions & 9 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ use crate::merge_policy::MergeOperation;
use crate::models::NewSplits;
use crate::MergePolicy;

#[derive(Debug)]
pub(crate) struct RunFinalizeMergePolicy;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct MergePartition {
partition_id: u64,
Expand Down Expand Up @@ -78,6 +81,8 @@ pub struct MergePlanner {
known_split_ids_recompute_attempt_id: usize,

merge_policy: Arc<dyn MergePolicy>,
finalize_merge_policy: Arc<dyn MergePolicy>,

merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,

Expand Down Expand Up @@ -126,6 +131,22 @@ impl Actor for MergePlanner {
}
}

#[async_trait]
impl Handler<RunFinalizeMergePolicy> for MergePlanner {
type Reply = ();

async fn handle(
&mut self,
_plan_merge: RunFinalizeMergePolicy,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
// Note we ignore messages that could be coming from a different incarnation.
// (See comment on `Self::incarnation_start_at`.)
self.send_merge_ops(true, ctx).await?;
Err(ActorExitStatus::Success)
}
}

#[async_trait]
impl Handler<PlanMerge> for MergePlanner {
type Reply = ();
Expand All @@ -138,7 +159,7 @@ impl Handler<PlanMerge> for MergePlanner {
if plan_merge.incarnation_started_at == self.incarnation_started_at {
// Note we ignore messages that could be coming from a different incarnation.
// (See comment on `Self::incarnation_start_at`.)
self.send_merge_ops(ctx).await?;
self.send_merge_ops(false, ctx).await?;
}
self.recompute_known_splits_if_necessary();
Ok(())
Expand All @@ -155,7 +176,7 @@ impl Handler<NewSplits> for MergePlanner {
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.record_splits_if_necessary(new_splits.new_splits);
self.send_merge_ops(ctx).await?;
self.send_merge_ops(false, ctx).await?;
self.recompute_known_splits_if_necessary();
Ok(())
}
Expand All @@ -172,6 +193,7 @@ impl MergePlanner {
pipeline_id: &MergePipelineId,
immature_splits: Vec<SplitMetadata>,
merge_policy: Arc<dyn MergePolicy>,
finalize_merge_policy: Arc<dyn MergePolicy>,
merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
) -> MergePlanner {
Expand All @@ -184,6 +206,7 @@ impl MergePlanner {
known_split_ids_recompute_attempt_id: 0,
partitioned_young_splits: Default::default(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
merge_scheduler_service,
ongoing_merge_operations_inventory: Inventory::default(),
Expand Down Expand Up @@ -273,12 +296,18 @@ impl MergePlanner {
}
async fn compute_merge_ops(
&mut self,
is_finalize: bool,
ctx: &ActorContext<Self>,
) -> Result<Vec<MergeOperation>, ActorExitStatus> {
let mut merge_operations = Vec::new();
for young_splits in self.partitioned_young_splits.values_mut() {
if !young_splits.is_empty() {
merge_operations.extend(self.merge_policy.operations(young_splits));
let merge_policy = if is_finalize {
&self.finalize_merge_policy
} else {
&self.merge_policy
};
merge_operations.extend(merge_policy.operations(young_splits));
}
ctx.record_progress();
ctx.yield_now().await;
Expand All @@ -289,13 +318,17 @@ impl MergePlanner {
Ok(merge_operations)
}

async fn send_merge_ops(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
async fn send_merge_ops(
&mut self,
is_finalize: bool,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
// We identify all of the merge operations we want to run and leave it
// to the merge scheduler to decide in which order these should be scheduled.
//
// The merge scheduler has the merit of knowing about merge operations from other
// index as well.
let merge_ops = self.compute_merge_ops(ctx).await?;
let merge_ops = self.compute_merge_ops(is_finalize, ctx).await?;
for merge_operation in merge_ops {
info!(merge_operation=?merge_operation, "schedule merge operation");
let tracked_merge_operation = self
Expand Down Expand Up @@ -347,7 +380,7 @@ mod tests {

use crate::actors::MergePlanner;
use crate::merge_policy::{
merge_policy_from_settings, MergePolicy, MergeTask, StableLogMergePolicy,
merge_policy_from_settings, MergePolicy, MergeTask, NopMergePolicy, StableLogMergePolicy,
};
use crate::models::NewSplits;

Expand Down Expand Up @@ -408,6 +441,7 @@ mod tests {
&pipeline_id,
Vec::new(),
merge_policy,
Arc::new(NopMergePolicy),
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -528,11 +562,15 @@ mod tests {
2,
),
];
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -620,11 +658,17 @@ mod tests {
2,
),
];
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(
indexing_settings.finalize_merge_policy.clone(),
&indexing_settings,
);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -685,11 +729,15 @@ mod tests {
2,
),
];
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down
Loading

0 comments on commit bed2ae7

Please sign in to comment.