Skip to content

Commit

Permalink
feat: continue vacuum drop table on per-table cleanup failures (#16424)
Browse files Browse the repository at this point in the history
* feat: continue vacuum drop table on per-table cleanup failures

* modify ut

* fix ut

* fix ut

* fix db name, gc_drop_tables()

* fix ut

* add test

* fix ut

* fix test name

* fix ut

* refactor DroppedId

* unify code

* make lint

* fix

* add comment
  • Loading branch information
SkyFan2002 authored Sep 11, 2024
1 parent 16c7d99 commit dcf1eab
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 166 deletions.
71 changes: 26 additions & 45 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2865,56 +2865,35 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
};

let table_infos = do_get_table_history(self, db_filter, left_num).await?;
let take_num = left_num.unwrap_or(usize::MAX);

// check if reach the limit
if let Some(left_num) = left_num {
let num = min(left_num, table_infos.len());
for table_info in table_infos.iter().take(num) {
let (table_info, db_id) = table_info;
// A DB can be removed only when all its tables are removed.
if drop_db && take_num > table_infos.len() {
drop_ids.push(DroppedId::Db {
db_id: db_info.database_id.db_id,
db_name: db_info.name_ident.database_name().to_string(),
tables: table_infos
.iter()
.map(|(table_info, _)| {
(table_info.ident.table_id, table_info.name.clone())
})
.collect(),
});
} else {
for (table_info, db_id) in table_infos.iter().take(take_num) {
drop_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
));
drop_table_infos.push(table_info.clone());
}

// if limit is Some, append DroppedId::Db only when table_infos is empty
if drop_db && table_infos.is_empty() {
drop_ids.push(DroppedId::Db(
db_info.database_id.db_id,
db_info.name_ident.database_name().to_string(),
));
}
if num == left_num {
return Ok(ListDroppedTableResp {
drop_table_infos,
drop_ids,
});
}
} else {
table_infos.iter().for_each(|(table_info, db_id)| {
if !drop_db {
drop_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
))
}
});
drop_table_infos.extend(
table_infos
.into_iter()
.map(|(table_info, _)| table_info)
.collect::<Vec<_>>(),
);
if drop_db {
drop_ids.push(DroppedId::Db(
db_info.database_id.db_id,
db_info.name_ident.database_name().to_string(),
));
}
}
drop_table_infos.extend(
table_infos
.iter()
.take(take_num)
.map(|(table_info, _)| table_info.clone()),
);
}

