Skip to content

Commit

Permalink
added cli flags
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jul 5, 2024
1 parent 62dbaf8 commit 8d56388
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 22 deletions.
40 changes: 26 additions & 14 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::time::{Duration, Instant};

use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
PIPELINE_THROUGHTPUT,
Expand Down Expand Up @@ -121,6 +122,13 @@ impl fmt::Debug for IndexingScheduler {
}
}

fn enable_variable_shard_load() -> bool {
static IS_SHARD_LOAD_CP_ENABLED: OnceCell<bool> = OnceCell::new();
*IS_SHARD_LOAD_CP_ENABLED.get_or_init(|| {
!quickwit_common::get_bool_from_env("QW_DISABLE_VARIABLE_SHARD_LOAD", false)
})
}

/// Computes the CPU load associated to a single shard of a given index.
///
/// The array passed contains all of data we have about the shard of the index.
Expand All @@ -132,20 +140,24 @@ impl fmt::Debug for IndexingScheduler {
/// It does not take in account the variation that could raise from the different
/// doc mapping / nature of the data, etc.
fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
let num_shards = shard_entries.len().max(1) as u64;
let average_throughput_per_shard_bytes: u64 = shard_entries
.iter()
.map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB)
.sum::<u64>()
.div_ceil(num_shards)
// A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is enforced
// by the configuration).
.min(PIPELINE_THROUGHTPUT.as_u64());
let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() as u64
* average_throughput_per_shard_bytes)
/ PIPELINE_THROUGHTPUT.as_u64();
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
if enable_variable_shard_load() {
let num_shards = shard_entries.len().max(1) as u64;
let average_throughput_per_shard_bytes: u64 = shard_entries
.iter()
.map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB)
.sum::<u64>()
.div_ceil(num_shards)
// A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is
// enforced by the configuration).
.min(PIPELINE_THROUGHTPUT.as_u64());
let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() as u64
* average_throughput_per_shard_bytes)
/ PIPELINE_THROUGHTPUT.as_u64();
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
} else {
NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap()
}
}

fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Source {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SchedulingProblem {
sources: Vec<Source>,
indexer_cpu_capacities: Vec<CpuCapacity>,
Expand Down
26 changes: 19 additions & 7 deletions quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, Weak};

use once_cell::sync::OnceCell;
use quickwit_common::thread_pool::run_cpu_intensive;
use quickwit_config::{build_doc_mapper, DocMapping, SearchSettings};
use quickwit_doc_mapper::DocMapper;
Expand Down Expand Up @@ -77,7 +78,7 @@ fn validate_doc_batch_impl(
) -> (DocBatchV2, Vec<ParseFailure>) {
let mut parse_failures: Vec<ParseFailure> = Vec::new();
for (doc_uid, doc) in doc_batch.docs() {
let Ok(json_doc) = serde_json::from_slice::<serde_json_borrow::Value>(&doc) else {
let Ok(json_doc) = serde_json::from_slice::<JsonValue>(&doc) else {
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
reason: ParseFailureReason::InvalidJson as i32,
Expand Down Expand Up @@ -107,18 +108,29 @@ fn validate_doc_batch_impl(
(doc_batch, parse_failures)
}

fn is_document_validation_enabled() -> bool {
static IS_DOCUMENT_VALIDATION_ENABLED: OnceCell<bool> = OnceCell::new();
*IS_DOCUMENT_VALIDATION_ENABLED.get_or_init(|| {
!quickwit_common::get_bool_from_env("QW_DISABLE_DOCUMENT_VALIDATION", false)
})
}

/// Parses the JSON documents contained in the batch and applies the doc mapper. Returns the
/// original batch and a list of parse failures.
pub(super) async fn validate_doc_batch(
doc_batch: DocBatchV2,
doc_mapper: Arc<dyn DocMapper>,
) -> IngestV2Result<(DocBatchV2, Vec<ParseFailure>)> {
run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper))
.await
.map_err(|error| {
let message = format!("failed to validate documents: {error}");
IngestV2Error::Internal(message)
})
if is_document_validation_enabled() {
run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper))
.await
.map_err(|error| {
let message = format!("failed to validate documents: {error}");
IngestV2Error::Internal(message)
})
} else {
Ok((doc_batch, Vec::new()))
}
}

#[cfg(test)]
Expand Down

0 comments on commit 8d56388

Please sign in to comment.