Skip to content

Commit 640f557

Browse files
committed
table lvt
1 parent 163fb3f commit 640f557

File tree

28 files changed

+645
-164
lines changed

28 files changed

+645
-164
lines changed

src/common/exception/src/exception_code.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ build_exceptions! {
511511
IllegalUser(2218),
512512
}
513513

514-
// Database and Catalog Management Errors [2301-2317, 2321-2323]
514+
// Database and Catalog Management Errors [2301-2317, 2321-2326]
515515
build_exceptions! {
516516
/// Database already exists
517517
DatabaseAlreadyExists(2301),

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ use databend_common_meta_app::schema::TableIdToName;
111111
use databend_common_meta_app::schema::TableIdent;
112112
use databend_common_meta_app::schema::TableIndexType;
113113
use databend_common_meta_app::schema::TableInfo;
114+
use databend_common_meta_app::schema::TableLvtCheck;
114115
use databend_common_meta_app::schema::TableMeta;
115116
use databend_common_meta_app::schema::TableNameIdent;
116117
use databend_common_meta_app::schema::TableStatistics;
@@ -1463,25 +1464,25 @@ impl SchemaApiTestSuite {
14631464

14641465
let lvt_name_ident = LeastVisibleTimeIdent::new(&tenant, table_id);
14651466

1466-
let res = mt.get_pb(&lvt_name_ident).await?;
1467+
let res = mt.get_table_lvt(&lvt_name_ident).await?;
14671468
assert!(res.is_none());
14681469

14691470
let res = mt.set_table_lvt(&lvt_name_ident, &lvt_big).await?;
14701471
assert_eq!(res.time, time_big);
1471-
let res = mt.get_pb(&lvt_name_ident).await?;
1472-
assert_eq!(res.unwrap().data.time, time_big);
1472+
let res = mt.get_table_lvt(&lvt_name_ident).await?;
1473+
assert_eq!(res.unwrap().time, time_big);
14731474

14741475
// test lvt never fall back
14751476

14761477
let res = mt.set_table_lvt(&lvt_name_ident, &lvt_small).await?;
14771478
assert_eq!(res.time, time_big);
1478-
let res = mt.get_pb(&lvt_name_ident).await?;
1479-
assert_eq!(res.unwrap().data.time, time_big);
1479+
let res = mt.get_table_lvt(&lvt_name_ident).await?;
1480+
assert_eq!(res.unwrap().time, time_big);
14801481

14811482
let res = mt.set_table_lvt(&lvt_name_ident, &lvt_bigger).await?;
14821483
assert_eq!(res.time, time_bigger);
1483-
let res = mt.get_pb(&lvt_name_ident).await?;
1484-
assert_eq!(res.unwrap().data.time, time_bigger);
1484+
let res = mt.get_table_lvt(&lvt_name_ident).await?;
1485+
assert_eq!(res.unwrap().time, time_bigger);
14851486
}
14861487

14871488
Ok(())
@@ -2642,6 +2643,7 @@ impl SchemaApiTestSuite {
26422643
mt: &MT,
26432644
) -> anyhow::Result<()> {
26442645
let tenant_name = "tenant1";
2646+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
26452647
let db_name = "db1";
26462648
let tbl_name = "tb2";
26472649

@@ -2717,6 +2719,7 @@ impl SchemaApiTestSuite {
27172719
seq: MatchSeq::Exact(table_version),
27182720
new_table_meta: new_table_meta.clone(),
27192721
base_snapshot_location: None,
2722+
lvt_check: None,
27202723
};
27212724

27222725
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2742,6 +2745,7 @@ impl SchemaApiTestSuite {
27422745
seq: MatchSeq::Exact(table_version + 1),
27432746
new_table_meta: new_table_meta.clone(),
27442747
base_snapshot_location: None,
2748+
lvt_check: None,
27452749
};
27462750
let res = mt
27472751
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2768,6 +2772,7 @@ impl SchemaApiTestSuite {
27682772
seq: MatchSeq::Exact(table_version),
27692773
new_table_meta: new_table_meta.clone(),
27702774
base_snapshot_location: None,
2775+
lvt_check: None,
27712776
};
27722777
let res = mt
27732778
.update_multi_table_meta_with_sender(
@@ -2849,6 +2854,7 @@ impl SchemaApiTestSuite {
28492854
seq: MatchSeq::Exact(table_version),
28502855
new_table_meta: new_table_meta.clone(),
28512856
base_snapshot_location: None,
2857+
lvt_check: None,
28522858
};
28532859
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
28542860
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2899,6 +2905,7 @@ impl SchemaApiTestSuite {
28992905
seq: MatchSeq::Exact(table_version),
29002906
new_table_meta: new_table_meta.clone(),
29012907
base_snapshot_location: None,
2908+
lvt_check: None,
29022909
};
29032910
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
29042911
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2949,6 +2956,7 @@ impl SchemaApiTestSuite {
29492956
seq: MatchSeq::Exact(table_version),
29502957
new_table_meta: new_table_meta.clone(),
29512958
base_snapshot_location: None,
2959+
lvt_check: None,
29522960
};
29532961
let result = mt
29542962
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2961,6 +2969,62 @@ impl SchemaApiTestSuite {
29612969
let err = ErrorCode::from(err);
29622970
assert_eq!(ErrorCode::DUPLICATED_UPSERT_FILES, err.code());
29632971
}
2972+
2973+
info!("--- update table meta, snapshot_ts must respect LVT");
2974+
{
2975+
let table = util.get_table().await.unwrap();
2976+
let table_id = table.ident.table_id;
2977+
let lvt_ident = LeastVisibleTimeIdent::new(&tenant, table_id);
2978+
let lvt_time = DateTime::<Utc>::from_timestamp(2_000, 0).unwrap();
2979+
mt.set_table_lvt(&lvt_ident, &LeastVisibleTime::new(lvt_time))
2980+
.await?;
2981+
2982+
// LVT was changed.
2983+
let mut new_table_meta = table.meta.clone();
2984+
new_table_meta.comment = "lvt guard should fail".to_string();
2985+
let small_ts = DateTime::<Utc>::from_timestamp(1_000, 0).unwrap();
2986+
let req = UpdateTableMetaReq {
2987+
table_id,
2988+
seq: MatchSeq::Exact(table.ident.seq),
2989+
new_table_meta: new_table_meta.clone(),
2990+
base_snapshot_location: None,
2991+
lvt_check: Some(TableLvtCheck {
2992+
tenant: tenant.clone(),
2993+
lvt: LeastVisibleTime::new(small_ts),
2994+
}),
2995+
};
2996+
let result = mt
2997+
.update_multi_table_meta(UpdateMultiTableMetaReq {
2998+
update_table_metas: vec![(req, table.as_ref().clone())],
2999+
..Default::default()
3000+
})
3001+
.await;
3002+
assert!(result.is_err());
3003+
3004+
// LVT unchanged.
3005+
let table = util.get_table().await.unwrap();
3006+
let mut ok_table_meta = table.meta.clone();
3007+
ok_table_meta.comment = "lvt guard success".to_string();
3008+
let req = UpdateTableMetaReq {
3009+
table_id,
3010+
seq: MatchSeq::Exact(table.ident.seq),
3011+
new_table_meta: ok_table_meta.clone(),
3012+
base_snapshot_location: None,
3013+
lvt_check: Some(TableLvtCheck {
3014+
tenant: tenant.clone(),
3015+
lvt: LeastVisibleTime::new(lvt_time),
3016+
}),
3017+
};
3018+
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
3019+
update_table_metas: vec![(req, table.as_ref().clone())],
3020+
..Default::default()
3021+
})
3022+
.await?
3023+
.unwrap();
3024+
3025+
let updated = util.get_table().await.unwrap();
3026+
assert_eq!(updated.meta.comment, "lvt guard success");
3027+
}
29643028
}
29653029
Ok(())
29663030
}
@@ -4302,6 +4366,7 @@ impl SchemaApiTestSuite {
43024366
seq: MatchSeq::Any,
43034367
new_table_meta: table_meta.clone(),
43044368
base_snapshot_location: None,
4369+
lvt_check: None,
43054370
};
43064371

