Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: tweak logic test & code comment #11

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl SnapshotGenerator for AppendGenerator {
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot> {
let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?;
Expand All @@ -133,7 +133,7 @@ impl SnapshotGenerator for AppendGenerator {
let mut new_segments = snapshot_merged.merged_segments.clone();
let mut new_summary = snapshot_merged.merged_statistics.clone();

if let Some(snapshot) = &previous {
if let Some(snapshot) = previous {
prev_timestamp = snapshot.timestamp;
prev_snapshot_id = Some((snapshot.snapshot_id, snapshot.format_version));
table_statistics_location = snapshot.table_statistics_location.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,29 @@ impl SnapshotGenerator for MutationGenerator {
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot> {
let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0);

let previous = previous.unwrap_or_else(|| {
Arc::new(TableSnapshot::new_empty_snapshot(
schema.clone(),
prev_table_seq,
))
});
let empty_snapshot;
let previous = match previous {
Some(prev) => prev,
None => {
empty_snapshot = Arc::new(TableSnapshot::new_empty_snapshot(
schema.clone(),
prev_table_seq,
));
&empty_snapshot
}
};

match &self.conflict_resolve_ctx {
ConflictResolveContext::ModifiedSegmentExistsInLatest(ctx) => {
if let Some((removed, replaced)) =
ConflictResolveContext::is_modified_segments_exists_in_latest(
&self.base_snapshot,
&previous,
previous,
&ctx.replaced_segments,
&ctx.removed_segment_indexes,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,21 @@ pub trait SnapshotGenerator {
txn_mgr: TxnManagerRef,
table_id: u64,
) -> Result<TableSnapshot> {
let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id);
let mut snapshot =
self.do_generate_new_snapshot(schema, cluster_key_meta, previous, prev_table_seq)?;
let guard = txn_mgr.lock();
// If a table is updated multi times in a transaction, the previous snapshot is always the snapshot before the transaction.
if guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some() {
self.do_generate_new_snapshot(schema, cluster_key_meta, &previous, prev_table_seq)?;

let has_pending_transactional_mutations = {
let guard = txn_mgr.lock();
// NOTE:
// When generating a new snapshot for a mutation of table for the first time,
// there is no buffered table ID inside txn_mgr for this table.
guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some()
};

if has_pending_transactional_mutations {
// Adjust the `prev_snapshot_id` of the newly created snapshot to match the
// `prev_snapshot_id` of the table when it first appeared in the transaction.
let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id);
snapshot.prev_snapshot_id = previous_of_previous;
}
Ok(snapshot)
Expand All @@ -62,7 +71,7 @@ pub trait SnapshotGenerator {
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl SnapshotGenerator for TruncateGenerator {
&self,
schema: TableSchema,
cluster_key_meta: Option<ClusterKey>,
previous: Option<Arc<TableSnapshot>>,
previous: &Option<Arc<TableSnapshot>>,
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot> {
let (prev_timestamp, prev_snapshot_id) = if let Some(prev_snapshot) = previous {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
statement ok
create or replace database test_txn_snapshots;

statement ok
use test_txn_snapshots;

statement ok
create or replace table t(c int);


###################################
# no snapshots left if tx aborted #
###################################

statement ok
begin;

statement ok
insert into t values(1);

statement ok
insert into t values(1);

statement ok
rollback;

query I
select count() from fuse_snapshot('test_txn_snapshots', 't');
----
0


#####################################################
# one snapshot left if table mutated multiple times #
#####################################################



statement ok
begin;

statement ok
insert into t values(1);

statement ok
insert into t values(1);

statement ok
insert into t values(1);

statement ok
commit;

query I
select count() from fuse_snapshot('test_txn_snapshots', 't');
----
1


Loading