return Ok(ListDroppedTableResp {
Expand Down Expand Up @@ -2974,9 +2953,11 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<(), KVAppError> {
for drop_id in req.drop_ids {
match drop_id {
DroppedId::Db(db_id, db_name) => {
gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?
}
DroppedId::Db {
db_id,
db_name,
tables: _,
} => gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?,
DroppedId::Table(db_id, table_id, table_name) => {
gc_dropped_table_by_id(self, &req.tenant, db_id, table_id, table_name).await?
}
Expand Down
84 changes: 64 additions & 20 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4020,14 +4020,16 @@ impl SchemaApiTestSuite {
};

let res = mt.create_database(req).await?;
drop_ids_1.push(DroppedId::Db(
*res.db_id,
db_name.database_name().to_string(),
));
drop_ids_2.push(DroppedId::Db(
*res.db_id,
db_name.database_name().to_string(),
));
drop_ids_1.push(DroppedId::Db {
db_id: *res.db_id,
db_name: db_name.database_name().to_string(),
tables: vec![],
});
drop_ids_2.push(DroppedId::Db {
db_id: *res.db_id,
db_name: db_name.database_name().to_string(),
tables: vec![],
});

let req = CreateTableReq {
create_option: CreateOption::Create,
Expand Down Expand Up @@ -4063,7 +4065,11 @@ impl SchemaApiTestSuite {

let res = mt.create_database(create_db_req.clone()).await?;
let db_id = res.db_id;
drop_ids_2.push(DroppedId::Db(*db_id, "db2".to_string()));
drop_ids_2.push(DroppedId::Db {
db_id: *db_id,
db_name: "db2".to_string(),
tables: vec![],
});

info!("--- create and drop db2.tb1");
{
Expand Down Expand Up @@ -4262,15 +4268,47 @@ impl SchemaApiTestSuite {
left_table_id.cmp(right_table_id)
}
}
(DroppedId::Db(left_db_id, _), DroppedId::Db(right_db_id, _)) => {
left_db_id.cmp(right_db_id)
}
(DroppedId::Db(left_db_id, _), DroppedId::Table(right_db_id, _, _)) => {
left_db_id.cmp(right_db_id)
}
(DroppedId::Table(left_db_id, _, _), DroppedId::Db(right_db_id, _)) => {
left_db_id.cmp(right_db_id)
}
(
DroppedId::Db {
db_id: left_db_id, ..
},
DroppedId::Db {
db_id: right_db_id, ..
},
) => left_db_id.cmp(right_db_id),
(
DroppedId::Db {
db_id: left_db_id,
db_name: _,
tables: _,
},
DroppedId::Table(right_db_id, _, _),
) => left_db_id.cmp(right_db_id),
(
DroppedId::Table(left_db_id, _, _),
DroppedId::Db {
db_id: right_db_id,
db_name: _,
tables: _,
},
) => left_db_id.cmp(right_db_id),
}
}
fn is_dropped_id_eq(l: &DroppedId, r: &DroppedId) -> bool {
match (l, r) {
(
DroppedId::Db {
db_id: left_db_id,
db_name: left_db_name,
tables: _,
},
DroppedId::Db {
db_id: right_db_id,
db_name: right_db_name,
tables: _,
},
) => left_db_id == right_db_id && left_db_name == right_db_name,
_ => l == r,
}
}
// case 1: test AllDroppedTables with filter time
Expand All @@ -4285,7 +4323,10 @@ impl SchemaApiTestSuite {
// sort drop id by table id
let mut sort_drop_ids = resp.drop_ids;
sort_drop_ids.sort_by(cmp_dropped_id);
assert_eq!(sort_drop_ids, drop_ids_1);
assert_eq!(sort_drop_ids.len(), drop_ids_1.len());
for (id1, id2) in sort_drop_ids.iter().zip(drop_ids_1.iter()) {
assert!(is_dropped_id_eq(id1, id2));
}

let expected: BTreeSet<String> = [
"'db1'.'tb1'".to_string(),
Expand Down Expand Up @@ -4314,7 +4355,10 @@ impl SchemaApiTestSuite {
// sort drop id by table id
let mut sort_drop_ids = resp.drop_ids;
sort_drop_ids.sort_by(cmp_dropped_id);
assert_eq!(sort_drop_ids, drop_ids_2);
assert_eq!(sort_drop_ids.len(), drop_ids_2.len());
for (id1, id2) in sort_drop_ids.iter().zip(drop_ids_2.iter()) {
assert!(is_dropped_id_eq(id1, id2));
}

let expected: BTreeSet<String> = [
"'db1'.'tb1'".to_string(),
Expand Down
24 changes: 21 additions & 3 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::time::Duration;
use anyerror::func_name;
use chrono::DateTime;
use chrono::Utc;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::FieldIndex;
use databend_common_expression::TableField;
Expand Down Expand Up @@ -201,6 +202,20 @@ pub struct TableInfo {
pub db_type: DatabaseType,
}

impl TableInfo {
pub fn database_name(&self) -> Result<&str> {
if self.engine() != "FUSE" {
return Err(ErrorCode::Internal(format!(
"Invalid engine: {}",
self.engine()
)));
}
let database_name = self.desc.split('.').next().unwrap();
let database_name = &database_name[1..database_name.len() - 1];
Ok(database_name)
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)]
pub struct TableStatistics {
/// Number of rows
Expand Down Expand Up @@ -360,7 +375,7 @@ impl Default for TableMeta {
fn default() -> Self {
TableMeta {
schema: Arc::new(TableSchema::empty()),
engine: "".to_string(),
engine: "FUSE".to_string(),
engine_options: BTreeMap::new(),
storage_params: None,
part_prefix: "".to_string(),
Expand Down Expand Up @@ -907,8 +922,11 @@ pub struct ListDroppedTableReq {

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DroppedId {
// db id, db name
Db(u64, String),
Db {
db_id: u64,
db_name: String,
tables: Vec<(u64, String)>,
},
// db id, table id, table name
Table(u64, u64, String),
}
Expand Down
5 changes: 2 additions & 3 deletions src/query/ee/src/storages/fuse/operations/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_storages_fuse::FuseTable;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropFileInfo;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult;
use databend_enterprise_vacuum_handler::VacuumHandler;
use databend_enterprise_vacuum_handler::VacuumHandlerWrapper;

use crate::storages::fuse::do_vacuum;
use crate::storages::fuse::operations::vacuum_temporary_files::do_vacuum_temporary_files;
use crate::storages::fuse::vacuum_drop_tables;

pub struct RealVacuumHandler {}

#[async_trait::async_trait]
Expand All @@ -49,7 +48,7 @@ impl VacuumHandler for RealVacuumHandler {
threads_nums: usize,
tables: Vec<Arc<dyn Table>>,
dry_run_limit: Option<usize>,
) -> Result<Option<Vec<VacuumDropFileInfo>>> {
) -> VacuumDropTablesResult {
vacuum_drop_tables(threads_nums, tables, dry_run_limit).await
}

Expand Down
Loading

0 comments on commit dcf1eab

Please sign in to comment.