Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1457,34 +1457,32 @@ impl TableContext for QueryContext {
table: &str,
max_batch_size: Option<u64>,
) -> Result<Arc<dyn Table>> {
let max_batch_size = {
match max_batch_size {
Some(v) => {
// use the batch size specified in the statement
let final_batch_size = match max_batch_size {
Some(v) => {
// use the batch size specified in the statement
Some(v)
}
None => {
if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? {
info!("Overriding stream max_batch_size with setting value: {}", v);
Some(v)
}
None => {
if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? {
info!("Overriding stream max_batch_size with setting value: {}", v);
Some(v)
} else {
None
}
} else {
None
}
}
};

let table = self
.get_table_from_shared(catalog, database, table, max_batch_size)
.get_table_from_shared(catalog, database, table, final_batch_size)
.await?;
if table.is_stream() {
let stream = StreamTable::try_from_table(table.as_ref())?;
let actual_batch_limit = stream.max_batch_size();
if actual_batch_limit != max_batch_size {
if actual_batch_limit != final_batch_size {
return Err(ErrorCode::StorageUnsupported(
format!(
"Stream batch size must be consistent within transaction: actual={:?}, requested={:?}",
actual_batch_limit, max_batch_size
actual_batch_limit, final_batch_size
)
));
}
Expand Down
22 changes: 11 additions & 11 deletions tests/sqllogictests/suites/ee/06_ee_stream/06_0000_stream.test
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ drop stream s3
statement ok
drop table t2 all

query TTTTT
query TTTTTT
select catalog, database, name, mode, table_name, comment from system.streams where database='test_stream' order by name
----
default test_stream s append_only test_stream.t (empty)
Expand All @@ -194,7 +194,7 @@ drop stream t1
statement ok
drop table t1 all

query T
query TTTTAA
show columns from s2
----
a int YES (empty) NULL NULL
Expand All @@ -210,12 +210,12 @@ s2 Change tracking is not enabled on table 'test_stream'.'t'
statement ok
drop table t all

query TTT
query TT
select name, invalid_reason from system.streams where database='test_stream' order by name
----
s2 Unknown table 't'

query T
query TTTTAA
show columns from s2
----

Expand Down Expand Up @@ -535,7 +535,7 @@ statement ok
create stream s8_1 on table t7

# test merge into insert only
query T
query I
merge into t7 using (select a, b from t8) as s on t7.a=s.a when not matched then insert *
----
1
Expand All @@ -546,7 +546,7 @@ select a, b, change$action, change$is_update from s8_1 order by a, b
3 3 INSERT 0

# test merge into matched only
query TT
query II
merge into t7 using (select a, b+1 as b from t8) as s on t7.a=s.a when matched and s.a=2 then update * when matched and s.a=4 then delete
----
1 1
Expand All @@ -572,7 +572,7 @@ statement ok
create stream s9_1 on table t7

# test merge into full operation
query TTT
query III
merge into t7 using t8 on t7.a=t8.a when matched and t8.a=2 then update set t7.b=t8.b when matched and t8.a=3 then delete when not matched then insert (a,b) values(t8.a,t8.b)
----
1 1 1
Expand Down Expand Up @@ -673,7 +673,7 @@ merge into t9 a using (select 10,'a') b(id,c1) on a.id=b.id when matched then up
----
1 0

query T
query ITTB
select id, c1, change$action, change$is_update from stream_t9;
----
10 a INSERT 0
Expand All @@ -693,7 +693,7 @@ merge into t9 a using (select 10,'a') b(id,c1) on a.id=b.id when matched then up
----
0 1

query T
query ITTTB
select * from stream_t9;
----

Expand Down Expand Up @@ -810,15 +810,15 @@ insert into t1 values(1,1),(2,3),(3,3);
statement ok
explain merge into t1 using t2 on t1.a=t2.a when matched then update set t1.b=t2.b when not matched then insert *;

query TT
query II
merge into t1 using t2 on t1.a=t2.a when matched then update set t1.b=t2.b when not matched then insert *;
----
1 2

statement ok
alter table t1 rename to t1_1

query T
query TT
select name, invalid_reason from system.streams where database='test_stream' order by name
----
s1 (empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,45 +39,45 @@ alter table t set options(change_tracking=true)
statement ok
create stream if not exists s on table t

query I
query B
select * from stream_status('s')
----
0

query I
query B
select * from stream_status('test_stream_status.s')
----
0

query I
query B
select * from stream_status('default.test_stream_status.s')
----
0

query I
query B
call system$stream_status('s')
----
0

query I
query B
call system$stream_status('test_stream_status.s')
----
0

query I
query B
call system$stream_status('default.test_stream_status.s')
----
0

statement ok
insert into t values(2)

query I
query B
select * from stream_status('s')
----
1

query I
query B
call system$stream_status('s')
----
1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ insert into t_1 (str) values ('a'), ('b');
statement ok
begin;

query I
query T
select str from s_1 with consume order by str;
----
a
Expand All @@ -383,7 +383,7 @@ select 1/0;
statement ok
commit;

query I
query T
select str from s_1 order by str;
----
a
Expand All @@ -394,20 +394,20 @@ b
statement ok
begin;

query I
query T
select str from s_1 with consume order by str;
----
a
b

# inside txn, s_1 is consumed, expects empty result set
query I
query T
select str from s_1;

statement ok
rollback;

query I
query T
select str from s_1 order by str;
----
a
Expand All @@ -428,7 +428,7 @@ statement ok
insert into tmp_sink select str from s_1;

# changes should not be consumed
query I
query T
select str from s_1 order by str;
----
a
Expand Down Expand Up @@ -459,7 +459,7 @@ select count() from tmp_sink
statement ok
commit;

query I
query T
select str from s_1 order by str;
----

Expand All @@ -486,7 +486,7 @@ statement ok
explain select str from s_1;

# explain should not consume the stream
query I
query T
select str from s_1 order by str;
----
a
Expand All @@ -508,7 +508,7 @@ begin;
statement ok
insert into target_1 select str from s_1;

query I
query T
select str from s_1 with consume order by str;
----
a
Expand All @@ -520,13 +520,13 @@ insert into target_2 select str from s_1;
statement ok
commit;

query I
query T
select str from target_1 order by str;
----
a
b

query I
query T
select str from target_2 order by str;
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

# multi table INSERT
statement ok
create or replace database test_txn_stream;
create or replace database test_txn_stream_1;

statement ok
use test_txn_stream;
use test_txn_stream_1;

statement ok
CREATE TABLE t_append_only(a INT);
Expand Down Expand Up @@ -194,4 +194,4 @@ SELECT * FROM t_consume_append_only_6_1; -- empty
----

statement ok
drop database test_txn_stream;
drop database test_txn_stream_1;
Loading
Loading