diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 74b92fb1b33..163a9f5cab4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -520,37 +520,13 @@ impl Ingester { let follower_id_opt = shard.follower_id_opt().cloned(); let from_position_exclusive = shard.replication_position_inclusive.clone(); - let (doc_batch, parse_failures) = match subrequest.doc_batch { - Some(doc_batch) if !doc_batch.is_empty() => { - // TODO: `validate_doc_batch` could remove from the batch the documents for - // which a parse failure occurred so we don't - // persist and replicate them unnecessarily. - // However, the doc processor metrics would be off. - validate_doc_batch(doc_batch, doc_mapper).await? - } + let doc_batch = match subrequest.doc_batch { + Some(doc_batch) if !doc_batch.is_empty() => doc_batch, _ => { warn!("received empty persist request"); - (DocBatchV2::default(), Vec::new()) + DocBatchV2::default() } }; - let num_persisted_docs = (doc_batch.num_docs() - parse_failures.len()) as u32; - - if num_persisted_docs == 0 { - let replication_position_inclusive = - Some(shard.replication_position_inclusive.clone()); - - let persist_success = PersistSuccess { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid, - source_id: subrequest.source_id, - shard_id: subrequest.shard_id, - replication_position_inclusive, - num_persisted_docs, - parse_failures, - }; - persist_successes.push(persist_success); - continue; - } let requested_capacity = estimate_size(&doc_batch); if let Err(error) = check_enough_capacity( @@ -592,7 +568,22 @@ impl Ingester { persist_failures.push(persist_failure); continue; } + let (doc_batch, parse_failures) = validate_doc_batch(doc_batch, doc_mapper).await?; + let num_persisted_docs = (doc_batch.num_docs() - parse_failures.len()) as u32; + if num_persisted_docs == 0 { + let persist_success = PersistSuccess { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + replication_position_inclusive: Some(from_position_exclusive), + num_persisted_docs, + parse_failures, + }; + persist_successes.push(persist_success); + continue; + } let batch_num_bytes = doc_batch.num_bytes() as u64; rate_meter.update(batch_num_bytes); total_requested_capacity += requested_capacity; @@ -1362,6 +1353,11 @@ mod tests { self } + pub fn with_memory_capacity(mut self, memory_capacity: ByteSize) -> Self { + self.memory_capacity = memory_capacity; + self + } + pub fn with_rate_limiter_settings( mut self, rate_limiter_settings: RateLimiterSettings, @@ -1973,6 +1969,125 @@ mod tests { assert!(parse_failure_2.message.contains("not declared")); } + #[tokio::test] + async fn test_ingester_persist_checks_capacity_before_validating_docs() { + let (ingester_ctx, ingester) = IngesterForTest::default() + .with_memory_capacity(ByteSize(0)) + .build() + .await; + + let index_uid: IndexUid = IndexUid::for_test("test-index", 0); + + let doc_mapping_uid = DocMappingUid::random(); + let doc_mapping_json = format!( + r#"{{ + "doc_mapping_uid": "{doc_mapping_uid}", + "mode": "strict", + "field_mappings": [{{"name": "doc", "type": "text"}}] + }}"# + ); + let init_shards_request = InitShardsRequest { + subrequests: vec![InitShardSubrequest { + subrequest_id: 0, + shard: Some(Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(0)), + shard_state: ShardState::Open as i32, + leader_id: ingester_ctx.node_id.to_string(), + doc_mapping_uid: Some(doc_mapping_uid), + ..Default::default() + }), + doc_mapping_json, + }], + }; + let response = ingester.init_shards(init_shards_request).await.unwrap(); + assert_eq!(response.successes.len(), 1); + assert_eq!(response.failures.len(), 0); + + let persist_request = PersistRequest { + leader_id: ingester_ctx.node_id.to_string(), + commit_type: CommitTypeV2::Force as i32, + subrequests: vec![PersistSubrequest { + subrequest_id: 0, + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(0)), + doc_batch: Some(DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#])), + }], + }; + let persist_response = ingester.persist(persist_request).await.unwrap(); + assert_eq!(persist_response.leader_id, "test-ingester"); + assert_eq!(persist_response.successes.len(), 0); + assert_eq!(persist_response.failures.len(), 1); + + let persist_failure = &persist_response.failures[0]; + assert_eq!( + persist_failure.reason(), + PersistFailureReason::ResourceExhausted + ); + } + + #[tokio::test] + async fn test_ingester_persist_applies_rate_limiting_before_validating_docs() { + let (ingester_ctx, ingester) = IngesterForTest::default() + .with_rate_limiter_settings(RateLimiterSettings { + burst_limit: 0, + rate_limit: ConstantRate::bytes_per_sec(ByteSize(0)), + refill_period: Duration::from_secs(1), + }) + .build() + .await; + + let index_uid: IndexUid = IndexUid::for_test("test-index", 0); + + let doc_mapping_uid = DocMappingUid::random(); + let doc_mapping_json = format!( + r#"{{ + "doc_mapping_uid": "{doc_mapping_uid}", + "mode": "strict", + "field_mappings": [{{"name": "doc", "type": "text"}}] + }}"# + ); + let init_shards_request = InitShardsRequest { + subrequests: vec![InitShardSubrequest { + subrequest_id: 0, + shard: Some(Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(0)), + shard_state: ShardState::Open as i32, + leader_id: ingester_ctx.node_id.to_string(), + doc_mapping_uid: Some(doc_mapping_uid), + ..Default::default() + }), + doc_mapping_json, + }], + }; + let response = ingester.init_shards(init_shards_request).await.unwrap(); + assert_eq!(response.successes.len(), 1); + assert_eq!(response.failures.len(), 0); + + let persist_request = PersistRequest { + leader_id: ingester_ctx.node_id.to_string(), + commit_type: CommitTypeV2::Force as i32, + subrequests: vec![PersistSubrequest { + subrequest_id: 0, + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(0)), + doc_batch: Some(DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#])), + }], + }; + let persist_response = ingester.persist(persist_request).await.unwrap(); + assert_eq!(persist_response.leader_id, "test-ingester"); + assert_eq!(persist_response.successes.len(), 0); + assert_eq!(persist_response.failures.len(), 1); + + let persist_failure = &persist_response.failures[0]; + assert_eq!(persist_failure.reason(), PersistFailureReason::RateLimited); + } + // This test should be run manually and independently of other tests with the `failpoints` // feature enabled: // ```sh