Skip to content

Commit

Permalink
Add endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jan 21, 2025
1 parent f4006c4 commit d7c318d
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 22 deletions.
10 changes: 5 additions & 5 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ use serde_json::Value as JsonValue;
use siphasher::sip::SipHasher;
use source_config::FileSourceParamsForSerde;
pub use source_config::{
load_source_config_from_user_config, FileSourceMessageType, FileSourceNotification,
FileSourceParams, FileSourceSqs, KafkaSourceParams, KinesisSourceParams, PubSubSourceParams,
PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat,
SourceParams, TransformConfig, VecSourceParams, VoidSourceParams, CLI_SOURCE_ID,
INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
load_source_config_from_user_config, load_source_config_update, FileSourceMessageType,
FileSourceNotification, FileSourceParams, FileSourceSqs, KafkaSourceParams,
KinesisSourceParams, PubSubSourceParams, PulsarSourceAuth, PulsarSourceParams,
RegionOrEndpoint, SourceConfig, SourceInputFormat, SourceParams, TransformConfig,
VecSourceParams, VoidSourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
};
use tracing::warn;

Expand Down
45 changes: 44 additions & 1 deletion quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ use regex::Regex;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value as JsonValue;
pub use serialize::load_source_config_from_user_config;
// For backward compatibility.
use serialize::VersionedSourceConfig;
pub use serialize::{load_source_config_from_user_config, load_source_config_update};
use siphasher::sip::SipHasher;

use crate::{disable_ingest_v1, enable_ingest_v2};
Expand Down Expand Up @@ -632,6 +632,7 @@ impl TransformConfig {

#[cfg(test)]
mod tests {
use std::num::NonZero;
use std::str::FromStr;

use quickwit_common::uri::Uri;
Expand Down Expand Up @@ -1392,4 +1393,46 @@ mod tests {
.unwrap();
assert_eq!(source_config.input_format, SourceInputFormat::PlainText);
}

#[tokio::test]
async fn test_update_kafka_source_config() {
let source_config_filepath = get_source_config_filepath("kafka-source.json");
let file_content = std::fs::read(&source_config_filepath).unwrap();
let source_config_uri = Uri::from_str(&source_config_filepath).unwrap();
let config_format = ConfigFormat::sniff_from_uri(&source_config_uri).unwrap();
let mut existing_source_config =
load_source_config_from_user_config(config_format, &file_content).unwrap();
existing_source_config.num_pipelines = NonZero::new(4).unwrap();
let new_source_config =
load_source_config_update(config_format, &file_content, &existing_source_config)
.unwrap();

let expected_source_config = SourceConfig {
source_id: "hdfs-logs-kafka-source".to_string(),
num_pipelines: NonZeroUsize::new(2).unwrap(),
enabled: true,
source_params: SourceParams::Kafka(KafkaSourceParams {
topic: "cloudera-cluster-logs".to_string(),
client_log_level: None,
client_params: json! {{"bootstrap.servers": "localhost:9092"}},
enable_backfill_mode: false,
}),
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "local".to_string(),
}),
input_format: SourceInputFormat::Json,
};
assert_eq!(new_source_config, expected_source_config);
assert_eq!(new_source_config.num_pipelines.get(), 2);

// the source type cannot be updated
existing_source_config.source_params = SourceParams::Kinesis(KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: None,
enable_backfill_mode: false,
});
load_source_config_update(config_format, &file_content, &existing_source_config)
.unwrap_err();
}
}
28 changes: 27 additions & 1 deletion quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::num::NonZeroUsize;

use anyhow::bail;
use anyhow::{bail, ensure};
use quickwit_proto::types::SourceId;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -64,6 +64,32 @@ pub fn load_source_config_from_user_config(
source_config_for_serialization.validate_and_build()
}

