Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gc multiple indexes at once #5380

Merged
merged 5 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 70 additions & 50 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;
use std::time::Duration;

use futures::Future;
use itertools::Itertools;
use quickwit_common::pretty::PrettySample;
use quickwit_common::{Progress, ServiceStream};
use quickwit_metastore::{
Expand Down Expand Up @@ -66,6 +67,7 @@ where Fut: Future<Output = T> {
}

/// Information on what splits have and have not been cleaned up by the GC.
#[derive(Debug)]
pub struct SplitRemovalInfo {
/// The set of splits that have been removed.
pub removed_split_entries: Vec<SplitInfo>,
Expand All @@ -75,7 +77,7 @@ pub struct SplitRemovalInfo {

/// Detect all dangling splits and associated files from the index and removes them.
///
/// * `index_id` - The target index id.
/// * `indexes` - The target index uids and storages.
/// * `storage - The storage managing the target index.
/// * `metastore` - The metastore managing the target index.
/// * `staged_grace_period` - Threshold period after which a staged split can be safely garbage
Expand All @@ -85,8 +87,7 @@ pub struct SplitRemovalInfo {
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
/// * `progress` - For reporting progress (useful when called from within a quickwit actor).
pub async fn run_garbage_collect(
index_uid: IndexUid,
storage: Arc<dyn Storage>,
indexes: HashMap<IndexUid, Arc<dyn Storage>>,
metastore: MetastoreServiceClient,
staged_grace_period: Duration,
deletion_grace_period: Duration,
Expand All @@ -97,7 +98,9 @@ pub async fn run_garbage_collect(
let grace_period_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64;

let query = ListSplitsQuery::for_index(index_uid.clone())
let index_uids: Vec<IndexUid> = indexes.keys().cloned().collect();

let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::Staged)
.with_update_timestamp_lte(grace_period_timestamp);

Expand All @@ -111,7 +114,7 @@ pub async fn run_garbage_collect(
.await?;

if dry_run {
let marked_for_deletion_query = ListSplitsQuery::for_index(index_uid.clone())
let marked_for_deletion_query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::MarkedForDeletion);
let marked_for_deletion_request =
ListSplitsRequest::try_from_list_splits_query(&marked_for_deletion_query)?;
Expand All @@ -135,13 +138,13 @@ pub async fn run_garbage_collect(
}

// Schedule all eligible staged splits for delete
let split_ids: Vec<SplitId> = deletable_staged_splits
.iter()
.map(|split| split.split_id.to_string())
.collect();
if !split_ids.is_empty() {
let split_ids: HashMap<IndexUid, Vec<SplitId>> = deletable_staged_splits
.into_iter()
.map(|split| (split.index_uid, split.split_id))
.into_group_map();
for (index_uid, split_ids) in split_ids {
let mark_splits_for_deletion_request =
MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids);
MarkSplitsForDeletionRequest::new(index_uid, split_ids);
protect_future(
progress_opt,
metastore.mark_splits_for_deletion(mark_splits_for_deletion_request),
Expand All @@ -154,35 +157,33 @@ pub async fn run_garbage_collect(
let updated_before_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64;

let deleted_splits = delete_splits_marked_for_deletion(
index_uid,
delete_splits_marked_for_deletion(
index_uids,
updated_before_timestamp,
storage,
metastore,
indexes,
progress_opt,
)
.await;

Ok(deleted_splits)
.await
}
#[instrument(skip(storage, metastore, progress_opt))]
#[instrument(skip(storages, metastore, progress_opt))]
/// Removes any splits marked for deletion which haven't been
/// updated after `updated_before_timestamp` in batches of 1000 splits.
///
/// The aim of this is to spread the load out across a longer period
/// rather than short, heavy bursts on the metastore and storage system itself.
async fn delete_splits_marked_for_deletion(
index_uid: IndexUid,
index_uids: Vec<IndexUid>,
updated_before_timestamp: i64,
storage: Arc<dyn Storage>,
metastore: MetastoreServiceClient,
storages: HashMap<IndexUid, Arc<dyn Storage>>,
progress_opt: Option<&Progress>,
) -> SplitRemovalInfo {
) -> anyhow::Result<SplitRemovalInfo> {
let mut removed_splits = Vec::new();
let mut failed_splits = Vec::new();

loop {
let query = ListSplitsQuery::for_index(index_uid.clone())
'outer: loop {
let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE);
Expand Down Expand Up @@ -219,31 +220,46 @@ async fn delete_splits_marked_for_deletion(
if num_splits_to_delete == 0 {
break;
}
let delete_splits_result = delete_splits_from_storage_and_metastore(
index_uid.clone(),
storage.clone(),
metastore.clone(),
splits_metadata_to_delete,
progress_opt,
)
.await;

match delete_splits_result {
Ok(entries) => removed_splits.extend(entries),
Err(delete_splits_error) => {
failed_splits.extend(delete_splits_error.storage_failures);
failed_splits.extend(delete_splits_error.metastore_failures);
break;
let splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
splits_metadata_to_delete
.into_iter()
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();

for (index_uid, splits_metadata_to_delete) in splits_metadata_to_delete_per_index {
let Some(storage) = storages.get(&index_uid).cloned() else {
error!("we are trying to GC without knowing the storage, this shouldn't happen");
// we stop there, or we could easily end up looping indefinitely if there are more
// than DELETE_SPLITS_BATCH_SIZE to delete in this index
break 'outer;
};
let delete_splits_result = delete_splits_from_storage_and_metastore(
index_uid,
storage,
metastore.clone(),
splits_metadata_to_delete,
progress_opt,
)
.await;

match delete_splits_result {
Ok(entries) => removed_splits.extend(entries),
Err(delete_splits_error) => {
failed_splits.extend(delete_splits_error.storage_failures);
failed_splits.extend(delete_splits_error.metastore_failures);
break;
}
}
}
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE {
break;
}
}
SplitRemovalInfo {
Ok(SplitRemovalInfo {
removed_split_entries: removed_splits,
failed_splits,
}
})
}

/// Delete a list of splits from the storage and the metastore.
Expand Down Expand Up @@ -369,6 +385,12 @@ mod tests {
use super::*;
use crate::run_garbage_collect;

fn hashmap<K: Eq + std::hash::Hash, V>(key: K, value: V) -> HashMap<K, V> {
let mut map = HashMap::new();
map.insert(key, value);
map
}

#[tokio::test]
async fn test_run_gc_marks_stale_staged_splits_for_deletion_after_grace_period() {
let storage = storage_for_test();
Expand Down Expand Up @@ -414,8 +436,7 @@ mod tests {

// The staging grace period hasn't passed yet so the split remains staged.
run_garbage_collect(
index_uid.clone(),
storage.clone(),
hashmap(index_uid.clone(), storage.clone()),
metastore.clone(),
Duration::from_secs(30),
Duration::from_secs(30),
Expand All @@ -442,8 +463,7 @@ mod tests {

// The staging grace period has passed so the split is marked for deletion.
run_garbage_collect(
index_uid.clone(),
storage.clone(),
hashmap(index_uid.clone(), storage.clone()),
metastore.clone(),
Duration::from_secs(0),
Duration::from_secs(30),
Expand Down Expand Up @@ -489,7 +509,7 @@ mod tests {
let split_id = "test-run-gc--split";
let split_metadata = SplitMetadata {
split_id: split_id.to_string(),
index_uid: IndexUid::new_with_random_ulid(index_id),
index_uid: index_uid.clone(),
..Default::default()
};
let stage_splits_request =
Expand Down Expand Up @@ -520,8 +540,7 @@ mod tests {

// The delete grace period hasn't passed yet so the split remains marked for deletion.
run_garbage_collect(
index_uid.clone(),
storage.clone(),
hashmap(index_uid.clone(), storage.clone()),
metastore.clone(),
Duration::from_secs(30),
Duration::from_secs(30),
Expand All @@ -548,8 +567,7 @@ mod tests {

// The delete grace period has passed so the split is deleted.
run_garbage_collect(
index_uid.clone(),
storage.clone(),
hashmap(index_uid.clone(), storage.clone()),
metastore.clone(),
Duration::from_secs(30),
Duration::from_secs(0),
Expand Down Expand Up @@ -584,8 +602,10 @@ mod tests {
.times(2)
.returning(|_| Ok(ServiceStream::empty()));
run_garbage_collect(
IndexUid::new_with_random_ulid("index-test-gc-deletes"),
storage.clone(),
hashmap(
IndexUid::new_with_random_ulid("index-test-gc-deletes"),
storage.clone(),
),
MetastoreServiceClient::from_mock(mock_metastore),
Duration::from_secs(30),
Duration::from_secs(30),
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,7 @@ impl IndexService {
.await?;

let deleted_entries = run_garbage_collect(
index_uid,
storage,
[(index_uid, storage)].into_iter().collect(),
self.metastore.clone(),
grace_period,
// deletion_grace_period of zero, so that a cli call directly deletes splits after
Expand Down
Loading
Loading