Skip to content

Commit 5c1469d

Browse files
authored
Populate ES bulk response items on success too (#5019)
1 parent ec502f0 commit 5c1469d

File tree

9 files changed

+114
-12
lines changed

9 files changed

+114
-12
lines changed

quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ async fn elastic_ingest_bulk(
146146
let bulk_response = ElasticBulkResponse {
147147
took_millis,
148148
errors,
149-
items: Vec::new(),
149+
actions: Vec::new(),
150150
};
151151
Ok(bulk_response)
152152
}

quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,19 @@ pub(crate) struct ElasticBulkResponse {
4141
#[serde(rename = "took")]
4242
pub took_millis: u64,
4343
pub errors: bool,
44-
pub items: Vec<ElasticBulkItemAction>,
44+
#[serde(rename = "items")]
45+
pub actions: Vec<ElasticBulkAction>,
4546
}
4647

47-
#[derive(Debug, Serialize, Deserialize)]
48-
pub(crate) enum ElasticBulkItemAction {
48+
#[derive(Debug, Clone, Serialize, Deserialize)]
49+
pub(crate) enum ElasticBulkAction {
4950
#[serde(rename = "create")]
5051
Create(ElasticBulkItem),
5152
#[serde(rename = "index")]
5253
Index(ElasticBulkItem),
5354
}
5455

55-
#[derive(Debug, Serialize, Deserialize)]
56+
#[derive(Debug, Clone, Serialize, Deserialize)]
5657
pub(crate) struct ElasticBulkItem {
5758
#[serde(rename = "_index")]
5859
pub index_id: IndexId,
@@ -63,7 +64,7 @@ pub(crate) struct ElasticBulkItem {
6364
pub error: Option<ElasticBulkError>,
6465
}
6566

66-
#[derive(Debug, Serialize, Deserialize)]
67+
#[derive(Debug, Clone, Serialize, Deserialize)]
6768
pub(crate) struct ElasticBulkError {
6869
#[serde(rename = "index")]
6970
pub index_id: Option<IndexId>,
@@ -132,8 +133,32 @@ pub(crate) async fn elastic_bulk_ingest_v2(
132133
};
133134
let ingest_response_v2 = ingest_router.ingest(ingest_request).await?;
134135
let errors = !ingest_response_v2.failures.is_empty();
135-
let mut items = Vec::new();
136+
let mut actions: Vec<ElasticBulkAction> = Vec::new();
136137

138+
for success in ingest_response_v2.successes {
139+
let es_doc_ids = per_subrequest_id_es_doc_ids
140+
.remove(&success.subrequest_id)
141+
.ok_or_else(|| {
142+
ElasticsearchError::new(
143+
StatusCode::INTERNAL_SERVER_ERROR,
144+
format!(
145+
"could not find subrequest `{}` in bulk request",
146+
success.subrequest_id
147+
),
148+
None,
149+
)
150+
})?;
151+
for es_doc_id in es_doc_ids {
152+
let item = ElasticBulkItem {
153+
index_id: success.index_uid().index_id.clone(),
154+
es_doc_id,
155+
status: StatusCode::CREATED,
156+
error: None,
157+
};
158+
let action = ElasticBulkAction::Index(item);
159+
actions.push(action);
160+
}
161+
}
137162
for failure in ingest_response_v2.failures {
138163
let es_doc_ids = per_subrequest_id_es_doc_ids
139164
.remove(&failure.subrequest_id)
@@ -161,7 +186,25 @@ pub(crate) async fn elastic_bulk_ingest_v2(
161186
status: StatusCode::NOT_FOUND,
162187
error: Some(error),
163188
};
164-
items.push(ElasticBulkItemAction::Index(item));
189+
let action = ElasticBulkAction::Index(item);
190+
actions.push(action);
191+
}
192+
}
193+
IngestFailureReason::Timeout => {
194+
for es_doc_id in es_doc_ids {
195+
let error = ElasticBulkError {
196+
index_id: Some(failure.index_id.clone()),
197+
exception: ErrorCauseException::Timeout,
198+
reason: format!("timeout [{}]", failure.index_id),
199+
};
200+
let item = ElasticBulkItem {
201+
index_id: failure.index_id.clone(),
202+
es_doc_id,
203+
status: StatusCode::REQUEST_TIMEOUT,
204+
error: Some(error),
205+
};
206+
let action = ElasticBulkAction::Index(item);
207+
actions.push(action);
165208
}
166209
}
167210
_ => {
@@ -174,7 +217,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
174217
let bulk_response = ElasticBulkResponse {
175218
took_millis,
176219
errors,
177-
items,
220+
actions,
178221
};
179222
Ok(bulk_response)
180223
}
@@ -274,6 +317,33 @@ mod tests {
274317

275318
let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap();
276319
assert!(!bulk_response.errors);
320+
321+
let mut items = bulk_response
322+
.actions
323+
.into_iter()
324+
.map(|action| match action {
325+
ElasticBulkAction::Create(item) => item,
326+
ElasticBulkAction::Index(item) => item,
327+
})
328+
.collect::<Vec<_>>();
329+
assert_eq!(items.len(), 3);
330+
331+
items.sort_by(|left, right| {
332+
left.index_id
333+
.cmp(&right.index_id)
334+
.then(left.es_doc_id.cmp(&right.es_doc_id))
335+
});
336+
assert_eq!(items[0].index_id, "my-index-1");
337+
assert!(items[0].es_doc_id.is_none());
338+
assert_eq!(items[0].status, StatusCode::CREATED);
339+
340+
assert_eq!(items[1].index_id, "my-index-1");
341+
assert_eq!(items[1].es_doc_id.as_ref().unwrap(), "1");
342+
assert_eq!(items[1].status, StatusCode::CREATED);
343+
344+
assert_eq!(items[2].index_id, "my-index-2");
345+
assert_eq!(items[2].es_doc_id.as_ref().unwrap(), "1");
346+
assert_eq!(items[2].status, StatusCode::CREATED);
277347
}
278348

279349
#[tokio::test]
@@ -466,6 +536,6 @@ mod tests {
466536

467537
let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap();
468538
assert!(bulk_response.errors);
469-
assert_eq!(bulk_response.items.len(), 3);
539+
assert_eq!(bulk_response.actions.len(), 3);
470540
}
471541
}

quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ pub enum ErrorCauseException {
142142
IllegalArgument,
143143
#[serde(rename = "index_not_found_exception")]
144144
IndexNotFound,
145+
#[serde(rename = "timeout_exception")]
146+
Timeout,
145147
}
146148

147149
impl ErrorCauseException {
@@ -150,6 +152,7 @@ impl ErrorCauseException {
150152
Self::ActionRequestValidation => "action_request_validation_exception",
151153
Self::IllegalArgument => "illegal_argument_exception",
152154
Self::IndexNotFound => "index_not_found_exception",
155+
Self::Timeout => "timeout_exception",
153156
}
154157
}
155158
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
ndjson:
2+
- index: { "_index": "test-index-happy-path", "_id": "1" }
3+
- message: Hello, World!
4+
- index: { "_index": "test-index-happy-path" }
5+
- message: Hola, Mundo!
6+
status_code: 200
7+
expected:
8+
errors: false
9+
items:
10+
- index:
11+
_index: test-index-happy-path
12+
_id: "1"
13+
status: 201
14+
- index:
15+
_index: test-index-happy-path
16+
status: 201

quickwit/rest-api-tests/scenarii/es_compatibility/bulk/_setup.elasticsearch.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,3 @@ json: {
1515
}
1616
}
1717
}
18-

quickwit/rest-api-tests/scenarii/es_compatibility/bulk/_setup.quickwit.yaml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,24 @@ method: POST
88
api_root: http://localhost:7280/api/v1/
99
endpoint: indexes/
1010
json:
11-
version: "0.7"
11+
version: "0.8"
1212
index_id: test-index
1313
doc_mapping:
1414
field_mappings:
1515
- name: message
1616
type: text
1717
sleep_after: 3
18+
---
19+
# Create index template
20+
method: POST
21+
api_root: http://localhost:7280/api/v1/
22+
endpoint: templates
23+
json:
24+
version: "0.8"
25+
template_id: test-index-template
26+
index_id_patterns:
27+
- test-index-happy-path*
28+
doc_mapping:
29+
mode: dynamic
30+
indexing_settings:
31+
commit_timeout_secs: 1

0 commit comments

Comments
 (0)