Skip to content

Commit

Permalink
Apply rate limiting before validating docs (#5170)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Jun 27, 2024
1 parent 6ac0ef8 commit b54d8f2
Showing 1 changed file with 142 additions and 27 deletions.
169 changes: 142 additions & 27 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,37 +520,13 @@ impl Ingester {
let follower_id_opt = shard.follower_id_opt().cloned();
let from_position_exclusive = shard.replication_position_inclusive.clone();

let (doc_batch, parse_failures) = match subrequest.doc_batch {
Some(doc_batch) if !doc_batch.is_empty() => {
// TODO: `validate_doc_batch` could remove from the batch the documents for
// which a parse failure occurred so we don't
// persist and replicate them unnecessarily.
// However, the doc processor metrics would be off.
validate_doc_batch(doc_batch, doc_mapper).await?
}
let doc_batch = match subrequest.doc_batch {
Some(doc_batch) if !doc_batch.is_empty() => doc_batch,
_ => {
warn!("received empty persist request");
(DocBatchV2::default(), Vec::new())
DocBatchV2::default()
}
};
let num_persisted_docs = (doc_batch.num_docs() - parse_failures.len()) as u32;

if num_persisted_docs == 0 {
let replication_position_inclusive =
Some(shard.replication_position_inclusive.clone());

let persist_success = PersistSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
replication_position_inclusive,
num_persisted_docs,
parse_failures,
};
persist_successes.push(persist_success);
continue;
}
let requested_capacity = estimate_size(&doc_batch);

if let Err(error) = check_enough_capacity(
Expand Down Expand Up @@ -592,7 +568,22 @@ impl Ingester {
persist_failures.push(persist_failure);
continue;
}
let (doc_batch, parse_failures) = validate_doc_batch(doc_batch, doc_mapper).await?;
let num_persisted_docs = (doc_batch.num_docs() - parse_failures.len()) as u32;

if num_persisted_docs == 0 {
let persist_success = PersistSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
replication_position_inclusive: Some(from_position_exclusive),
num_persisted_docs,
parse_failures,
};
persist_successes.push(persist_success);
continue;
}
let batch_num_bytes = doc_batch.num_bytes() as u64;
rate_meter.update(batch_num_bytes);
total_requested_capacity += requested_capacity;
Expand Down Expand Up @@ -1362,6 +1353,11 @@ mod tests {
self
}

pub fn with_memory_capacity(mut self, memory_capacity: ByteSize) -> Self {
self.memory_capacity = memory_capacity;
self
}

pub fn with_rate_limiter_settings(
mut self,
rate_limiter_settings: RateLimiterSettings,
Expand Down Expand Up @@ -1973,6 +1969,125 @@ mod tests {
assert!(parse_failure_2.message.contains("not declared"));
}

#[tokio::test]
async fn test_ingester_persist_checks_capacity_before_validating_docs() {
let (ingester_ctx, ingester) = IngesterForTest::default()
.with_memory_capacity(ByteSize(0))
.build()
.await;

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);

let doc_mapping_uid = DocMappingUid::random();
let doc_mapping_json = format!(
r#"{{
"doc_mapping_uid": "{doc_mapping_uid}",
"mode": "strict",
"field_mappings": [{{"name": "doc", "type": "text"}}]
}}"#
);
let init_shards_request = InitShardsRequest {
subrequests: vec![InitShardSubrequest {
subrequest_id: 0,
shard: Some(Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(0)),
shard_state: ShardState::Open as i32,
leader_id: ingester_ctx.node_id.to_string(),
doc_mapping_uid: Some(doc_mapping_uid),
..Default::default()
}),
doc_mapping_json,
}],
};
let response = ingester.init_shards(init_shards_request).await.unwrap();
assert_eq!(response.successes.len(), 1);
assert_eq!(response.failures.len(), 0);

let persist_request = PersistRequest {
leader_id: ingester_ctx.node_id.to_string(),
commit_type: CommitTypeV2::Force as i32,
subrequests: vec![PersistSubrequest {
subrequest_id: 0,
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(0)),
doc_batch: Some(DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#])),
}],
};
let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
assert_eq!(persist_response.successes.len(), 0);
assert_eq!(persist_response.failures.len(), 1);

let persist_failure = &persist_response.failures[0];
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ResourceExhausted
);
}

#[tokio::test]
async fn test_ingester_persist_applies_rate_limiting_before_validating_docs() {
let (ingester_ctx, ingester) = IngesterForTest::default()
.with_rate_limiter_settings(RateLimiterSettings {
burst_limit: 0,
rate_limit: ConstantRate::bytes_per_sec(ByteSize(0)),
refill_period: Duration::from_secs(1),
})
.build()
.await;

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);

let doc_mapping_uid = DocMappingUid::random();
let doc_mapping_json = format!(
r#"{{
"doc_mapping_uid": "{doc_mapping_uid}",
"mode": "strict",
"field_mappings": [{{"name": "doc", "type": "text"}}]
}}"#
);
let init_shards_request = InitShardsRequest {
subrequests: vec![InitShardSubrequest {
subrequest_id: 0,
shard: Some(Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(0)),
shard_state: ShardState::Open as i32,
leader_id: ingester_ctx.node_id.to_string(),
doc_mapping_uid: Some(doc_mapping_uid),
..Default::default()
}),
doc_mapping_json,
}],
};
let response = ingester.init_shards(init_shards_request).await.unwrap();
assert_eq!(response.successes.len(), 1);
assert_eq!(response.failures.len(), 0);

let persist_request = PersistRequest {
leader_id: ingester_ctx.node_id.to_string(),
commit_type: CommitTypeV2::Force as i32,
subrequests: vec![PersistSubrequest {
subrequest_id: 0,
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(0)),
doc_batch: Some(DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#])),
}],
};
let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
assert_eq!(persist_response.successes.len(), 0);
assert_eq!(persist_response.failures.len(), 1);

let persist_failure = &persist_response.failures[0];
assert_eq!(persist_failure.reason(), PersistFailureReason::RateLimited);
}

// This test should be run manually and independently of other tests with the `failpoints`
// feature enabled:
// ```sh
Expand Down

0 comments on commit b54d8f2

Please sign in to comment.