43074372
let table = mt
@@ -4442,6 +4507,7 @@ impl SchemaApiTestSuite {
44424507
seq: MatchSeq::Any,
44434508
new_table_meta: create_table_meta.clone(),
44444509
base_snapshot_location: None,
4510+
lvt_check: None,
44454511
};
44464512

44474513
let table = mt
@@ -6239,6 +6305,7 @@ impl SchemaApiTestSuite {
62396305
seq: MatchSeq::Any,
62406306
new_table_meta: table_meta(created_on),
62416307
base_snapshot_location: None,
6308+
lvt_check: None,
62426309
};
62436310

62446311
let table = mt
@@ -6290,6 +6357,7 @@ impl SchemaApiTestSuite {
62906357
seq: MatchSeq::Any,
62916358
new_table_meta: table_meta(created_on),
62926359
base_snapshot_location: None,
6360+
lvt_check: None,
62936361
};
62946362

62956363
let table = mt
@@ -7803,6 +7871,7 @@ impl SchemaApiTestSuite {
78037871
seq: MatchSeq::Any,
78047872
new_table_meta: table_meta(created_on),
78057873
base_snapshot_location: None,
7874+
lvt_check: None,
78067875
};
78077876

78087877
let table = mt
@@ -7862,6 +7931,7 @@ impl SchemaApiTestSuite {
78627931
seq: MatchSeq::Any,
78637932
new_table_meta: table_meta(created_on),
78647933
base_snapshot_location: None,
7934+
lvt_check: None,
78657935
};
78667936