pub fn load_source_config_update(
config_format: ConfigFormat,
config_content: &[u8],
current_source_config: &SourceConfig,
) -> anyhow::Result<SourceConfig> {
let versioned_source_config: VersionedSourceConfig = config_format.parse(config_content)?;
let source_config_for_serialization: SourceConfigForSerialization =
versioned_source_config.into();
let new_source_config = source_config_for_serialization.validate_and_build()?;

ensure!(
current_source_config.source_id == new_source_config.source_id,
"existing `source_id` {} does not match updated `source_id` {}",
current_source_config.source_id,
new_source_config.source_id
);

ensure!(
current_source_config.source_type() == new_source_config.source_type(),
"source type cannot be updated, current type: {}",
current_source_config.source_type(),
);

Ok(new_source_config)
}

impl SourceConfigForSerialization {
/// Checks the validity of the `SourceConfig` as a "deserializable source".
///
Expand Down
38 changes: 36 additions & 2 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{
AddSourceRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt,
ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt,
MetastoreServiceStreamSplitsExt, SplitInfo, SplitMetadata, SplitState,
MetastoreServiceStreamSplitsExt, SplitInfo, SplitMetadata, SplitState, UpdateSourceRequestExt,
};
use quickwit_proto::metastore::{
serde_utils, AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, EntityKind,
IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest,
MarkSplitsForDeletionRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
ResetSourceCheckpointRequest,
ResetSourceCheckpointRequest, UpdateSourceRequest,
};
use quickwit_proto::types::{IndexUid, SplitId};
use quickwit_proto::{ServiceError, ServiceErrorCode};
Expand Down Expand Up @@ -481,6 +481,40 @@ impl IndexService {
Ok(source)
}

/// Updates a source from an index identified by its UID.
pub async fn update_source(
&mut self,
index_uid: IndexUid,
source_config: SourceConfig,
) -> Result<SourceConfig, IndexServiceError> {
let source_id = source_config.source_id.clone();
check_source_connectivity(&self.storage_resolver, &source_config)
.await
.map_err(IndexServiceError::InvalidConfig)?;
let update_source_request =
UpdateSourceRequest::try_from_source_config(index_uid.clone(), &source_config)?;
self.metastore.update_source(update_source_request).await?;
info!(
"source `{}` successfully updated for index `{}`",
source_id, index_uid.index_id,
);
let index_metadata_request = IndexMetadataRequest::for_index_id(index_uid.index_id);
let source = self
.metastore
.index_metadata(index_metadata_request)
.await?
.deserialize_index_metadata()?
.sources
.get(&source_id)
.ok_or_else(|| {
IndexServiceError::Internal(
"created source is not in index metadata, this should never happen".to_string(),
)
})?
.clone();
Ok(source)
}

pub async fn get_source(
&mut self,
index_id: &str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl MetastoreService for ControlPlaneMetastore {
}

async fn toggle_source(&self, request: ToggleSourceRequest) -> MetastoreResult<EmptyResponse> {
let response = self.control_plane.clone().toggle_source(request).await?;
let response = self.control_plane.toggle_source(request).await?;
Ok(response)
}

Expand Down
101 changes: 91 additions & 10 deletions quickwit/quickwit-serve/src/index_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ use super::index_resource::{
};
use super::source_resource::{
__path_create_source, __path_delete_source, __path_reset_source_checkpoint,
__path_toggle_source, create_source_handler, delete_source_handler, get_source_handler,
get_source_shards_handler, reset_source_checkpoint_handler, toggle_source_handler,
ToggleSource,
__path_toggle_source, __path_update_source, create_source_handler, delete_source_handler,
get_source_handler, get_source_shards_handler, reset_source_checkpoint_handler,
toggle_source_handler, update_source_handler, ToggleSource,
};
use super::split_resource::{
__path_list_splits, __path_mark_splits_for_deletion, list_splits_handler,
Expand All @@ -62,6 +62,7 @@ use crate::simple_list::from_simple_list;
describe_index,
mark_splits_for_deletion,
create_source,
update_source,
reset_source_checkpoint,
toggle_source,
delete_source,
Expand Down Expand Up @@ -107,6 +108,7 @@ pub fn index_management_handlers(
.or(reset_source_checkpoint_handler(index_service.metastore()))
.or(toggle_source_handler(index_service.metastore()))
.or(create_source_handler(index_service.clone()))
.or(update_source_handler(index_service.clone()))
.or(get_source_handler(index_service.metastore()))
.or(delete_source_handler(index_service.metastore()))
.or(get_source_shards_handler(index_service.metastore()))
Expand Down Expand Up @@ -1109,6 +1111,92 @@ mod tests {
}
}

#[tokio::test]
async fn test_update_source() {
let metastore = metastore_for_test();
let index_service = IndexService::new(metastore.clone(), StorageResolver::unconfigured());
let mut node_config = NodeConfig::for_test();
node_config.default_index_root_uri = Uri::for_test("file:///default-index-root-uri");
let index_management_handler =
super::index_management_handlers(index_service, Arc::new(node_config));
let resp = warp::test::request()
.path("/indexes")
.method("POST")
.json(&true)
.body(r#"{"version": "0.7", "index_id": "hdfs-logs", "doc_mapping": {"field_mappings":[{"name": "timestamp", "type": "i64", "fast": true, "indexed": true}]}}"#)
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 200);
let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
let expected_response_json = serde_json::json!({
"index_config": {
"index_id": "hdfs-logs",
"index_uri": "file:///default-index-root-uri/hdfs-logs",
}
});
assert_json_include!(actual: resp_json, expected: expected_response_json);

// Create source.
let source_config_body = r#"{"version": "0.7", "source_id": "vec-source", "source_type": "vec", "params": {"docs": [], "batch_num_docs": 10}}"#;
let resp = warp::test::request()
.path("/indexes/hdfs-logs/sources")
.method("POST")
.json(&true)
.body(source_config_body)
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 200);

// Update the source.
let update_source_config_body = r#"{"version": "0.7", "source_id": "vec-source", "source_type": "vec", "params": {"docs": [], "batch_num_docs": 20}}"#;
let resp = warp::test::request()
.path("/indexes/hdfs-logs/sources/vec-source")
.method("PUT")
.json(&true)
.body(update_source_config_body)
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 200);
// Check that the source has been updated.
let index_metadata = metastore
.index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string()))
.await
.unwrap()
.deserialize_index_metadata()
.unwrap();
assert!(index_metadata.sources.contains_key("vec-source"));
let source_config = index_metadata.sources.get("vec-source").unwrap();
assert_eq!(source_config.source_type(), SourceType::Vec);
assert_eq!(
source_config.source_params,
SourceParams::Vec(VecSourceParams {
docs: Vec::new(),
batch_num_docs: 20,
partition: "".to_string(),
})
);

