From 9fb6d3c36b00faa0fcf56208e7b50853f47a9a36 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 1 Nov 2023 12:04:58 +0900 Subject: [PATCH] Ensures the load of pipelines does not exceed 80%. (#4054) * Ensures the load of pipelines does not exceed 80%. The allocation attempts to match the previous plan as much as possible. It also includes some hysteresis effect to avoid flip flapping between two N and N+1 pipelines. Closes #4010 * CR comment --- quickwit/quickwit-common/src/lib.rs | 7 + .../src/indexing_scheduler/scheduling/mod.rs | 256 +++++++++++++++++- 2 files changed, 250 insertions(+), 13 deletions(-) diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index d3a2f6af94b..3079ef25696 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -249,6 +249,13 @@ mod tests { assert_eq!(div_ceil(5, 2), 3); assert_eq!(div_ceil(6, 2), 3); + assert_eq!(div_ceil(3, 3), 1); + assert_eq!(div_ceil(2, 3), 1); + assert_eq!(div_ceil(1, 3), 1); + assert_eq!(div_ceil(0, 3), 0); + assert_eq!(div_ceil(-1, 3), 0); + assert_eq!(div_ceil(-2, 3), 0); + assert_eq!(div_ceil(-5, 1), -5); assert_eq!(div_ceil(-5, 2), -2); assert_eq!(div_ceil(-6, 2), -3); diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index dabc6def658..728b31a184d 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -29,13 +29,31 @@ use quickwit_proto::types::{IndexUid, ShardId}; pub use scheduling_logic_model::Load; use scheduling_logic_model::{IndexerOrd, SourceOrd}; use tracing::error; +use tracing::log::warn; use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::scheduling::scheduling_logic_model::{ SchedulingProblem, SchedulingSolution, }; +use crate::indexing_scheduler::PIPELINE_FULL_LOAD; use crate::SourceUid; +/// If we have several pipelines below this threshold we +/// reduce the number of pipelines. +/// +/// Note that even for 2 pipelines, this creates an hysteris effet. +/// +/// Starting from a single pipeline. +/// An overall load above 80% is enough to trigger the creation of a +/// second pipeline. +/// +/// Coming back to a single pipeline requires having a load per pipeline +/// of 30%. Which translates into an overall load of 60%. +const LOAD_PER_PIPELINE_LOW_THRESHOLD: u32 = PIPELINE_FULL_LOAD * 3 / 10; + +/// That's 80% of a period +const MAX_LOAD_PER_PIPELINE: u32 = PIPELINE_FULL_LOAD * 8 / 10; + fn indexing_task(source_uid: SourceUid, shard_ids: Vec) -> IndexingTask { IndexingTask { index_uid: source_uid.index_uid.to_string(), @@ -238,8 +256,113 @@ pub enum SourceToScheduleType { IngestV1, } +fn group_shards_into_pipelines( + source_uid: &SourceUid, + shard_ids: &[ShardId], + previous_indexing_tasks: &[IndexingTask], + load_per_shard: Load, +) -> Vec { + let num_shards = shard_ids.len() as u32; + if num_shards == 0 { + return Vec::new(); + } + let max_num_shards_per_pipeline: NonZeroU32 = + NonZeroU32::new(MAX_LOAD_PER_PIPELINE / load_per_shard).unwrap_or_else(|| { + // We throttle shard at ingestion to ensure that a shard does not + // exceed 5MB/s. + // + // This value has been chosen to make sure that one full pipeline + // should always be able to handle the load of one shard. + // + // However it is possible for the system to take more than this + // when it is playing catch up. + // + // This is a transitory state, and not a problem per se. + warn!("load per shard is higher than `MAX_LOAD_PER_PIPELINE`"); + NonZeroU32::new(1).unwrap() + }); + + // We compute the number of pipelines we will create, cooking in some hysteresis effect here. + // We have two different threshold to increase and to decrease the number of pipelines. + let min_num_pipelines: u32 = + (num_shards + max_num_shards_per_pipeline.get() - 1) / max_num_shards_per_pipeline; + let max_num_pipelines: u32 = (num_shards * load_per_shard) / LOAD_PER_PIPELINE_LOW_THRESHOLD; + let previous_num_pipelines = previous_indexing_tasks.len() as u32; + let num_pipelines: u32 = if previous_num_pipelines > min_num_pipelines { + previous_num_pipelines.min(max_num_pipelines) + } else { + min_num_pipelines + }; + + let mut pipelines: Vec> = std::iter::repeat_with(Vec::new) + .take((previous_num_pipelines as usize).max(num_pipelines as usize)) + .collect(); + + let mut unassigned_shard_ids: Vec = Vec::new(); + let previous_pipeline_map: FnvHashMap = previous_indexing_tasks + .iter() + .enumerate() + .flat_map(|(pipeline_ord, indexing_task)| { + indexing_task + .shard_ids + .iter() + .map(move |shard_id| (*shard_id, pipeline_ord)) + }) + .collect(); + + for &shard in shard_ids { + if let Some(pipeline_ord) = previous_pipeline_map.get(&shard).copied() { + // Whenever possible we allocate to the previous pipeline. + let best_pipeline_for_shard = &mut pipelines[pipeline_ord]; + if best_pipeline_for_shard.len() < max_num_shards_per_pipeline.get() as usize { + best_pipeline_for_shard.push(shard); + } else { + unassigned_shard_ids.push(shard); + } + } else { + unassigned_shard_ids.push(shard); + } + } + + // If needed, let's remove some pipelines. We just remove the pipelines that have + // the least number of shards. + pipelines.sort_by_key(|shards| std::cmp::Reverse(shards.len())); + for removed_pipeline_shards in pipelines.drain(num_pipelines as usize..) { + unassigned_shard_ids.extend(removed_pipeline_shards); + } + + // Now we need to allocate the unallocated shards. + // We just allocate them to the current pipeline that has the lowest load. + for shard in unassigned_shard_ids { + let best_pipeline_for_shard: &mut Vec = pipelines + .iter_mut() + .min_by_key(|shards| shards.len()) + .unwrap(); + best_pipeline_for_shard.push(shard); + } + + let mut indexing_tasks: Vec = pipelines + .into_iter() + .map(|mut shard_ids| { + shard_ids.sort(); + IndexingTask { + index_uid: source_uid.index_uid.to_string(), + source_id: source_uid.source_id.clone(), + shard_ids, + } + }) + .collect(); + + indexing_tasks.sort_by_key(|indexing_task| indexing_task.shard_ids[0]); + + indexing_tasks +} + +/// This function takes a scheduling solution (which abstracts the notion of pipelines, +/// and shard ids) and builds a physical plan. fn convert_scheduling_solution_to_physical_plan( - solution: SchedulingSolution, + solution: &SchedulingSolution, + problem: &SchedulingProblem, id_to_ord_map: &IdToOrdMap, sources: &[SourceToSchedule], previous_plan_opt: Option<&PhysicalIndexingPlan>, @@ -270,14 +393,25 @@ fn convert_scheduling_solution_to_physical_plan( let shard_to_indexer_ord = previous_shard_to_indexer_map .remove(&source_ord) .unwrap_or_default(); - let shard_ids_per_indexer = + + let load_per_shard = problem.source_load_per_shard(source_ord); + let shard_ids_per_node: FnvHashMap> = spread_shards_optimally(shards, indexer_num_shards, shard_to_indexer_ord); - for (indexer_ord, shard_ids_for_indexer) in shard_ids_per_indexer { - let indexer_id = id_to_ord_map.indexer_id(indexer_ord); - let indexing_task = - indexing_task(source.source_uid.clone(), shard_ids_for_indexer); - physical_indexing_plan.add_indexing_task(indexer_id, indexing_task); + for (node_ord, shard_ids_for_node) in shard_ids_per_node { + let node_id = id_to_ord_map.indexer_id(node_ord); + let indexing_tasks: &[IndexingTask] = previous_plan_opt + .and_then(|previous_plan| previous_plan.indexer(node_id)) + .unwrap_or(&[]); + let indexing_tasks = group_shards_into_pipelines( + &source.source_uid, + &shard_ids_for_node, + indexing_tasks, + load_per_shard.get(), + ); + for indexing_task in indexing_tasks { + physical_indexing_plan.add_indexing_task(node_id, indexing_task); + } } } SourceToScheduleType::NonSharded { .. } => { @@ -352,7 +486,6 @@ pub fn build_physical_indexing_plan( } let mut problem = SchedulingProblem::with_indexer_maximum_load(indexer_max_loads); - for source in sources { if let Some(source_ord) = populate_problem(source, &mut problem) { let registered_source_ord = id_to_ord_map.add_source_uid(source.source_uid.clone()); @@ -378,7 +511,8 @@ pub fn build_physical_indexing_plan( // Convert the new scheduling solution back to a physical plan. convert_scheduling_solution_to_physical_plan( - new_solution, + &new_solution, + &problem, &id_to_ord_map, sources, previous_plan_opt, @@ -392,11 +526,12 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use fnv::FnvHashMap; - use quickwit_proto::types::IndexUid; + use quickwit_proto::indexing::IndexingTask; + use quickwit_proto::types::{IndexUid, ShardId}; use super::{ - build_physical_indexing_plan, indexing_task, spread_shards_optimally, SourceToSchedule, - SourceToScheduleType, + build_physical_indexing_plan, group_shards_into_pipelines, indexing_task, + spread_shards_optimally, SourceToSchedule, SourceToScheduleType, }; use crate::SourceUid; @@ -473,7 +608,9 @@ mod tests { assert_eq!( &node2_plan, &[ - indexing_task(source_uid0.clone(), vec![0, 1, 2, 3, 4, 5, 6, 7]), + indexing_task(source_uid0.clone(), vec![0, 3, 6]), + indexing_task(source_uid0.clone(), vec![1, 4, 7]), + indexing_task(source_uid0.clone(), vec![2, 5]), indexing_task(source_uid2.clone(), vec![]), ] ); @@ -519,4 +656,97 @@ mod tests { ) } } + + #[test] + fn test_group_shards_empty() { + let source_uid = source_id(); + let indexing_tasks = group_shards_into_pipelines(&source_uid, &[], &[], 250); + assert!(indexing_tasks.is_empty()); + } + + fn make_indexing_tasks( + source_uid: &SourceUid, + shard_ids_grp: &[&[ShardId]], + ) -> Vec { + shard_ids_grp + .iter() + .copied() + .map(|shard_ids| IndexingTask { + index_uid: source_uid.index_uid.to_string(), + source_id: source_uid.source_id.clone(), + shard_ids: shard_ids.to_vec(), + }) + .collect::>() + } + + #[test] + fn test_group_shards_into_pipeline_simple() { + let source_uid = source_id(); + let previous_indexing_tasks: Vec = + make_indexing_tasks(&source_uid, &[&[1, 2], &[3, 4, 5]]); + let indexing_tasks = group_shards_into_pipelines( + &source_uid, + &[0, 1, 3, 4, 5], + &previous_indexing_tasks, + 250, + ); + assert_eq!(indexing_tasks.len(), 2); + assert_eq!(&indexing_tasks[0].shard_ids, &[0, 1]); + assert_eq!(&indexing_tasks[1].shard_ids, &[3, 4, 5]); + } + + #[test] + fn test_group_shards_load_per_shard_too_high() { + let source_uid = source_id(); + let indexing_tasks = group_shards_into_pipelines(&source_uid, &[1, 2], &[], 1_000); + assert_eq!(indexing_tasks.len(), 2); + } + + #[test] + fn test_group_shards_into_pipeline_hysteresis() { + let source_uid = source_id(); + let previous_indexing_tasks: Vec = make_indexing_tasks(&source_uid, &[]); + let indexing_tasks_1 = group_shards_into_pipelines( + &source_uid, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + &previous_indexing_tasks, + 100, + ); + assert_eq!(indexing_tasks_1.len(), 2); + assert_eq!(&indexing_tasks_1[0].shard_ids, &[0, 2, 4, 6, 8, 10]); + assert_eq!(&indexing_tasks_1[1].shard_ids, &[1, 3, 5, 7, 9]); + // With the same set of shards, an increase of load triggers the creation of a new task. + let indexing_tasks_2 = group_shards_into_pipelines( + &source_uid, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + &indexing_tasks_1, + 150, + ); + assert_eq!(indexing_tasks_2.len(), 3); + assert_eq!(&indexing_tasks_2[0].shard_ids, &[0, 2, 4, 6, 8]); + assert_eq!(&indexing_tasks_2[1].shard_ids, &[1, 3, 5, 7, 9]); + assert_eq!(&indexing_tasks_2[2].shard_ids, &[10]); + // Now the load comes back to normal + // The hysteresis takes effect. We do not switch back to 2 pipelines. + let indexing_tasks_3 = group_shards_into_pipelines( + &source_uid, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + &indexing_tasks_2, + 100, + ); + assert_eq!(indexing_tasks_3.len(), 3); + assert_eq!(&indexing_tasks_3[0].shard_ids, &[0, 2, 4, 6, 8]); + assert_eq!(&indexing_tasks_3[1].shard_ids, &[1, 3, 5, 7, 9]); + assert_eq!(&indexing_tasks_3[2].shard_ids, &[10]); + // Now a further lower load.. + let indexing_tasks_4 = group_shards_into_pipelines( + &source_uid, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + &indexing_tasks_3, + 80, + ); + assert_eq!(indexing_tasks_4.len(), 2); + assert_eq!(&indexing_tasks_4[0].shard_ids, &[0, 2, 4, 6, 8, 10]); + assert_eq!(&indexing_tasks_4[1].shard_ids, &[1, 3, 5, 7, 9]); + } }