Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cleanup logic to get tables to vacuum #16450

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 56 additions & 87 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::any::type_name;
use std::cmp::min;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashMap;
Expand Down Expand Up @@ -2812,7 +2811,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
) -> Result<ListDroppedTableResp, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

if let TableInfoFilter::AllDroppedTables(filter_drop_on) = &req.filter {
let the_limit = req.limit.unwrap_or(usize::MAX);

if let TableInfoFilter::DroppedTableOrDroppedDatabase(retention_boundary) = &req.filter {
let db_infos = self
.get_database_history(ListDatabaseReq {
tenant: req.inner.tenant().clone(),
Expand All @@ -2821,53 +2822,42 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
})
.await?;

let mut drop_table_infos = vec![];
let mut drop_ids = vec![];
let mut vacuum_table_infos = vec![];
let mut vacuum_ids = vec![];

for db_info in db_infos {
let mut drop_db = false;
let filter = match db_info.meta.drop_on {
Some(db_drop_on) => {
if let Some(filter_drop_on) = filter_drop_on {
if db_drop_on.timestamp() <= filter_drop_on.timestamp() {
// if db drop on before filter time, then get all the db tables.
drop_db = true;
TableInfoFilter::All
} else {
// else get all the db tables drop on before filter time.
TableInfoFilter::Dropped(Some(*filter_drop_on))
}
} else {
// while filter_drop_on is None, then get all the drop db tables
drop_db = true;
TableInfoFilter::All
}
}
None => {
// not drop db, only filter drop tables with filter drop on
TableInfoFilter::Dropped(*filter_drop_on)
}
};
if vacuum_table_infos.len() >= the_limit {
return Ok(ListDroppedTableResp {
drop_table_infos: vacuum_table_infos,
drop_ids: vacuum_ids,
});
}

let db_filter = (filter, db_info.clone());
// If boundary is None, it means choose all tables.
// Thus, we just choose a very large time.
let boundary = retention_boundary.unwrap_or(DateTime::<Utc>::MAX_UTC);

let left_num = if let Some(limit) = req.limit {
if drop_table_infos.len() >= limit {
return Ok(ListDroppedTableResp {
drop_table_infos,
drop_ids,
});
}
Some(limit - drop_table_infos.len())
let vacuum_db = {
let drop_on = db_info.meta.drop_on;
drop_on.is_some() && drop_on <= Some(boundary)
};

// If to vacuum a db, just vacuum all tables.
// Otherwise, choose only dropped tables(before retention time).
let filter = if vacuum_db {
TableInfoFilter::All
} else {
None
TableInfoFilter::DroppedTables(*retention_boundary)
};

let table_infos = do_get_table_history(self, db_filter, left_num).await?;
let take_num = left_num.unwrap_or(usize::MAX);
let db_filter = (filter, db_info.clone());

let capacity = the_limit - vacuum_table_infos.len();
let table_infos = do_get_table_history(self, db_filter, capacity).await?;

// A DB can be removed only when all its tables are removed.
if drop_db && take_num > table_infos.len() {
drop_ids.push(DroppedId::Db {
if vacuum_db && capacity >= table_infos.len() {
vacuum_ids.push(DroppedId::Db {
db_id: db_info.database_id.db_id,
db_name: db_info.name_ident.database_name().to_string(),
tables: table_infos
Expand All @@ -2878,25 +2868,26 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.collect(),
});
} else {
for (table_info, db_id) in table_infos.iter().take(take_num) {
drop_ids.push(DroppedId::Table(
for (table_info, db_id) in table_infos.iter().take(capacity) {
vacuum_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
));
}
}
drop_table_infos.extend(

vacuum_table_infos.extend(
table_infos
.iter()
.take(take_num)
.take(capacity)
.map(|(table_info, _)| table_info.clone()),
);
}

return Ok(ListDroppedTableResp {
drop_table_infos,
drop_ids,
drop_table_infos: vacuum_table_infos,
drop_ids: vacuum_ids,
});
}

Expand All @@ -2923,16 +2914,11 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
meta: db_meta,
});
let db_filter = (req.filter, db_info);
let table_infos = do_get_table_history(self, db_filter, req.limit).await?;
let table_infos = do_get_table_history(self, db_filter, the_limit).await?;
let mut drop_ids = vec![];
let mut drop_table_infos = vec![];
let num = if let Some(limit) = req.limit {
min(limit, table_infos.len())
} else {
table_infos.len()
};
for table_info in table_infos.iter().take(num) {
let (table_info, db_id) = table_info;

for (table_info, db_id) in table_infos.iter().take(the_limit) {
drop_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
Expand Down Expand Up @@ -3773,21 +3759,13 @@ async fn batch_filter_table_info(
continue;
};

#[allow(clippy::collapsible_else_if)]
if let TableInfoFilter::Dropped(drop_on) = filter {
if let Some(drop_on) = drop_on {
if let Some(meta_drop_on) = &seq_meta.drop_on {
if meta_drop_on.timestamp_millis() >= drop_on.timestamp_millis() {
continue;
}
} else {
continue;
}
} else {
//
if seq_meta.drop_on.is_none() {
continue;
}
if let TableInfoFilter::DroppedTables(retention_boundary) = filter {
let Some(meta_drop_on) = seq_meta.drop_on else {
continue;
};

if meta_drop_on > retention_boundary.unwrap_or(DateTime::<Utc>::MAX_UTC) {
continue;
}
}

Expand Down Expand Up @@ -3815,7 +3793,7 @@ type TableFilterInfoList<'a> = Vec<(&'a TableInfoFilter, &'a Arc<DatabaseInfo>,
#[fastrace::trace]
async fn get_gc_table_info(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
limit: Option<usize>,
limit: usize,
table_id_list: &TableFilterInfoList<'_>,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let mut filter_tb_infos = vec![];
Expand All @@ -3842,11 +3820,8 @@ async fn get_gc_table_info(

filter_db_info_with_table_name_list.clear();

// check if reach the limit
if let Some(limit) = limit {
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
}

Expand All @@ -3867,7 +3842,7 @@ async fn get_gc_table_info(
async fn do_get_table_history(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
db_filter: (TableInfoFilter, Arc<DatabaseInfo>),
limit: Option<usize>,
limit: usize,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let mut filter_tb_infos = vec![];

Expand Down Expand Up @@ -3934,11 +3909,8 @@ async fn do_get_table_history(
filter_tb_infos.extend(ret);
filter_db_info_with_table_id_list.clear();

// check if reach the limit
if let Some(limit) = limit {
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
}

Expand All @@ -3947,11 +3919,8 @@ async fn do_get_table_history(
filter_tb_infos.extend(ret);
filter_db_info_with_table_id_list.clear();

// check if reach the limit
if let Some(limit) = limit {
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3396,7 +3396,7 @@ impl SchemaApiTestSuite {
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -3606,7 +3606,7 @@ impl SchemaApiTestSuite {
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -3803,7 +3803,7 @@ impl SchemaApiTestSuite {
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -4316,7 +4316,7 @@ impl SchemaApiTestSuite {
let now = Utc::now();
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(Some(now)),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(Some(now)),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -4348,7 +4348,7 @@ impl SchemaApiTestSuite {
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -4557,7 +4557,7 @@ impl SchemaApiTestSuite {
for (limit, number, drop_ids) in limit_and_drop_ids {
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -5234,7 +5234,7 @@ impl SchemaApiTestSuite {
// vacuum drop table
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down
24 changes: 14 additions & 10 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,16 +897,20 @@ impl ListTableReq {

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TableInfoFilter {
// if datatime is some, filter only dropped tables which drop time before that,
// else filter all dropped tables
Dropped(Option<DateTime<Utc>>),
// filter all dropped tables, including all tables in dropped database and dropped tables in exist dbs,
// in this case, `ListTableReq`.db_name will be ignored
// return Tables in two cases:
// 1) if database drop before date time, then all table in this db will be return;
// 2) else, return all the tables drop before data time.
AllDroppedTables(Option<DateTime<Utc>>),
// return all tables, ignore drop on time.
/// Choose only dropped tables.
///
/// If the arg `retention_boundary` time is Some, choose only tables dropped before this boundary time.
DroppedTables(Option<DateTime<Utc>>),
/// Choose dropped table or all table in dropped databases.
///
/// In this case, `ListTableReq`.db_name will be ignored.
///
/// If the `retention_boundary` time is Some,
/// choose the table dropped before this time
/// or choose the database before this time.
DroppedTableOrDroppedDatabase(Option<DateTime<Utc>>),

/// return all tables, ignore drop on time.
All,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ impl Interpreter for VacuumDropTablesInterpreter {
);
// if database if empty, vacuum all tables
let filter = if self.plan.database.is_empty() {
TableInfoFilter::AllDroppedTables(Some(retention_time))
TableInfoFilter::DroppedTableOrDroppedDatabase(Some(retention_time))
} else {
TableInfoFilter::Dropped(Some(retention_time))
TableInfoFilter::DroppedTables(Some(retention_time))
};

let tenant = self.ctx.get_tenant();
Expand Down
Loading