78677937
let table = mt
@@ -7918,6 +7988,7 @@ impl SchemaApiTestSuite {
79187988
seq: MatchSeq::Any,
79197989
new_table_meta: table_meta(created_on),
79207990
base_snapshot_location: None,
7991+
lvt_check: None,
79217992
};
79227993

79237994
let table = mt
@@ -8372,6 +8443,7 @@ where MT: SchemaApi + kvapi::KVApi<Error = MetaError>
83728443
seq: MatchSeq::Any,
83738444
new_table_meta: self.table_meta(),
83748445
base_snapshot_location: None,
8446+
lvt_check: None,
83758447
};
83768448

83778449
let req = UpdateMultiTableMetaReq {

src/meta/api/src/table_api.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ use databend_common_meta_types::MatchSeqExt;
9999
use databend_common_meta_types::MetaError;
100100
use databend_common_meta_types::MetaId;
101101
use databend_common_meta_types::SeqV;
102+
use databend_common_meta_types::TxnCondition;
102103
use databend_common_meta_types::TxnGetRequest;
103104
use databend_common_meta_types::TxnGetResponse;
104105
use databend_common_meta_types::TxnOp;
@@ -1201,19 +1202,19 @@ where
12011202
})
12021203
.collect::<Vec<_>>();
12031204
let mut tb_meta_vec: Vec<(u64, Option<TableMeta>)> = mget_pb_values(self, &tid_vec).await?;
1204-
for (req, (tb_meta_seq, table_meta)) in
1205+
for ((req, _), (tb_meta_seq, table_meta)) in
12051206
update_table_metas.iter().zip(tb_meta_vec.iter_mut())
12061207
{
1207-
let req_seq = req.0.seq;
1208+
let req_seq = req.seq;
12081209

12091210
if *tb_meta_seq == 0 || table_meta.is_none() {
12101211
return Err(KVAppError::AppError(AppError::UnknownTableId(
1211-
UnknownTableId::new(req.0.table_id, "update_multi_table_meta"),
1212+
UnknownTableId::new(req.table_id, "update_multi_table_meta"),
12121213
)));
12131214
}
12141215
if req_seq.match_seq(tb_meta_seq).is_err() {
12151216
mismatched_tbs.push((
1216-
req.0.table_id,
1217+
req.table_id,
12171218
*tb_meta_seq,
12181219
std::mem::take(table_meta).unwrap(),
12191220
));
@@ -1225,27 +1226,37 @@ where
12251226
}
12261227

12271228
let mut new_table_meta_map: BTreeMap<u64, TableMeta> = BTreeMap::new();
1228-
for (req, (tb_meta_seq, table_meta)) in
1229+
for ((req, _), (tb_meta_seq, table_meta)) in
12291230
update_table_metas.iter_mut().zip(tb_meta_vec.iter())
12301231
{
12311232
let tbid = TableId {
1232-
table_id: req.0.table_id,
1233+
table_id: req.table_id,
12331234
};
12341235
// `update_table_meta` MUST NOT modify `shared_by` field
12351236
let table_meta = table_meta.as_ref().unwrap();
12361237

1237-
let mut new_table_meta = req.0.new_table_meta.clone();
1238+
let mut new_table_meta = req.new_table_meta.clone();
12381239
new_table_meta.shared_by = table_meta.shared_by.clone();
12391240

1240-
tbl_seqs.insert(req.0.table_id, *tb_meta_seq);
1241+
tbl_seqs.insert(req.table_id, *tb_meta_seq);
12411242
txn.condition.push(txn_cond_seq(&tbid, Eq, *tb_meta_seq));
1243+
1244+
// Add LVT check if provided
1245+
if let Some(check) = req.lvt_check.as_ref() {
1246+
let lvt_ident = LeastVisibleTimeIdent::new(&check.tenant, req.table_id);
1247+
txn.condition.push(TxnCondition::eq_value(
1248+
lvt_ident.to_string_key(),
1249+
serialize_struct(&check.lvt)?,
1250+
));
1251+
}
1252+
12421253
txn.if_then
12431254
.push(txn_op_put(&tbid, serialize_struct(&new_table_meta)?));
12441255
txn.else_then.push(TxnOp {
12451256
request: Some(Request::Get(TxnGetRequest::new(tbid.to_string_key()))),
12461257
});
12471258

1248-
new_table_meta_map.insert(req.0.table_id, new_table_meta);
1259+
new_table_meta_map.insert(req.table_id, new_table_meta);
12491260
}
12501261

12511262
// `remove_table_copied_files` and `upsert_table_copied_file_info`
@@ -1898,6 +1909,17 @@ where
18981909

18991910
return Ok(transition.unwrap().result.into_value().unwrap_or_default());
19001911
}
1912+
1913+
#[logcall::logcall]
1914+
#[fastrace::trace]
1915+
async fn get_table_lvt(
1916+
&self,
1917+
name_ident: &LeastVisibleTimeIdent,
1918+
) -> Result<Option<LeastVisibleTime>, KVAppError> {
1919+
debug!(req :? =(&name_ident); "TableApi: {}", func_name!());
1920+
let res = self.get_pb(name_ident).await?;
1921+
Ok(res.map(|v| v.data))
1922+
}
19011923
}
19021924

