Skip to content

Commit

Permalink
Merge branch 'main' into refactor-inverted-index-fst
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed Sep 4, 2024
2 parents 4774885 + bfab76d commit 6212bc0
Show file tree
Hide file tree
Showing 20 changed files with 390 additions and 70 deletions.
14 changes: 12 additions & 2 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ use databend_common_meta_app::schema::SetLVTReply;
use databend_common_meta_app::schema::SetLVTReq;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
use databend_common_meta_app::schema::TableId;
use databend_common_meta_app::schema::TableIdHistoryIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::schema::TruncateTableReply;
Expand Down Expand Up @@ -210,8 +212,16 @@ pub trait SchemaApi: Send + Sync {

async fn get_table(&self, req: GetTableReq) -> Result<Arc<TableInfo>, KVAppError>;

async fn get_table_history(&self, req: ListTableReq)
-> Result<Vec<Arc<TableInfo>>, KVAppError>;
async fn get_table_meta_history(
&self,
database_name: &str,
table_id_history: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError>;

async fn get_tables_history(
&self,
req: ListTableReq,
) -> Result<Vec<Arc<TableInfo>>, KVAppError>;

async fn list_tables(&self, req: ListTableReq) -> Result<Vec<Arc<TableInfo>>, KVAppError>;

Expand Down
138 changes: 86 additions & 52 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1987,7 +1987,35 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

#[logcall::logcall]
#[fastrace::trace]
async fn get_table_history(
async fn get_table_meta_history(
&self,
database_name: &str,
table_id_history: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError> {
let table_name = &table_id_history.table_name;
let (meta_seq, table_id_list) = get_pb_value(self, table_id_history).await?;
if meta_seq == 0 || table_id_list.is_none() {
return Err(KVAppError::AppError(AppError::from(UnknownTable::new(
table_name,
format!("get_table_history: {}.{}", database_name, table_name),
))));
}

let now = Utc::now();
let table_id_list = table_id_list.unwrap();

let metas = get_table_meta_history(self, &now, table_id_list).await?;

debug!(
name :% =(&table_id_history);
"get_table_meta_history"
);
return Ok(metas);
}

#[logcall::logcall]
#[fastrace::trace]
async fn get_tables_history(
&self,
req: ListTableReq,
) -> Result<Vec<Arc<TableInfo>>, KVAppError> {
Expand All @@ -1999,7 +2027,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let res = get_db_or_err(
self,
tenant_dbname,
format!("get_table_history: {}", tenant_dbname.display()),
format!("get_tables_history: {}", tenant_dbname.display()),
)
.await;

Expand All @@ -2020,7 +2048,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

let table_id_list_keys = list_keys(self, &dir_name).await?;

let mut tb_info_list = vec![];
let mut tbs_info_list = vec![];
let now = Utc::now();
let keys: Vec<String> = table_id_list_keys
.iter()
Expand Down Expand Up @@ -2052,68 +2080,36 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

debug!(
name :% =(&table_id_list_key);
"get_table_history"
"get_tables_history"
);

let inner_keys: Vec<String> = tb_id_list
.id_list
.iter()
.map(|table_id| {
TableId {
table_id: *table_id,
}
.to_string_key()
})
.collect();
let mut table_id_iter = tb_id_list.id_list.into_iter();
for c in inner_keys.chunks(DEFAULT_MGET_SIZE) {
let tb_meta_vec: Vec<(u64, Option<TableMeta>)> =
mget_pb_values(self, c).await?;
for (tb_meta_seq, tb_meta) in tb_meta_vec {
let table_id = table_id_iter.next().unwrap();
if tb_meta_seq == 0 || tb_meta.is_none() {
error!("get_table_history cannot find {:?} table_meta", table_id);
continue;
}

// Safe unwrap() because: tb_meta_seq > 0
let tb_meta = tb_meta.unwrap();
if is_drop_time_out_of_retention_time(&tb_meta.drop_on, &now) {
continue;
}

let tenant_dbname_tbname: TableNameIdent = TableNameIdent {
tenant: tenant_dbname.tenant().clone(),
db_name: tenant_dbname.database_name().to_string(),
table_name: table_id_list_key.table_name.clone(),
};

let db_type = DatabaseType::NormalDB;

let tb_info = TableInfo {
let metas = get_table_meta_history(self, &now, tb_id_list).await?;
let tb_info_list: Vec<Arc<TableInfo>> = metas
.into_iter()
.map(|(table_id, seqv)| {
Arc::new(TableInfo {
ident: TableIdent {
table_id,
seq: tb_meta_seq,
table_id: table_id.table_id,
seq: seqv.seq(),
},
desc: format!(
"'{}'.'{}'",
tenant_dbname.database_name(),
tenant_dbname_tbname.table_name
table_id_list_key.table_name,
),
name: table_id_list_key.table_name.clone(),
meta: tb_meta,
name: table_id_list_key.table_name.to_string(),
meta: seqv.data,
tenant: tenant_dbname.tenant_name().to_string(),
db_type,
db_type: DatabaseType::NormalDB,
catalog_info: Default::default(),
};

tb_info_list.push(Arc::new(tb_info));
}
}
})
})
.collect();
tbs_info_list.extend(tb_info_list);
}
}

return Ok(tb_info_list);
return Ok(tbs_info_list);
}

#[logcall::logcall]
Expand Down Expand Up @@ -3959,6 +3955,44 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

async fn get_table_meta_history(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
now: &DateTime<Utc>,
tb_id_list: TableIdList,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError> {
let mut tb_metas = vec![];
let inner_keys: Vec<String> = tb_id_list
.id_list
.iter()
.map(|table_id| {
TableId {
table_id: *table_id,
}
.to_string_key()
})
.collect();
let mut table_id_iter = tb_id_list.id_list.into_iter();
for c in inner_keys.chunks(DEFAULT_MGET_SIZE) {
let tb_meta_vec: Vec<(u64, Option<TableMeta>)> = mget_pb_values(kv_api, c).await?;
for (tb_meta_seq, tb_meta) in tb_meta_vec {
let table_id = table_id_iter.next().unwrap();
if tb_meta_seq == 0 || tb_meta.is_none() {
error!("get_table_history cannot find {:?} table_meta", table_id);
continue;
}

// Safe unwrap() because: tb_meta_seq > 0
let tb_meta = tb_meta.unwrap();
if is_drop_time_out_of_retention_time(&tb_meta.drop_on, now) {
continue;
}

tb_metas.push((TableId { table_id }, SeqV::new(tb_meta_seq, tb_meta)));
}
}
Ok(tb_metas)
}

async fn construct_drop_virtual_column_txn_operations(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_ident: &VirtualColumnIdent,
Expand Down
48 changes: 36 additions & 12 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1860,6 +1860,30 @@ impl SchemaApiTestSuite {
tbl_name
);
}

info!("--- get table history after drop");
{
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;

let got = mt
.get_table_meta_history(db_name, &TableIdHistoryIdent {
database_id: cur_db.database_id.db_id,
table_name: tbl_name.to_string(),
})
.await
.unwrap()[0]
.clone();
let want = TableInfo {
ident: tb_ident_2,
desc: format!("'{}'.'{}'", db_name, tbl_name),
name: tbl_name.into(),
meta: table_meta(created_on),
tenant: tenant_name.to_string(),
..Default::default()
};
assert_eq!(got.1.data.created_on, want.meta.created_on);
assert!(got.1.data.drop_on.is_some());
}
}

info!("--- drop table with if_exists = false again, error");
Expand Down Expand Up @@ -4018,7 +4042,7 @@ impl SchemaApiTestSuite {
assert!(table_id >= 1, "table id >= 1");

let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;

assert_eq!(res.len(), 1);
Expand All @@ -4036,7 +4060,7 @@ impl SchemaApiTestSuite {
upsert_test_data(mt.as_kv_api(), &tbid, data).await?;
// assert not return out of retention time data
let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;

assert_eq!(res.len(), 0);
Expand Down Expand Up @@ -4678,7 +4702,7 @@ impl SchemaApiTestSuite {
assert!(res.table_id >= 1, "table id >= 1");

let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
Expand Down Expand Up @@ -4714,7 +4738,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: tbl_name.to_string(),
Expand All @@ -4736,7 +4760,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: tbl_name.to_string(),
Expand Down Expand Up @@ -4767,7 +4791,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: tbl_name.to_string(),
Expand All @@ -4794,7 +4818,7 @@ impl SchemaApiTestSuite {
assert!(res.table_id >= 1, "table id >= 1");

let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
Expand Down Expand Up @@ -4826,7 +4850,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: tbl_name.to_string(),
Expand All @@ -4847,7 +4871,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
Expand Down Expand Up @@ -4884,7 +4908,7 @@ impl SchemaApiTestSuite {
let old_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
let _res = mt.create_table(req.clone()).await?;
let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
assert!(old_db.meta.seq < cur_db.meta.seq);
Expand Down Expand Up @@ -4931,7 +4955,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;
calc_and_compare_drop_on_table_result(res, vec![
DroponInfo {
Expand Down Expand Up @@ -4968,7 +4992,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
.get_tables_history(ListTableReq::new(&tenant, db_name))
.await?;
calc_and_compare_drop_on_table_result(res, vec![
DroponInfo {
Expand Down
8 changes: 8 additions & 0 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
table_name: &str,
) -> Result<Arc<dyn Table>>;

// Get one table identified as dropped by db and table name.
async fn get_table_history(
&self,
tenant: &Tenant,
db_name: &str,
table_name: &str,
) -> Result<Vec<Arc<dyn Table>>>;

/// List all tables in a database.This will not list temporary tables.
async fn list_tables(&self, tenant: &Tenant, db_name: &str) -> Result<Vec<Arc<dyn Table>>>;

Expand Down
9 changes: 9 additions & 0 deletions src/query/catalog/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ pub trait Database: DynClone + Sync + Send {
)))
}

// Get one table history by db and table name.
#[async_backtrace::framed]
async fn get_table_history(&self, _table_name: &str) -> Result<Vec<Arc<dyn Table>>> {
Err(ErrorCode::Unimplemented(format!(
"UnImplement get_table in {} Database",
self.name()
)))
}

#[async_backtrace::framed]
async fn list_tables(&self) -> Result<Vec<Arc<dyn Table>>> {
Err(ErrorCode::Unimplemented(format!(
Expand Down
Loading

0 comments on commit 6212bc0

Please sign in to comment.