Skip to content

Commit

Permalink
Process in-flight merges upon shutting down indexing pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Oct 3, 2024
1 parent 060dfd8 commit fe61044
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 35 deletions.
34 changes: 20 additions & 14 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use async_trait::async_trait;
use futures::TryStreamExt;
use itertools::Itertools;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, Mailbox,
Observation,
Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Command, Handler, Healthz,
Mailbox, Observation,
};
use quickwit_cluster::Cluster;
use quickwit_common::fs::get_cache_directory_path;
Expand Down Expand Up @@ -504,21 +504,27 @@ impl IndexingService {
.merge_pipeline_handles
.remove_entry(&merge_pipeline_to_shutdown)
{
// We kill the merge pipeline to avoid waiting a merge operation to finish as it can
// be long.
// We gracefully shutdown the merge pipeline, so we can complete the in-flight
// merges.
info!(
index_uid=%merge_pipeline_to_shutdown.index_uid,
source_id=%merge_pipeline_to_shutdown.source_id,
"no more indexing pipeline on this index and source, killing merge pipeline"
"shutting down orphan merge pipeline"
);
merge_pipeline_handle.handle.kill().await;
// The queue capacity of the merge pipeline is unbounded, so `.send_message(...)`
// should not block.
// We avoid using `.quit()` here because it waits for the actor to exit.
merge_pipeline_handle
.handle
.mailbox()
.send_message(Command::Quit)
.await
.expect("merge pipeline mailbox should not be full");
}
}
// Finally remove the merge pipeline with an exit status.
// Finally, we remove the completed or failed merge pipelines.
self.merge_pipeline_handles
.retain(|_, merge_pipeline_mailbox_handle| {
merge_pipeline_mailbox_handle.handle.state().is_running()
});
.retain(|_, merge_pipeline_handle| merge_pipeline_handle.handle.state().is_running());
self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len();
self.update_chitchat_running_plan().await;

Expand All @@ -543,23 +549,23 @@ impl IndexingService {
immature_splits_opt: Option<Vec<SplitMetadata>>,
ctx: &ActorContext<Self>,
) -> Result<Mailbox<MergePlanner>, IndexingError> {
if let Some(merge_pipeline_mailbox_handle) = self
if let Some(merge_pipeline_handle) = self
.merge_pipeline_handles
.get(&merge_pipeline_params.pipeline_id)
{
return Ok(merge_pipeline_mailbox_handle.mailbox.clone());
return Ok(merge_pipeline_handle.mailbox.clone());
}
let merge_pipeline_id = merge_pipeline_params.pipeline_id.clone();
let merge_pipeline =
MergePipeline::new(merge_pipeline_params, immature_splits_opt, ctx.spawn_ctx());
let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone();
let (_pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(merge_pipeline);
let merge_pipeline_mailbox_handle = MergePipelineHandle {
let merge_pipeline_handle = MergePipelineHandle {
mailbox: merge_planner_mailbox.clone(),
handle: pipeline_handle,
};
self.merge_pipeline_handles
.insert(merge_pipeline_id, merge_pipeline_mailbox_handle);
.insert(merge_pipeline_id, merge_pipeline_handle);
self.counters.num_running_merge_pipelines += 1;
Ok(merge_planner_mailbox)
}
Expand Down
42 changes: 21 additions & 21 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl MergePipeline {
Some(self.merge_planner_mailbox.clone()),
None,
);
let (merge_publisher_mailbox, merge_publisher_handler) = ctx
let (merge_publisher_mailbox, merge_publisher_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.set_backpressure_micros_counter(
Expand All @@ -271,15 +271,15 @@ impl MergePipeline {
self.params.max_concurrent_split_uploads,
self.params.event_broker.clone(),
);
let (merge_uploader_mailbox, merge_uploader_handler) = ctx
let (merge_uploader_mailbox, merge_uploader_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_uploader);

// Merge Packager
let tag_fields = self.params.doc_mapper.tag_named_fields()?;
let merge_packager = Packager::new("MergePackager", tag_fields, merge_uploader_mailbox);
let (merge_packager_mailbox, merge_packager_handler) = ctx
let (merge_packager_mailbox, merge_packager_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_packager);
Expand All @@ -300,7 +300,7 @@ impl MergePipeline {
merge_executor_io_controls,
merge_packager_mailbox,
);
let (merge_executor_mailbox, merge_executor_handler) = ctx
let (merge_executor_mailbox, merge_executor_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.set_backpressure_micros_counter(
Expand All @@ -316,7 +316,7 @@ impl MergePipeline {
executor_mailbox: merge_executor_mailbox,
io_controls: split_downloader_io_controls,
};
let (merge_split_downloader_mailbox, merge_split_downloader_handler) = ctx
let (merge_split_downloader_mailbox, merge_split_downloader_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.set_backpressure_micros_counter(
Expand All @@ -334,7 +334,7 @@ impl MergePipeline {
merge_split_downloader_mailbox,
self.params.merge_scheduler_service.clone(),
);
let (_, merge_planner_handler) = ctx
let (_, merge_planner_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.set_mailboxes(
Expand All @@ -346,27 +346,27 @@ impl MergePipeline {
self.previous_generations_statistics = self.statistics.clone();
self.statistics.generation += 1;
self.handles_opt = Some(MergePipelineHandles {
merge_planner: merge_planner_handler,
merge_split_downloader: merge_split_downloader_handler,
merge_executor: merge_executor_handler,
merge_packager: merge_packager_handler,
merge_uploader: merge_uploader_handler,
merge_publisher: merge_publisher_handler,
merge_planner: merge_planner_handle,
merge_split_downloader: merge_split_downloader_handle,
merge_executor: merge_executor_handle,
merge_packager: merge_packager_handle,
merge_uploader: merge_uploader_handle,
merge_publisher: merge_publisher_handle,
next_check_for_progress: Instant::now() + *HEARTBEAT,
});
Ok(())
}

async fn terminate(&mut self) {
self.kill_switch.kill();
if let Some(handlers) = self.handles_opt.take() {
if let Some(handles) = self.handles_opt.take() {
tokio::join!(
handlers.merge_planner.kill(),
handlers.merge_split_downloader.kill(),
handlers.merge_executor.kill(),
handlers.merge_packager.kill(),
handlers.merge_uploader.kill(),
handlers.merge_publisher.kill(),
handles.merge_planner.kill(),
handles.merge_split_downloader.kill(),
handles.merge_executor.kill(),
handles.merge_packager.kill(),
handles.merge_uploader.kill(),
handles.merge_publisher.kill(),
);
}
}
Expand Down Expand Up @@ -576,8 +576,8 @@ mod tests {
event_broker: Default::default(),
};
let pipeline = MergePipeline::new(pipeline_params, None, universe.spawn_ctx());
let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline);
let (pipeline_exit_status, pipeline_statistics) = pipeline_handler.quit().await;
let (_pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline);
let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.quit().await;
assert_eq!(pipeline_statistics.generation, 1);
assert_eq!(pipeline_statistics.num_spawn_attempts, 1);
assert_eq!(pipeline_statistics.num_published_splits, 0);
Expand Down

0 comments on commit fe61044

Please sign in to comment.