Skip to content

Commit

Permalink
Various Lambda fixes (#5016)
Browse files Browse the repository at this point in the history
* Various Lambda fixes on HEAD

* Fix clippy lint

* Logging improvements
  • Loading branch information
rdettai authored Jun 3, 2024
1 parent fc7638b commit f8590d5
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 23 deletions.
3 changes: 3 additions & 0 deletions distribution/lambda/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ package:
then
pushd ../../quickwit/
rustc --version
# TODO: remove --disable-optimizations when upgrading to a release containing
# https://github.com/cargo-lambda/cargo-lambda/issues/649 (> 1.2.1)
cargo lambda build \
-p quickwit-lambda \
--disable-optimizations \
--release \
--output-format zip \
--target x86_64-unknown-linux-gnu
Expand Down
8 changes: 7 additions & 1 deletion distribution/lambda/cdk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ def get_logs(
last_event_id = ""
last_event_found = True
start_time = time.time()
while time.time() - start_time < timeout:
describe_resp = client.describe_log_groups(logGroupNamePrefix=log_group_name)
group_names = [group["logGroupName"] for group in describe_resp["logGroups"]]
if log_group_name in group_names:
break
print(f"log group not found, retrying...")
time.sleep(3)
while time.time() - start_time < timeout:
for page in paginator.paginate(
logGroupName=log_group_name,
Expand All @@ -268,7 +275,6 @@ def get_logs(
last_event_id = event["eventId"]
yield event["message"]
if event["message"].startswith("REPORT"):
lower_time_bound = int(event["timestamp"])
last_event_id = "REPORT"
break
if last_event_id == "REPORT":
Expand Down
60 changes: 41 additions & 19 deletions quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ use quickwit_indexing::IndexingPipeline;
use quickwit_ingest::IngesterPool;
use quickwit_janitor::{start_janitor_service, JanitorService};
use quickwit_metastore::{
CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt,
AddSourceRequestExt, CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata,
IndexMetadataResponseExt,
};
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::metastore::{
CreateIndexRequest, IndexMetadataRequest, MetastoreError, MetastoreService,
AddSourceRequest, CreateIndexRequest, IndexMetadataRequest, MetastoreError, MetastoreService,
MetastoreServiceClient, ResetSourceCheckpointRequest,
};
use quickwit_proto::types::PipelineUid;
Expand All @@ -60,7 +61,7 @@ use crate::indexer::environment::{
DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI, MAX_CHECKPOINTS,
};

const LAMBDA_SOURCE_ID: &str = "_ingest-lambda-source";
const LAMBDA_SOURCE_ID: &str = "ingest-lambda-source";

/// The indexing service needs to update its cluster chitchat state so that the control plane is
/// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
Expand Down Expand Up @@ -154,29 +155,47 @@ pub(super) async fn configure_source(
}

/// Check if the index exists, creating or overwriting it if necessary
///
/// If the index exists but without the Lambda source ([`LAMBDA_SOURCE_ID`]),
/// the source is added.
pub(super) async fn init_index_if_necessary(
metastore: &mut MetastoreServiceClient,
storage_resolver: &StorageResolver,
default_index_root_uri: &Uri,
overwrite: bool,
source_config: &SourceConfig,
) -> anyhow::Result<IndexMetadata> {
let metadata_result = metastore
.index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone()))
.await;
let metadata = match metadata_result {
Ok(_) if overwrite => {
info!(
index_id = *INDEX_ID,
"Overwrite enabled, clearing existing index",
);
let mut index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&INDEX_ID).await?;
metastore
.index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone()))
.await?
.deserialize_index_metadata()?
Ok(metadata_resp) => {
let current_metadata = metadata_resp.deserialize_index_metadata()?;
let mut metadata_changed = false;
if overwrite {
info!(index_uid = %current_metadata.index_uid, "overwrite enabled, clearing existing index");
let mut index_service =
IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&INDEX_ID).await?;
metadata_changed = true;
}
if !current_metadata.sources.contains_key(LAMBDA_SOURCE_ID) {
let add_source_request = AddSourceRequest::try_from_source_config(
current_metadata.index_uid.clone(),
source_config,
)?;
metastore.add_source(add_source_request).await?;
metadata_changed = true;
}
if metadata_changed {
metastore
.index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone()))
.await?
.deserialize_index_metadata()?
} else {
current_metadata
}
}
Ok(metadata_resp) => metadata_resp.deserialize_index_metadata()?,
Err(MetastoreError::NotFound(_)) => {
info!(
index_id = *INDEX_ID,
Expand All @@ -191,10 +210,13 @@ pub(super) async fn init_index_if_necessary(
index_config.index_id,
);
}
let create_resp = metastore
.create_index(CreateIndexRequest::try_from_index_config(&index_config)?)
.await?;
info!("index created");
let create_index_request = CreateIndexRequest::try_from_index_and_source_configs(
&index_config,
std::slice::from_ref(source_config),
)?;
let create_resp = metastore.create_index(create_index_request).await?;

info!(index_uid = %create_resp.index_uid(), "index created");
create_resp.deserialize_index_metadata()?
}
Err(e) => bail!(e),
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-lambda/src/indexer/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,18 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
let (config, storage_resolver, mut metastore) =
load_node_config(CONFIGURATION_TEMPLATE).await?;

let source_config =
configure_source(args.input_path, args.input_format, args.vrl_script).await?;

let index_metadata = init_index_if_necessary(
&mut metastore,
&storage_resolver,
&config.default_index_root_uri,
args.overwrite,
&source_config,
)
.await?;

let source_config =
configure_source(args.input_path, args.input_format, args.vrl_script).await?;

let mut services = vec![QuickwitService::Indexer];
if !*DISABLE_JANITOR {
services.push(QuickwitService::Janitor);
Expand Down

0 comments on commit f8590d5

Please sign in to comment.