Skip to content

Commit

Permalink
Using the shard throughput information in the scheduling logic. (#5196)
Browse files Browse the repository at this point in the history
* Using the shard throughput information in the scheduling logic.

* added cli flags
  • Loading branch information
fulmicoton authored Jul 5, 2024
1 parent b600b4a commit 43e5ced
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 19 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ license.workspace = true
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
bytesize = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand Down
62 changes: 52 additions & 10 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ use std::time::{Duration, Instant};

use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
PIPELINE_THROUGHTPUT,
};
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::{NodeId, ShardId};
use quickwit_proto::types::NodeId;
use scheduling::{SourceToSchedule, SourceToScheduleType};
use serde::Serialize;
use tracing::{debug, info, warn};
Expand All @@ -41,7 +43,7 @@ use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier};
use crate::indexing_scheduler::scheduling::build_physical_indexing_plan;
use crate::metrics::ShardLocalityMetrics;
use crate::model::{ControlPlaneModel, ShardLocations};
use crate::model::{ControlPlaneModel, ShardEntry, ShardLocations};
use crate::{IndexerNodeInfo, IndexerPool};

pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
Expand Down Expand Up @@ -120,6 +122,44 @@ impl fmt::Debug for IndexingScheduler {
}
}

fn enable_variable_shard_load() -> bool {
static IS_SHARD_LOAD_CP_ENABLED: OnceCell<bool> = OnceCell::new();
*IS_SHARD_LOAD_CP_ENABLED.get_or_init(|| {
!quickwit_common::get_bool_from_env("QW_DISABLE_VARIABLE_SHARD_LOAD", false)
})
}

/// Computes the CPU load associated to a single shard of a given index.
///
/// The array passed contains all of data we have about the shard of the index.
/// This function averages their statistics.
///
/// For the moment, this function only takes in account the measured throughput,
/// and assumes a constant CPU usage of 4 vCPU = 20mb/s.
///
/// It does not take in account the variation that could raise from the different
/// doc mapping / nature of the data, etc.
fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
if enable_variable_shard_load() {
let num_shards = shard_entries.len().max(1) as u64;
let average_throughput_per_shard_bytes: u64 = shard_entries
.iter()
.map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB)
.sum::<u64>()
.div_ceil(num_shards)
// A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is
// enforced by the configuration).
.min(PIPELINE_THROUGHTPUT.as_u64());
let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() as u64
* average_throughput_per_shard_bytes)
/ PIPELINE_THROUGHTPUT.as_u64();
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
} else {
NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap()
}
}

fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
let mut sources = Vec::new();

Expand All @@ -146,22 +186,24 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
// Expect: the source should exist since we just read it from `get_source_configs`.
// Note that we keep all shards, including Closed shards:
// A closed shards still needs to be indexed.
let shard_ids: Vec<ShardId> = model
let shard_entries: Vec<&ShardEntry> = model
.get_shards_for_source(&source_uid)
.expect("source should exist")
.keys()
.cloned()
.values()
.collect();
if shard_ids.is_empty() {
if shard_entries.is_empty() {
continue;
}
let shard_ids = shard_entries
.iter()
.map(|shard_entry| shard_entry.shard_id().clone())
.collect();
let load_per_shard = compute_load_per_shard(&shard_entries[..]);
sources.push(SourceToSchedule {
source_uid,
source_type: SourceToScheduleType::Sharded {
shard_ids,
// FIXME
load_per_shard: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4)
.unwrap(),
load_per_shard,
},
});
}
Expand Down Expand Up @@ -540,7 +582,7 @@ mod tests {
use proptest::{prop_compose, proptest};
use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::types::{IndexUid, PipelineUid, SourceUid};
use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid};

