Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: continue vacuum drop table on per-table cleanup failures #16424

Merged
merged 15 commits into from
Sep 11, 2024
71 changes: 26 additions & 45 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2866,55 +2866,34 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

let table_infos = do_get_table_history(self, db_filter, left_num).await?;

// check if reach the limit
if let Some(left_num) = left_num {
let num = min(left_num, table_infos.len());
for table_info in table_infos.iter().take(num) {
let (table_info, db_id) = table_info;
let limit = left_num.unwrap_or(table_infos.len());
let this_db_drop_table_infos = &table_infos[..limit];

if limit >= this_db_drop_table_infos.len() && drop_db {
drop_ids.push(DroppedId::Db {
db_id: db_info.database_id.db_id,
db_name: db_info.name_ident.database_name().to_string(),
tables: this_db_drop_table_infos
.iter()
.map(|(table_info, _)| {
(table_info.ident.table_id, table_info.name.clone())
})
.collect(),
});
} else {
for (table_info, db_id) in this_db_drop_table_infos {
drop_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
));
drop_table_infos.push(table_info.clone());
}

// if limit is Some, append DroppedId::Db only when table_infos is empty
if drop_db && table_infos.is_empty() {
drop_ids.push(DroppedId::Db(
db_info.database_id.db_id,
db_info.name_ident.database_name().to_string(),
));
}
if num == left_num {
return Ok(ListDroppedTableResp {
drop_table_infos,
drop_ids,
});
}
} else {
table_infos.iter().for_each(|(table_info, db_id)| {
if !drop_db {
drop_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
))
}
});
drop_table_infos.extend(
table_infos
.into_iter()
.map(|(table_info, _)| table_info)
.collect::<Vec<_>>(),
);
if drop_db {
drop_ids.push(DroppedId::Db(
db_info.database_id.db_id,
db_info.name_ident.database_name().to_string(),
));
}
}
drop_table_infos.extend(
this_db_drop_table_infos
.iter()
.map(|(table_info, _)| table_info.clone()),
);
}

