diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 765fbeed124..32c19fb6c1a 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5830,6 +5830,7 @@ version = "0.8.0" dependencies = [ "anyhow", "async-trait", + "bytesize", "fnv", "futures", "itertools 0.13.0", diff --git a/quickwit/quickwit-control-plane/Cargo.toml b/quickwit/quickwit-control-plane/Cargo.toml index ffb57b1b732..024704736e0 100644 --- a/quickwit/quickwit-control-plane/Cargo.toml +++ b/quickwit/quickwit-control-plane/Cargo.toml @@ -13,6 +13,7 @@ license.workspace = true [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +bytesize = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 0fc16189925..110e08a5139 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -28,11 +28,13 @@ use std::time::{Duration, Instant}; use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; +use once_cell::sync::OnceCell; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, + PIPELINE_THROUGHTPUT, }; use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::{NodeId, ShardId}; +use quickwit_proto::types::NodeId; use scheduling::{SourceToSchedule, SourceToScheduleType}; use serde::Serialize; use tracing::{debug, info, warn}; @@ -41,7 +43,7 @@ use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier}; use crate::indexing_scheduler::scheduling::build_physical_indexing_plan; use crate::metrics::ShardLocalityMetrics; -use crate::model::{ControlPlaneModel, ShardLocations}; +use crate::model::{ControlPlaneModel, ShardEntry, ShardLocations}; use crate::{IndexerNodeInfo, IndexerPool}; pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration = @@ -120,6 +122,44 @@ impl fmt::Debug for IndexingScheduler { } } +fn enable_variable_shard_load() -> bool { + static IS_SHARD_LOAD_CP_ENABLED: OnceCell = OnceCell::new(); + *IS_SHARD_LOAD_CP_ENABLED.get_or_init(|| { + !quickwit_common::get_bool_from_env("QW_DISABLE_VARIABLE_SHARD_LOAD", false) + }) +} + +/// Computes the CPU load associated to a single shard of a given index. +/// +/// The array passed contains all of data we have about the shard of the index. +/// This function averages their statistics. +/// +/// For the moment, this function only takes in account the measured throughput, +/// and assumes a constant CPU usage of 4 vCPU = 20mb/s. +/// +/// It does not take in account the variation that could raise from the different +/// doc mapping / nature of the data, etc. +fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 { + if enable_variable_shard_load() { + let num_shards = shard_entries.len().max(1) as u64; + let average_throughput_per_shard_bytes: u64 = shard_entries + .iter() + .map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB) + .sum::() + .div_ceil(num_shards) + // A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is + // enforced by the configuration). + .min(PIPELINE_THROUGHTPUT.as_u64()); + let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() as u64 + * average_throughput_per_shard_bytes) + / PIPELINE_THROUGHTPUT.as_u64(); + const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32; + NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap() + } else { + NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap() + } +} + fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { let mut sources = Vec::new(); @@ -146,22 +186,24 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { // Expect: the source should exist since we just read it from `get_source_configs`. // Note that we keep all shards, including Closed shards: // A closed shards still needs to be indexed. - let shard_ids: Vec = model + let shard_entries: Vec<&ShardEntry> = model .get_shards_for_source(&source_uid) .expect("source should exist") - .keys() - .cloned() + .values() .collect(); - if shard_ids.is_empty() { + if shard_entries.is_empty() { continue; } + let shard_ids = shard_entries + .iter() + .map(|shard_entry| shard_entry.shard_id().clone()) + .collect(); + let load_per_shard = compute_load_per_shard(&shard_entries[..]); sources.push(SourceToSchedule { source_uid, source_type: SourceToScheduleType::Sharded { shard_ids, - // FIXME - load_per_shard: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4) - .unwrap(), + load_per_shard, }, }); } @@ -540,7 +582,7 @@ mod tests { use proptest::{prop_compose, proptest}; use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams}; use quickwit_metastore::IndexMetadata; - use quickwit_proto::types::{IndexUid, PipelineUid, SourceUid}; + use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid}; use super::*; use crate::model::ShardLocations; diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index ead4509f4aa..bc9d6ec6d91 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -565,10 +565,26 @@ fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) { else { return; }; + + // We first artificially scale down the node capacities. + // + // The node capacity is an estimate of the amount of CPU available on a given indexer node. + // It has two purpose, + // - under a lot of load, indexer will receive work proportional to their relative capacity. + // - under low load, the absolute magnitude will be used by the scheduler, to decide whether + // to prefer having a balanced workload over other criteria (all pipeline from a same index on + // the same node, indexing local shards, etc.). + // + // The default CPU capacity is detected from the OS. Using these values directly leads + // a non uniform distribution of the load which is very confusing for users. We artificially + // scale down the indexer capacities. + problem.scale_node_capacities(0.3f32); + let min_indexer_capacity = (0..problem.num_indexers()) .map(|indexer_ord| problem.indexer_cpu_capacity(indexer_ord)) .min() .expect("At least one indexer is required"); + assert_ne!(min_indexer_capacity.cpu_millis(), 0); if min_indexer_capacity.cpu_millis() < largest_shard_load.get() { let scaling_factor = diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index 15feb765b72..d339d8e5459 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -163,7 +163,7 @@ fn assert_remove_extraneous_shards_post_condition( // Releave sources from the node that are exceeding their maximum load. fn enforce_indexers_cpu_capacity(problem: &SchedulingProblem, solution: &mut SchedulingSolution) { - for indexer_assignment in solution.indexer_assignments.iter_mut() { + for indexer_assignment in &mut solution.indexer_assignments { let indexer_cpu_capacity: CpuCapacity = problem.indexer_cpu_capacity(indexer_assignment.indexer_ord); enforce_indexer_cpu_capacity(problem, indexer_cpu_capacity, indexer_assignment); @@ -753,6 +753,35 @@ mod tests { assert_eq!(solution.indexer_assignments[0].num_shards(0), 1); } + #[test] + fn test_problem_unbalanced_simple() { + let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![ + CpuCapacity::from_cpu_millis(1), + CpuCapacity::from_cpu_millis(1), + ]); + problem.add_source(1, NonZeroU32::new(10).unwrap()); + for _ in 0..10 { + problem.add_source(1, NonZeroU32::new(1).unwrap()); + } + let previous_solution = problem.new_solution(); + let solution = solve(problem.clone(), previous_solution); + let available_capacities: Vec = solution + .indexer_assignments + .iter() + .map(|indexer_assignment: &IndexerAssignment| { + indexer_assignment.total_cpu_load(&problem) + }) + .collect(); + assert_eq!(available_capacities.len(), 2); + let (min, max) = available_capacities + .into_iter() + .minmax() + .into_option() + .unwrap(); + assert_eq!(min, 10); + assert_eq!(max, 10); + } + proptest! { #[test] fn test_proptest_post_conditions((problem, solution) in problem_solution_strategy()) { diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs index 89d47fc50b4..eee1f416638 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs @@ -79,7 +79,7 @@ impl Source { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SchedulingProblem { sources: Vec, indexer_cpu_capacities: Vec, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs index 2748828131b..a93513a9584 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs @@ -21,6 +21,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::{Arc, Weak}; +use once_cell::sync::OnceCell; use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_config::{build_doc_mapper, DocMapping, SearchSettings}; use quickwit_doc_mapper::DocMapper; @@ -107,18 +108,29 @@ fn validate_doc_batch_impl( (doc_batch, parse_failures) } +fn is_document_validation_enabled() -> bool { + static IS_DOCUMENT_VALIDATION_ENABLED: OnceCell = OnceCell::new(); + *IS_DOCUMENT_VALIDATION_ENABLED.get_or_init(|| { + !quickwit_common::get_bool_from_env("QW_DISABLE_DOCUMENT_VALIDATION", false) + }) +} + /// Parses the JSON documents contained in the batch and applies the doc mapper. Returns the /// original batch and a list of parse failures. pub(super) async fn validate_doc_batch( doc_batch: DocBatchV2, doc_mapper: Arc, ) -> IngestV2Result<(DocBatchV2, Vec)> { - run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper)) - .await - .map_err(|error| { - let message = format!("failed to validate documents: {error}"); - IngestV2Error::Internal(message) - }) + if is_document_validation_enabled() { + run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper)) + .await + .map_err(|error| { + let message = format!("failed to validate documents: {error}"); + IngestV2Error::Internal(message) + }) + } else { + Ok((doc_batch, Vec::new())) + } } #[cfg(test)] diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs index b79311f21fd..968c575aee5 100644 --- a/quickwit/quickwit-proto/src/indexing/mod.rs +++ b/quickwit/quickwit-proto/src/indexing/mod.rs @@ -22,6 +22,7 @@ use std::fmt::{Display, Formatter}; use std::hash::Hash; use std::ops::{Add, Mul, Sub}; +use bytesize::ByteSize; use quickwit_actors::AskError; use quickwit_common::pubsub::Event; use quickwit_common::tower::{MakeLoadShedError, RpcName}; @@ -176,9 +177,17 @@ impl Display for PipelineMetrics { } /// One full pipeline (including merging) is assumed to consume 4 CPU threads. -/// The actual number somewhere between 3 and 4. +/// The actual number somewhere between 3 and 4. Quickwit is not super sensitive to this number. +/// +/// It simply impacts the point where we prefer to work on balancing the load over the different +/// indexers and the point where we prefer improving other feature of the system (shard locality, +/// grouping pipelines associated to a given index on the same node, etc.). pub const PIPELINE_FULL_CAPACITY: CpuCapacity = CpuCapacity::from_cpu_millis(4_000u32); +/// One full pipeline (including merging) is supposed to have the capacity to index at least 20mb/s. +/// This is a defensive value: In reality, this is typically above 30mb/s. +pub const PIPELINE_THROUGHTPUT: ByteSize = ByteSize::mb(20); + /// The CpuCapacity represents an amount of CPU resource available. /// /// It is usually expressed in CPU millis (For instance, one full CPU thread is