use super::*;
use crate::model::ShardLocations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,10 +565,26 @@ fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) {
else {
return;
};

// We first artificially scale down the node capacities.
//
// The node capacity is an estimate of the amount of CPU available on a given indexer node.
// It has two purpose,
// - under a lot of load, indexer will receive work proportional to their relative capacity.
// - under low load, the absolute magnitude will be used by the scheduler, to decide whether
// to prefer having a balanced workload over other criteria (all pipeline from a same index on
// the same node, indexing local shards, etc.).
//
// The default CPU capacity is detected from the OS. Using these values directly leads
// a non uniform distribution of the load which is very confusing for users. We artificially
// scale down the indexer capacities.
problem.scale_node_capacities(0.3f32);

let min_indexer_capacity = (0..problem.num_indexers())
.map(|indexer_ord| problem.indexer_cpu_capacity(indexer_ord))
.min()
.expect("At least one indexer is required");

assert_ne!(min_indexer_capacity.cpu_millis(), 0);
if min_indexer_capacity.cpu_millis() < largest_shard_load.get() {
let scaling_factor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn assert_remove_extraneous_shards_post_condition(
// Releave sources from the node that are exceeding their maximum load.

fn enforce_indexers_cpu_capacity(problem: &SchedulingProblem, solution: &mut SchedulingSolution) {
for indexer_assignment in solution.indexer_assignments.iter_mut() {
for indexer_assignment in &mut solution.indexer_assignments {
let indexer_cpu_capacity: CpuCapacity =
problem.indexer_cpu_capacity(indexer_assignment.indexer_ord);
enforce_indexer_cpu_capacity(problem, indexer_cpu_capacity, indexer_assignment);
Expand Down Expand Up @@ -753,6 +753,35 @@ mod tests {
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
}

#[test]
fn test_problem_unbalanced_simple() {
let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![
CpuCapacity::from_cpu_millis(1),
CpuCapacity::from_cpu_millis(1),
]);
problem.add_source(1, NonZeroU32::new(10).unwrap());
for _ in 0..10 {
problem.add_source(1, NonZeroU32::new(1).unwrap());
}
let previous_solution = problem.new_solution();
let solution = solve(problem.clone(), previous_solution);
let available_capacities: Vec<u32> = solution
.indexer_assignments
.iter()
.map(|indexer_assignment: &IndexerAssignment| {
indexer_assignment.total_cpu_load(&problem)
})
.collect();
assert_eq!(available_capacities.len(), 2);
let (min, max) = available_capacities
.into_iter()
.minmax()
.into_option()
.unwrap();
assert_eq!(min, 10);
assert_eq!(max, 10);
}

proptest! {
#[test]
fn test_proptest_post_conditions((problem, solution) in problem_solution_strategy()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Source {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SchedulingProblem {
sources: Vec<Source>,
indexer_cpu_capacities: Vec<CpuCapacity>,
Expand Down
24 changes: 18 additions & 6 deletions quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, Weak};

use once_cell::sync::OnceCell;
use quickwit_common::thread_pool::run_cpu_intensive;
use quickwit_config::{build_doc_mapper, DocMapping, SearchSettings};
use quickwit_doc_mapper::DocMapper;
Expand Down Expand Up @@ -107,18 +108,29 @@ fn validate_doc_batch_impl(
(doc_batch, parse_failures)
}

fn is_document_validation_enabled() -> bool {
static IS_DOCUMENT_VALIDATION_ENABLED: OnceCell<bool> = OnceCell::new();
*IS_DOCUMENT_VALIDATION_ENABLED.get_or_init(|| {
!quickwit_common::get_bool_from_env("QW_DISABLE_DOCUMENT_VALIDATION", false)
})
}

/// Parses the JSON documents contained in the batch and applies the doc mapper. Returns the
/// original batch and a list of parse failures.
pub(super) async fn validate_doc_batch(
doc_batch: DocBatchV2,
doc_mapper: Arc<dyn DocMapper>,
) -> IngestV2Result<(DocBatchV2, Vec<ParseFailure>)> {
run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper))
.await
.map_err(|error| {
let message = format!("failed to validate documents: {error}");
IngestV2Error::Internal(message)
})
if is_document_validation_enabled() {
run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper))
.await
.map_err(|error| {
let message = format!("failed to validate documents: {error}");
IngestV2Error::Internal(message)
})
} else {
Ok((doc_batch, Vec::new()))
}
}

#[cfg(test)]
Expand Down
11 changes: 10 additions & 1 deletion quickwit/quickwit-proto/src/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::ops::{Add, Mul, Sub};

use bytesize::ByteSize;
use quickwit_actors::AskError;
use quickwit_common::pubsub::Event;
use quickwit_common::tower::{MakeLoadShedError, RpcName};
Expand Down Expand Up @@ -176,9 +177,17 @@ impl Display for PipelineMetrics {
}

/// One full pipeline (including merging) is assumed to consume 4 CPU threads.
/// The actual number somewhere between 3 and 4.
/// The actual number somewhere between 3 and 4. Quickwit is not super sensitive to this number.
///
/// It simply impacts the point where we prefer to work on balancing the load over the different
/// indexers and the point where we prefer improving other feature of the system (shard locality,
/// grouping pipelines associated to a given index on the same node, etc.).
pub const PIPELINE_FULL_CAPACITY: CpuCapacity = CpuCapacity::from_cpu_millis(4_000u32);

/// One full pipeline (including merging) is supposed to have the capacity to index at least 20mb/s.
/// This is a defensive value: In reality, this is typically above 30mb/s.
pub const PIPELINE_THROUGHTPUT: ByteSize = ByteSize::mb(20);

/// The CpuCapacity represents an amount of CPU resource available.
///
/// It is usually expressed in CPU millis (For instance, one full CPU thread is
Expand Down

0 comments on commit 43e5ced

Please sign in to comment.