diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index fece7246bd69..b4cb346a4917 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -61,6 +61,12 @@ pub enum LimitType { } impl LimitType { + pub fn from_limit_rows(limit: Option) -> Self { + match limit { + Some(limit) => LimitType::LimitRows(limit), + None => LimitType::None, + } + } pub fn limit_rows(&self, rows: usize) -> usize { match self { LimitType::LimitRows(limit) => *limit, diff --git a/src/query/expression/src/kernels/sort_compare.rs b/src/query/expression/src/kernels/sort_compare.rs index a1df50931ed6..f492ba7178c9 100644 --- a/src/query/expression/src/kernels/sort_compare.rs +++ b/src/query/expression/src/kernels/sort_compare.rs @@ -98,7 +98,7 @@ macro_rules! do_sorter { impl SortCompare { pub fn new(ordering_descs: Vec, rows: usize, limit: LimitType) -> Self { let equality_index = - if ordering_descs.len() == 1 && matches!(limit, LimitType::LimitRank(_)) { + if ordering_descs.len() == 1 && !matches!(limit, LimitType::LimitRank(_)) { vec![] } else { vec![1; rows as _] @@ -114,6 +114,11 @@ impl SortCompare { } } + fn need_update_equality_index(&self) -> bool { + self.current_column_index != self.ordering_descs.len() - 1 + || matches!(self.limit, LimitType::LimitRank(_)) + } + pub fn increment_column_index(&mut self) { self.current_column_index += 1; } @@ -196,8 +201,7 @@ impl SortCompare { } else { let mut current = 1; let len = self.rows; - let need_update_equality_index = - self.current_column_index != self.ordering_descs.len() - 1; + let need_update_equality_index = self.need_update_equality_index(); while current < len { // Find the start of the next range of equal elements diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_partial.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_partial.rs index 2a4b7f9261e2..664880d42fad 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_partial.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_partial.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::DataBlock; +use databend_common_expression::LimitType; use databend_common_expression::SortColumnDescription; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -25,13 +26,13 @@ use crate::processors::transforms::Transform; use crate::processors::transforms::Transformer; pub struct TransformSortPartial { - limit: Option, + limit: LimitType, sort_columns_descriptions: Arc>, } impl TransformSortPartial { pub fn new( - limit: Option, + limit: LimitType, sort_columns_descriptions: Arc>, ) -> Self { Self { @@ -43,7 +44,7 @@ impl TransformSortPartial { pub fn try_create( input: Arc, output: Arc, - limit: Option, + limit: LimitType, sort_columns_descriptions: Arc>, ) -> Result> { Ok(Transformer::create(input, output, TransformSortPartial { @@ -58,6 +59,6 @@ impl Transform for TransformSortPartial { const NAME: &'static str = "SortPartialTransform"; fn transform(&mut self, block: DataBlock) -> Result { - DataBlock::sort(&block, &self.sort_columns_descriptions, self.limit) + DataBlock::sort_with_type(&block, &self.sort_columns_descriptions, self.limit) } } diff --git a/src/query/service/src/interpreters/interpreter_table_analyze.rs b/src/query/service/src/interpreters/interpreter_table_analyze.rs index 74e9f2a40925..f3a5d2b53bcc 100644 --- a/src/query/service/src/interpreters/interpreter_table_analyze.rs +++ b/src/query/service/src/interpreters/interpreter_table_analyze.rs @@ -298,6 +298,7 @@ fn remove_exchange(plan: PhysicalPlan) -> PhysicalPlan { input: Box::new(traverse(*plan.input)), group_by: plan.group_by, agg_funcs: plan.agg_funcs, + rank_limit: plan.rank_limit, enable_experimental_aggregate_hashtable: plan .enable_experimental_aggregate_hashtable, group_by_display: plan.group_by_display, @@ -310,7 +311,6 @@ fn remove_exchange(plan: PhysicalPlan) -> PhysicalPlan { group_by: plan.group_by, agg_funcs: plan.agg_funcs, before_group_by_schema: plan.before_group_by_schema, - limit: plan.limit, group_by_display: plan.group_by_display, stat_info: plan.stat_info, }), diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index 1ecf01b6b0f8..001d1bafa83a 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -23,10 +23,13 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::HashMethodKind; use databend_common_expression::HashTableConfig; +use databend_common_expression::LimitType; +use databend_common_expression::SortColumnDescription; use databend_common_functions::aggregates::AggregateFunctionFactory; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_core::query_spill_prefix; use databend_common_pipeline_transforms::processors::TransformPipelineHelper; +use databend_common_pipeline_transforms::processors::TransformSortPartial; use databend_common_sql::executor::physical_plans::AggregateExpand; use databend_common_sql::executor::physical_plans::AggregateFinal; use databend_common_sql::executor::physical_plans::AggregateFunctionDesc; @@ -111,7 +114,6 @@ impl PipelineBuilder { enable_experimental_aggregate_hashtable, self.is_exchange_neighbor, max_block_size as usize, - None, max_spill_io_requests as usize, )?; @@ -125,7 +127,7 @@ impl PipelineBuilder { let group_cols = ¶ms.group_columns; let schema_before_group_by = params.input_schema.clone(); - let sample_block = DataBlock::empty_with_schema(schema_before_group_by); + let sample_block = DataBlock::empty_with_schema(schema_before_group_by.clone()); let method = DataBlock::choose_hash_method(&sample_block, group_cols, efficiently_memory)?; // Need a global atomic to read the max current radix bits hint @@ -136,6 +138,28 @@ impl PipelineBuilder { .cluster_with_partial(true, self.ctx.get_cluster().nodes.len()) }; + // For rank limit, we can filter data using sort with rank before partial + if let Some(rank_limit) = &aggregate.rank_limit { + let sort_desc = rank_limit + .0 + .iter() + .map(|desc| { + let offset = schema_before_group_by.index_of(&desc.order_by.to_string())?; + Ok(SortColumnDescription { + offset, + asc: desc.asc, + nulls_first: desc.nulls_first, + is_nullable: schema_before_group_by.field(offset).is_nullable(), // This information is not needed here. + }) + }) + .collect::>>()?; + let sort_desc = Arc::new(sort_desc); + + self.main_pipeline.add_transformer(|| { + TransformSortPartial::new(LimitType::LimitRank(rank_limit.1), sort_desc.clone()) + }); + } + self.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create( match params.aggregate_functions.is_empty() { @@ -225,7 +249,6 @@ impl PipelineBuilder { enable_experimental_aggregate_hashtable, self.is_exchange_neighbor, max_block_size as usize, - aggregate.limit, max_spill_io_requests as usize, )?; @@ -292,7 +315,6 @@ impl PipelineBuilder { enable_experimental_aggregate_hashtable: bool, cluster_aggregator: bool, max_block_size: usize, - limit: Option, max_spill_io_requests: usize, ) -> Result> { let mut agg_args = Vec::with_capacity(agg_funcs.len()); @@ -335,7 +357,6 @@ impl PipelineBuilder { enable_experimental_aggregate_hashtable, cluster_aggregator, max_block_size, - limit, max_spill_io_requests, )?; diff --git a/src/query/service/src/pipelines/builders/builder_insert_multi_table.rs b/src/query/service/src/pipelines/builders/builder_insert_multi_table.rs index 11d3d0378c97..a5164b49cb3a 100644 --- a/src/query/service/src/pipelines/builders/builder_insert_multi_table.rs +++ b/src/query/service/src/pipelines/builders/builder_insert_multi_table.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use databend_common_catalog::catalog::CatalogManager; use databend_common_exception::Result; use databend_common_expression::DataSchema; +use databend_common_expression::LimitType; use databend_common_expression::SortColumnDescription; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_core::DynTransformBuilder; @@ -230,7 +231,7 @@ impl PipelineBuilder { Ok(ProcessorPtr::create(TransformSortPartial::try_create( transform_input_port, transform_output_port, - None, + LimitType::None, sort_desc.clone(), )?)) }, diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index 5ae5aaa1b345..6c933b165b0b 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::DataSchemaRef; +use databend_common_expression::LimitType; use databend_common_expression::SortColumnDescription; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_core::query_spill_prefix; @@ -197,7 +198,12 @@ impl SortPipelineBuilder { pub fn build_full_sort_pipeline(self, pipeline: &mut Pipeline) -> Result<()> { // Partial sort - pipeline.add_transformer(|| TransformSortPartial::new(self.limit, self.sort_desc.clone())); + pipeline.add_transformer(|| { + TransformSortPartial::new( + LimitType::from_limit_rows(self.limit), + self.sort_desc.clone(), + ) + }); self.build_merge_sort_pipeline(pipeline, false) } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs index 9c1466184a77..f1dfb320c3d0 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs @@ -44,8 +44,6 @@ pub struct AggregatorParams { pub enable_experimental_aggregate_hashtable: bool, pub cluster_aggregator: bool, pub max_block_size: usize, - // Limit is push down to AggregatorTransform - pub limit: Option, pub max_spill_io_requests: usize, } @@ -59,7 +57,6 @@ impl AggregatorParams { enable_experimental_aggregate_hashtable: bool, cluster_aggregator: bool, max_block_size: usize, - limit: Option, max_spill_io_requests: usize, ) -> Result> { let mut states_offsets: Vec = Vec::with_capacity(agg_funcs.len()); @@ -80,7 +77,6 @@ impl AggregatorParams { enable_experimental_aggregate_hashtable, cluster_aggregator, max_block_size, - limit, max_spill_io_requests, })) } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index 482bca8238f2..23598d0515c5 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -46,7 +46,6 @@ pub struct TransformFinalAggregate { method: Method, params: Arc, flush_state: PayloadFlushState, - reach_limit: bool, } impl TransformFinalAggregate { @@ -63,7 +62,6 @@ impl TransformFinalAggregate { method, params, flush_state: PayloadFlushState::default(), - reach_limit: false, }, )) } @@ -124,23 +122,11 @@ impl TransformFinalAggregate { let mut blocks = vec![]; self.flush_state.clear(); - let mut rows = 0; loop { if ht.merge_result(&mut self.flush_state)? { let mut cols = self.flush_state.take_aggregate_results(); cols.extend_from_slice(&self.flush_state.take_group_columns()); - rows += cols[0].len(); blocks.push(DataBlock::new_from_columns(cols)); - - if rows >= self.params.limit.unwrap_or(usize::MAX) { - log::info!( - "reach limit optimization in flush agg hashtable, current {}, total {}", - rows, - ht.len(), - ); - self.reach_limit = true; - break; - } } else { break; } @@ -162,10 +148,6 @@ where Method: HashMethodBounds const NAME: &'static str = "TransformFinalAggregate"; fn transform(&mut self, meta: AggregateMeta) -> Result> { - if self.reach_limit { - return Ok(vec![self.params.empty_result_block()]); - } - if self.params.enable_experimental_aggregate_hashtable { return Ok(vec![self.transform_agg_hashtable(meta)?]); } @@ -196,18 +178,8 @@ where Method: HashMethodBounds let (len, _) = keys_iter.size_hint(); let mut places = Vec::with_capacity(len); - let mut current_len = hash_cell.hashtable.len(); unsafe { for key in keys_iter { - if self.reach_limit { - let entry = hash_cell.hashtable.entry(key); - if let Some(entry) = entry { - let place = Into::::into(*entry.get()); - places.push(place); - } - continue; - } - match hash_cell.hashtable.insert_and_entry(key) { Ok(mut entry) => { let place = @@ -215,13 +187,6 @@ where Method: HashMethodBounds places.push(place); *entry.get_mut() = place.addr(); - - if let Some(limit) = self.params.limit { - current_len += 1; - if current_len >= limit { - self.reach_limit = true; - } - } } Err(entry) => { let place = Into::::into(*entry.get()); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs index 5a21e7d3c4f9..2065d6e7c4bb 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs @@ -40,7 +40,6 @@ pub struct TransformFinalGroupBy { method: Method, params: Arc, flush_state: PayloadFlushState, - reach_limit: bool, } impl TransformFinalGroupBy { @@ -57,7 +56,6 @@ impl TransformFinalGroupBy { method, params, flush_state: PayloadFlushState::default(), - reach_limit: false, }, )) } @@ -118,22 +116,10 @@ impl TransformFinalGroupBy { let mut blocks = vec![]; self.flush_state.clear(); - let mut rows = 0; loop { if ht.merge_result(&mut self.flush_state)? { let cols = self.flush_state.take_group_columns(); - rows += cols[0].len(); blocks.push(DataBlock::new_from_columns(cols)); - - if rows >= self.params.limit.unwrap_or(usize::MAX) { - log::info!( - "reach limit optimization in flush agg hashtable, current {}, total {}", - rows, - ht.len(), - ); - self.reach_limit = true; - break; - } } else { break; } @@ -155,10 +141,6 @@ where Method: HashMethodBounds const NAME: &'static str = "TransformFinalGroupBy"; fn transform(&mut self, meta: AggregateMeta) -> Result> { - if self.reach_limit { - return Ok(vec![self.params.empty_result_block()]); - } - if self.params.enable_experimental_aggregate_hashtable { return Ok(vec![self.transform_agg_hashtable(meta)?]); } @@ -167,7 +149,7 @@ where Method: HashMethodBounds let arena = Arc::new(Bump::new()); let mut hashtable = self.method.create_hash_table::<()>(arena)?; - 'merge_hashtable: for bucket_data in data { + for bucket_data in data { match bucket_data { AggregateMeta::Spilled(_) => unreachable!(), AggregateMeta::BucketSpilled(_) => unreachable!(), @@ -182,13 +164,6 @@ where Method: HashMethodBounds for key in keys_iter.iter() { let _ = hashtable.insert_and_entry(key); } - - if let Some(limit) = self.params.limit { - if hashtable.len() >= limit { - self.reach_limit = true; - break 'merge_hashtable; - } - } } } AggregateMeta::HashTable(payload) => unsafe { @@ -197,12 +172,6 @@ where Method: HashMethodBounds for key in payload.cell.hashtable.iter() { let _ = hashtable.insert_and_entry(key.key()); } - - if let Some(limit) = self.params.limit { - if hashtable.len() >= limit { - break 'merge_hashtable; - } - } }, AggregateMeta::AggregatePayload(_) => unreachable!(), AggregateMeta::AggregateSpilling(_) => unreachable!(), diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index b0dda598d991..3ece5cf2e064 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -1089,6 +1089,10 @@ fn aggregate_partial_to_format_tree( children.extend(items); } + if let Some((_, r)) = &plan.rank_limit { + children.push(FormatTreeNode::new(format!("rank limit: {r}"))); + } + append_profile_info(&mut children, profs, plan.plan_id); children.push(to_format_tree(&plan.input, metadata, profs)?); @@ -1130,11 +1134,6 @@ fn aggregate_final_to_format_tree( FormatTreeNode::new(format!("aggregate functions: [{agg_funcs}]")), ]; - if let Some(limit) = &plan.limit { - let items = FormatTreeNode::new(format!("limit: {limit}")); - children.push(items); - } - if let Some(info) = &plan.stat_info { let items = plan_stats_info_to_format_tree(info); children.extend(items); diff --git a/src/query/sql/src/executor/mod.rs b/src/query/sql/src/executor/mod.rs index 4b3712a1bb65..2b1ae9d2fd28 100644 --- a/src/query/sql/src/executor/mod.rs +++ b/src/query/sql/src/executor/mod.rs @@ -16,7 +16,6 @@ mod explain; mod format; mod physical_plan; mod physical_plan_builder; -mod physical_plan_display; mod physical_plan_visitor; pub mod physical_plans; mod util; diff --git a/src/query/sql/src/executor/physical_plan_display.rs b/src/query/sql/src/executor/physical_plan_display.rs deleted file mode 100644 index 953f3aa45725..000000000000 --- a/src/query/sql/src/executor/physical_plan_display.rs +++ /dev/null @@ -1,589 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fmt::Display; -use std::fmt::Formatter; - -use databend_common_functions::BUILTIN_FUNCTIONS; -use itertools::Itertools; - -use crate::executor::physical_plan::PhysicalPlan; -use crate::executor::physical_plans::AddStreamColumn; -use crate::executor::physical_plans::AggregateExpand; -use crate::executor::physical_plans::AggregateFinal; -use crate::executor::physical_plans::AggregatePartial; -use crate::executor::physical_plans::AsyncFunction; -use crate::executor::physical_plans::CacheScan; -use crate::executor::physical_plans::ColumnMutation; -use crate::executor::physical_plans::CommitSink; -use crate::executor::physical_plans::CompactSource; -use crate::executor::physical_plans::ConstantTableScan; -use crate::executor::physical_plans::CopyIntoLocation; -use crate::executor::physical_plans::CopyIntoTable; -use crate::executor::physical_plans::CteScan; -use crate::executor::physical_plans::DistributedInsertSelect; -use crate::executor::physical_plans::EvalScalar; -use crate::executor::physical_plans::Exchange; -use crate::executor::physical_plans::ExchangeSink; -use crate::executor::physical_plans::ExchangeSource; -use crate::executor::physical_plans::ExpressionScan; -use crate::executor::physical_plans::Filter; -use crate::executor::physical_plans::HashJoin; -use crate::executor::physical_plans::Limit; -use crate::executor::physical_plans::MaterializedCte; -use crate::executor::physical_plans::Mutation; -use crate::executor::physical_plans::MutationManipulate; -use crate::executor::physical_plans::MutationOrganize; -use crate::executor::physical_plans::MutationSource; -use crate::executor::physical_plans::MutationSplit; -use crate::executor::physical_plans::ProjectSet; -use crate::executor::physical_plans::RangeJoin; -use crate::executor::physical_plans::Recluster; -use crate::executor::physical_plans::ReplaceAsyncSourcer; -use crate::executor::physical_plans::ReplaceDeduplicate; -use crate::executor::physical_plans::ReplaceInto; -use crate::executor::physical_plans::RowFetch; -use crate::executor::physical_plans::Sort; -use crate::executor::physical_plans::TableScan; -use crate::executor::physical_plans::Udf; -use crate::executor::physical_plans::UnionAll; -use crate::executor::physical_plans::Window; -use crate::executor::physical_plans::WindowPartition; -use crate::plans::CacheSource; -use crate::plans::JoinType; - -impl PhysicalPlan { - pub fn format_indent(&self, indent: usize) -> impl std::fmt::Display + '_ { - PhysicalPlanIndentFormatDisplay { indent, node: self } - } -} - -pub struct PhysicalPlanIndentFormatDisplay<'a> { - indent: usize, - node: &'a PhysicalPlan, -} - -impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}", " ".repeat(self.indent))?; - - match self.node { - PhysicalPlan::TableScan(scan) => write!(f, "{}", scan)?, - PhysicalPlan::Filter(filter) => write!(f, "{}", filter)?, - PhysicalPlan::EvalScalar(eval_scalar) => write!(f, "{}", eval_scalar)?, - PhysicalPlan::AggregateExpand(aggregate) => write!(f, "{}", aggregate)?, - PhysicalPlan::AggregatePartial(aggregate) => write!(f, "{}", aggregate)?, - PhysicalPlan::AggregateFinal(aggregate) => write!(f, "{}", aggregate)?, - PhysicalPlan::Window(window) => write!(f, "{}", window)?, - PhysicalPlan::WindowPartition(window_partition) => write!(f, "{}", window_partition)?, - PhysicalPlan::Sort(sort) => write!(f, "{}", sort)?, - PhysicalPlan::Limit(limit) => write!(f, "{}", limit)?, - PhysicalPlan::RowFetch(row_fetch) => write!(f, "{}", row_fetch)?, - PhysicalPlan::HashJoin(join) => write!(f, "{}", join)?, - PhysicalPlan::Exchange(exchange) => write!(f, "{}", exchange)?, - PhysicalPlan::ExchangeSource(source) => write!(f, "{}", source)?, - PhysicalPlan::ExchangeSink(sink) => write!(f, "{}", sink)?, - PhysicalPlan::UnionAll(union_all) => write!(f, "{}", union_all)?, - PhysicalPlan::DistributedInsertSelect(insert_select) => write!(f, "{}", insert_select)?, - PhysicalPlan::CompactSource(compact) => write!(f, "{}", compact)?, - PhysicalPlan::CommitSink(commit) => write!(f, "{}", commit)?, - PhysicalPlan::ProjectSet(unnest) => write!(f, "{}", unnest)?, - PhysicalPlan::RangeJoin(plan) => write!(f, "{}", plan)?, - PhysicalPlan::CopyIntoTable(copy_into_table) => write!(f, "{}", copy_into_table)?, - PhysicalPlan::CopyIntoLocation(copy_into_location) => { - write!(f, "{}", copy_into_location)? - } - PhysicalPlan::MutationSource(mutation_source) => write!(f, "{}", mutation_source)?, - PhysicalPlan::ReplaceAsyncSourcer(async_sourcer) => write!(f, "{}", async_sourcer)?, - PhysicalPlan::ReplaceDeduplicate(deduplicate) => write!(f, "{}", deduplicate)?, - PhysicalPlan::ReplaceInto(replace) => write!(f, "{}", replace)?, - PhysicalPlan::ColumnMutation(column_mutation) => write!(f, "{}", column_mutation)?, - PhysicalPlan::Mutation(merge_into) => write!(f, "{}", merge_into)?, - PhysicalPlan::MutationSplit(merge_into_split) => write!(f, "{}", merge_into_split)?, - PhysicalPlan::MutationManipulate(merge_into_manipulate) => { - write!(f, "{}", merge_into_manipulate)? - } - PhysicalPlan::MutationOrganize(merge_into_organize) => { - write!(f, "{}", merge_into_organize)? - } - PhysicalPlan::AddStreamColumn(add_stream_column) => write!(f, "{}", add_stream_column)?, - PhysicalPlan::CteScan(cte_scan) => write!(f, "{}", cte_scan)?, - PhysicalPlan::RecursiveCteScan(recursive_cte_scan) => { - write!(f, "{}", recursive_cte_scan)? - } - PhysicalPlan::MaterializedCte(plan) => write!(f, "{}", plan)?, - PhysicalPlan::ConstantTableScan(scan) => write!(f, "{}", scan)?, - PhysicalPlan::ExpressionScan(scan) => write!(f, "{}", scan)?, - PhysicalPlan::CacheScan(scan) => write!(f, "{}", scan)?, - PhysicalPlan::Recluster(plan) => write!(f, "{}", plan)?, - PhysicalPlan::Udf(udf) => write!(f, "{}", udf)?, - PhysicalPlan::Duplicate(_) => "Duplicate".fmt(f)?, - PhysicalPlan::Shuffle(_) => "Shuffle".fmt(f)?, - PhysicalPlan::ChunkFilter(_) => "ChunkFilter".fmt(f)?, - PhysicalPlan::ChunkEvalScalar(_) => "ChunkEvalScalar".fmt(f)?, - PhysicalPlan::ChunkCastSchema(_) => "ChunkCastSchema".fmt(f)?, - PhysicalPlan::ChunkFillAndReorder(_) => "ChunkFillAndReorder".fmt(f)?, - PhysicalPlan::ChunkAppendData(_) => "ChunkAppendData".fmt(f)?, - PhysicalPlan::ChunkMerge(_) => "ChunkMerge".fmt(f)?, - PhysicalPlan::ChunkCommitInsert(_) => "ChunkCommitInsert".fmt(f)?, - PhysicalPlan::AsyncFunction(_) => "AsyncFunction".fmt(f)?, - } - - for node in self.node.children() { - writeln!(f)?; - write!(f, "{}", node.format_indent(self.indent + 1))?; - } - - Ok(()) - } -} - -impl Display for TableScan { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "TableScan: [{}]", self.source.source_info.desc()) - } -} - -impl Display for CteScan { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "CteScan: [{}]", self.cte_idx.0) - } -} - -impl Display for MaterializedCte { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "MaterializedCte") - } -} - -impl Display for ConstantTableScan { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let columns = self - .values - .iter() - .enumerate() - .map(|(i, value)| { - let column = value.iter().map(|val| format!("{val}")).join(", "); - format!("column {}: [{}]", i, column) - }) - .collect::>(); - - write!(f, "{}: {}", self.name(), columns.join(", ")) - } -} - -impl Display for ExpressionScan { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let columns = self - .values - .iter() - .enumerate() - .map(|(i, value)| { - let column = value - .iter() - .map(|val| val.as_expr(&BUILTIN_FUNCTIONS).sql_display()) - .join(", "); - format!("column {}: [{}]", i, column) - }) - .collect::>(); - - write!(f, "ExpressionScan: {}", columns.join(", ")) - } -} - -impl Display for CacheScan { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match &self.cache_source { - CacheSource::HashJoinBuild((cache_index, column_indexes)) => { - write!( - f, - "CacheScan: [cache_index: {}, column_indexes: {:?}]", - cache_index, column_indexes - ) - } - } - } -} - -impl Display for Filter { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let predicates = self - .predicates - .iter() - .map(|pred| pred.as_expr(&BUILTIN_FUNCTIONS).sql_display()) - .join(", "); - - write!(f, "Filter: [{predicates}]") - } -} - -impl Display for Sort { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let scalars = self - .order_by - .iter() - .map(|item| { - format!( - "{} {}", - item.order_by, - if item.asc { "ASC" } else { "DESC" } - ) - }) - .collect::>(); - let limit = self.limit.as_ref().cloned().unwrap_or(0); - write!(f, "Sort: [{}], Limit: [{}]", scalars.join(", "), limit) - } -} - -impl Display for EvalScalar { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let scalars = self - .exprs - .iter() - .map(|(expr, _)| expr.as_expr(&BUILTIN_FUNCTIONS).to_string()) - .collect::>(); - - write!(f, "EvalScalar: [{}]", scalars.join(", ")) - } -} - -impl Display for AggregateExpand { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let sets = self - .grouping_sets - .sets - .iter() - .map(|set| { - set.iter() - .map(|index| index.to_string()) - .collect::>() - .join(", ") - }) - .map(|s| format!("[{}]", s)) - .collect::>() - .join(", "); - write!(f, "Aggregate(Expand): grouping sets: [{}]", sets) - } -} - -impl Display for AggregateFinal { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let group_items = self - .group_by - .iter() - .map(|v| v.to_string()) - .collect::>() - .join(", "); - - let agg_funcs = self - .agg_funcs - .iter() - .map(|item| { - format!( - "{}({})", - item.sig.name, - item.arg_indices - .iter() - .map(|index| index.to_string()) - .collect::>() - .join(", ") - ) - }) - .join(", "); - - write!( - f, - "Aggregate(Final): group items: [{}], aggregate functions: [{}]", - group_items, agg_funcs - ) - } -} - -impl Display for AggregatePartial { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let group_items = self - .group_by - .iter() - .map(|v| v.to_string()) - .collect::>() - .join(", "); - - let agg_funcs = self - .agg_funcs - .iter() - .map(|item| { - format!( - "{}({})", - item.sig.name, - item.arg_indices - .iter() - .map(|index| index.to_string()) - .collect::>() - .join(", ") - ) - }) - .join(", "); - - write!( - f, - "Aggregate(Partial): group items: [{}], aggregate functions: [{}]", - group_items, agg_funcs - ) - } -} - -impl Display for Window { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let window_id = self.plan_id; - write!(f, "Window: [{}]", window_id) - } -} - -impl Display for WindowPartition { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let window_partition_id = self.plan_id; - write!(f, "WindowPartition: [{}]", window_partition_id) - } -} - -impl Display for Limit { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let limit = self.limit.as_ref().cloned().unwrap_or(0); - write!(f, "Limit: [{}], Offset: [{}]", limit, self.offset) - } -} - -impl Display for RowFetch { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "RowFetch: [{:?}]", self.cols_to_fetch) - } -} - -impl Display for HashJoin { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - match self.join_type { - JoinType::Cross => { - write!(f, "CrossJoin") - } - _ => { - let build_keys = self - .build_keys - .iter() - .map(|scalar| scalar.as_expr(&BUILTIN_FUNCTIONS).sql_display()) - .collect::>() - .join(", "); - - let probe_keys = self - .probe_keys - .iter() - .map(|scalar| scalar.as_expr(&BUILTIN_FUNCTIONS).sql_display()) - .collect::>() - .join(", "); - - let join_filters = self - .non_equi_conditions - .iter() - .map(|scalar| scalar.as_expr(&BUILTIN_FUNCTIONS).sql_display()) - .collect::>() - .join(", "); - - write!( - f, - "HashJoin: {}, build keys: [{}], probe keys: [{}], join filters: [{}]", - &self.join_type, build_keys, probe_keys, join_filters, - ) - } - } - } -} - -impl Display for RangeJoin { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "IEJoin: {}", &self.join_type) - } -} - -impl Display for Exchange { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let keys = self - .keys - .iter() - .map(|key| key.as_expr(&BUILTIN_FUNCTIONS).sql_display()) - .join(", "); - - write!(f, "Exchange: [kind: {:?}, keys: {}]", self.kind, keys) - } -} - -impl Display for ExchangeSource { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!( - f, - "Exchange Source: fragment id: [{:?}]", - self.source_fragment_id - ) - } -} - -impl Display for ExchangeSink { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!( - f, - "Exchange Sink: fragment id: [{:?}]", - self.destination_fragment_id - ) - } -} - -impl Display for UnionAll { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "UnionAll") - } -} - -impl Display for DistributedInsertSelect { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "DistributedInsertSelect") - } -} - -impl Display for CompactSource { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "CompactSource") - } -} - -impl Display for CommitSink { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "CommitSink") - } -} -impl Display for CopyIntoTable { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "CopyIntoTable") - } -} - -impl Display for CopyIntoLocation { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "CopyIntoLocation") - } -} - -impl Display for ProjectSet { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let scalars = self - .srf_exprs - .iter() - .map(|(expr, _)| expr.as_expr(&BUILTIN_FUNCTIONS).to_string()) - .collect::>(); - - write!( - f, - "ProjectSet: set-returning functions : {}", - scalars.join(", ") - ) - } -} - -impl Display for ReplaceAsyncSourcer { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "AsyncSourcer") - } -} - -impl Display for ReplaceDeduplicate { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "Deduplicate") - } -} - -impl Display for ReplaceInto { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "Replace") - } -} - -impl Display for MutationSource { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "MutationSource") - } -} - -impl Display for ColumnMutation { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "ColumnMutation") - } -} - -impl Display for Mutation { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "Mutation") - } -} - -impl Display for MutationSplit { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "MutationSplit") - } -} - -impl Display for MutationManipulate { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "MutationManipulate") - } -} - -impl Display for MutationOrganize { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "MutationOrganize") - } -} - -impl Display for AddStreamColumn { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "AddStreamColumn") - } -} - -impl Display for Recluster { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "Recluster") - } -} - -impl Display for Udf { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let scalars = self - .udf_funcs - .iter() - .map(|func| { - let arg_exprs = func.arg_exprs.join(", "); - format!("{}({})", func.func_name, arg_exprs) - }) - .collect::>(); - write!(f, "Udf functions: {}", scalars.join(", ")) - } -} - -impl Display for AsyncFunction { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let scalars = self - .async_func_descs - .iter() - .map(|func| func.display_name.clone()) - .collect::>(); - write!(f, "Async functions: {}", scalars.join(", ")) - } -} diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 354c275cb95a..6dcab582c7f3 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -200,6 +200,7 @@ pub trait PhysicalPlanReplacer { group_by_display: plan.group_by_display.clone(), agg_funcs: plan.agg_funcs.clone(), stat_info: plan.stat_info.clone(), + rank_limit: plan.rank_limit.clone(), })) } @@ -214,7 +215,6 @@ pub trait PhysicalPlanReplacer { agg_funcs: plan.agg_funcs.clone(), group_by_display: plan.group_by_display.clone(), stat_info: plan.stat_info.clone(), - limit: plan.limit, })) } diff --git a/src/query/sql/src/executor/physical_plans/physical_aggregate_final.rs b/src/query/sql/src/executor/physical_plans/physical_aggregate_final.rs index 0e8d2e5aa891..dc8286f9c4c7 100644 --- a/src/query/sql/src/executor/physical_plans/physical_aggregate_final.rs +++ b/src/query/sql/src/executor/physical_plans/physical_aggregate_final.rs @@ -22,6 +22,7 @@ use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; use databend_common_expression::RemoteExpr; +use super::SortDesc; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plans::AggregateExpand; use crate::executor::physical_plans::AggregateFunctionDesc; @@ -45,8 +46,6 @@ pub struct AggregateFinal { pub group_by: Vec, pub agg_funcs: Vec, pub before_group_by_schema: DataSchemaRef, - pub limit: Option, - pub group_by_display: Vec, // Only used for explain @@ -105,7 +104,7 @@ impl PhysicalPlanBuilder { aggregate_functions: used, from_distinct: agg.from_distinct, mode: agg.mode, - limit: agg.limit, + rank_limit: agg.rank_limit.clone(), grouping_sets: agg.grouping_sets.clone(), }; @@ -177,6 +176,19 @@ impl PhysicalPlanBuilder { } } + let rank_limit = agg.rank_limit.map(|(item, limit)| { + let desc = item + .iter() + .map(|v| SortDesc { + asc: v.asc, + nulls_first: v.nulls_first, + order_by: v.index, + display_name: self.metadata.read().column(v.index).name(), + }) + .collect::>(); + (desc, limit) + }); + match input { PhysicalPlan::Exchange(Exchange { input, kind, .. }) if group_by_shuffle_mode == "before_merge" => @@ -197,6 +209,7 @@ impl PhysicalPlanBuilder { group_by_display, group_by: group_items, stat_info: Some(stat_info), + rank_limit: None, } } else { AggregatePartial { @@ -207,6 +220,7 @@ impl PhysicalPlanBuilder { group_by_display, group_by: group_items, stat_info: Some(stat_info), + rank_limit, } }; @@ -275,6 +289,7 @@ impl PhysicalPlanBuilder { group_by: group_items, input: Box::new(PhysicalPlan::AggregateExpand(expand)), stat_info: Some(stat_info), + rank_limit: None, }) } else { PhysicalPlan::AggregatePartial(AggregatePartial { @@ -285,6 +300,7 @@ impl PhysicalPlanBuilder { group_by: group_items, input: Box::new(input), stat_info: Some(stat_info), + rank_limit, }) } } @@ -358,7 +374,7 @@ impl PhysicalPlanBuilder { match input { PhysicalPlan::AggregatePartial(ref partial) => { let before_group_by_schema = partial.input.output_schema()?; - let limit = agg.limit; + PhysicalPlan::AggregateFinal(AggregateFinal { plan_id: 0, group_by_display: partial.group_by_display.clone(), @@ -368,7 +384,6 @@ impl PhysicalPlanBuilder { before_group_by_schema, stat_info: Some(stat_info), - limit, }) } @@ -377,7 +392,6 @@ impl PhysicalPlanBuilder { .. }) => { let before_group_by_schema = partial.input.output_schema()?; - let limit = agg.limit; PhysicalPlan::AggregateFinal(AggregateFinal { plan_id: 0, @@ -388,7 +402,6 @@ impl PhysicalPlanBuilder { before_group_by_schema, stat_info: Some(stat_info), - limit, }) } diff --git a/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs b/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs index e6e1dca9cd4e..b47f9f00d031 100644 --- a/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs +++ b/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs @@ -20,6 +20,7 @@ use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; +use super::SortDesc; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plans::common::AggregateFunctionDesc; use crate::executor::PhysicalPlan; @@ -35,6 +36,8 @@ pub struct AggregatePartial { pub enable_experimental_aggregate_hashtable: bool, pub group_by_display: Vec, + // Order by keys if keys are subset of group by key, then we can use rank to filter data in previous + pub rank_limit: Option<(Vec, usize)>, // Only used for explain pub stat_info: Option, } diff --git a/src/query/sql/src/planner/binder/aggregate.rs b/src/query/sql/src/planner/binder/aggregate.rs index 2c5e7be098d4..71d715dcfa68 100644 --- a/src/query/sql/src/planner/binder/aggregate.rs +++ b/src/query/sql/src/planner/binder/aggregate.rs @@ -471,7 +471,8 @@ impl Binder { group_items: agg_info.group_items.clone(), aggregate_functions: agg_info.aggregate_functions.clone(), from_distinct: false, - limit: None, + rank_limit: None, + grouping_sets: agg_info.grouping_sets.as_ref().map(|g| GroupingSets { grouping_id_index: g.grouping_id_column.index, sets: g.sets.clone(), diff --git a/src/query/sql/src/planner/binder/bind_query/bind_value.rs b/src/query/sql/src/planner/binder/bind_query/bind_value.rs index 70b49d7de73f..08aee00a7cb5 100644 --- a/src/query/sql/src/planner/binder/bind_query/bind_value.rs +++ b/src/query/sql/src/planner/binder/bind_query/bind_value.rs @@ -346,10 +346,7 @@ impl Binder { Aggregate { mode: AggregateMode::Initial, group_items, - aggregate_functions: vec![], - from_distinct: false, - limit: None, - grouping_sets: None, + ..Default::default() } .into(), ), diff --git a/src/query/sql/src/planner/binder/distinct.rs b/src/query/sql/src/planner/binder/distinct.rs index 82d829f63759..621bc1ea0ca6 100644 --- a/src/query/sql/src/planner/binder/distinct.rs +++ b/src/query/sql/src/planner/binder/distinct.rs @@ -83,10 +83,8 @@ impl Binder { let distinct_plan = Aggregate { mode: AggregateMode::Initial, group_items, - aggregate_functions: vec![], from_distinct: true, - limit: None, - grouping_sets: None, + ..Default::default() }; Ok(SExpr::create_unary( diff --git a/src/query/sql/src/planner/optimizer/aggregate/normalize_aggregate.rs b/src/query/sql/src/planner/optimizer/aggregate/normalize_aggregate.rs index 96514c2e5e89..bad2a93f5dd6 100644 --- a/src/query/sql/src/planner/optimizer/aggregate/normalize_aggregate.rs +++ b/src/query/sql/src/planner/optimizer/aggregate/normalize_aggregate.rs @@ -119,7 +119,7 @@ impl RuleNormalizeAggregateOptimizer { group_items: aggregate.group_items, aggregate_functions: new_aggregate_functions, from_distinct: aggregate.from_distinct, - limit: aggregate.limit, + rank_limit: aggregate.rank_limit, grouping_sets: aggregate.grouping_sets, }; diff --git a/src/query/sql/src/planner/optimizer/decorrelate/flatten_plan.rs b/src/query/sql/src/planner/optimizer/decorrelate/flatten_plan.rs index 74bd88267650..ac7743020a0f 100644 --- a/src/query/sql/src/planner/optimizer/decorrelate/flatten_plan.rs +++ b/src/query/sql/src/planner/optimizer/decorrelate/flatten_plan.rs @@ -148,10 +148,7 @@ impl SubqueryRewriter { Aggregate { mode: AggregateMode::Initial, group_items, - aggregate_functions: vec![], - from_distinct: false, - limit: None, - grouping_sets: None, + ..Default::default() } .into(), ), @@ -623,7 +620,7 @@ impl SubqueryRewriter { group_items, aggregate_functions: agg_items, from_distinct: aggregate.from_distinct, - limit: aggregate.limit, + rank_limit: aggregate.rank_limit.clone(), grouping_sets: aggregate.grouping_sets.clone(), } .into(), diff --git a/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs b/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs index 90d1b16f50f3..26edfa3624f2 100644 --- a/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs +++ b/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs @@ -459,10 +459,7 @@ impl SubqueryRewriter { .into(), index: agg_func_index, }], - from_distinct: false, - mode: AggregateMode::Initial, - limit: None, - grouping_sets: None, + ..Default::default() }; let compare = FunctionCall { @@ -692,9 +689,7 @@ impl SubqueryRewriter { index: any_idx, }, ], - from_distinct: false, - limit: None, - grouping_sets: None, + ..Default::default() } .into(), ), diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs index 262bf06900f2..94cf8dc7009d 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs @@ -127,8 +127,6 @@ fn create_count_aggregate(mode: AggregateMode) -> Aggregate { }), index: 0, }], - from_distinct: false, - limit: None, - grouping_sets: None, + ..Default::default() } } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 625ce0bd0dd2..b8bbca939a4f 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -377,7 +377,6 @@ pub async fn optimize_query(opt_ctx: &mut OptimizerContext, mut s_expr: SExpr) - // After join reorder, Convert some single join to inner join. s_expr = SingleToInnerOptimizer::new().run(&s_expr)?; - // Deduplicate join conditions. s_expr = DeduplicateJoinConditionOptimizer::new().run(&s_expr)?; diff --git a/src/query/sql/src/planner/optimizer/rule/factory.rs b/src/query/sql/src/planner/optimizer/rule/factory.rs index e3b322b39e17..a71e52f8c281 100644 --- a/src/query/sql/src/planner/optimizer/rule/factory.rs +++ b/src/query/sql/src/planner/optimizer/rule/factory.rs @@ -22,9 +22,9 @@ use super::rewrite::RulePushDownFilterAggregate; use super::rewrite::RulePushDownFilterEvalScalar; use super::rewrite::RulePushDownFilterJoin; use super::rewrite::RulePushDownFilterWindow; -use super::rewrite::RulePushDownLimitAggregate; use super::rewrite::RulePushDownLimitEvalScalar; use super::rewrite::RulePushDownPrewhere; +use super::rewrite::RulePushDownRankLimitAggregate; use super::rewrite::RulePushDownSortEvalScalar; use super::rewrite::RuleTryApplyAggIndex; use crate::optimizer::rule::rewrite::RuleEliminateFilter; @@ -80,7 +80,9 @@ impl RuleFactory { RuleID::PushDownLimitWindow => { Ok(Box::new(RulePushDownLimitWindow::new(MAX_PUSH_DOWN_LIMIT))) } - RuleID::PushDownLimitAggregate => Ok(Box::new(RulePushDownLimitAggregate::new())), + RuleID::RulePushDownRankLimitAggregate => { + Ok(Box::new(RulePushDownRankLimitAggregate::new())) + } RuleID::PushDownFilterAggregate => Ok(Box::new(RulePushDownFilterAggregate::new())), RuleID::PushDownFilterWindow => Ok(Box::new(RulePushDownFilterWindow::new())), RuleID::EliminateFilter => Ok(Box::new(RuleEliminateFilter::new(metadata))), diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/mod.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/mod.rs index 1e55479e3a93..bd7dede200d7 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/mod.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/mod.rs @@ -63,7 +63,7 @@ pub use rule_push_down_filter_sort::RulePushDownFilterSort; pub use rule_push_down_filter_union::RulePushDownFilterUnion; pub use rule_push_down_filter_window::RulePushDownFilterWindow; pub use rule_push_down_limit::RulePushDownLimit; -pub use rule_push_down_limit_aggregate::RulePushDownLimitAggregate; +pub use rule_push_down_limit_aggregate::RulePushDownRankLimitAggregate; pub use rule_push_down_limit_expression::RulePushDownLimitEvalScalar; pub use rule_push_down_limit_join::RulePushDownLimitOuterJoin; pub use rule_push_down_limit_scan::RulePushDownLimitScan; diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs index d73fc600f3de..12e74010eb41 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp; use std::sync::Arc; use crate::optimizer::extract::Matcher; @@ -21,47 +20,75 @@ use crate::optimizer::rule::TransformResult; use crate::optimizer::RuleID; use crate::optimizer::SExpr; use crate::plans::Aggregate; +use crate::plans::AggregateMode; use crate::plans::Limit; +use crate::plans::Operator; use crate::plans::RelOp; use crate::plans::RelOperator; +use crate::plans::Sort; +use crate::plans::SortItem; -/// Input: Limit +/// Input: Limit | Sort /// \ /// Aggregate /// \ /// * /// -/// Output: Limit +/// Output: Limit | Sort /// \ -/// Aggregate(padding limit) +/// Aggregate(padding limit | rank_limit) /// \ /// * -pub struct RulePushDownLimitAggregate { +pub struct RulePushDownRankLimitAggregate { id: RuleID, matchers: Vec, } -impl RulePushDownLimitAggregate { +impl RulePushDownRankLimitAggregate { pub fn new() -> Self { Self { - id: RuleID::PushDownLimitAggregate, - matchers: vec![Matcher::MatchOp { - op_type: RelOp::Limit, - children: vec![Matcher::MatchOp { - op_type: RelOp::Aggregate, - children: vec![Matcher::Leaf], - }], - }], + id: RuleID::RulePushDownRankLimitAggregate, + matchers: vec![ + Matcher::MatchOp { + op_type: RelOp::Limit, + children: vec![Matcher::MatchOp { + op_type: RelOp::Aggregate, + children: vec![Matcher::MatchOp { + op_type: RelOp::Aggregate, + children: vec![Matcher::Leaf], + }], + }], + }, + Matcher::MatchOp { + op_type: RelOp::Sort, + children: vec![Matcher::MatchOp { + op_type: RelOp::Aggregate, + children: vec![Matcher::MatchOp { + op_type: RelOp::Aggregate, + children: vec![Matcher::Leaf], + }], + }], + }, + Matcher::MatchOp { + op_type: RelOp::Sort, + children: vec![Matcher::MatchOp { + op_type: RelOp::EvalScalar, + children: vec![Matcher::MatchOp { + op_type: RelOp::Aggregate, + children: vec![Matcher::MatchOp { + op_type: RelOp::Aggregate, + children: vec![Matcher::Leaf], + }], + }], + }], + }, + ], } } -} - -impl Rule for RulePushDownLimitAggregate { - fn id(&self) -> RuleID { - self.id - } - fn apply( + // There is no order by, so we don't care the order of result. + // To make query works with consistent result and more efficient, we will inject a order by before limit + fn apply_limit( &self, s_expr: &SExpr, state: &mut TransformResult, @@ -69,21 +96,133 @@ impl Rule for RulePushDownLimitAggregate { let limit: Limit = s_expr.plan().clone().try_into()?; if let Some(mut count) = limit.limit { count += limit.offset; - let agg = s_expr.child(0)?; - let mut agg_limit: Aggregate = agg.plan().clone().try_into()?; + let agg_final = s_expr.child(0)?; + let agg_partial = agg_final.child(0)?; + + let mut agg_limit: Aggregate = agg_partial.plan().clone().try_into()?; + + let sort_items = agg_limit + .group_items + .iter() + .map(|g| SortItem { + index: g.index, + asc: true, + nulls_first: false, + }) + .collect::>(); + agg_limit.rank_limit = Some((sort_items.clone(), count)); + + let sort = Sort { + items: sort_items.clone(), + limit: Some(count), + after_exchange: None, + pre_projection: None, + window_partition: vec![], + }; + + let agg_partial = SExpr::create_unary( + Arc::new(RelOperator::Aggregate(agg_limit)), + Arc::new(agg_partial.child(0)?.clone()), + ); + let agg_final = agg_final.replace_children(vec![agg_partial.into()]); + let sort = SExpr::create_unary(Arc::new(RelOperator::Sort(sort)), agg_final.into()); + let mut result = s_expr.replace_children(vec![Arc::new(sort)]); + + result.set_applied_rule(&self.id); + state.add_result(result); + } + Ok(()) + } + + fn apply_sort( + &self, + s_expr: &SExpr, + state: &mut TransformResult, + ) -> databend_common_exception::Result<()> { + let sort: Sort = s_expr.plan().clone().try_into()?; + let mut has_eval_scalar = false; + let agg = match s_expr.child(0)?.plan().rel_op() { + RelOp::Aggregate => s_expr.child(0)?, + RelOp::EvalScalar => { + has_eval_scalar = true; + s_expr.child(0)?.child(0)? + } + _ => return Ok(()), + }; + + let agg_limit_expr = agg.child(0)?; + let mut agg_limit: Aggregate = agg_limit_expr.plan().clone().try_into()?; + + if agg_limit.mode != AggregateMode::Partial { + return Ok(()); + } + + if let Some(limit) = sort.limit { + let is_order_subset = sort + .items + .iter() + .all(|k| agg_limit.group_items.iter().any(|g| g.index == k.index)); - agg_limit.limit = Some(agg_limit.limit.map_or(count, |c| cmp::max(c, count))); - let agg = SExpr::create_unary( + if !is_order_subset { + return Ok(()); + } + let mut sort_items = Vec::with_capacity(agg_limit.group_items.len()); + let mut not_found_sort_items = vec![]; + for i in 0..agg_limit.group_items.len() { + let group_item = &agg_limit.group_items[i]; + if let Some(sort_item) = sort.items.iter().find(|k| k.index == group_item.index) { + sort_items.push(SortItem { + index: group_item.index, + asc: sort_item.asc, + nulls_first: sort_item.nulls_first, + }); + } else { + not_found_sort_items.push(SortItem { + index: group_item.index, + asc: true, + nulls_first: false, + }); + } + } + sort_items.extend(not_found_sort_items); + + agg_limit.rank_limit = Some((sort_items, limit)); + + let agg_partial = SExpr::create_unary( Arc::new(RelOperator::Aggregate(agg_limit)), - Arc::new(agg.child(0)?.clone()), + Arc::new(agg_limit_expr.child(0)?.clone()), ); - let mut result = s_expr.replace_children(vec![Arc::new(agg)]); + let agg = agg.replace_children(vec![Arc::new(agg_partial)]); + let mut result = if has_eval_scalar { + let eval_scalar = s_expr.child(0)?.replace_children(vec![Arc::new(agg)]); + s_expr.replace_children(vec![Arc::new(eval_scalar)]) + } else { + s_expr.replace_children(vec![Arc::new(agg)]) + }; result.set_applied_rule(&self.id); state.add_result(result); } Ok(()) } +} + +impl Rule for RulePushDownRankLimitAggregate { + fn id(&self) -> RuleID { + self.id + } + + fn apply( + &self, + s_expr: &SExpr, + state: &mut TransformResult, + ) -> databend_common_exception::Result<()> { + match s_expr.plan().rel_op() { + RelOp::Limit => self.apply_limit(s_expr, state), + RelOp::Sort | RelOp::EvalScalar => self.apply_sort(s_expr, state), + _ => Ok(()), + } + } fn matchers(&self) -> &[Matcher] { &self.matchers diff --git a/src/query/sql/src/planner/optimizer/rule/rule.rs b/src/query/sql/src/planner/optimizer/rule/rule.rs index fb19eee25c4a..b3b2ea26f2c8 100644 --- a/src/query/sql/src/planner/optimizer/rule/rule.rs +++ b/src/query/sql/src/planner/optimizer/rule/rule.rs @@ -46,7 +46,7 @@ pub static DEFAULT_REWRITE_RULES: LazyLock> = LazyLock::new(|| { RuleID::PushDownLimitEvalScalar, RuleID::PushDownLimitSort, RuleID::PushDownLimitWindow, - RuleID::PushDownLimitAggregate, + RuleID::RulePushDownRankLimitAggregate, RuleID::PushDownLimitOuterJoin, RuleID::PushDownLimitScan, RuleID::SemiToInnerJoin, @@ -93,7 +93,7 @@ pub enum RuleID { PushDownLimitEvalScalar, PushDownLimitSort, PushDownLimitWindow, - PushDownLimitAggregate, + RulePushDownRankLimitAggregate, PushDownLimitScan, PushDownSortEvalScalar, PushDownSortScan, @@ -129,7 +129,7 @@ impl Display for RuleID { RuleID::PushDownLimitOuterJoin => write!(f, "PushDownLimitOuterJoin"), RuleID::PushDownLimitEvalScalar => write!(f, "PushDownLimitEvalScalar"), RuleID::PushDownLimitSort => write!(f, "PushDownLimitSort"), - RuleID::PushDownLimitAggregate => write!(f, "PushDownLimitAggregate"), + RuleID::RulePushDownRankLimitAggregate => write!(f, "RulePushDownRankLimitAggregate"), RuleID::PushDownFilterAggregate => write!(f, "PushDownFilterAggregate"), RuleID::PushDownLimitScan => write!(f, "PushDownLimitScan"), RuleID::PushDownSortScan => write!(f, "PushDownSortScan"), diff --git a/src/query/sql/src/planner/plans/aggregate.rs b/src/query/sql/src/planner/plans/aggregate.rs index cdec39872124..89b623aa19f3 100644 --- a/src/query/sql/src/planner/plans/aggregate.rs +++ b/src/query/sql/src/planner/plans/aggregate.rs @@ -27,6 +27,7 @@ use crate::optimizer::RelationalProperty; use crate::optimizer::RequiredProperty; use crate::optimizer::StatInfo; use crate::optimizer::Statistics; +use crate::plans::sort::SortItem; use crate::plans::Operator; use crate::plans::RelOp; use crate::plans::ScalarItem; @@ -63,10 +64,24 @@ pub struct Aggregate { pub aggregate_functions: Vec, // True if the plan is generated from distinct, else the plan is a normal aggregate; pub from_distinct: bool, - pub limit: Option, + pub rank_limit: Option<(Vec, usize)>, + pub grouping_sets: Option, } +impl Default for Aggregate { + fn default() -> Self { + Self { + mode: AggregateMode::Initial, + group_items: vec![], + aggregate_functions: vec![], + from_distinct: false, + rank_limit: None, + grouping_sets: None, + } + } +} + impl Aggregate { pub fn used_columns(&self) -> Result { let mut used_columns = ColumnSet::new(); diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index 0f5bedf86789..057048e69516 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -4337,10 +4337,7 @@ impl<'a> TypeChecker<'a> { }), index: self.metadata.read().columns().len() - 1, }], - aggregate_functions: vec![], - from_distinct: false, - limit: None, - grouping_sets: None, + ..Default::default() } .into(), ), diff --git a/src/query/storages/fuse/src/operations/append.rs b/src/query/storages/fuse/src/operations/append.rs index b6ce19faf4be..453b77d734ef 100644 --- a/src/query/storages/fuse/src/operations/append.rs +++ b/src/query/storages/fuse/src/operations/append.rs @@ -22,6 +22,7 @@ use databend_common_expression::BlockThresholds; use databend_common_expression::DataField; use databend_common_expression::DataSchema; use databend_common_expression::Expr; +use databend_common_expression::LimitType; use databend_common_expression::SortColumnDescription; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_pipeline_core::Pipeline; @@ -109,7 +110,12 @@ impl FuseTable { let sort_desc = Arc::new(sort_desc); let mut builder = pipeline.try_create_transform_pipeline_builder_with_len( - || Ok(TransformSortPartial::new(None, sort_desc.clone())), + || { + Ok(TransformSortPartial::new( + LimitType::None, + sort_desc.clone(), + )) + }, transform_len, )?; if need_match { @@ -152,7 +158,8 @@ impl FuseTable { }) .collect(); let sort_desc = Arc::new(sort_desc); - pipeline.add_transformer(|| TransformSortPartial::new(None, sort_desc.clone())); + pipeline + .add_transformer(|| TransformSortPartial::new(LimitType::None, sort_desc.clone())); } Ok(cluster_stats_gen) } diff --git a/tests/sqllogictests/suites/mode/cluster/create_table.test b/tests/sqllogictests/suites/mode/cluster/create_table.test index ace5b3c2b23d..71a9d6078291 100644 --- a/tests/sqllogictests/suites/mode/cluster/create_table.test +++ b/tests/sqllogictests/suites/mode/cluster/create_table.test @@ -12,33 +12,36 @@ EvalScalar ├── limit: 3 ├── offset: 0 ├── estimated rows: 3.00 - └── Exchange + └── Sort ├── output columns: [max(number) (#6), numbers.number (#4)] - ├── exchange type: Merge - └── Limit - ├── output columns: [max(number) (#6), numbers.number (#4)] - ├── limit: 3 - ├── offset: 0 - ├── estimated rows: 3.00 - └── AggregateFinal - ├── output columns: [max(number) (#6), numbers.number (#4)] - ├── group by: [number] - ├── aggregate functions: [max(number)] - ├── limit: 3 + ├── sort keys: [number ASC NULLS LAST] + ├── estimated rows: 10000000.00 + └── Exchange + ├── output columns: [max(number) (#6), numbers.number (#4), #_order_col] + ├── exchange type: Merge + └── Sort + ├── output columns: [max(number) (#6), numbers.number (#4), #_order_col] + ├── sort keys: [number ASC NULLS LAST] ├── estimated rows: 10000000.00 - └── Exchange + └── AggregateFinal ├── output columns: [max(number) (#6), numbers.number (#4)] - ├── exchange type: Hash(0) - └── AggregatePartial - ├── group by: [number] - ├── aggregate functions: [max(number)] - ├── estimated rows: 10000000.00 - └── TableScan - ├── table: default.system.numbers - ├── output columns: [number (#4)] - ├── read rows: 10000000 - ├── read size: 76.29 MiB - ├── partitions total: 153 - ├── partitions scanned: 153 - ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 10000000.00 + ├── group by: [number] + ├── aggregate functions: [max(number)] + ├── estimated rows: 10000000.00 + └── Exchange + ├── output columns: [max(number) (#6), numbers.number (#4)] + ├── exchange type: Hash(0) + └── AggregatePartial + ├── group by: [number] + ├── aggregate functions: [max(number)] + ├── estimated rows: 10000000.00 + ├── rank limit: 3 + └── TableScan + ├── table: default.system.numbers + ├── output columns: [number (#4)] + ├── read rows: 10000000 + ├── read size: 76.29 MiB + ├── partitions total: 153 + ├── partitions scanned: 153 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 10000000.00 diff --git a/tests/sqllogictests/suites/mode/standalone/explain/aggregate.test b/tests/sqllogictests/suites/mode/standalone/explain/aggregate.test index 3ed683ff898e..de694e021e65 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/aggregate.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/aggregate.test @@ -451,6 +451,70 @@ EvalScalar ├── push downs: [filters: [], limit: NONE] └── estimated rows: 0.00 +query T +EXPLAIN SELECT referer, isrefresh, count() FROM t GROUP BY referer, isrefresh order by referer, isrefresh desc limit 10; +---- +Limit +├── output columns: [count() (#2), t.referer (#0), t.isrefresh (#1)] +├── limit: 10 +├── offset: 0 +├── estimated rows: 0.00 +└── Sort + ├── output columns: [count() (#2), t.referer (#0), t.isrefresh (#1)] + ├── sort keys: [referer ASC NULLS LAST, isrefresh DESC NULLS LAST] + ├── estimated rows: 0.00 + └── AggregateFinal + ├── output columns: [count() (#2), t.referer (#0), t.isrefresh (#1)] + ├── group by: [referer, isrefresh] + ├── aggregate functions: [count()] + ├── estimated rows: 0.00 + └── AggregatePartial + ├── group by: [referer, isrefresh] + ├── aggregate functions: [count()] + ├── estimated rows: 0.00 + ├── rank limit: 10 + └── TableScan + ├── table: default.default.t + ├── output columns: [referer (#0), isrefresh (#1)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 0.00 + +query T +EXPLAIN SELECT referer, isrefresh, count() FROM t GROUP BY referer, isrefresh limit 3, 10; +---- +Limit +├── output columns: [count() (#2), t.referer (#0), t.isrefresh (#1)] +├── limit: 10 +├── offset: 3 +├── estimated rows: 0.00 +└── Sort + ├── output columns: [count() (#2), t.referer (#0), t.isrefresh (#1)] + ├── sort keys: [referer ASC NULLS LAST, isrefresh ASC NULLS LAST] + ├── estimated rows: 0.00 + └── AggregateFinal + ├── output columns: [count() (#2), t.referer (#0), t.isrefresh (#1)] + ├── group by: [referer, isrefresh] + ├── aggregate functions: [count()] + ├── estimated rows: 0.00 + └── AggregatePartial + ├── group by: [referer, isrefresh] + ├── aggregate functions: [count()] + ├── estimated rows: 0.00 + ├── rank limit: 13 + └── TableScan + ├── table: default.default.t + ├── output columns: [referer (#0), isrefresh (#1)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 0.00 + statement ok DROP TABLE IF EXISTS t; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/limit.test b/tests/sqllogictests/suites/mode/standalone/explain/limit.test index 0b8bc375d636..0fe71b4a5d92 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/limit.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/limit.test @@ -94,6 +94,7 @@ Limit ├── group by: [number] ├── aggregate functions: [] ├── estimated rows: 0.20 + ├── rank limit: 3 └── Filter ├── output columns: [t.number (#0)] ├── filters: [is_true(CAST(t.number (#0) AS UInt64 NULL) = if(true, TRY_CAST(scalar_subquery_4 (#4) AS UInt64 NULL), 0))] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/prune_column.test b/tests/sqllogictests/suites/mode/standalone/explain/prune_column.test index 3fe741eb2724..91b532ed9e7d 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/prune_column.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/prune_column.test @@ -54,6 +54,7 @@ Limit ├── group by: [number, number, number, number] ├── aggregate functions: [] ├── estimated rows: 0.00 + ├── rank limit: 1 └── Filter ├── output columns: [numbers.number (#0)] ├── filters: [numbers.number (#0) > 1] @@ -132,12 +133,12 @@ HashJoin │ ├── output columns: [_count_scalar_subquery (#13), _any_scalar_subquery (#14)] │ ├── group by: [] │ ├── aggregate functions: [count(), any(COUNT(*))] -│ ├── limit: 1 │ ├── estimated rows: 1.00 │ └── AggregatePartial │ ├── group by: [] │ ├── aggregate functions: [count(), any(COUNT(*))] │ ├── estimated rows: 1.00 +│ ├── rank limit: 1 │ └── AggregateFinal │ ├── output columns: [COUNT(*) (#12)] │ ├── group by: [] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/subquery.test b/tests/sqllogictests/suites/mode/standalone/explain/subquery.test index c62b0c4be869..d2d102470f9f 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/subquery.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/subquery.test @@ -179,12 +179,12 @@ HashJoin │ ├── output columns: [_count_scalar_subquery (#2), _any_scalar_subquery (#3)] │ ├── group by: [] │ ├── aggregate functions: [count(), any(number)] -│ ├── limit: 1 │ ├── estimated rows: 1.00 │ └── AggregatePartial │ ├── group by: [] │ ├── aggregate functions: [count(), any(number)] │ ├── estimated rows: 1.00 +│ ├── rank limit: 1 │ └── Filter │ ├── output columns: [numbers.number (#1)] │ ├── filters: [numbers.number (#1) = 0] diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/limit.test b/tests/sqllogictests/suites/mode/standalone/explain_native/limit.test index d4b71378cce6..651e7a5cae7c 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/limit.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/limit.test @@ -94,6 +94,7 @@ Limit ├── group by: [number] ├── aggregate functions: [] ├── estimated rows: 0.20 + ├── rank limit: 3 └── Filter ├── output columns: [t.number (#0)] ├── filters: [is_true(CAST(t.number (#0) AS UInt64 NULL) = if(true, TRY_CAST(scalar_subquery_4 (#4) AS UInt64 NULL), 0))] diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/prune_column.test b/tests/sqllogictests/suites/mode/standalone/explain_native/prune_column.test index 023c7447b009..7766a562e3dc 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/prune_column.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/prune_column.test @@ -54,6 +54,7 @@ Limit ├── group by: [number, number, number, number] ├── aggregate functions: [] ├── estimated rows: 0.00 + ├── rank limit: 1 └── Filter ├── output columns: [numbers.number (#0)] ├── filters: [numbers.number (#0) > 1] @@ -132,12 +133,12 @@ HashJoin │ ├── output columns: [_count_scalar_subquery (#13), _any_scalar_subquery (#14)] │ ├── group by: [] │ ├── aggregate functions: [count(), any(COUNT(*))] -│ ├── limit: 1 │ ├── estimated rows: 1.00 │ └── AggregatePartial │ ├── group by: [] │ ├── aggregate functions: [count(), any(COUNT(*))] │ ├── estimated rows: 1.00 +│ ├── rank limit: 1 │ └── AggregateFinal │ ├── output columns: [COUNT(*) (#12)] │ ├── group by: [] diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/subquery.test b/tests/sqllogictests/suites/mode/standalone/explain_native/subquery.test index 66a0c0a68a6a..ae17ac46c8f6 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/subquery.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/subquery.test @@ -179,12 +179,12 @@ HashJoin │ ├── output columns: [_count_scalar_subquery (#2), _any_scalar_subquery (#3)] │ ├── group by: [] │ ├── aggregate functions: [count(), any(number)] -│ ├── limit: 1 │ ├── estimated rows: 1.00 │ └── AggregatePartial │ ├── group by: [] │ ├── aggregate functions: [count(), any(number)] │ ├── estimated rows: 1.00 +│ ├── rank limit: 1 │ └── Filter │ ├── output columns: [numbers.number (#1)] │ ├── filters: [numbers.number (#1) = 0]