diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index bc9aad57b2909..60103da7ad933 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -117,7 +117,7 @@ impl SnapshotGenerator for AppendGenerator { &self, schema: TableSchema, cluster_key_meta: Option, - previous: Option>, + previous: &Option>, prev_table_seq: Option, ) -> Result { let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?; @@ -133,7 +133,7 @@ impl SnapshotGenerator for AppendGenerator { let mut new_segments = snapshot_merged.merged_segments.clone(); let mut new_summary = snapshot_merged.merged_statistics.clone(); - if let Some(snapshot) = &previous { + if let Some(snapshot) = previous { prev_timestamp = snapshot.timestamp; prev_snapshot_id = Some((snapshot.snapshot_id, snapshot.format_version)); table_statistics_location = snapshot.table_statistics_location.clone(); diff --git a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs index 63851d4b9dcbb..c2a06a72cf194 100644 --- a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs @@ -60,23 +60,29 @@ impl SnapshotGenerator for MutationGenerator { &self, schema: TableSchema, cluster_key_meta: Option, - previous: Option>, + previous: &Option>, prev_table_seq: Option, ) -> Result { let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0); - let previous = previous.unwrap_or_else(|| { - Arc::new(TableSnapshot::new_empty_snapshot( - schema.clone(), - prev_table_seq, - )) - }); + let empty_snapshot; + let previous = match previous { + Some(prev) => prev, + None => { + empty_snapshot = Arc::new(TableSnapshot::new_empty_snapshot( + schema.clone(), + prev_table_seq, + )); + &empty_snapshot + } + }; + match &self.conflict_resolve_ctx { ConflictResolveContext::ModifiedSegmentExistsInLatest(ctx) => { if let Some((removed, replaced)) = ConflictResolveContext::is_modified_segments_exists_in_latest( &self.base_snapshot, - &previous, + previous, &ctx.replaced_segments, &ctx.removed_segment_indexes, ) diff --git a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs index e92a6a172e1d0..61928e0a7db0c 100644 --- a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs @@ -47,12 +47,21 @@ pub trait SnapshotGenerator { txn_mgr: TxnManagerRef, table_id: u64, ) -> Result { - let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id); let mut snapshot = - self.do_generate_new_snapshot(schema, cluster_key_meta, previous, prev_table_seq)?; - let guard = txn_mgr.lock(); - // If a table is updated multi times in a transaction, the previous snapshot is always the snapshot before the transaction. - if guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some() { + self.do_generate_new_snapshot(schema, cluster_key_meta, &previous, prev_table_seq)?; + + let has_pending_transactional_mutations = { + let guard = txn_mgr.lock(); + // NOTE: + // When generating a new snapshot for a mutation of table for the first time, + // there is no buffered table ID inside txn_mgr for this table. + guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some() + }; + + if has_pending_transactional_mutations { + // Adjust the `prev_snapshot_id` of the newly created snapshot to match the + // `prev_snapshot_id` of the table when it first appeared in the transaction. + let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id); snapshot.prev_snapshot_id = previous_of_previous; } Ok(snapshot) @@ -62,7 +71,7 @@ pub trait SnapshotGenerator { &self, schema: TableSchema, cluster_key_meta: Option, - previous: Option>, + previous: &Option>, prev_table_seq: Option, ) -> Result; } diff --git a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs index 4479aea13ebf7..750043cebb0b0 100644 --- a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs @@ -58,7 +58,7 @@ impl SnapshotGenerator for TruncateGenerator { &self, schema: TableSchema, cluster_key_meta: Option, - previous: Option>, + previous: &Option>, prev_table_seq: Option, ) -> Result { let (prev_timestamp, prev_snapshot_id) = if let Some(prev_snapshot) = previous { diff --git a/tests/sqllogictests/suites/base/14_transaction/14_0005_snapshots.test b/tests/sqllogictests/suites/base/14_transaction/14_0005_snapshots.test new file mode 100644 index 0000000000000..9d4a243413cd5 --- /dev/null +++ b/tests/sqllogictests/suites/base/14_transaction/14_0005_snapshots.test @@ -0,0 +1,59 @@ +statement ok +create or replace database test_txn_snapshots; + +statement ok +use test_txn_snapshots; + +statement ok +create or replace table t(c int); + + +################################### +# no snapshots left if tx aborted # +################################### + +statement ok +begin; + +statement ok +insert into t values(1); + +statement ok +insert into t values(1); + +statement ok +rollback; + +query I +select count() from fuse_snapshot('test_txn_snapshots', 't'); +---- +0 + + +##################################################### +# one snapshot left if table mutated multiple times # +##################################################### + + + +statement ok +begin; + +statement ok +insert into t values(1); + +statement ok +insert into t values(1); + +statement ok +insert into t values(1); + +statement ok +commit; + +query I +select count() from fuse_snapshot('test_txn_snapshots', 't'); +---- +1 + +