Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't enumerate index_uid when requesting splits to gc #5489

Merged
merged 5 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 43 additions & 36 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use quickwit_proto::types::{IndexUid, SplitId};
use quickwit_storage::{BulkDeleteError, Storage};
use thiserror::Error;
use time::OffsetDateTime;
use tracing::{error, instrument};
use tracing::{error, info, instrument};

/// The maximum number of splits that the GC should delete per attempt.
const DELETE_SPLITS_BATCH_SIZE: usize = 10_000;
Expand Down Expand Up @@ -122,8 +122,8 @@ pub async fn run_garbage_collect(

let index_uids: Vec<IndexUid> = indexes.keys().cloned().collect();

let Some(list_splits_query_for_index_uids) =
ListSplitsQuery::try_from_index_uids(index_uids.clone())
// TODO maybe we want to do a ListSplitsQuery::for_all_indexes and post-filter ourselves here
let Some(list_splits_query_for_index_uids) = ListSplitsQuery::try_from_index_uids(index_uids)
else {
return Ok(SplitRemovalInfo::default());
};
Expand Down Expand Up @@ -187,7 +187,6 @@ pub async fn run_garbage_collect(
OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64;

Ok(delete_splits_marked_for_deletion_several_indexes(
index_uids,
updated_before_timestamp,
metastore,
indexes,
Expand Down Expand Up @@ -221,20 +220,15 @@ async fn delete_splits(
)
.await
} else {
error!(
"we are trying to GC without knowing the storage, this shouldn't \
happen"
// in practice this can happen if the index was created between the start of
// the run and now, and one of its splits has already expired, which likely
// means a very long gc run, or if we run gc on a single index from the cli.
quickwit_common::rate_limited_warn!(
limit_per_min = 2,
index_uid=%index_uid,
"we are trying to GC without knowing the storage",
);
Err(DeleteSplitsError {
successes: Vec::new(),
storage_error: None,
storage_failures: splits_metadata_to_delete
.into_iter()
.map(|split| split.as_split_info())
.collect(),
metastore_error: None,
metastore_failures: Vec::new(),
})
Ok(Vec::new())
}
}
})
Expand Down Expand Up @@ -304,11 +298,12 @@ async fn list_splits_metadata(
/// Removes any splits marked for deletion which haven't been
/// updated after `updated_before_timestamp` in batches of 1000 splits.
///
/// Only splits from index_uids in the `storages` map will be deleted.
///
/// The aim of this is to spread the load out across a longer period
/// rather than short, heavy bursts on the metastore and storage system itself.
#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))]
#[instrument(skip(storages, metastore, progress_opt, metrics), fields(num_indexes=%storages.len()))]
async fn delete_splits_marked_for_deletion_several_indexes(
index_uids: Vec<IndexUid>,
updated_before_timestamp: i64,
metastore: MetastoreServiceClient,
storages: HashMap<IndexUid, Arc<dyn Storage>>,
Expand All @@ -317,18 +312,22 @@ async fn delete_splits_marked_for_deletion_several_indexes(
) -> SplitRemovalInfo {
let mut split_removal_info = SplitRemovalInfo::default();

let Some(list_splits_query) = ListSplitsQuery::try_from_index_uids(index_uids) else {
error!("failed to create list splits query. this should never happen");
return split_removal_info;
};
// we ask for all indexes because the query is more efficient and we almost always want all
// indexes anyway. The exception is when garbage collecting a single index from the commandline.
// In this case, we will log a bunch of warn. i (trinity) consider it worth the more generic
// code which needs fewer special case while testing, but we could check index_uids len if we
// think it's a better idea.
let list_splits_query = ListSplitsQuery::for_all_indexes();
fulmicoton marked this conversation as resolved.
Show resolved Hide resolved

let mut list_splits_query = list_splits_query
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE)
.sort_by_index_uid();

loop {
let mut splits_to_delete_possibly_remaining = true;

while splits_to_delete_possibly_remaining {
let splits_metadata_to_delete: Vec<SplitMetadata> = match protect_future(
progress_opt,
list_splits_metadata(&metastore, &list_splits_query),
Expand All @@ -342,19 +341,33 @@ async fn delete_splits_marked_for_deletion_several_indexes(
}
};

// We page through the list of splits to delete using a limit and a `search_after` trick.
// To detect if this is the last page, we check if the number of splits is less than the
// limit.
assert!(splits_metadata_to_delete.len() <= DELETE_SPLITS_BATCH_SIZE);
splits_to_delete_possibly_remaining =
splits_metadata_to_delete.len() == DELETE_SPLITS_BATCH_SIZE;

// set split after which to search for the next loop
let Some(last_split_metadata) = splits_metadata_to_delete.last() else {
break;
};
list_splits_query = list_splits_query.after_split(last_split_metadata);

let num_splits_to_delete = splits_metadata_to_delete.len();
let mut splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
HashMap::with_capacity(storages.len());

let splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
splits_metadata_to_delete
.into_iter()
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();
for meta in splits_metadata_to_delete {
if !storages.contains_key(&meta.index_uid) {
info!(index_uid=?meta.index_uid, "split not listed in storage map: skipping");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't log often, but when it does, it may log a lot. this should be rate-limited

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

continue;
}
if let Some(splits) = splits_metadata_to_delete_per_index.get_mut(&meta.index_uid) {
splits.push(meta);
} else {
splits_metadata_to_delete_per_index.insert(meta.index_uid.clone(), vec![meta]);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be expressed with the entry api, which imo is easier to read

Copy link
Contributor

@fulmicoton fulmicoton Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I was avoiding the needless 10k allocation, but yeah, maybe it does not matter. I will switch to the entry API.

}
}

// ignore return we continue either way
let _: Result<(), ()> = delete_splits(
Expand All @@ -366,12 +379,6 @@ async fn delete_splits_marked_for_deletion_several_indexes(
&mut split_removal_info,
)
.await;

if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE {
// stop the gc if this was the last batch
// we are guaranteed to make progress due to .after_split()
break;
}
}

split_removal_info
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1764,14 +1764,14 @@ mod tests {
.expect_list_splits()
.withf(|request| {
let list_splits_query = request.deserialize_list_splits_query().unwrap();
list_splits_query.index_uids == [("test-index-0", 0)]
list_splits_query.index_uids.unwrap() == [("test-index-0", 0)]
})
.return_once(|_request| Ok(ServiceStream::empty()));
mock_metastore
.expect_list_splits()
.withf(|request| {
let list_splits_query = request.deserialize_list_splits_query().unwrap();
list_splits_query.index_uids == [("test-index-1", 0), ("test-index-2", 0)]
list_splits_query.index_uids.unwrap() == [("test-index-1", 0), ("test-index-2", 0)]
})
.return_once(|_request| {
let splits = vec![Split {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ mod tests {
.times(1)
.withf(move |list_splits_request| {
let list_split_query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(list_split_query.index_uids, &[index_uid.clone()]);
assert_eq!(list_split_query.index_uids, Some(vec![index_uid.clone()]));
assert_eq!(
list_split_query.split_states,
vec![quickwit_metastore::SplitState::Published]
Expand Down
45 changes: 29 additions & 16 deletions quickwit/quickwit-janitor/src/actors/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,13 @@ mod tests {
.times(2)
.returning(move |list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(query.index_uids[0], index_uid_clone,);
let splits = match query.split_states[0] {
SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged),
SplitState::Staged => {
assert_eq!(query.index_uids.unwrap()[0], index_uid_clone);
make_splits("test-index", &["a"], SplitState::Staged)
}
SplitState::MarkedForDeletion => {
assert!(query.index_uids.is_none());
let expected_deletion_timestamp = OffsetDateTime::now_utc()
.unix_timestamp()
- split_deletion_grace_period().as_secs() as i64;
Expand Down Expand Up @@ -394,14 +397,19 @@ mod tests {
.times(2)
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(&query.index_uids[0].index_id, "test-index");
let splits = match query.split_states[0] {
SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged),
SplitState::MarkedForDeletion => make_splits(
"test-index",
&["a", "b", "c"],
SplitState::MarkedForDeletion,
),
SplitState::Staged => {
assert_eq!(&query.index_uids.unwrap()[0].index_id, "test-index");
make_splits("test-index", &["a"], SplitState::Staged)
}
SplitState::MarkedForDeletion => {
assert!(query.index_uids.is_none());
make_splits(
"test-index",
&["a", "b", "c"],
SplitState::MarkedForDeletion,
)
}
_ => panic!("only Staged and MarkedForDeletion expected."),
};
let splits = ListSplitsResponse::try_from_splits(splits).unwrap();
Expand Down Expand Up @@ -469,10 +477,13 @@ mod tests {
.times(6)
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(&query.index_uids[0].index_id, "test-index");
let splits = match query.split_states[0] {
SplitState::Staged => make_splits("test-index", &["a"], SplitState::Staged),
SplitState::Staged => {
assert_eq!(&query.index_uids.unwrap()[0].index_id, "test-index");
make_splits("test-index", &["a"], SplitState::Staged)
}
SplitState::MarkedForDeletion => {
assert!(&query.index_uids.is_none());
make_splits("test-index", &["a", "b"], SplitState::MarkedForDeletion)
}
_ => panic!("only Staged and MarkedForDeletion expected."),
Expand Down Expand Up @@ -633,11 +644,6 @@ mod tests {
.times(3)
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(query.index_uids.len(), 2);
assert!(["test-index-1", "test-index-2"]
.contains(&query.index_uids[0].index_id.as_ref()));
assert!(["test-index-1", "test-index-2"]
.contains(&query.index_uids[1].index_id.as_ref()));
let splits_ids_string: Vec<String> =
(0..8000).map(|seq| format!("split-{seq:04}")).collect();
let splits_ids: Vec<&str> = splits_ids_string
Expand All @@ -646,11 +652,18 @@ mod tests {
.collect();
let mut splits = match query.split_states[0] {
SplitState::Staged => {
let index_uids = query.index_uids.unwrap();
assert_eq!(index_uids.len(), 2);
assert!(["test-index-1", "test-index-2"]
.contains(&index_uids[0].index_id.as_ref()));
assert!(["test-index-1", "test-index-2"]
.contains(&index_uids[1].index_id.as_ref()));
let mut splits = make_splits("test-index-1", &["a"], SplitState::Staged);
splits.append(&mut make_splits("test-index-2", &["a"], SplitState::Staged));
splits
}
SplitState::MarkedForDeletion => {
assert!(query.index_uids.is_none());
assert_eq!(query.limit, Some(10_000));
let mut splits =
make_splits("test-index-1", &splits_ids, SplitState::MarkedForDeletion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ mod tests {
.returning(|list_splits_request| {
let query = list_splits_request.deserialize_list_splits_query().unwrap();
assert_eq!(query.split_states, &[SplitState::Published]);
let splits = match query.index_uids[0].index_id.as_ref() {
let splits = match query.index_uids.unwrap()[0].index_id.as_ref() {
"index-1" => {
vec![
make_split("split-1", Some(1000..=5000)),
Expand Down
Loading
Loading