diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index bda258689d9..36dff368304 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -286,7 +286,7 @@ pub fn mock_split_meta(split_id: &str, index_uid: &IndexUid) -> SplitMetadata { index_uid: index_uid.clone(), split_id: split_id.to_string(), partition_id: 13u64, - num_docs: 10, + num_docs: if split_id == "split1" { 1_000_000 } else { 10 }, uncompressed_docs_size_in_bytes: 256, time_range: Some(121000..=130198), create_timestamp: 0, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index dc28ed8d5e8..5b05a9d6674 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -1406,9 +1406,10 @@ async fn assign_client_fetch_docs_jobs( } // Measure the cost associated to searching in a given split metadata. -fn compute_split_cost(_split_metadata: &SplitMetadata) -> usize { - // TODO: Have a smarter cost, by smoothing the number of docs. - 1 +fn compute_split_cost(split_metadata: &SplitMetadata) -> usize { + // TODO this formula could be tuned a lot more. The general idea is that there is a fixed + // cost to searching a split, plus a somewhat-linear cost depending on the size of the split + 5 + split_metadata.num_docs / 100_000 } /// Builds a LeafSearchRequest to one node, from a list of [`SearchJob`]. diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index dc708763ff1..64330fedac4 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -28,6 +28,7 @@ use async_trait::async_trait; use quickwit_common::pubsub::EventSubscriber; use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash}; use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; +use tracing::warn; use crate::{SearchJob, SearchServiceClient, SearcherPool, SEARCH_METRICS}; @@ -177,13 +178,29 @@ impl SearchJobPlacer { let mut job_assignments: HashMap)> = HashMap::with_capacity(num_nodes); + let total_load: usize = jobs.iter().map(|job| job.cost()).sum(); + + // allow arround 5% disparity. Round up so we never end up in a case where + // target_load * num_nodes < total_load + // some of our tests needs 2 splits to be put on 2 different searchers. It makes sens for + // these tests to keep doing so (testing root merge). Either we can make the allowed + // difference stricter, find the right split names ("split6" instead of "split2" works). + // or modify mock_split_meta() so that not all splits have the same job cost + // for now i went with the mock_split_meta() changes. + const ALLOWED_DIFFERENCE: usize = 105; + let target_load = (total_load * ALLOWED_DIFFERENCE).div_ceil(num_nodes * 100); for job in jobs { sort_by_rendez_vous_hash(&mut candidate_nodes, job.split_id()); - // Select the least loaded node. - let chosen_node_idx = if candidate_nodes.len() >= 2 { - usize::from(candidate_nodes[0].load > candidate_nodes[1].load) + + let (chosen_node_idx, chosen_node) = if let Some((idx, node)) = candidate_nodes + .iter_mut() + .enumerate() + .find(|(_pos, node)| node.load < target_load) + { + (idx, node) } else { - 0 + warn!("found no lightly loaded searcher for split, this should never happen"); + (0, &mut candidate_nodes[0]) }; let metric_node_idx = match chosen_node_idx { 0 => "0", @@ -194,8 +211,6 @@ impl SearchJobPlacer { .job_assigned_total .with_label_values([metric_node_idx]) .inc(); - - let chosen_node = &mut candidate_nodes[chosen_node_idx]; chosen_node.load += job.cost(); job_assignments @@ -406,19 +421,75 @@ mod tests { vec![ SearchJob::for_test("split5", 5), SearchJob::for_test("split4", 4), - SearchJob::for_test("split2", 2), + SearchJob::for_test("split3", 3), ], ), ( expected_searcher_addr_2, vec![ SearchJob::for_test("split6", 6), - SearchJob::for_test("split3", 3), + SearchJob::for_test("split2", 2), SearchJob::for_test("split1", 1), ], ), ]; assert_eq!(assigned_jobs, expected_assigned_jobs); } + { + let searcher_pool = searcher_pool_for_test([ + ("127.0.0.1:1001", MockSearchService::new()), + ("127.0.0.1:1002", MockSearchService::new()), + ]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let jobs = vec![ + SearchJob::for_test("split1", 1000), + SearchJob::for_test("split2", 1), + ]; + let mut assigned_jobs: Vec<(SocketAddr, Vec)> = search_job_placer + .assign_jobs(jobs, &HashSet::default()) + .await + .unwrap() + .map(|(client, jobs)| (client.grpc_addr(), jobs)) + .collect(); + assigned_jobs.sort_unstable_by_key(|(node_uid, _)| *node_uid); + + let expected_searcher_addr_1: SocketAddr = ([127, 0, 0, 1], 1001).into(); + let expected_searcher_addr_2: SocketAddr = ([127, 0, 0, 1], 1002).into(); + let expected_assigned_jobs = vec![ + ( + expected_searcher_addr_1, + vec![SearchJob::for_test("split1", 1000)], + ), + ( + expected_searcher_addr_2, + vec![SearchJob::for_test("split2", 1)], + ), + ]; + assert_eq!(assigned_jobs, expected_assigned_jobs); + } + } + + #[tokio::test] + async fn test_search_job_placer_many_splits() { + let searcher_pool = searcher_pool_for_test([ + ("127.0.0.1:1001", MockSearchService::new()), + ("127.0.0.1:1002", MockSearchService::new()), + ("127.0.0.1:1003", MockSearchService::new()), + ("127.0.0.1:1004", MockSearchService::new()), + ("127.0.0.1:1005", MockSearchService::new()), + ]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let jobs = (0..1000) + .map(|id| SearchJob::for_test(&format!("split{id}"), 1)) + .collect(); + let jobs_len: Vec = search_job_placer + .assign_jobs(jobs, &HashSet::default()) + .await + .unwrap() + .map(|(_, jobs)| jobs.len()) + .collect(); + for job_len in jobs_len { + assert!(job_len <= 1050 / 5); + } } }