Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't enumerate index_uid when requesting splits to gc #5489

Merged
merged 5 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 42 additions & 36 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::{Future, StreamExt};
use itertools::Itertools;
use quickwit_common::metrics::IntCounter;
use quickwit_common::pretty::PrettySample;
use quickwit_common::Progress;
use quickwit_common::{rate_limited_info, Progress};
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo,
SplitMetadata, SplitState,
Expand Down Expand Up @@ -122,8 +122,8 @@ pub async fn run_garbage_collect(

let index_uids: Vec<IndexUid> = indexes.keys().cloned().collect();

let Some(list_splits_query_for_index_uids) =
ListSplitsQuery::try_from_index_uids(index_uids.clone())
// TODO maybe we want to do a ListSplitsQuery::for_all_indexes and post-filter ourselves here
let Some(list_splits_query_for_index_uids) = ListSplitsQuery::try_from_index_uids(index_uids)
else {
return Ok(SplitRemovalInfo::default());
};
Expand Down Expand Up @@ -187,7 +187,6 @@ pub async fn run_garbage_collect(
OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64;

Ok(delete_splits_marked_for_deletion_several_indexes(
index_uids,
updated_before_timestamp,
metastore,
indexes,
Expand Down Expand Up @@ -221,20 +220,15 @@ async fn delete_splits(
)
.await
} else {
error!(
"we are trying to GC without knowing the storage, this shouldn't \
happen"
// in practice this can happen if the index was created between the start of
// the run and now, and one of its splits has already expired, which likely
// means a very long gc run, or if we run gc on a single index from the cli.
quickwit_common::rate_limited_warn!(
limit_per_min = 2,
index_uid=%index_uid,
"we are trying to GC without knowing the storage",
);
Err(DeleteSplitsError {
successes: Vec::new(),
storage_error: None,
storage_failures: splits_metadata_to_delete
.into_iter()
.map(|split| split.as_split_info())
.collect(),
metastore_error: None,
metastore_failures: Vec::new(),
})
Ok(Vec::new())
}
}
})
Expand Down Expand Up @@ -304,11 +298,12 @@ async fn list_splits_metadata(
/// Removes any splits marked for deletion which haven't been
/// updated after `updated_before_timestamp` in batches of 1000 splits.
///
/// Only splits from index_uids in the `storages` map will be deleted.
///
/// The aim of this is to spread the load out across a longer period
/// rather than short, heavy bursts on the metastore and storage system itself.
#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))]
#[instrument(skip(storages, metastore, progress_opt, metrics), fields(num_indexes=%storages.len()))]
async fn delete_splits_marked_for_deletion_several_indexes(
index_uids: Vec<IndexUid>,
updated_before_timestamp: i64,
metastore: MetastoreServiceClient,
storages: HashMap<IndexUid, Arc<dyn Storage>>,
Expand All @@ -317,18 +312,22 @@ async fn delete_splits_marked_for_deletion_several_indexes(
) -> SplitRemovalInfo {
let mut split_removal_info = SplitRemovalInfo::default();

let Some(list_splits_query) = ListSplitsQuery::try_from_index_uids(index_uids) else {
error!("failed to create list splits query. this should never happen");
return split_removal_info;
};
// we ask for all indexes because the query is more efficient and we almost always want all
// indexes anyway. The exception is when garbage collecting a single index from the commandline.
// In this case, we will log a bunch of warn. i (trinity) consider it worth the more generic
// code which needs fewer special case while testing, but we could check index_uids len if we
// think it's a better idea.
let list_splits_query = ListSplitsQuery::for_all_indexes();
fulmicoton marked this conversation as resolved.
Show resolved Hide resolved

let mut list_splits_query = list_splits_query
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE)
.sort_by_index_uid();

loop {
let mut splits_to_delete_possibly_remaining = true;

while splits_to_delete_possibly_remaining {
let splits_metadata_to_delete: Vec<SplitMetadata> = match protect_future(
progress_opt,
list_splits_metadata(&metastore, &list_splits_query),
Expand All @@ -342,19 +341,32 @@ async fn delete_splits_marked_for_deletion_several_indexes(
}
};

// We page through the list of splits to delete using a limit and a `search_after` trick.
// To detect if this is the last page, we check if the number of splits is less than the
// limit.
assert!(splits_metadata_to_delete.len() <= DELETE_SPLITS_BATCH_SIZE);
splits_to_delete_possibly_remaining =
splits_metadata_to_delete.len() == DELETE_SPLITS_BATCH_SIZE;

// set split after which to search for the next loop
let Some(last_split_metadata) = splits_metadata_to_delete.last() else {
break;
};
list_splits_query = list_splits_query.after_split(last_split_metadata);

let num_splits_to_delete = splits_metadata_to_delete.len();
let mut splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
HashMap::with_capacity(storages.len());

let splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
splits_metadata_to_delete
.into_iter()
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();
for meta in splits_metadata_to_delete {
if !storages.contains_key(&meta.index_uid) {
rate_limited_info!(limit_per_min=6, index_uid=?meta.index_uid, "split not listed in storage map: skipping");
continue;
}
splits_metadata_to_delete_per_index
.entry(meta.index_uid.clone())
.or_default()
.push(meta);
}

// ignore return we continue either way
let _: Result<(), ()> = delete_splits(
Expand All @@ -366,12 +378,6 @@ async fn delete_splits_marked_for_deletion_several_indexes(
&mut split_removal_info,
)
.await;

if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE {
// stop the gc if this was the last batch
// we are guaranteed to make progress due to .after_split()
break;
}
}

split_removal_info
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1764,14 +1764,14 @@ mod tests {
.expect_list_splits()
.withf(|request| {
let list_splits_query = request.deserialize_list_splits_query().unwrap();
list_splits_query.index_uids == [("test-index-0", 0)]
list_splits_query.index_uids.unwrap() == [("test-index-0", 0)]
})
.return_once(|_request| Ok(ServiceStream::empty()));
mock_metastore
.expect_list_splits()
.withf(|request| {
let list_splits_query = request.deserialize_list_splits_query().unwrap();
list_splits_query.index_uids == [("test-index-1", 0), ("test-index-2", 0)]
list_splits_query.index_uids.unwrap() == [("test-index-1", 0), ("test-index-2", 0)]
})
.return_once(|_request| {
let splits = vec![Split {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ mod tests {
.times(1)
.withf(move |list_splits_request| {
let list_split_query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(list_split_query.index_uids, &[index_uid.clone()]);
assert_eq!(list_split_query.index_uids, Some(vec![index_uid.clone()]));
assert_eq!(
list_split_query.split_states,
vec![quickwit_metastore::SplitState::Published]
Expand Down
45 changes: 29 additions & 16 deletions quickwit/quickwit-janitor/src/actors/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,13 @@ mod tests {
.times(2)
.returning(move |list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(query.index_uids[0], index_uid_clone,);
let splits = match query.split_states[0] {
SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged),
SplitState::Staged => {
assert_eq!(query.index_uids.unwrap()[0], index_uid_clone);
make_splits("test-index", &["a"], SplitState::Staged)
}
SplitState::MarkedForDeletion => {
assert!(query.index_uids.is_none());
let expected_deletion_timestamp = OffsetDateTime::now_utc()
.unix_timestamp()
- split_deletion_grace_period().as_secs() as i64;
Expand Down Expand Up @@ -394,14 +397,19 @@ mod tests {
.times(2)
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(&query.index_uids[0].index_id, "test-index");
let splits = match query.split_states[0] {
SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged),
SplitState::MarkedForDeletion => make_splits(
"test-index",
&["a", "b", "c"],
SplitState::MarkedForDeletion,
),
SplitState::Staged => {
assert_eq!(&query.index_uids.unwrap()[0].index_id, "test-index");
make_splits("test-index", &["a"], SplitState::Staged)
}
SplitState::MarkedForDeletion => {
assert!(query.index_uids.is_none());
make_splits(
"test-index",
&["a", "b", "c"],
SplitState::MarkedForDeletion,
)
}
_ => panic!("only Staged and MarkedForDeletion expected."),
};
let splits = ListSplitsResponse::try_from_splits(splits).unwrap();
Expand Down Expand Up @@ -469,10 +477,13 @@ mod tests {
.times(6)
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(&query.index_uids[0].index_id, "test-index");
let splits = match query.split_states[0] {
SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged),
SplitState::Staged => {
assert_eq!(&query.index_uids.unwrap()[0].index_id, "test-index");
make_splits("test-index", &["a"], SplitState::Staged)
}
SplitState::MarkedForDeletion => {
assert!(&query.index_uids.is_none());
make_splits("test-index", &["a", "b"], SplitState::MarkedForDeletion)
}
_ => panic!("only Staged and MarkedForDeletion expected."),
Expand Down Expand Up @@ -633,11 +644,6 @@ mod tests {
.times(3)
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(query.index_uids.len(), 2);
assert!(["test-index-1", "test-index-2"]
.contains(&query.index_uids[0].index_id.as_ref()));
assert!(["test-index-1", "test-index-2"]
.contains(&query.index_uids[1].index_id.as_ref()));
let splits_ids_string: Vec<String> =
(0..8000).map(|seq| format!("split-{seq:04}")).collect();
let splits_ids: Vec<&str> = splits_ids_string
Expand All @@ -646,11 +652,18 @@ mod tests {
.collect();
let mut splits = match query.split_states[0] {
SplitState::Staged => {
let index_uids = query.index_uids.unwrap();
assert_eq!(index_uids.len(), 2);
assert!(["test-index-1", "test-index-2"]
.contains(&index_uids[0].index_id.as_ref()));
assert!(["test-index-1", "test-index-2"]
.contains(&index_uids[1].index_id.as_ref()));
let mut splits = make_splits("test-index-1", &["a"], SplitState::Staged);
splits.append(&mut make_splits("test-index-2", &["a"], SplitState::Staged));
splits
}
SplitState::MarkedForDeletion => {
assert!(query.index_uids.is_none());
assert_eq!(query.limit, Some(10_000));
let mut splits =
make_splits("test-index-1", &splits_ids, SplitState::MarkedForDeletion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ mod tests {
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(query.split_states, &[SplitState::Published]);
let splits = match query.index_uids[0].index_id.as_ref() {
let splits = match query.index_uids.unwrap()[0].index_id.as_ref() {
"index-1" => {
vec![
make_split("split-1", Some(1000..=5000)),
Expand Down
Loading
Loading