Skip to content

Commit

Permalink
Merge pull request #11 from dantengsky/sky_fan_snapshot2
Browse files Browse the repository at this point in the history
chore: tweak logic test & code comment
  • Loading branch information
SkyFan2002 authored Jul 15, 2024
2 parents a8d0cba + c143bf3 commit 7ed1b33
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl SnapshotGenerator for AppendGenerator {
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot> {
let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?;
Expand All @@ -133,7 +133,7 @@ impl SnapshotGenerator for AppendGenerator {
let mut new_segments = snapshot_merged.merged_segments.clone();
let mut new_summary = snapshot_merged.merged_statistics.clone();

if let Some(snapshot) = &previous {
if let Some(snapshot) = previous {
prev_timestamp = snapshot.timestamp;
prev_snapshot_id = Some((snapshot.snapshot_id, snapshot.format_version));
table_statistics_location = snapshot.table_statistics_location.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,29 @@ impl SnapshotGenerator for MutationGenerator {
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot> {
let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0);

let previous = previous.unwrap_or_else(|| {
Arc::new(TableSnapshot::new_empty_snapshot(
schema.clone(),
prev_table_seq,
))
});
let empty_snapshot;
let previous = match previous {
Some(prev) => prev,
None => {
empty_snapshot = Arc::new(TableSnapshot::new_empty_snapshot(
schema.clone(),
prev_table_seq,
));
&empty_snapshot
}
};

match &self.conflict_resolve_ctx {
ConflictResolveContext::ModifiedSegmentExistsInLatest(ctx) => {
if let Some((removed, replaced)) =
ConflictResolveContext::is_modified_segments_exists_in_latest(
&self.base_snapshot,
&previous,
previous,
&ctx.replaced_segments,
&ctx.removed_segment_indexes,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,21 @@ pub trait SnapshotGenerator {
txn_mgr: TxnManagerRef,
table_id: u64,
) -> Result<TableSnapshot> {
let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id);
let mut snapshot =
self.do_generate_new_snapshot(schema, cluster_key_meta, previous, prev_table_seq)?;
let guard = txn_mgr.lock();
// If a table is updated multi times in a transaction, the previous snapshot is always the snapshot before the transaction.
if guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some() {
self.do_generate_new_snapshot(schema, cluster_key_meta, &previous, prev_table_seq)?;

let has_pending_transactional_mutations = {
let guard = txn_mgr.lock();
// NOTE:
// When generating a new snapshot for a mutation of table for the first time,
// there is no buffered table ID inside txn_mgr for this table.
guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some()
};

if has_pending_transactional_mutations {
// Adjust the `prev_snapshot_id` of the newly created snapshot to match the
// `prev_snapshot_id` of the table when it first appeared in the transaction.
let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id);
snapshot.prev_snapshot_id = previous_of_previous;
}
Ok(snapshot)
Expand All @@ -62,7 +71,7 @@ pub trait SnapshotGenerator {
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl SnapshotGenerator for TruncateGenerator {
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot> {
let (prev_timestamp, prev_snapshot_id) = if let Some(prev_snapshot) = previous {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
statement ok
create or replace database test_txn_snapshots;

statement ok
use test_txn_snapshots;

statement ok
create or replace table t(c int);


###################################
# no snapshots left if tx aborted #
###################################

statement ok
begin;

statement ok
insert into t values(1);

statement ok
insert into t values(1);

statement ok
rollback;

query I
select count() from fuse_snapshot('test_txn_snapshots', 't');
----
0


#####################################################
# one snapshot left if table mutated multiple times #
#####################################################



statement ok
begin;

statement ok
insert into t values(1);

statement ok
insert into t values(1);

statement ok
insert into t values(1);

statement ok
commit;

query I
select count() from fuse_snapshot('test_txn_snapshots', 't');
----
1


0 comments on commit 7ed1b33

Please sign in to comment.