From 16d44ca1bd6e0e348659d5aab13c7d8c28e25a6b Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 8 Dec 2025 01:31:20 +0800 Subject: [PATCH 1/3] fix: query error when stream_consume_batch_size_hint is not 0 --- src/query/service/src/sessions/query_ctx.rs | 31 +++++++++---------- src/query/sql/src/planner/planner_cache.rs | 2 +- .../ee/06_ee_stream/06_0000_stream.test | 22 ++++++------- .../06_ee_stream/06_0001_stream_status.test | 16 +++++----- .../06_0002_stream_txn_consume.test | 22 ++++++------- .../06_0003_stream_multi_table_insert.test | 6 ++-- ..._into_location_from_strea_issue_15876.test | 25 ++++++++------- .../06_0006_stream_batch_limit.test | 23 +++++++++----- .../06_0009_stream_issue_18827.test | 6 ++-- .../06_0010_stream_issue_19037.test | 4 +-- 10 files changed, 82 insertions(+), 75 deletions(-) diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 953de6f0b0da4..bf9115e883efb 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -1440,8 +1440,7 @@ impl TableContext for QueryContext { } } - let batch_size = self.get_settings().get_stream_consume_batch_size_hint()?; - self.get_table_from_shared(catalog, database, table, batch_size) + self.get_table_from_shared(catalog, database, table, None) .await } @@ -1457,34 +1456,32 @@ impl TableContext for QueryContext { table: &str, max_batch_size: Option, ) -> Result> { - let max_batch_size = { - match max_batch_size { - Some(v) => { - // use the batch size specified in the statement + let final_batch_size = match max_batch_size { + Some(v) => { + // use the batch size specified in the statement + Some(v) + } + None => { + if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? { + info!("Overriding stream max_batch_size with setting value: {}", v); Some(v) - } - None => { - if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? { - info!("Overriding stream max_batch_size with setting value: {}", v); - Some(v) - } else { - None - } + } else { + None } } }; let table = self - .get_table_from_shared(catalog, database, table, max_batch_size) + .get_table_from_shared(catalog, database, table, final_batch_size) .await?; if table.is_stream() { let stream = StreamTable::try_from_table(table.as_ref())?; let actual_batch_limit = stream.max_batch_size(); - if actual_batch_limit != max_batch_size { + if actual_batch_limit != final_batch_size { return Err(ErrorCode::StorageUnsupported( format!( "Stream batch size must be consistent within transaction: actual={:?}, requested={:?}", - actual_batch_limit, max_batch_size + actual_batch_limit, final_batch_size ) )); } diff --git a/src/query/sql/src/planner/planner_cache.rs b/src/query/sql/src/planner/planner_cache.rs index cc3453cb432cd..25870fd137a23 100644 --- a/src/query/sql/src/planner/planner_cache.rs +++ b/src/query/sql/src/planner/planner_cache.rs @@ -205,7 +205,7 @@ impl TableRefVisitor { databend_common_base::runtime::block_on(async move { if let Ok(table_meta) = self .ctx - .get_table(&catalog_name, &database_name, &table_name) + .get_table_with_batch(&catalog_name, &database_name, &table_name, None) .await { if !table_meta.is_temp() diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0000_stream.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0000_stream.test index af9db2c4abf5a..344699014c8c0 100644 --- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0000_stream.test +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0000_stream.test @@ -172,7 +172,7 @@ drop stream s3 statement ok drop table t2 all -query TTTTT +query TTTTTT select catalog, database, name, mode, table_name, comment from system.streams where database='test_stream' order by name ---- default test_stream s append_only test_stream.t (empty) @@ -194,7 +194,7 @@ drop stream t1 statement ok drop table t1 all -query T +query TTTTAA show columns from s2 ---- a int YES (empty) NULL NULL @@ -210,12 +210,12 @@ s2 Change tracking is not enabled on table 'test_stream'.'t' statement ok drop table t all -query TTT +query TT select name, invalid_reason from system.streams where database='test_stream' order by name ---- s2 Unknown table 't' -query T +query TTTTAA show columns from s2 ---- @@ -535,7 +535,7 @@ statement ok create stream s8_1 on table t7 # test merge into insert only -query T +query I merge into t7 using (select a, b from t8) as s on t7.a=s.a when not matched then insert * ---- 1 @@ -546,7 +546,7 @@ select a, b, change$action, change$is_update from s8_1 order by a, b 3 3 INSERT 0 # test merge into matched only -query TT +query II merge into t7 using (select a, b+1 as b from t8) as s on t7.a=s.a when matched and s.a=2 then update * when matched and s.a=4 then delete ---- 1 1 @@ -572,7 +572,7 @@ statement ok create stream s9_1 on table t7 # test merge into full operation -query TTT +query III merge into t7 using t8 on t7.a=t8.a when matched and t8.a=2 then update set t7.b=t8.b when matched and t8.a=3 then delete when not matched then insert (a,b) values(t8.a,t8.b) ---- 1 1 1 @@ -673,7 +673,7 @@ merge into t9 a using (select 10,'a') b(id,c1) on a.id=b.id when matched then up ---- 1 0 -query T +query ITTB select id, c1, change$action, change$is_update from stream_t9; ---- 10 a INSERT 0 @@ -693,7 +693,7 @@ merge into t9 a using (select 10,'a') b(id,c1) on a.id=b.id when matched then up ---- 0 1 -query T +query ITTTB select * from stream_t9; ---- @@ -810,7 +810,7 @@ insert into t1 values(1,1),(2,3),(3,3); statement ok explain merge into t1 using t2 on t1.a=t2.a when matched then update set t1.b=t2.b when not matched then insert *; -query TT +query II merge into t1 using t2 on t1.a=t2.a when matched then update set t1.b=t2.b when not matched then insert *; ---- 1 2 @@ -818,7 +818,7 @@ merge into t1 using t2 on t1.a=t2.a when matched then update set t1.b=t2.b when statement ok alter table t1 rename to t1_1 -query T +query TT select name, invalid_reason from system.streams where database='test_stream' order by name ---- s1 (empty) diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0001_stream_status.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0001_stream_status.test index 006c2171834da..1f45b369b7f83 100644 --- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0001_stream_status.test +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0001_stream_status.test @@ -39,32 +39,32 @@ alter table t set options(change_tracking=true) statement ok create stream if not exists s on table t -query I +query B select * from stream_status('s') ---- 0 -query I +query B select * from stream_status('test_stream_status.s') ---- 0 -query I +query B select * from stream_status('default.test_stream_status.s') ---- 0 -query I +query B call system$stream_status('s') ---- 0 -query I +query B call system$stream_status('test_stream_status.s') ---- 0 -query I +query B call system$stream_status('default.test_stream_status.s') ---- 0 @@ -72,12 +72,12 @@ call system$stream_status('default.test_stream_status.s') statement ok insert into t values(2) -query I +query B select * from stream_status('s') ---- 1 -query I +query B call system$stream_status('s') ---- 1 diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0002_stream_txn_consume.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0002_stream_txn_consume.test index 27ba4a9699766..a7396c6b5128e 100644 --- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0002_stream_txn_consume.test +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0002_stream_txn_consume.test @@ -371,7 +371,7 @@ insert into t_1 (str) values ('a'), ('b'); statement ok begin; -query I +query T select str from s_1 with consume order by str; ---- a @@ -383,7 +383,7 @@ select 1/0; statement ok commit; -query I +query T select str from s_1 order by str; ---- a @@ -394,20 +394,20 @@ b statement ok begin; -query I +query T select str from s_1 with consume order by str; ---- a b # inside txn, s_1 is consumed, expects empty result set -query I +query T select str from s_1; statement ok rollback; -query I +query T select str from s_1 order by str; ---- a @@ -428,7 +428,7 @@ statement ok insert into tmp_sink select str from s_1; # changes should not be consumed -query I +query T select str from s_1 order by str; ---- a @@ -459,7 +459,7 @@ select count() from tmp_sink statement ok commit; -query I +query T select str from s_1 order by str; ---- @@ -486,7 +486,7 @@ statement ok explain select str from s_1; # explain should not consume the stream -query I +query T select str from s_1 order by str; ---- a @@ -508,7 +508,7 @@ begin; statement ok insert into target_1 select str from s_1; -query I +query T select str from s_1 with consume order by str; ---- a @@ -520,13 +520,13 @@ insert into target_2 select str from s_1; statement ok commit; -query I +query T select str from target_1 order by str; ---- a b -query I +query T select str from target_2 order by str; ---- diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0003_stream_multi_table_insert.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0003_stream_multi_table_insert.test index 0887f21f89e87..34293e6702406 100644 --- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0003_stream_multi_table_insert.test +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0003_stream_multi_table_insert.test @@ -14,10 +14,10 @@ # multi table INSERT statement ok -create or replace database test_txn_stream; +create or replace database test_txn_stream_1; statement ok -use test_txn_stream; +use test_txn_stream_1; statement ok CREATE TABLE t_append_only(a INT); @@ -194,4 +194,4 @@ SELECT * FROM t_consume_append_only_6_1; -- empty ---- statement ok -drop database test_txn_stream; \ No newline at end of file +drop database test_txn_stream_1; diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0005_copy_into_location_from_strea_issue_15876.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0005_copy_into_location_from_strea_issue_15876.test index 7998ee47f1d9f..41dcb1d172d4e 100644 --- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0005_copy_into_location_from_strea_issue_15876.test +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0005_copy_into_location_from_strea_issue_15876.test @@ -42,13 +42,13 @@ remove @test_stage_15876; statement ok copy into @test_stage_15876/case1 from s; -query T +query I select c from @test_stage_15876/case1; ---- 1 # expects empty result set -query TT +query I select c from s; ---- @@ -59,12 +59,12 @@ insert into t values(1); statement ok copy into @test_stage_15876/empty from (select c from s where 1 = 0); -query T +query TITTT list @test_stage_15876/empty; ---- # stream should be consumed -query TT +query I select c from s; ---- @@ -81,7 +81,7 @@ statement ok begin; # check stream -query T +query I select c from s; ---- 2 @@ -90,7 +90,7 @@ statement ok copy into @test_stage_15876/case2 from s; # stream should NOT be consumed yet (txn not ended) -query T +query I select c from s; ---- 2 @@ -99,7 +99,7 @@ statement ok commit; # stream should be consumed now -query ok +query ITBT select * from s; ---- @@ -110,7 +110,7 @@ select * from s; statement ok insert into t values(1); -query TT +query I select c from s; ---- 1 @@ -119,7 +119,7 @@ statement error copy into @test_stage_15876 from (select c/0 from s); # stream should NOT be consumed now -query TT +query I select c from s; ---- 1 @@ -131,7 +131,7 @@ select c from s; # check that stream contains changes -query TT +query I select c from s; ---- 1 @@ -147,7 +147,10 @@ statement ok commit; # stream should NOT be consumed -query TT +query I select c from s; ---- 1 + +statement ok +drop database issue_15876; diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0006_stream_batch_limit.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0006_stream_batch_limit.test index 956f1d84eecf9..45c1eb7c8c36f 100644 --- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0006_stream_batch_limit.test +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0006_stream_batch_limit.test @@ -141,7 +141,7 @@ statement ok copy into @test_stage_stream_06/case1 from s with (max_batch_size = 1); # there should be only 1 row copied into stage -query T +query I select a from @test_stage_stream_06/case1; ---- 2 @@ -154,13 +154,13 @@ statement ok INSERT INTO t1 values(4); # recall: t2 contains one row {1} -query T +query I select a from t2; ---- 1 # recall: s contains 2 rows {3, 4} -query T +query I select a from s order by a; ---- 3 @@ -209,21 +209,21 @@ statement ok insert into t values(4); # expect size of the result set is larger than hint (1) -query T +query I select a from s with (consume = true, max_batch_size = 1) order by a; ---- 1 2 # expect 2 rows -query T +query I select a from s with (consume = false, max_batch_size = 2) order by a; ---- 3 4 # expect 2 rows -query T +query I select a from s with (consume = true, max_batch_size = 2) order by a; ---- 3 @@ -264,7 +264,7 @@ statement ok commit; # expects {1,2} have been inserted twice -query T +query I select a from t1 order by a; ---- 1 @@ -273,7 +273,7 @@ select a from t1 order by a; 2 # expects {3,4} left in stream -query T +query I select a from s order by a; ---- 3 @@ -325,6 +325,13 @@ select c from s_t_settings order by c; 1 2 +query I +select c from t_settings order by c; +---- +1 +2 +3 + ####################################################### # max_batch_size specified in query has high priority # ####################################################### diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0009_stream_issue_18827.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0009_stream_issue_18827.test index fa3538d0bd75c..3c28b30f8fef9 100644 --- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0009_stream_issue_18827.test +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0009_stream_issue_18827.test @@ -35,17 +35,17 @@ alter table t_18827 modify column a binary; statement error 1132 alter table t_18827 add column c date default today(); -query T +query AI select a, b from t_18827; ---- 61 1 62 2 -query T +query AITB select a, b, change$action, change$is_update from s1_18827; ---- -query T +query AITB select a, b, change$action, change$is_update from s_18827 order by change$action, a; ---- diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0010_stream_issue_19037.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0010_stream_issue_19037.test index b2af24b87bc99..593ec6f390e5a 100644 --- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0010_stream_issue_19037.test +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0010_stream_issue_19037.test @@ -27,7 +27,7 @@ create or replace stream s on table t APPEND_ONLY = false; statement ok insert into t values(1, BUILD_BITMAP([3, 7, 9])); -query T +query I select count() from s with consume; ---- 1 @@ -35,7 +35,7 @@ select count() from s with consume; statement ok update t set b = BUILD_BITMAP([3, 7, 10]) where a = 1; -query T +query I select count() from s with consume; ---- 2 From d8c9d99fbc3ed3cf102a652b3d086a2ef201d5d1 Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 8 Dec 2025 20:10:54 +0800 Subject: [PATCH 2/3] fix --- src/query/service/src/sessions/query_ctx.rs | 3 ++- src/query/sql/src/planner/planner_cache.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index bf9115e883efb..c13af21deedc1 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -1440,7 +1440,8 @@ impl TableContext for QueryContext { } } - self.get_table_from_shared(catalog, database, table, None) + let batch_size = self.get_settings().get_stream_consume_batch_size_hint()?; + self.get_table_from_shared(catalog, database, table, batch_size) .await } diff --git a/src/query/sql/src/planner/planner_cache.rs b/src/query/sql/src/planner/planner_cache.rs index 25870fd137a23..cc3453cb432cd 100644 --- a/src/query/sql/src/planner/planner_cache.rs +++ b/src/query/sql/src/planner/planner_cache.rs @@ -205,7 +205,7 @@ impl TableRefVisitor { databend_common_base::runtime::block_on(async move { if let Ok(table_meta) = self .ctx - .get_table_with_batch(&catalog_name, &database_name, &table_name, None) + .get_table(&catalog_name, &database_name, &table_name) .await { if !table_meta.is_temp() From de0d52e7dec7247b98a85ed2513c1409ff751bbc Mon Sep 17 00:00:00 2001 From: dantengsky Date: Mon, 8 Dec 2025 23:16:23 +0800 Subject: [PATCH 3/3] add logic test for issue #19073 --- .../06_0010_stream_issue_19073.test | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 tests/sqllogictests/suites/ee/06_ee_stream/06_0010_stream_issue_19073.test diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0010_stream_issue_19073.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0010_stream_issue_19073.test new file mode 100644 index 0000000000000..a5ce8809b5b71 --- /dev/null +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0010_stream_issue_19073.test @@ -0,0 +1,39 @@ +## Copyright 2023 Databend Cloud +## +## Licensed under the Elastic License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## https://www.elastic.co/licensing/elastic-license +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. + +statement ok +create or replace database issue_19073; + +statement ok +use issue_19073; + +statement ok +create table t(a int) + +statement ok +set stream_consume_batch_size_hint =2; + +statement ok +insert into t values(1); + +statement ok +insert into t values(2),(3); + + +query I +select * from t order by a +---- +1 +2 +3