Skip to content

Commit cc5aa8e

Browse files
feat: snapshots generated in multi txn has same prev_snapshot_id (#16044)
* feat: snapshots generated in multi txn has same prev_snapshot_id * chore: logic test of snapshot generation inside explicit txn * chore: tweak code comment --------- Co-authored-by: dantengsky <dantengsky@gmail.com>
1 parent 8f1f08f commit cc5aa8e

File tree

13 files changed

+248
-17
lines changed

13 files changed

+248
-17
lines changed

โ€ŽCargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€Žsrc/query/service/tests/it/storages/fuse/conflict.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use databend_common_storages_fuse::operations::SnapshotChanges;
2424
use databend_common_storages_fuse::operations::SnapshotGenerator;
2525
use databend_storages_common_table_meta::meta::Statistics;
2626
use databend_storages_common_table_meta::meta::TableSnapshot;
27+
use databend_storages_common_txn::TxnManager;
2728

2829
#[test]
2930
/// base snapshot contains segments 1, 2, 3,
@@ -62,6 +63,8 @@ fn test_unresolvable_delete_conflict() {
6263
None,
6364
Some(Arc::new(latest_snapshot)),
6465
None,
66+
TxnManager::init(),
67+
0,
6568
);
6669
assert!(result.is_err());
6770
}
@@ -151,6 +154,8 @@ fn test_resolvable_delete_conflict() {
151154
None,
152155
Some(Arc::new(latest_snapshot)),
153156
None,
157+
TxnManager::init(),
158+
0,
154159
);
155160
let snapshot = result.unwrap();
156161
let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)];
@@ -255,6 +260,8 @@ fn test_resolvable_replace_conflict() {
255260
None,
256261
Some(Arc::new(latest_snapshot)),
257262
None,
263+
TxnManager::init(),
264+
0,
258265
);
259266
let snapshot = result.unwrap();
260267
let expected = vec![

โ€Žsrc/query/storages/fuse/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ databend-storages-common-index = { workspace = true }
4747
databend-storages-common-io = { workspace = true }
4848
databend-storages-common-pruner = { workspace = true }
4949
databend-storages-common-table-meta = { workspace = true }
50+
databend-storages-common-txn = { workspace = true }
5051
enum-as-inner = "0.5"
5152
futures = { workspace = true }
5253
futures-util = { workspace = true }

โ€Žsrc/query/storages/fuse/src/operations/common/generators/append_generator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,11 @@ impl SnapshotGenerator for AppendGenerator {
113113
Ok(())
114114
}
115115

116-
fn generate_new_snapshot(
116+
fn do_generate_new_snapshot(
117117
&self,
118118
schema: TableSchema,
119119
cluster_key_meta: Option<ClusterKey>,
120-
previous: Option<Arc<TableSnapshot>>,
120+
previous: &Option<Arc<TableSnapshot>>,
121121
prev_table_seq: Option<u64>,
122122
) -> Result<TableSnapshot> {
123123
let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?;
@@ -133,7 +133,7 @@ impl SnapshotGenerator for AppendGenerator {
133133
let mut new_segments = snapshot_merged.merged_segments.clone();
134134
let mut new_summary = snapshot_merged.merged_statistics.clone();
135135

136-
if let Some(snapshot) = &previous {
136+
if let Some(snapshot) = previous {
137137
prev_timestamp = snapshot.timestamp;
138138
prev_snapshot_id = Some((snapshot.snapshot_id, snapshot.format_version));
139139
table_statistics_location = snapshot.table_statistics_location.clone();

โ€Žsrc/query/storages/fuse/src/operations/common/generators/mutation_generator.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,27 +56,33 @@ impl SnapshotGenerator for MutationGenerator {
5656
self.conflict_resolve_ctx = ctx;
5757
}
5858

59-
fn generate_new_snapshot(
59+
fn do_generate_new_snapshot(
6060
&self,
6161
schema: TableSchema,
6262
cluster_key_meta: Option<ClusterKey>,
63-
previous: Option<Arc<TableSnapshot>>,
63+
previous: &Option<Arc<TableSnapshot>>,
6464
prev_table_seq: Option<u64>,
6565
) -> Result<TableSnapshot> {
6666
let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0);
6767

68-
let previous = previous.unwrap_or_else(|| {
69-
Arc::new(TableSnapshot::new_empty_snapshot(
70-
schema.clone(),
71-
prev_table_seq,
72-
))
73-
});
68+
let empty_snapshot;
69+
let previous = match previous {
70+
Some(prev) => prev,
71+
None => {
72+
empty_snapshot = Arc::new(TableSnapshot::new_empty_snapshot(
73+
schema.clone(),
74+
prev_table_seq,
75+
));
76+
&empty_snapshot
77+
}
78+
};
79+
7480
match &self.conflict_resolve_ctx {
7581
ConflictResolveContext::ModifiedSegmentExistsInLatest(ctx) => {
7682
if let Some((removed, replaced)) =
7783
ConflictResolveContext::is_modified_segments_exists_in_latest(
7884
&self.base_snapshot,
79-
&previous,
85+
previous,
8086
&ctx.replaced_segments,
8187
&ctx.removed_segment_indexes,
8288
)

โ€Žsrc/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use databend_common_exception::Result;
1919
use databend_common_expression::TableSchema;
2020
use databend_storages_common_table_meta::meta::ClusterKey;
2121
use databend_storages_common_table_meta::meta::TableSnapshot;
22+
use databend_storages_common_txn::TxnManagerRef;
2223

2324
use crate::operations::common::ConflictResolveContext;
2425

@@ -43,5 +44,34 @@ pub trait SnapshotGenerator {
4344
cluster_key_meta: Option<ClusterKey>,
4445
previous: Option<Arc<TableSnapshot>>,
4546
prev_table_seq: Option<u64>,
47+
txn_mgr: TxnManagerRef,
48+
table_id: u64,
49+
) -> Result<TableSnapshot> {
50+
let mut snapshot =
51+
self.do_generate_new_snapshot(schema, cluster_key_meta, &previous, prev_table_seq)?;
52+
53+
let has_pending_transactional_mutations = {
54+
let guard = txn_mgr.lock();
55+
// NOTE:
56+
// When generating a new snapshot for a mutation of table for the first time,
57+
// there is no buffered table ID inside txn_mgr for this table.
58+
guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some()
59+
};
60+
61+
if has_pending_transactional_mutations {
62+
// Adjust the `prev_snapshot_id` of the newly created snapshot to match the
63+
// `prev_snapshot_id` of the table when it first appeared in the transaction.
64+
let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id);
65+
snapshot.prev_snapshot_id = previous_of_previous;
66+
}
67+
Ok(snapshot)
68+
}
69+
70+
fn do_generate_new_snapshot(
71+
&self,
72+
schema: TableSchema,
73+
cluster_key_meta: Option<ClusterKey>,
74+
previous: &Option<Arc<TableSnapshot>>,
75+
prev_table_seq: Option<u64>,
4676
) -> Result<TableSnapshot>;
4777
}

โ€Žsrc/query/storages/fuse/src/operations/common/generators/truncate_generator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ impl SnapshotGenerator for TruncateGenerator {
5454
self
5555
}
5656

57-
fn generate_new_snapshot(
57+
fn do_generate_new_snapshot(
5858
&self,
5959
schema: TableSchema,
6060
cluster_key_meta: Option<ClusterKey>,
61-
previous: Option<Arc<TableSnapshot>>,
61+
previous: &Option<Arc<TableSnapshot>>,
6262
prev_table_seq: Option<u64>,
6363
) -> Result<TableSnapshot> {
6464
let (prev_timestamp, prev_snapshot_id) = if let Some(prev_snapshot) = previous {

โ€Žsrc/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use databend_common_meta_types::MatchSeq;
3434
use databend_common_pipeline_sinks::AsyncSink;
3535
use databend_storages_common_table_meta::meta::TableSnapshot;
3636
use databend_storages_common_table_meta::meta::Versioned;
37+
use databend_storages_common_txn::TxnManagerRef;
3738
use log::debug;
3839
use log::error;
3940
use log::info;
@@ -90,7 +91,12 @@ impl AsyncSink for CommitMultiTableInsert {
9091
snapshot_generator.set_conflict_resolve_context(commit_meta.conflict_resolve_context);
9192
let table = self.tables.get(&table_id).unwrap();
9293
update_table_metas.push((
93-
build_update_table_meta_req(table.as_ref(), &snapshot_generator).await?,
94+
build_update_table_meta_req(
95+
table.as_ref(),
96+
&snapshot_generator,
97+
self.ctx.txn_mgr(),
98+
)
99+
.await?,
94100
table.get_table_info().clone(),
95101
));
96102
snapshot_generators.insert(table_id, snapshot_generator);
@@ -173,6 +179,7 @@ impl AsyncSink for CommitMultiTableInsert {
173179
*req = build_update_table_meta_req(
174180
table.as_ref(),
175181
snapshot_generators.get(&tid).unwrap(),
182+
self.ctx.txn_mgr(),
176183
)
177184
.await?;
178185
break;
@@ -227,6 +234,7 @@ impl AsyncSink for CommitMultiTableInsert {
227234
async fn build_update_table_meta_req(
228235
table: &dyn Table,
229236
snapshot_generator: &AppendGenerator,
237+
txn_mgr: TxnManagerRef,
230238
) -> Result<UpdateTableMetaReq> {
231239
let fuse_table = FuseTable::try_from_table(table)?;
232240
let previous = fuse_table.read_table_snapshot().await?;
@@ -235,6 +243,8 @@ async fn build_update_table_meta_req(
235243
fuse_table.cluster_key_meta.clone(),
236244
previous,
237245
Some(fuse_table.table_info.ident.seq),
246+
txn_mgr,
247+
table.get_id(),
238248
)?;
239249

240250
// write snapshot

โ€Žsrc/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,8 @@ where F: SnapshotGenerator + Send + 'static
286286
cluster_key_meta,
287287
previous,
288288
Some(table_info.ident.seq),
289+
self.ctx.txn_mgr(),
290+
table_info.ident.table_id,
289291
) {
290292
Ok(snapshot) => {
291293
self.state = State::TryCommit {

โ€Žtests/sqllogictests/suites/base/09_fuse_engine/09_0100_insert_multi_table.sql

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,11 @@ begin transaction;
325325
statement ok
326326
insert into s values(3,4),(1,2),(5,6);
327327

328+
query I
329+
select count(*) from fuse_snapshot('default', 's');
330+
----
331+
1
332+
328333
query II
329334
INSERT FIRST
330335
WHEN c3 = 5 THEN
@@ -335,6 +340,16 @@ SELECT * from s;
335340
----
336341
1 2
337342

343+
query I
344+
select count(*) from fuse_snapshot('default', 't1');
345+
----
346+
1
347+
348+
query I
349+
select count(*) from fuse_snapshot('default', 't2');
350+
----
351+
1
352+
338353
query II
339354
select * from t1 order by c1;
340355
----
@@ -349,6 +364,22 @@ select * from t2;
349364
statement ok
350365
rollback;
351366

367+
query I
368+
select count(*) from fuse_snapshot('default', 's');
369+
----
370+
0
371+
372+
query I
373+
select count(*) from fuse_snapshot('default', 't1');
374+
----
375+
0
376+
377+
query I
378+
select count(*) from fuse_snapshot('default', 't2');
379+
----
380+
0
381+
382+
352383
query II
353384
select * from t1 order by c1;
354385
----

โ€Žtests/sqllogictests/suites/base/14_transaction/14_0003_merge_into.test

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,21 @@ INSERT INTO employees VALUES
3030
(3, 'Charlie', 'Finance'),
3131
(4, 'David', 'HR');
3232

33+
query I
34+
select count(*) from fuse_snapshot('test_txn_merge_into','employees');
35+
----
36+
1
37+
3338
statement ok
3439
INSERT INTO salaries VALUES
3540
(1, 50000.00),
3641
(2, 60000.00);
3742

43+
query I
44+
select count(*) from fuse_snapshot('test_txn_merge_into','salaries');
45+
----
46+
1
47+
3848
statement ok
3949
MERGE INTO salaries
4050
USING (SELECT * FROM employees) AS employees
@@ -49,6 +59,11 @@ MERGE INTO salaries
4959
INSERT (employee_id, salary)
5060
VALUES (employees.employee_id, 55000.00);
5161

62+
query I
63+
select count(*) from fuse_snapshot('test_txn_merge_into','salaries');
64+
----
65+
1
66+
5267
query IF
5368
SELECT employee_id, salary FROM salaries order by employee_id;
5469
----
@@ -60,6 +75,16 @@ SELECT employee_id, salary FROM salaries order by employee_id;
6075
statement ok
6176
COMMIT;
6277

78+
query I
79+
select count(*) from fuse_snapshot('test_txn_merge_into','salaries');
80+
----
81+
1
82+
83+
query I
84+
select count(*) from fuse_snapshot('test_txn_merge_into','employees');
85+
----
86+
1
87+
6388
query IF
6489
SELECT employee_id, salary FROM salaries order by employee_id;
6590
----
@@ -145,4 +170,4 @@ SELECT employee_id, salary FROM salaries order by employee_id;
145170

146171

147172
statement ok
148-
drop database test_txn_merge_into;
173+
drop database test_txn_merge_into;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
statement ok
2+
create or replace database test_txn_snapshots;
3+
4+
statement ok
5+
use test_txn_snapshots;
6+
7+
statement ok
8+
create or replace table t(c int);
9+
10+
11+
###################################
12+
# no snapshots left if tx aborted #
13+
###################################
14+
15+
statement ok
16+
begin;
17+
18+
statement ok
19+
insert into t values(1);
20+
21+
statement ok
22+
insert into t values(1);
23+
24+
statement ok
25+
rollback;
26+
27+
query I
28+
select count() from fuse_snapshot('test_txn_snapshots', 't');
29+
----
30+
0
31+
32+
33+
#####################################################
34+
# one snapshot left if table mutated multiple times #
35+
#####################################################
36+
37+
38+
39+
statement ok
40+
begin;
41+
42+
statement ok
43+
insert into t values(1);
44+
45+
statement ok
46+
insert into t values(1);
47+
48+
statement ok
49+
insert into t values(1);
50+
51+
statement ok
52+
commit;
53+
54+
query I
55+
select count() from fuse_snapshot('test_txn_snapshots', 't');
56+
----
57+
1
58+
59+

0 commit comments

Comments
ย (0)