// Update the source with a different source_id (forbidden)
let update_source_config_body = r#"{"version": "0.7", "source_id": "other-source-id", "source_type": "vec", "params": {"docs": [], "batch_num_docs": 20}}"#;
let resp = warp::test::request()
.path("/indexes/hdfs-logs/sources/vec-source")
.method("PUT")
.json(&true)
.body(update_source_config_body)
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 400);
// Check that the source hasn't been updated.
let index_metadata = metastore
.index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string()))
.await
.unwrap()
.deserialize_index_metadata()
.unwrap();
assert!(index_metadata.sources.contains_key("vec-source"));
assert!(!index_metadata.sources.contains_key("other-source-id"));
}

#[tokio::test]
async fn test_delete_non_existing_source() {
let mut mock_metastore = MockMetastoreService::new();
Expand Down Expand Up @@ -1244,13 +1332,6 @@ mod tests {
let index_management_handler =
super::index_management_handlers(index_service, Arc::new(NodeConfig::for_test()))
.recover(recover_fn);
// Check server returns 405 if sources root path is used.
let resp = warp::test::request()
.path("/indexes/quickwit-demo-index/sources/source-to-toggle")
.method("PUT")
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 405);
let resp = warp::test::request()
.path("/indexes/quickwit-demo-index/sources/source-to-toggle/toggle")
.method("PUT")
Expand Down
Loading

0 comments on commit d7c318d

Please sign in to comment.