Skip to content

Commit

Permalink
Explicit 400 when using ingest V2 and wait_for
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 27, 2024
1 parent 20606e5 commit 910c325
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 56 deletions.
4 changes: 4 additions & 0 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
pub enum IngestServiceError {
#[error("unimplemented: {0}")]
Unimplemented(String),
#[error("data corruption: {0}")]
Corruption(String),
#[error("index `{index_id}` already exists")]
Expand Down Expand Up @@ -141,6 +143,7 @@ impl From<IngestFailure> for IngestServiceError {
impl ServiceError for IngestServiceError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Unimplemented(_) => ServiceErrorCode::BadRequest,
Self::Corruption(err_msg) => {
rate_limited_error!(
limit_per_min = 6,
Expand Down Expand Up @@ -196,6 +199,7 @@ impl From<CorruptedKey> for IngestServiceError {
impl From<IngestServiceError> for tonic::Status {
fn from(error: IngestServiceError) -> tonic::Status {
let code = match &error {
IngestServiceError::Unimplemented(_) => tonic::Code::InvalidArgument,
IngestServiceError::Corruption { .. } => tonic::Code::DataLoss,
IngestServiceError::IndexAlreadyExists { .. } => tonic::Code::AlreadyExists,
IngestServiceError::IndexNotFound { .. } => tonic::Code::NotFound,
Expand Down
99 changes: 51 additions & 48 deletions quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async fn test_ingest_v2_index_not_found() {
ingest_json!({"body": "doc1"}),
None,
None,
CommitType::WaitFor,
CommitType::Auto,
)
.await
.unwrap_err();
Expand Down Expand Up @@ -281,7 +281,7 @@ async fn test_ingest_v2_happy_path() {
ingest_json!({"body": "doc1"}),
None,
None,
CommitType::WaitFor,
CommitType::Auto,
)
.await;
let Some(ingest_error) = ingest_res.err() else {
Expand Down Expand Up @@ -365,49 +365,52 @@ async fn test_commit_force() {
sandbox.shutdown().await.unwrap();
}

// #[tokio::test]
// async fn test_commit_wait_for() {
// initialize_tests();
// let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
// let index_id = "test_commit_wait_for";
// let index_config = format!(
// r#"
// version: 0.8
// index_id: {index_id}
// doc_mapping:
// field_mappings:
// - name: body type: text
// indexing_settings:
// commit_timeout_secs: 2
// "#
// );

// // Create index
// sandbox
// .indexer_rest_client
// .indexes()
// .create(index_config, ConfigFormat::Yaml, false)
// .await
// .unwrap();

// sandbox.enable_ingest_v2();

// sandbox
// .indexer_rest_client
// .ingest(
// index_id,
// ingest_json!({"body": "wait"}),
// None,
// None,
// CommitType::WaitFor,
// )
// .await
// .unwrap();

// sandbox.assert_hit_count(index_id, "body:wait", 1).await;

// sandbox.shutdown().await.unwrap();
// }
#[tokio::test]
async fn test_commit_wait_for() {
initialize_tests();
let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
let index_id = "test_commit_wait_for";
let index_config = format!(
r#"
version: 0.8
index_id: {index_id}
doc_mapping:
field_mappings:
- name: body
type: text
indexing_settings:
commit_timeout_secs: 2
"#
);

// Create index
sandbox
.indexer_rest_client
.indexes()
.create(index_config, ConfigFormat::Yaml, false)
.await
.unwrap();

sandbox.enable_ingest_v2();

let ingest_error = sandbox
.indexer_rest_client
.ingest(
index_id,
ingest_json!({"body": "wait"}),
None,
None,
CommitType::WaitFor,
)
.await
.unwrap_err();

// TODO https://github.com/quickwit-oss/quickwit/issues/5351
assert_eq!(ingest_error.status_code(), Some(StatusCode::BAD_REQUEST));
// sandbox.assert_hit_count(index_id, "body:wait", 1).await;

sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_commit_auto() {
Expand Down Expand Up @@ -507,7 +510,7 @@ async fn test_very_large_index_name() {
&sandbox.indexer_rest_client,
index_id,
ingest_json!({"body": "not too long"}),
CommitType::WaitFor,
CommitType::Auto,
)
.await
.unwrap();
Expand Down Expand Up @@ -667,7 +670,7 @@ async fn test_shutdown_control_plane_early_shutdown() {
&sandbox.indexer_rest_client,
index_id,
ingest_json!({"body": "one"}),
CommitType::WaitFor,
CommitType::Force,
)
.await
.unwrap();
Expand Down Expand Up @@ -726,7 +729,7 @@ async fn test_shutdown_separate_indexer() {
&sandbox.indexer_rest_client,
index_id,
ingest_json!({"body": "one"}),
CommitType::WaitFor,
CommitType::Force,
)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-proto/protos/quickwit/ingest.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ message ShardPKey {
enum CommitTypeV2 {
COMMIT_TYPE_V2_UNSPECIFIED = 0;
COMMIT_TYPE_V2_AUTO = 1;
COMMIT_TYPE_V2_WAIT = 2;
COMMIT_TYPE_V2_WAIT_FOR = 2;
COMMIT_TYPE_V2_FORCE = 3;
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use quickwit_proto::ingest::router::{
use quickwit_proto::ingest::CommitTypeV2;
use quickwit_proto::types::{DocUid, IndexId};
use serde::{Deserialize, Serialize};
use tracing::warn;

use super::model::ElasticException;
use crate::elasticsearch_api::model::{BulkAction, ElasticBulkOptions, ElasticsearchError};
Expand Down Expand Up @@ -140,8 +139,12 @@ pub(crate) async fn elastic_bulk_ingest_v2(
}
let commit_type: CommitTypeV2 = bulk_options.refresh.into();

if commit_type != CommitTypeV2::Auto {
warn!("ingest API v2 does not support the `refresh` parameter (yet)");
if commit_type == CommitTypeV2::WaitFor {
ElasticsearchError::new(
StatusCode::BAD_REQUEST,
"ingest API v2 does not support the `refresh=wait_for` parameter (yet)".to_string(),
Some(ElasticException::IllegalArgument),
);
}
let ingest_request_opt = ingest_request_builder.build(INGEST_V2_SOURCE_ID, commit_type);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl From<ElasticRefresh> for CommitTypeV2 {
match val {
ElasticRefresh::False => Self::Auto,
ElasticRefresh::True => Self::Force,
ElasticRefresh::WaitFor => Self::Wait,
ElasticRefresh::WaitFor => Self::WaitFor,
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ async fn ingest_v2(
ingest_options: IngestV2Options,
ingest_router: IngestRouterServiceClient,
) -> Result<IngestResponse, IngestServiceError> {
if ingest_options.commit_type == CommitTypeV2::WaitFor {
return Err(IngestServiceError::Unimplemented(
"ingest API v2 does not support the `refrcommitesh=wait_for` parameter (yet)"
.to_string(),
));
}
let mut doc_batch_builder = DocBatchV2Builder::default();
let mut doc_uid_generator = DocUidGenerator::default();

Expand Down

0 comments on commit 910c325

Please sign in to comment.