Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: at most one snapshot is written for one table in multi statement transaction #16011

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use databend_common_storage::StorageMetrics;
use databend_storages_common_table_meta::meta::SnapshotId;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::table::ChangeType;
use databend_storages_common_txn::TxnManagerRef;

use crate::plan::DataSourceInfo;
use crate::plan::DataSourcePlan;
Expand Down Expand Up @@ -298,6 +299,7 @@ pub trait Table: Sync + Send {
&self,
navigation: &TimeNavigation,
abort_checker: AbortChecker,
_txn_mgr: TxnManagerRef,
) -> Result<Arc<dyn Table>> {
let _ = navigation;
let _ = abort_checker;
Expand Down
1 change: 1 addition & 0 deletions src/query/ee/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ databend-query = { workspace = true }
databend-storages-common-cache = { workspace = true }
databend-storages-common-pruner = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
databend-storages-common-txn = { workspace = true }
derive-visitor = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion src/query/ee/src/attach_table/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::Versioned;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use databend_storages_common_txn::TxnManager;

pub struct RealAttachTableHandler {}
#[async_trait::async_trait]
Expand All @@ -43,7 +44,7 @@ impl AttachTableHandler for RealAttachTableHandler {
let sp = plan.storage_params.as_ref().unwrap();
let operator = DataOperator::try_create(sp).await?;
let operator = operator.operator();
let reader = MetaReaders::table_snapshot_reader(operator.clone());
let reader = MetaReaders::table_snapshot_reader(operator.clone(), TxnManager::init());
let hint = format!("{}/{}", storage_prefix, FUSE_TBL_LAST_SNAPSHOT_HINT);
let snapshot_loc = operator.read(&hint).await?.to_vec();
let snapshot_loc = String::from_utf8(snapshot_loc)?;
Expand Down
9 changes: 6 additions & 3 deletions src/query/ee/src/fail_safe/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ use databend_enterprise_fail_safe::FailSafeHandlerWrapper;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_txn::TxnManager;
use log::info;
use log::warn;
use opendal::ErrorKind;
use opendal::Operator;

pub struct RealFailSafeHandler {}

impl RealFailSafeHandler {}
Expand Down Expand Up @@ -142,7 +142,10 @@ impl Amender {
}

async fn recover_snapshot(&self, table: Box<FuseTable>) -> Result<()> {
match table.read_table_snapshot_without_cache().await {
match table
.read_table_snapshot_without_cache(TxnManager::init())
.await
{
Ok(Some(snapshot)) => {
let schema = table.schema();
let operator = table.get_operator();
Expand All @@ -154,7 +157,7 @@ impl Amender {
if e.code() == ErrorCode::STORAGE_NOT_FOUND {
let snapshot_location = table.snapshot_loc().await?.unwrap();
self.recover_object(&snapshot_location).await?;
let snapshot = table.read_table_snapshot().await?;
let snapshot = table.read_table_snapshot(TxnManager::init()).await?;
let schema = table.schema();
let operator = table.get_operator();
if let Some(snapshot) = snapshot {
Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/src/storages/fuse/operations/vacuum_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub async fn get_snapshot_referenced_files(
}

let root_snapshot_location = root_snapshot_location_op.unwrap();
let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator());
let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator(), ctx.txn_mgr());
let ver = TableMetaLocationGenerator::snapshot_version(root_snapshot_location.as_str());
let params = LoadParams {
location: root_snapshot_location.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn do_refresh_virtual_column(
return Ok(());
}

let snapshot_opt = fuse_table.read_table_snapshot().await?;
let snapshot_opt = fuse_table.read_table_snapshot(ctx.txn_mgr()).await?;
let snapshot = if let Some(val) = snapshot_opt {
val
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ impl StreamHandler for RealStreamHandler {
}

let table = FuseTable::try_from_table(table.as_ref())?;
let abort_checker = ctx.get_abort_checker();
let abort_checker = ctx.clone().get_abort_checker();
let change_desc = table
.get_change_descriptor(
plan.append_only,
"".to_string(),
plan.navigation.as_ref(),
abort_checker,
ctx.txn_mgr(),
)
.await?;
table.check_changes_valid(&table.get_table_info().desc, change_desc.seq)?;
Expand Down
5 changes: 4 additions & 1 deletion src/query/ee/tests/it/inverted_index/index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use databend_query::interpreters::RefreshTableIndexInterpreter;
use databend_query::test_kits::append_string_sample_data;
use databend_query::test_kits::*;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_txn::TxnManager;
use tantivy::schema::Field;
use tantivy::tokenizer::LowerCaser;
use tantivy::tokenizer::SimpleTokenizer;
Expand Down Expand Up @@ -95,7 +96,9 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> {
let table_schema = new_fuse_table.schema();

// get index location from new table snapshot
let new_snapshot = new_fuse_table.read_table_snapshot().await?;
let new_snapshot = new_fuse_table
.read_table_snapshot(TxnManager::init())
.await?;
assert!(new_snapshot.is_some());
let new_snapshot = new_snapshot.unwrap();

Expand Down
3 changes: 2 additions & 1 deletion src/query/ee/tests/it/inverted_index/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use databend_storages_common_pruner::BlockMetaIndex;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_txn::TxnManager;
use opendal::Operator;

async fn apply_block_pruning(
Expand Down Expand Up @@ -531,7 +532,7 @@ async fn test_block_pruner() -> Result<()> {
let new_table = table.refresh(ctx.as_ref()).await?;
let fuse_table = FuseTable::do_create(new_table.get_table_info().clone())?;

let snapshot = fuse_table.read_table_snapshot().await?;
let snapshot = fuse_table.read_table_snapshot(TxnManager::init()).await?;
assert!(snapshot.is_some());
let snapshot = snapshot.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::FuseStorageFormat;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_enterprise_query::storages::fuse::operations::virtual_columns::do_refresh_virtual_column;
use databend_query::test_kits::*;
use databend_storages_common_cache::LoadParams;

#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
let fixture = TestFixture::setup().await?;
Expand All @@ -45,7 +45,7 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
let virtual_columns = vec!["v['a']".to_string(), "v[0]".to_string()];
let table_ctx = fixture.new_query_ctx().await?;

let snapshot_opt = fuse_table.read_table_snapshot().await?;
let snapshot_opt = fuse_table.read_table_snapshot(table_ctx.txn_mgr()).await?;
let snapshot = snapshot_opt.unwrap();

let projection = Projection::Columns(vec![]);
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl Interpreter for DeleteInterpreter {
let mut build_res = PipelineBuildResult::create();

// check if table is empty
let Some(snapshot) = fuse_table.read_table_snapshot().await? else {
let Some(snapshot) = fuse_table.read_table_snapshot(self.ctx.txn_mgr()).await? else {
// no snapshot, no deletion
return Ok(build_res);
};
Expand Down
15 changes: 9 additions & 6 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,15 @@ impl MergeIntoInterpreter {

// Prepare MergeIntoBuildInfo for PhysicalPlanBuilder to build MergeInto physical plan.
let table_info = fuse_table.get_table_info();
let table_snapshot = fuse_table.read_table_snapshot().await?.unwrap_or_else(|| {
Arc::new(TableSnapshot::new_empty_snapshot(
fuse_table.schema().as_ref().clone(),
Some(table_info.ident.seq),
))
});
let table_snapshot = fuse_table
.read_table_snapshot(self.ctx.txn_mgr())
.await?
.unwrap_or_else(|| {
Arc::new(TableSnapshot::new_empty_snapshot(
fuse_table.schema().as_ref().clone(),
Some(table_info.ident.seq),
))
});
let update_stream_meta =
dml_build_update_stream_req(self.ctx.clone(), &merge_into.meta_data).await?;
let merge_into_build_info = MergeIntoBuildInfo {
Expand Down
15 changes: 9 additions & 6 deletions src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,15 @@ impl ReplaceInterpreter {
})?;

let table_info = fuse_table.get_table_info();
let base_snapshot = fuse_table.read_table_snapshot().await?.unwrap_or_else(|| {
Arc::new(TableSnapshot::new_empty_snapshot(
schema.as_ref().clone(),
Some(table_info.ident.seq),
))
});
let base_snapshot = fuse_table
.read_table_snapshot(self.ctx.txn_mgr())
.await?
.unwrap_or_else(|| {
Arc::new(TableSnapshot::new_empty_snapshot(
schema.as_ref().clone(),
Some(table_info.ident.seq),
))
});

let is_multi_node = !self.ctx.get_cluster().is_empty();
let is_value_source = matches!(self.plan.source, InsertInputSource::Values(_));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl Interpreter for AddTableColumnInterpreter {
let table_id = table_info.ident.table_id;
let table_version = table_info.ident.seq;

generate_new_snapshot(table.as_ref(), &mut new_table_meta).await?;
generate_new_snapshot(table.as_ref(), &mut new_table_meta, self.ctx.as_ref()).await?;

let req = UpdateTableMetaReq {
table_id,
Expand All @@ -140,9 +140,10 @@ impl Interpreter for AddTableColumnInterpreter {
pub(crate) async fn generate_new_snapshot(
table: &dyn Table,
new_table_meta: &mut TableMeta,
ctx: &dyn TableContext,
) -> Result<()> {
if let Ok(fuse_table) = FuseTable::try_from_table(table) {
if let Some(snapshot) = fuse_table.read_table_snapshot().await? {
if let Some(snapshot) = fuse_table.read_table_snapshot(ctx.txn_mgr()).await? {
let mut new_snapshot = TableSnapshot::from_previous(
snapshot.as_ref(),
Some(fuse_table.get_table_info().ident.seq),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Interpreter for AnalyzeTableInterpreter {
Err(_) => return Ok(PipelineBuildResult::create()),
};

let r = table.read_table_snapshot().await;
let r = table.read_table_snapshot(self.ctx.txn_mgr()).await;
let snapshot_opt = match r {
Err(e) => return Err(e),
Ok(v) => v,
Expand All @@ -94,11 +94,12 @@ impl Interpreter for AnalyzeTableInterpreter {
table_statistics.snapshot_id.simple().to_string(),
),
self.ctx.clone().get_abort_checker(),
self.ctx.txn_mgr(),
)
.await
{
Ok(t) => !t
.read_table_snapshot()
.read_table_snapshot(self.ctx.txn_mgr())
.await
.is_ok_and(|s| s.is_some_and(|s| s.prev_table_seq.is_some())),
Err(_) => true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl CreateTableInterpreter {
// using application level data operator is a temp workaround
// please see discussions https://github.com/datafuselabs/databend/pull/10424
let operator = self.ctx.get_application_level_data_operator()?.operator();
let reader = MetaReaders::table_snapshot_reader(operator);
let reader = MetaReaders::table_snapshot_reader(operator, self.ctx.txn_mgr());

let params = LoadParams {
location: snapshot_loc.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Interpreter for DropTableColumnInterpreter {
let table_id = table_info.ident.table_id;
let table_version = table_info.ident.seq;

generate_new_snapshot(table.as_ref(), &mut new_table_meta).await?;
generate_new_snapshot(table.as_ref(), &mut new_table_meta, self.ctx.as_ref()).await?;

let req = UpdateTableMetaReq {
table_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl ModifyTableColumnInterpreter {

let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let prev_snapshot_id = fuse_table
.read_table_snapshot()
.read_table_snapshot(self.ctx.txn_mgr())
.await
.map_or(None, |v| v.map(|snapshot| snapshot.snapshot_id));

Expand Down
25 changes: 25 additions & 0 deletions src/query/service/src/interpreters/interpreter_txn_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_storages_common_txn::TxnManagerRef;
use log::error;
Expand Down Expand Up @@ -58,6 +59,30 @@ impl Interpreter for CommitInterpreter {
if is_active {
let catalog = self.ctx.get_default_catalog()?;

{
let mutated_tables = self.ctx.txn_mgr().lock().mutated_tables();
for table_info in mutated_tables.values() {
let table = catalog.get_table_by_info(table_info)?;
match table.engine().to_uppercase().as_str() {
"FUSE" => {
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let dal = fuse_table.get_operator();
if let Some(location) = fuse_table.snapshot_loc().await? {
let snapshot = self
.ctx
.txn_mgr()
.lock()
.get_table_snapshot_by_location(&location);
if let Some(snapshot) = snapshot {
dal.write(&location, snapshot.to_bytes()?).await?;
}
}
}
"STREAM" => {}
_ => unreachable!(),
}
}
}
let req = self.ctx.txn_mgr().lock().req();

let update_summary = {
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/test_kits/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_storages_fuse::FUSE_TBL_SEGMENT_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_SNAPSHOT_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_SNAPSHOT_STATISTICS_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
use databend_storages_common_txn::TxnManager;
use futures::TryStreamExt;
use walkdir::WalkDir;

Expand Down Expand Up @@ -174,7 +175,7 @@ pub async fn check_data_dir(
if check_table_statistic_file.is_some() {
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let snapshot_opt = fuse_table.read_table_snapshot().await?;
let snapshot_opt = fuse_table.read_table_snapshot(TxnManager::init()).await?;
assert!(snapshot_opt.is_some());
let snapshot = snapshot_opt.unwrap();
let ts_location_opt = snapshot.table_statistics_location.clone();
Expand Down
6 changes: 5 additions & 1 deletion src/query/service/src/test_kits/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::Versioned;
use databend_storages_common_txn::TxnManager;
use futures_util::TryStreamExt;
use opendal::Operator;
use serde::Serialize;
Expand All @@ -63,7 +64,10 @@ pub async fn generate_snapshot_with_segments(
segment_locations: Vec<Location>,
time_stamp: Option<DateTime<Utc>>,
) -> Result<String> {
let current_snapshot = fuse_table.read_table_snapshot().await?.unwrap();
let current_snapshot = fuse_table
.read_table_snapshot(TxnManager::init())
.await?
.unwrap();
let operator = fuse_table.get_operator();
let location_gen = fuse_table.meta_location_generator();
let mut new_snapshot = TableSnapshot::from_previous(
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl TableContext for CtxDelegation {
}

fn txn_mgr(&self) -> TxnManagerRef {
todo!()
self.ctx.txn_mgr()
}

fn incr_total_scan_value(&self, _value: ProgressValues) {
Expand Down
Loading
Loading