Skip to content

Commit

Permalink
refactoring attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Sep 10, 2024
1 parent 2937cbb commit eb60ebe
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 101 deletions.
214 changes: 124 additions & 90 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where Fut: Future<Output = T> {
}

/// Information on what splits have and have not been cleaned up by the GC.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct SplitRemovalInfo {
/// The set of splits that have been removed.
pub removed_split_entries: Vec<SplitInfo>,
Expand All @@ -94,17 +94,23 @@ pub async fn run_garbage_collect(
dry_run: bool,
progress_opt: Option<&Progress>,
) -> anyhow::Result<SplitRemovalInfo> {
// Select staged splits with staging timestamp older than grace period timestamp.



let grace_period_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64;

let index_uids: Vec<IndexUid> = indexes.keys().cloned().collect();

let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
let Some(list_splits_query_for_index_uids) = ListSplitsQuery::try_from_index_uids(index_uids.clone()) else {
return Ok(SplitRemovalInfo::default())
};
let list_splits_query = list_splits_query_for_index_uids
.clone()
.with_split_state(SplitState::Staged)
.with_update_timestamp_lte(grace_period_timestamp);

let list_deletable_staged_request = ListSplitsRequest::try_from_list_splits_query(&query)?;
let list_deletable_staged_request = ListSplitsRequest::try_from_list_splits_query(&list_splits_query)?;
let deletable_staged_splits: Vec<SplitMetadata> = protect_future(
progress_opt,
metastore.list_splits(list_deletable_staged_request),
Expand All @@ -114,7 +120,7 @@ pub async fn run_garbage_collect(
.await?;

if dry_run {
let marked_for_deletion_query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
let marked_for_deletion_query = list_splits_query_for_index_uids
.with_split_state(SplitState::MarkedForDeletion);
let marked_for_deletion_request =
ListSplitsRequest::try_from_list_splits_query(&marked_for_deletion_query)?;
Expand Down Expand Up @@ -157,78 +163,25 @@ pub async fn run_garbage_collect(
let updated_before_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64;

delete_splits_marked_for_deletion(
Ok(delete_splits_marked_for_deletion_several_indexes(
index_uids,
updated_before_timestamp,
metastore,
indexes,
progress_opt,
)
.await
.await)
}
#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))]
/// Removes any splits marked for deletion which haven't been
/// updated after `updated_before_timestamp` in batches of 1000 splits.
///
/// The aim of this is to spread the load out across a longer period
/// rather than short, heavy bursts on the metastore and storage system itself.
async fn delete_splits_marked_for_deletion(
index_uids: Vec<IndexUid>,
updated_before_timestamp: i64,

async fn delete_splits(
splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>>,
storages: &HashMap<IndexUid, Arc<dyn Storage>>,
metastore: MetastoreServiceClient,
storages: HashMap<IndexUid, Arc<dyn Storage>>,
progress_opt: Option<&Progress>,
) -> anyhow::Result<SplitRemovalInfo> {
let mut removed_splits = Vec::new();
let mut failed_splits = Vec::new();

loop {
let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE)
.sort_by_index_uid();

let list_splits_request = match ListSplitsRequest::try_from_list_splits_query(&query) {
Ok(request) => request,
Err(error) => {
error!(error = ?error, "failed to build list splits request");
break;
}
};
let splits_stream_result =
protect_future(progress_opt, metastore.list_splits(list_splits_request)).await;
let splits_to_delete_stream: ServiceStream<MetastoreResult<ListSplitsResponse>> =
match splits_stream_result {
Ok(splits_stream) => splits_stream,
Err(error) => {
error!(error = ?error, "failed to fetch stream splits");
break;
}
};

let splits_metadata_to_delete: Vec<SplitMetadata> =
match splits_to_delete_stream.collect_splits_metadata().await {
Ok(splits) => splits,
Err(error) => {
error!(error = ?error, "failed to collect splits");
break;
}
};

let num_splits_to_delete = splits_metadata_to_delete.len();

if num_splits_to_delete == 0 {
break;
}

let splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
splits_metadata_to_delete
.into_iter()
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();

let exit = futures::stream::iter(splits_metadata_to_delete_per_index)
split_removal_info: &mut SplitRemovalInfo,
) -> Result<(), ()> {
let mut delete_split_from_index_res_stream =
futures::stream::iter(splits_metadata_to_delete_per_index)
.map(|(index_uid, splits_metadata_to_delete)| {
let storage = storages.get(&index_uid).cloned();
let metastore = metastore.clone();
Expand Down Expand Up @@ -260,33 +213,114 @@ async fn delete_splits_marked_for_deletion(
}
}
})
.buffer_unordered(10)
.fold(false, |previous_err, result| {
let err_here = match result {
Ok(entries) => {
removed_splits.extend(entries);
false
}
Err(delete_splits_error) => {
removed_splits.extend(delete_splits_error.successes);
failed_splits.extend(delete_splits_error.storage_failures);
failed_splits.extend(delete_splits_error.metastore_failures);
true
}
};
std::future::ready(previous_err || err_here)
})
.await;
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || exit {
.buffer_unordered(10);
let mut error_encountered = false;
while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await {
match delete_split_result {
Ok(entries) => {
split_removal_info.removed_split_entries.extend(entries);
}
Err(delete_split_error) => {
split_removal_info
.removed_split_entries
.extend(delete_split_error.successes);
split_removal_info
.failed_splits
.extend(delete_split_error.storage_failures);
split_removal_info
.failed_splits
.extend(delete_split_error.metastore_failures);
error_encountered = true;
}
}
}
if error_encountered {
Err(())
} else {
Ok(())
}
}

use anyhow::Context;

/// Fetch the list metadata from the metastore and returns them as a Vec.
async fn list_splits_metadata(metastore: &MetastoreServiceClient, query: &ListSplitsQuery) -> anyhow::Result<Vec<SplitMetadata>> {
let list_splits_request = ListSplitsRequest::try_from_list_splits_query(&query)
.context("failed to build list splits request")?;
let metastore = metastore.clone();
let splits_to_delete_stream =
metastore
.list_splits(list_splits_request).await
.context("failed to fetch stream splits")?;
let splits = splits_to_delete_stream.collect_splits_metadata().await
.context("failed to collect splits")?;
Ok(splits)
}

/// Removes any splits marked for deletion which haven't been
/// updated after `updated_before_timestamp` in batches of 1000 splits.
///
/// The aim of this is to spread the load out across a longer period
/// rather than short, heavy bursts on the metastore and storage system itself.
#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))]
async fn delete_splits_marked_for_deletion_several_indexes(
index_uids: Vec<IndexUid>,
updated_before_timestamp: i64,
metastore: MetastoreServiceClient,
storages: HashMap<IndexUid, Arc<dyn Storage>>,
progress_opt: Option<&Progress>,
) -> SplitRemovalInfo {
let mut split_removal_info = SplitRemovalInfo::default();

let Some(list_splits_query) = ListSplitsQuery::try_from_index_uids(index_uids) else {
error!("failed to create list splits query. this should never happen");
return split_removal_info;
};

let list_splits_query = list_splits_query
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE)
.sort_by_index_uid();

loop {
let splits_metadata_to_delete: Vec<SplitMetadata> = match protect_future(progress_opt, list_splits_metadata(&metastore, &list_splits_query)).await {
Ok(splits) => splits,
Err(list_splits_err) => {
error!(error=?list_splits_err, "failed to list splits");
break;
}
};

let num_splits_to_delete = splits_metadata_to_delete.len();

if num_splits_to_delete == 0 {
break;
}

let splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
splits_metadata_to_delete
.into_iter()
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();

let delete_split_res = delete_splits(
splits_metadata_to_delete_per_index,
&storages,
metastore.clone(),
progress_opt,
&mut split_removal_info,
)
.await;

if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || delete_split_res.is_err() {
// stop the gc if this was the last batch or we encountered an error
// (otherwise we might try deleting the same splits in an endless loop)
break;
}
}
Ok(SplitRemovalInfo {
removed_split_entries: removed_splits,
failed_splits,
})

split_removal_info
}

/// Delete a list of splits from the storage and the metastore.
Expand Down
11 changes: 4 additions & 7 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,15 +662,12 @@ impl ListSplitsQuery {
}

/// Creates a new [`ListSplitsQuery`] from a non-empty list of index UIDs.
/// Returns an error if the list is empty.
pub fn try_from_index_uids(index_uids: Vec<IndexUid>) -> MetastoreResult<Self> {
/// Returns None if the list is empty.
pub fn try_from_index_uids(index_uids: Vec<IndexUid>) -> Option<Self> {
if index_uids.is_empty() {
return Err(MetastoreError::Internal {
message: "ListSplitQuery should define at least one index uid".to_string(),
cause: "".to_string(),
});
return None;
}
Ok(Self {
Some(Self {
index_uids,
node_id: None,
limit: None,
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ pub async fn list_relevant_splits(
tags_filter_opt: Option<TagFilterAst>,
metastore: &mut MetastoreServiceClient,
) -> crate::Result<Vec<SplitMetadata>> {
let mut query =
ListSplitsQuery::try_from_index_uids(index_uids)?.with_split_state(SplitState::Published);
let Some(mut query) = ListSplitsQuery::try_from_index_uids(index_uids) else {
return Ok(Vec::new());
};
query = query.with_split_state(SplitState::Published);

if let Some(start_ts) = start_timestamp {
query = query.with_time_range_start_gte(start_ts);
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ pub async fn root_list_terms(
.iter()
.map(|index_metadata| index_metadata.index_uid.clone())
.collect();
let mut query = quickwit_metastore::ListSplitsQuery::try_from_index_uids(index_uids)?
.with_split_state(quickwit_metastore::SplitState::Published);

let Some(mut query) = quickwit_metastore::ListSplitsQuery::try_from_index_uids(index_uids) else {
return Ok(ListTermsResponse::default());
};
query = query.with_split_state(quickwit_metastore::SplitState::Published);

if let Some(start_ts) = list_terms_request.start_timestamp {
query = query.with_time_range_start_gte(start_ts);
Expand Down

0 comments on commit eb60ebe

Please sign in to comment.