19031925
#[async_trait::async_trait]

src/meta/app/src/schema/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ pub use table::TableIdent;
139139
pub use table::TableIndex;
140140
pub use table::TableIndexType;
141141
pub use table::TableInfo;
142+
pub use table::TableLvtCheck;
142143
pub use table::TableMeta;
143144
pub use table::TableNameIdent;
144145
pub use table::TablePartition;

src/meta/app/src/schema/table/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use super::CatalogInfo;
4141
use super::CreateOption;
4242
use super::DatabaseId;
4343
use super::MarkedDeletedIndexMeta;
44+
use crate::schema::LeastVisibleTime;
4445
use crate::schema::constraint::Constraint;
4546
use crate::schema::database_name_ident::DatabaseNameIdent;
4647
use crate::schema::table_niv::TableNIV;
@@ -815,6 +816,15 @@ pub struct UpdateTableMetaReq {
815816
pub seq: MatchSeq,
816817
pub new_table_meta: TableMeta,
817818
pub base_snapshot_location: Option<String>,
819+
/// Optional optimistic LVT check.
820+
/// When set, the table LVT must be equal to the provided value.
821+
pub lvt_check: Option<TableLvtCheck>,
822+
}
823+
824+
#[derive(Clone, Debug, PartialEq, Eq)]
825+
pub struct TableLvtCheck {
826+
pub tenant: Tenant,
827+
pub lvt: LeastVisibleTime,
818828
}
819829

820830
#[derive(Clone, Debug, PartialEq, Eq)]

src/query/ast/src/parser/statement.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4569,6 +4569,10 @@ pub fn alter_table_action(i: Input) -> IResult<AlterTableAction> {
45694569
},
45704570
);
45714571

4572+
// NOTE: `AT (BRANCH|TAG => ...)` travel-point syntax is only supported when
4573+
// creating a branch/tag via `ALTER TABLE ... CREATE`. It is intentionally not
4574+
// available for SELECT or other query statements, so keep the parsing rule scoped
4575+
// here to avoid implying broader support.
45724576
let create_snapshot_ref = map(
45734577
rule! {
45744578
CREATE ~ ( BRANCH | TAG ) ~ #ident ~ ( AT ~ ^(#travel_point | #at_table_ref) )? ~ (RETAIN ~ #literal_duration)?

src/query/catalog/src/catalog/interface.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,13 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
622622
unimplemented!()
623623
}
624624

625+
async fn get_table_lvt(
626+
&self,
627+
_name_ident: &LeastVisibleTimeIdent,
628+
) -> Result<Option<LeastVisibleTime>> {
629+
unimplemented!()
630+
}
631+
625632
async fn rename_dictionary(&self, req: RenameDictionaryReq) -> Result<()>;
626633

627634
fn transform_udtf_as_table_function(

0 commit comments

Comments
 (0)