return Ok(ListDroppedTableResp {
Expand Down Expand Up @@ -2974,9 +2953,11 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<(), KVAppError> {
for drop_id in req.drop_ids {
match drop_id {
DroppedId::Db(db_id, db_name) => {
gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?
}
DroppedId::Db {
db_id,
db_name,
tables: _,
} => gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?,
DroppedId::Table(db_id, table_id, table_name) => {
gc_dropped_table_by_id(self, &req.tenant, db_id, table_id, table_name).await?
}
Expand Down
84 changes: 64 additions & 20 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4020,14 +4020,16 @@ impl SchemaApiTestSuite {
};

let res = mt.create_database(req).await?;
drop_ids_1.push(DroppedId::Db(
*res.db_id,
db_name.database_name().to_string(),
));
drop_ids_2.push(DroppedId::Db(
*res.db_id,
db_name.database_name().to_string(),
));
drop_ids_1.push(DroppedId::Db {
db_id: *res.db_id,
db_name: db_name.database_name().to_string(),
tables: vec![],
});
drop_ids_2.push(DroppedId::Db {
db_id: *res.db_id,
db_name: db_name.database_name().to_string(),
tables: vec![],
});

let req = CreateTableReq {
create_option: CreateOption::Create,
Expand Down Expand Up @@ -4063,7 +4065,11 @@ impl SchemaApiTestSuite {

let res = mt.create_database(create_db_req.clone()).await?;
let db_id = res.db_id;
drop_ids_2.push(DroppedId::Db(*db_id, "db2".to_string()));
drop_ids_2.push(DroppedId::Db {
db_id: *db_id,
db_name: "db2".to_string(),
tables: vec![],
});

info!("--- create and drop db2.tb1");
{
Expand Down Expand Up @@ -4262,15 +4268,47 @@ impl SchemaApiTestSuite {
left_table_id.cmp(right_table_id)
}
}
(DroppedId::Db(left_db_id, _), DroppedId::Db(right_db_id, _)) => {
left_db_id.cmp(right_db_id)
}
(DroppedId::Db(left_db_id, _), DroppedId::Table(right_db_id, _, _)) => {
left_db_id.cmp(right_db_id)
}
(DroppedId::Table(left_db_id, _, _), DroppedId::Db(right_db_id, _)) => {
left_db_id.cmp(right_db_id)
}
(
DroppedId::Db {
db_id: left_db_id, ..
},
DroppedId::Db {
db_id: right_db_id, ..
},
) => left_db_id.cmp(right_db_id),
(
DroppedId::Db {
db_id: left_db_id,
db_name: _,
tables: _,
},
DroppedId::Table(right_db_id, _, _),
) => left_db_id.cmp(right_db_id),
(
DroppedId::Table(left_db_id, _, _),
DroppedId::Db {
db_id: right_db_id,
db_name: _,
tables: _,
},
) => left_db_id.cmp(right_db_id),
}
}
fn is_dropped_id_eq(l: &DroppedId, r: &DroppedId) -> bool {
match (l, r) {
(
DroppedId::Db {
db_id: left_db_id,
db_name: left_db_name,
tables: _,
},
DroppedId::Db {
db_id: right_db_id,
db_name: right_db_name,
tables: _,
},
) => left_db_id == right_db_id && left_db_name == right_db_name,
_ => l == r,
}
}
// case 1: test AllDroppedTables with filter time
Expand All @@ -4285,7 +4323,10 @@ impl SchemaApiTestSuite {
// sort drop id by table id
let mut sort_drop_ids = resp.drop_ids;
sort_drop_ids.sort_by(cmp_dropped_id);
assert_eq!(sort_drop_ids, drop_ids_1);
assert_eq!(sort_drop_ids.len(), drop_ids_1.len());
for (id1, id2) in sort_drop_ids.iter().zip(drop_ids_1.iter()) {
assert!(is_dropped_id_eq(id1, id2));
}

let expected: BTreeSet<String> = [
"'db1'.'tb1'".to_string(),
Expand Down Expand Up @@ -4314,7 +4355,10 @@ impl SchemaApiTestSuite {
// sort drop id by table id
let mut sort_drop_ids = resp.drop_ids;
sort_drop_ids.sort_by(cmp_dropped_id);
assert_eq!(sort_drop_ids, drop_ids_2);
assert_eq!(sort_drop_ids.len(), drop_ids_2.len());
for (id1, id2) in sort_drop_ids.iter().zip(drop_ids_2.iter()) {
assert!(is_dropped_id_eq(id1, id2));
}

let expected: BTreeSet<String> = [
"'db1'.'tb1'".to_string(),
Expand Down
24 changes: 21 additions & 3 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::time::Duration;
use anyerror::func_name;
use chrono::DateTime;
use chrono::Utc;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::FieldIndex;
use databend_common_expression::TableField;
Expand Down Expand Up @@ -201,6 +202,20 @@ pub struct TableInfo {
pub db_type: DatabaseType,
}

impl TableInfo {
pub fn database_name(&self) -> Result<&str> {
if self.engine() != "FUSE" {
return Err(ErrorCode::Internal(format!(
"Invalid engine: {}",
self.engine()
)));
}
let database_name = self.desc.split('.').next().unwrap();
let database_name = &database_name[1..database_name.len() - 1];
Ok(database_name)
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)]
pub struct TableStatistics {
/// Number of rows
Expand Down Expand Up @@ -360,7 +375,7 @@ impl Default for TableMeta {
fn default() -> Self {
TableMeta {
schema: Arc::new(TableSchema::empty()),
engine: "".to_string(),
engine: "FUSE".to_string(),
engine_options: BTreeMap::new(),
storage_params: None,
part_prefix: "".to_string(),
Expand Down Expand Up @@ -907,8 +922,11 @@ pub struct ListDroppedTableReq {

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DroppedId {
// db id, db name
Db(u64, String),
Db {
db_id: u64,
db_name: String,
tables: Vec<(u64, String)>,
},
// db id, table id, table name
Table(u64, u64, String),
}
Expand Down
5 changes: 2 additions & 3 deletions src/query/ee/src/storages/fuse/operations/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_storages_fuse::FuseTable;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropFileInfo;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult;
use databend_enterprise_vacuum_handler::VacuumHandler;
use databend_enterprise_vacuum_handler::VacuumHandlerWrapper;

use crate::storages::fuse::do_vacuum;
use crate::storages::fuse::operations::vacuum_temporary_files::do_vacuum_temporary_files;
use crate::storages::fuse::vacuum_drop_tables;

pub struct RealVacuumHandler {}

#[async_trait::async_trait]
Expand All @@ -49,7 +48,7 @@ impl VacuumHandler for RealVacuumHandler {
threads_nums: usize,
tables: Vec<Arc<dyn Table>>,
dry_run_limit: Option<usize>,
) -> Result<Option<Vec<VacuumDropFileInfo>>> {
) -> VacuumDropTablesResult {
vacuum_drop_tables(threads_nums, tables, dry_run_limit).await
}

Expand Down
Loading
Loading