Skip to content

Commit

Permalink
Ensures the load of pipelines does not exceed 80%. (#4054)
Browse files Browse the repository at this point in the history
* Ensures the load of pipelines does not exceed 80%.

The allocation attempts to match the previous
plan as much as possible.

It also includes some hysteresis effect to avoid
flip flapping between two N and N+1 pipelines.

Closes #4010

* CR comment
  • Loading branch information
fulmicoton authored Nov 1, 2023
1 parent a6eeba9 commit 9fb6d3c
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 13 deletions.
7 changes: 7 additions & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ mod tests {
assert_eq!(div_ceil(5, 2), 3);
assert_eq!(div_ceil(6, 2), 3);

assert_eq!(div_ceil(3, 3), 1);
assert_eq!(div_ceil(2, 3), 1);
assert_eq!(div_ceil(1, 3), 1);
assert_eq!(div_ceil(0, 3), 0);
assert_eq!(div_ceil(-1, 3), 0);
assert_eq!(div_ceil(-2, 3), 0);

assert_eq!(div_ceil(-5, 1), -5);
assert_eq!(div_ceil(-5, 2), -2);
assert_eq!(div_ceil(-6, 2), -3);
Expand Down
256 changes: 243 additions & 13 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,31 @@ use quickwit_proto::types::{IndexUid, ShardId};
pub use scheduling_logic_model::Load;
use scheduling_logic_model::{IndexerOrd, SourceOrd};
use tracing::error;
use tracing::log::warn;

use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::scheduling::scheduling_logic_model::{
SchedulingProblem, SchedulingSolution,
};
use crate::indexing_scheduler::PIPELINE_FULL_LOAD;
use crate::SourceUid;

/// If we have several pipelines below this threshold we
/// reduce the number of pipelines.
///
/// Note that even for 2 pipelines, this creates an hysteris effet.
///
/// Starting from a single pipeline.
/// An overall load above 80% is enough to trigger the creation of a
/// second pipeline.
///
/// Coming back to a single pipeline requires having a load per pipeline
/// of 30%. Which translates into an overall load of 60%.
const LOAD_PER_PIPELINE_LOW_THRESHOLD: u32 = PIPELINE_FULL_LOAD * 3 / 10;

/// That's 80% of a period
const MAX_LOAD_PER_PIPELINE: u32 = PIPELINE_FULL_LOAD * 8 / 10;

fn indexing_task(source_uid: SourceUid, shard_ids: Vec<ShardId>) -> IndexingTask {
IndexingTask {
index_uid: source_uid.index_uid.to_string(),
Expand Down Expand Up @@ -238,8 +256,113 @@ pub enum SourceToScheduleType {
IngestV1,
}

fn group_shards_into_pipelines(
source_uid: &SourceUid,
shard_ids: &[ShardId],
previous_indexing_tasks: &[IndexingTask],
load_per_shard: Load,
) -> Vec<IndexingTask> {
let num_shards = shard_ids.len() as u32;
if num_shards == 0 {
return Vec::new();
}
let max_num_shards_per_pipeline: NonZeroU32 =
NonZeroU32::new(MAX_LOAD_PER_PIPELINE / load_per_shard).unwrap_or_else(|| {
// We throttle shard at ingestion to ensure that a shard does not
// exceed 5MB/s.
//
// This value has been chosen to make sure that one full pipeline
// should always be able to handle the load of one shard.
//
// However it is possible for the system to take more than this
// when it is playing catch up.
//
// This is a transitory state, and not a problem per se.
warn!("load per shard is higher than `MAX_LOAD_PER_PIPELINE`");
NonZeroU32::new(1).unwrap()
});

// We compute the number of pipelines we will create, cooking in some hysteresis effect here.
// We have two different threshold to increase and to decrease the number of pipelines.
let min_num_pipelines: u32 =
(num_shards + max_num_shards_per_pipeline.get() - 1) / max_num_shards_per_pipeline;
let max_num_pipelines: u32 = (num_shards * load_per_shard) / LOAD_PER_PIPELINE_LOW_THRESHOLD;
let previous_num_pipelines = previous_indexing_tasks.len() as u32;
let num_pipelines: u32 = if previous_num_pipelines > min_num_pipelines {
previous_num_pipelines.min(max_num_pipelines)
} else {
min_num_pipelines
};

let mut pipelines: Vec<Vec<ShardId>> = std::iter::repeat_with(Vec::new)
.take((previous_num_pipelines as usize).max(num_pipelines as usize))
.collect();

let mut unassigned_shard_ids: Vec<ShardId> = Vec::new();
let previous_pipeline_map: FnvHashMap<ShardId, usize> = previous_indexing_tasks
.iter()
.enumerate()
.flat_map(|(pipeline_ord, indexing_task)| {
indexing_task
.shard_ids
.iter()
.map(move |shard_id| (*shard_id, pipeline_ord))
})
.collect();

for &shard in shard_ids {
if let Some(pipeline_ord) = previous_pipeline_map.get(&shard).copied() {
// Whenever possible we allocate to the previous pipeline.
let best_pipeline_for_shard = &mut pipelines[pipeline_ord];
if best_pipeline_for_shard.len() < max_num_shards_per_pipeline.get() as usize {
best_pipeline_for_shard.push(shard);
} else {
unassigned_shard_ids.push(shard);
}
} else {
unassigned_shard_ids.push(shard);
}
}

// If needed, let's remove some pipelines. We just remove the pipelines that have
// the least number of shards.
pipelines.sort_by_key(|shards| std::cmp::Reverse(shards.len()));
for removed_pipeline_shards in pipelines.drain(num_pipelines as usize..) {
unassigned_shard_ids.extend(removed_pipeline_shards);
}

// Now we need to allocate the unallocated shards.
// We just allocate them to the current pipeline that has the lowest load.
for shard in unassigned_shard_ids {
let best_pipeline_for_shard: &mut Vec<ShardId> = pipelines
.iter_mut()
.min_by_key(|shards| shards.len())
.unwrap();
best_pipeline_for_shard.push(shard);
}

let mut indexing_tasks: Vec<IndexingTask> = pipelines
.into_iter()
.map(|mut shard_ids| {
shard_ids.sort();
IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.clone(),
shard_ids,
}
})
.collect();

indexing_tasks.sort_by_key(|indexing_task| indexing_task.shard_ids[0]);

indexing_tasks
}

/// This function takes a scheduling solution (which abstracts the notion of pipelines,
/// and shard ids) and builds a physical plan.
fn convert_scheduling_solution_to_physical_plan(
solution: SchedulingSolution,
solution: &SchedulingSolution,
problem: &SchedulingProblem,
id_to_ord_map: &IdToOrdMap,
sources: &[SourceToSchedule],
previous_plan_opt: Option<&PhysicalIndexingPlan>,
Expand Down Expand Up @@ -270,14 +393,25 @@ fn convert_scheduling_solution_to_physical_plan(
let shard_to_indexer_ord = previous_shard_to_indexer_map
.remove(&source_ord)
.unwrap_or_default();
let shard_ids_per_indexer =

let load_per_shard = problem.source_load_per_shard(source_ord);
let shard_ids_per_node: FnvHashMap<IndexerOrd, Vec<ShardId>> =
spread_shards_optimally(shards, indexer_num_shards, shard_to_indexer_ord);

for (indexer_ord, shard_ids_for_indexer) in shard_ids_per_indexer {
let indexer_id = id_to_ord_map.indexer_id(indexer_ord);
let indexing_task =
indexing_task(source.source_uid.clone(), shard_ids_for_indexer);
physical_indexing_plan.add_indexing_task(indexer_id, indexing_task);
for (node_ord, shard_ids_for_node) in shard_ids_per_node {
let node_id = id_to_ord_map.indexer_id(node_ord);
let indexing_tasks: &[IndexingTask] = previous_plan_opt
.and_then(|previous_plan| previous_plan.indexer(node_id))
.unwrap_or(&[]);
let indexing_tasks = group_shards_into_pipelines(
&source.source_uid,
&shard_ids_for_node,
indexing_tasks,
load_per_shard.get(),
);
for indexing_task in indexing_tasks {
physical_indexing_plan.add_indexing_task(node_id, indexing_task);
}
}
}
SourceToScheduleType::NonSharded { .. } => {
Expand Down Expand Up @@ -352,7 +486,6 @@ pub fn build_physical_indexing_plan(
}

let mut problem = SchedulingProblem::with_indexer_maximum_load(indexer_max_loads);

for source in sources {
if let Some(source_ord) = populate_problem(source, &mut problem) {
let registered_source_ord = id_to_ord_map.add_source_uid(source.source_uid.clone());
Expand All @@ -378,7 +511,8 @@ pub fn build_physical_indexing_plan(

// Convert the new scheduling solution back to a physical plan.
convert_scheduling_solution_to_physical_plan(
new_solution,
&new_solution,
&problem,
&id_to_ord_map,
sources,
previous_plan_opt,
Expand All @@ -392,11 +526,12 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};

use fnv::FnvHashMap;
use quickwit_proto::types::IndexUid;
use quickwit_proto::indexing::IndexingTask;
use quickwit_proto::types::{IndexUid, ShardId};

use super::{
build_physical_indexing_plan, indexing_task, spread_shards_optimally, SourceToSchedule,
SourceToScheduleType,
build_physical_indexing_plan, group_shards_into_pipelines, indexing_task,
spread_shards_optimally, SourceToSchedule, SourceToScheduleType,
};
use crate::SourceUid;

Expand Down Expand Up @@ -473,7 +608,9 @@ mod tests {
assert_eq!(
&node2_plan,
&[
indexing_task(source_uid0.clone(), vec![0, 1, 2, 3, 4, 5, 6, 7]),
indexing_task(source_uid0.clone(), vec![0, 3, 6]),
indexing_task(source_uid0.clone(), vec![1, 4, 7]),
indexing_task(source_uid0.clone(), vec![2, 5]),
indexing_task(source_uid2.clone(), vec![]),
]
);
Expand Down Expand Up @@ -519,4 +656,97 @@ mod tests {
)
}
}

#[test]
fn test_group_shards_empty() {
let source_uid = source_id();
let indexing_tasks = group_shards_into_pipelines(&source_uid, &[], &[], 250);
assert!(indexing_tasks.is_empty());
}

fn make_indexing_tasks(
source_uid: &SourceUid,
shard_ids_grp: &[&[ShardId]],
) -> Vec<IndexingTask> {
shard_ids_grp
.iter()
.copied()
.map(|shard_ids| IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.clone(),
shard_ids: shard_ids.to_vec(),
})
.collect::<Vec<IndexingTask>>()
}

#[test]
fn test_group_shards_into_pipeline_simple() {
let source_uid = source_id();
let previous_indexing_tasks: Vec<IndexingTask> =
make_indexing_tasks(&source_uid, &[&[1, 2], &[3, 4, 5]]);
let indexing_tasks = group_shards_into_pipelines(
&source_uid,
&[0, 1, 3, 4, 5],
&previous_indexing_tasks,
250,
);
assert_eq!(indexing_tasks.len(), 2);
assert_eq!(&indexing_tasks[0].shard_ids, &[0, 1]);
assert_eq!(&indexing_tasks[1].shard_ids, &[3, 4, 5]);
}

#[test]
fn test_group_shards_load_per_shard_too_high() {
let source_uid = source_id();
let indexing_tasks = group_shards_into_pipelines(&source_uid, &[1, 2], &[], 1_000);
assert_eq!(indexing_tasks.len(), 2);
}

#[test]
fn test_group_shards_into_pipeline_hysteresis() {
let source_uid = source_id();
let previous_indexing_tasks: Vec<IndexingTask> = make_indexing_tasks(&source_uid, &[]);
let indexing_tasks_1 = group_shards_into_pipelines(
&source_uid,
&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
&previous_indexing_tasks,
100,
);
assert_eq!(indexing_tasks_1.len(), 2);
assert_eq!(&indexing_tasks_1[0].shard_ids, &[0, 2, 4, 6, 8, 10]);
assert_eq!(&indexing_tasks_1[1].shard_ids, &[1, 3, 5, 7, 9]);
// With the same set of shards, an increase of load triggers the creation of a new task.
let indexing_tasks_2 = group_shards_into_pipelines(
&source_uid,
&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
&indexing_tasks_1,
150,
);
assert_eq!(indexing_tasks_2.len(), 3);
assert_eq!(&indexing_tasks_2[0].shard_ids, &[0, 2, 4, 6, 8]);
assert_eq!(&indexing_tasks_2[1].shard_ids, &[1, 3, 5, 7, 9]);
assert_eq!(&indexing_tasks_2[2].shard_ids, &[10]);
// Now the load comes back to normal
// The hysteresis takes effect. We do not switch back to 2 pipelines.
let indexing_tasks_3 = group_shards_into_pipelines(
&source_uid,
&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
&indexing_tasks_2,
100,
);
assert_eq!(indexing_tasks_3.len(), 3);
assert_eq!(&indexing_tasks_3[0].shard_ids, &[0, 2, 4, 6, 8]);
assert_eq!(&indexing_tasks_3[1].shard_ids, &[1, 3, 5, 7, 9]);
assert_eq!(&indexing_tasks_3[2].shard_ids, &[10]);
// Now a further lower load..
let indexing_tasks_4 = group_shards_into_pipelines(
&source_uid,
&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
&indexing_tasks_3,
80,
);
assert_eq!(indexing_tasks_4.len(), 2);
assert_eq!(&indexing_tasks_4[0].shard_ids, &[0, 2, 4, 6, 8, 10]);
assert_eq!(&indexing_tasks_4[1].shard_ids, &[1, 3, 5, 7, 9]);
}
}

0 comments on commit 9fb6d3c

Please sign in to comment.