From 6c42265d49440bdbdf7e8c23c236671a83ee0580 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Sun, 15 Sep 2024 08:31:04 +0800 Subject: [PATCH 01/11] update --- src/query/expression/src/kernels/mod.rs | 2 + src/query/expression/src/kernels/sort.rs | 163 ++++--------- .../expression/src/kernels/sort_compare.rs | 222 ++++++++++++++++++ src/query/expression/src/types.rs | 27 ++- src/query/expression/src/types/any.rs | 4 +- src/query/expression/src/types/binary.rs | 6 + src/query/expression/src/types/bitmap.rs | 6 + src/query/expression/src/types/boolean.rs | 6 + src/query/expression/src/types/date.rs | 6 + src/query/expression/src/types/decimal.rs | 5 + src/query/expression/src/types/empty_array.rs | 6 + src/query/expression/src/types/empty_map.rs | 6 + src/query/expression/src/types/generic.rs | 6 + src/query/expression/src/types/geography.rs | 8 +- src/query/expression/src/types/geometry.rs | 5 + src/query/expression/src/types/null.rs | 6 + src/query/expression/src/types/nullable.rs | 12 + src/query/expression/src/types/number.rs | 6 + src/query/expression/src/types/string.rs | 6 + src/query/expression/src/types/timestamp.rs | 6 + src/query/expression/src/types/variant.rs | 4 +- src/query/expression/src/utils/mod.rs | 1 + src/query/expression/src/utils/visitor.rs | 141 +++++++++++ src/query/expression/src/values.rs | 20 +- src/query/expression/tests/it/sort.rs | 129 +--------- .../transforms/transform_sort_spill.rs | 18 -- 26 files changed, 543 insertions(+), 284 deletions(-) create mode 100644 src/query/expression/src/kernels/sort_compare.rs create mode 100755 src/query/expression/src/utils/visitor.rs diff --git a/src/query/expression/src/kernels/mod.rs b/src/query/expression/src/kernels/mod.rs index c6485fb21310..1e5369e2d807 100644 --- a/src/query/expression/src/kernels/mod.rs +++ b/src/query/expression/src/kernels/mod.rs @@ -18,6 +18,7 @@ mod group_by; mod group_by_hash; mod scatter; mod sort; +mod sort_compare; mod take; mod take_chunks; mod take_compact; @@ -27,6 +28,7 @@ mod utils; pub use group_by_hash::*; pub use sort::*; +pub use sort_compare::*; pub use take_chunks::*; pub use topk::*; pub use utils::*; diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index 7e49e28e358b..6a25dd2d9633 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -20,13 +20,10 @@ use databend_common_arrow::arrow::array::ord as arrow_ord; use databend_common_arrow::arrow::array::ord::DynComparator; use databend_common_arrow::arrow::array::Array; use databend_common_arrow::arrow::array::PrimitiveArray; -use databend_common_arrow::arrow::compute::merge_sort as arrow_merge_sort; -use databend_common_arrow::arrow::compute::merge_sort::build_comparator_impl; use databend_common_arrow::arrow::compute::sort as arrow_sort; use databend_common_arrow::arrow::datatypes::DataType as ArrowType; use databend_common_arrow::arrow::error::Error as ArrowError; use databend_common_arrow::arrow::error::Result as ArrowResult; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use crate::converts::arrow2::ARROW_EXT_TYPE_EMPTY_ARRAY; @@ -34,10 +31,13 @@ use crate::converts::arrow2::ARROW_EXT_TYPE_EMPTY_MAP; use crate::converts::arrow2::ARROW_EXT_TYPE_VARIANT; use crate::types::DataType; use crate::utils::arrow::column_to_arrow_array; +use crate::visitor::ValueVisitor; use crate::Column; use crate::ColumnBuilder; use crate::DataBlock; use crate::Scalar; +use crate::SortCompare; +use crate::Value; pub type AbortChecker = Arc; @@ -80,6 +80,26 @@ impl DataBlock { block: &DataBlock, descriptions: &[SortColumnDescription], limit: Option, + ) -> Result { + let num_rows = block.num_rows(); + let mut sort_compare = SortCompare::new(descriptions.to_owned(), num_rows, limit); + + for desc in descriptions.iter() { + let array = block.get_by_offset(desc.offset).value.clone(); + sort_compare.visit_value(array)?; + sort_compare.increment_column_index(); + } + + let permutations = sort_compare.take_permutation(); + DataBlock::take(block, &permutations, &mut None) + } + + // TODO remove these + #[allow(dead_code)] + pub fn sort_old( + block: &DataBlock, + descriptions: &[SortColumnDescription], + limit: Option, ) -> Result { let num_rows = block.num_rows(); if num_rows <= 1 { @@ -106,115 +126,6 @@ impl DataBlock { arrow_sort::lexsort_to_indices_impl(&order_arrays, limit, &build_compare)?; DataBlock::take(block, indices.values(), &mut None) } - - // merge two blocks to one sorted block - // require: lhs and rhs have been `convert_to_full`. - fn two_way_merge_sort( - blocks: &[DataBlock], - descriptions: &[SortColumnDescription], - limit: Option, - ) -> Result { - assert!(blocks.len() == 2); - - let lhs = &blocks[0]; - let rhs = &blocks[1]; - - let lhs_len = lhs.num_rows(); - let rhs_len = rhs.num_rows(); - if lhs_len == 0 { - return Ok(rhs.clone()); - } - if rhs_len == 0 { - return Ok(lhs.clone()); - } - - let mut sort_options = Vec::with_capacity(descriptions.len()); - let sort_arrays = descriptions - .iter() - .map(|d| { - let left = column_to_arrow_array(lhs.get_by_offset(d.offset), lhs_len); - let right = column_to_arrow_array(rhs.get_by_offset(d.offset), rhs_len); - sort_options.push(arrow_sort::SortOptions { - descending: !d.asc, - nulls_first: d.nulls_first, - }); - vec![left, right] - }) - .collect::>(); - - let sort_dyn_arrays = sort_arrays - .iter() - .map(|f| vec![f[0].as_ref(), f[1].as_ref()]) - .collect::>(); - - let sort_options_with_arrays = sort_dyn_arrays - .iter() - .zip(sort_options.iter()) - .map(|(arrays, opt)| (arrays as &[&dyn Array], opt)) - .collect::>(); - - let comparator = build_comparator_impl(&sort_options_with_arrays, &build_compare)?; - let lhs_slice = (0, 0, lhs_len); - let rhs_slice = (1, 0, rhs_len); - - let slices = - arrow_merge_sort::merge_sort_slices(once(&lhs_slice), once(&rhs_slice), &comparator) - .to_vec(limit); - - let block = DataBlock::take_by_slices_limit_from_blocks(blocks, &slices, limit); - Ok(block) - } - - pub fn merge_sort( - blocks: &[DataBlock], - descriptions: &[SortColumnDescription], - limit: Option, - abort_checker: AbortChecker, - ) -> Result { - match blocks.len() { - 0 => Result::Err(ErrorCode::EmptyData("Can't merge empty blocks")), - 1 => Ok(blocks[0].clone()), - 2 => { - if abort_checker.is_aborting() { - return Err(ErrorCode::AbortedQuery( - "Aborted query, because the server is shutting down or the query was killed.", - )); - } - - DataBlock::two_way_merge_sort(blocks, descriptions, limit) - } - _ => { - if abort_checker.is_aborting() { - return Err(ErrorCode::AbortedQuery( - "Aborted query, because the server is shutting down or the query was killed.", - )); - } - let left = DataBlock::merge_sort( - &blocks[0..blocks.len() / 2], - descriptions, - limit, - abort_checker.clone(), - )?; - if abort_checker.is_aborting() { - return Err(ErrorCode::AbortedQuery( - "Aborted query, because the server is shutting down or the query was killed.", - )); - } - let right = DataBlock::merge_sort( - &blocks[blocks.len() / 2..blocks.len()], - descriptions, - limit, - abort_checker.clone(), - )?; - if abort_checker.is_aborting() { - return Err(ErrorCode::AbortedQuery( - "Aborted query, because the server is shutting down or the query was killed.", - )); - } - DataBlock::two_way_merge_sort(&[left, right], descriptions, limit) - } - } - } } fn compare_variant(left: &dyn Array, right: &dyn Array) -> ArrowResult { @@ -290,20 +201,26 @@ pub fn compare_scalars(rows: Vec>, data_types: &[DataType]) -> Resul let order_columns = columns .into_iter() - .map(|builder| builder.build().as_arrow()) + .map(|builder| builder.build()) .collect::>(); - let order_arrays = order_columns + + let descriptions = order_columns .iter() - .map(|array| arrow_sort::SortColumn { - values: array.as_ref(), - options: Some(arrow_sort::SortOptions { - descending: false, - nulls_first: false, - }), + .enumerate() + .map(|(idx, array)| SortColumnDescription { + offset: idx, + asc: true, + nulls_first: false, + is_nullable: array.data_type().is_nullable(), }) .collect::>(); - let indices: PrimitiveArray = - arrow_sort::lexsort_to_indices_impl(&order_arrays, None, &build_compare)?; - Ok(indices.values().to_vec()) + let mut sort_compare = SortCompare::new(descriptions, length, None); + + for array in order_columns { + sort_compare.visit_value(Value::Column(array))?; + sort_compare.increment_column_index(); + } + + Ok(sort_compare.take_permutation()) } diff --git a/src/query/expression/src/kernels/sort_compare.rs b/src/query/expression/src/kernels/sort_compare.rs new file mode 100644 index 000000000000..31373427fda7 --- /dev/null +++ b/src/query/expression/src/kernels/sort_compare.rs @@ -0,0 +1,222 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::ops::Range; + +use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_arrow::arrow::buffer::Buffer; +use databend_common_exception::Result; +use memchr::memchr; + +use crate::types::AnyType; +use crate::types::NullableColumn; +use crate::types::Number; +use crate::types::ValueType; +use crate::visitor::ValueVisitor; +use crate::SortColumnDescription; + +pub struct SortCompare { + rows: usize, + limit: Option, + permutation: Vec, + ordering_descs: Vec, + current_column_index: usize, + validity: Option, + equality_index: Vec, +} + +macro_rules! generate_comparator { + ($value:expr, $validity:expr, $g:expr, $c:expr, $ordering_desc:expr) => { + |&a, &b| { + let ord = if let Some(valids) = &$validity { + match (valids.get_bit(a as _), valids.get_bit(b as _)) { + (true, true) => { + let left = $g($value, a); + let right = $g($value, b); + $c(left, right) + } + (true, false) => Ordering::Less, + (false, true) => Ordering::Greater, + (false, false) => Ordering::Equal, + } + } else { + let left = $g($value, a); + let right = $g($value, b); + $c(left, right) + }; + + if $ordering_desc.asc { + ord + } else { + ord.reverse() + } + } + }; +} + +impl SortCompare { + pub fn new( + ordering_descs: Vec, + rows: usize, + limit: Option, + ) -> Self { + let equality_index = if ordering_descs.len() == 1 { + vec![] + } else { + vec![1; rows as _] + }; + Self { + rows, + limit, + permutation: (0..rows as u32).collect(), + ordering_descs, + current_column_index: 0, + validity: None, + equality_index, + } + } + + pub fn increment_column_index(&mut self) { + self.current_column_index += 1; + } + + pub fn take_permutation(mut self) -> Vec { + let limit = self.limit.unwrap_or(self.rows); + self.permutation.truncate(limit); + self.permutation + } + + fn do_inner_sort(&mut self, c: C, range: Range) + where C: FnMut(&u32, &u32) -> Ordering + Copy { + let permutations = &mut self.permutation[range.start..range.end]; + + let limit = self.limit.unwrap_or(self.rows); + if limit > range.start && limit < range.end { + let (p, _, _) = permutations.select_nth_unstable_by(limit - range.start, c); + p.sort_unstable_by(c); + } else { + permutations.sort_unstable_by(c); + } + } + + fn common_sort(&mut self, value: V, g: G, c: C) + where + G: Fn(V, u32) -> T + Copy, + V: Copy, + C: Fn(T, T) -> Ordering + Copy, + { + let mut validity = self.validity.take(); + let ordering_desc = self.ordering_descs[self.current_column_index].clone(); + + // faster path for only one sort column + if self.ordering_descs.len() == 1 { + self.do_inner_sort( + generate_comparator!(value, validity, g, c, ordering_desc), + 0..self.rows, + ); + } else { + let mut current = 1; + let len = self.rows; + let need_update_equality_index = + self.current_column_index != self.ordering_descs.len() - 1; + + while current < len { + // Find the start of the next range of equal elements + let start = if let Some(pos) = memchr(1, &self.equality_index[current..len]) { + current + pos + } else { + len + }; + + if start == len { + break; + } + + // Find the end of the range of equal elements + let end = if let Some(pos) = memchr(0, &self.equality_index[start..len]) { + start + pos + } else { + len + }; + + let range = start - 1..end; + + if let Some(v) = validity.as_mut() { + v.slice(range.start, range.end - range.start); + if v.unset_bits() == 0 { + validity = None; + } + } + // Perform the inner sort on the found range + self.do_inner_sort( + generate_comparator!(value, validity, g, c, ordering_desc), + range, + ); + + if need_update_equality_index { + // Update equality_index + for i in start..end { + let is_equal = u8::from( + c( + g(value, self.permutation[i]), + g(value, self.permutation[i - 1]), + ) == Ordering::Equal, + ); + self.equality_index[i] &= is_equal; + } + } + + current = end; + } + } + } +} + +impl ValueVisitor for SortCompare { + fn visit_scalar(&mut self, _scalar: crate::Scalar) -> Result<()> { + Ok(()) + } + + // faster path for numeric + fn visit_number(&mut self, column: Buffer) -> Result<()> { + let values = column.as_slice(); + self.common_sort(values, |c, idx| c[idx as usize], |a: T, b: T| a.cmp(&b)); + Ok(()) + } + + fn visit_timestamp(&mut self, buffer: Buffer) -> Result<()> { + self.visit_number(buffer) + } + + fn visit_date(&mut self, buffer: Buffer) -> Result<()> { + self.visit_number(buffer) + } + + fn visit_typed_column(&mut self, col: T::Column) -> Result<()> { + self.common_sort( + &col, + |c, idx| -> T::ScalarRef<'_> { unsafe { T::index_column_unchecked(&c, idx as _) } }, + |a, b| T::compare(a, b), + ); + Ok(()) + } + + fn visit_nullable(&mut self, column: Box>) -> Result<()> { + if column.validity.unset_bits() > 0 { + self.validity = Some(column.validity.clone()); + } + self.visit_column(column.column.clone()) + } +} diff --git a/src/query/expression/src/types.rs b/src/query/expression/src/types.rs index b5d69f85737b..aa6b837bfa7d 100755 --- a/src/query/expression/src/types.rs +++ b/src/query/expression/src/types.rs @@ -44,22 +44,26 @@ use serde::Deserialize; use serde::Serialize; pub use self::any::AnyType; +pub use self::array::ArrayColumn; pub use self::array::ArrayType; +pub use self::binary::BinaryColumn; pub use self::binary::BinaryType; pub use self::bitmap::BitmapType; pub use self::boolean::BooleanType; pub use self::date::DateType; -pub use self::decimal::DecimalDataType; -pub use self::decimal::DecimalSize; +pub use self::decimal::*; pub use self::empty_array::EmptyArrayType; pub use self::empty_map::EmptyMapType; pub use self::generic::GenericType; +pub use self::geography::GeographyColumn; pub use self::geography::GeographyType; pub use self::map::MapType; pub use self::null::NullType; +pub use self::nullable::NullableColumn; pub use self::nullable::NullableType; pub use self::number::*; pub use self::number_class::*; +pub use self::string::StringColumn; pub use self::string::StringType; pub use self::timestamp::TimestampType; pub use self::variant::VariantType; @@ -387,46 +391,47 @@ pub trait ValueType: Debug + Clone + PartialEq + Sized + 'static { Self::column_len(col) * std::mem::size_of::() } - /// Compare two scalars and return the Ordering between them, some data types not support comparison. + /// This is default implementation yet it's not efficient. #[inline(always)] - fn compare(_: Self::ScalarRef<'_>, _: Self::ScalarRef<'_>) -> Option { - None + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + Self::upcast_scalar(Self::to_owned_scalar(lhs)) + .cmp(&Self::upcast_scalar(Self::to_owned_scalar(rhs))) } /// Equal comparison between two scalars, some data types not support comparison. #[inline(always)] fn equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { - matches!(Self::compare(left, right), Some(Ordering::Equal)) + matches!(Self::compare(left, right), Ordering::Equal) } /// Not equal comparison between two scalars, some data types not support comparison. #[inline(always)] fn not_equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { - !matches!(Self::compare(left, right), Some(Ordering::Equal)) + !matches!(Self::compare(left, right), Ordering::Equal) } /// Greater than comparison between two scalars, some data types not support comparison. #[inline(always)] fn greater_than(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { - matches!(Self::compare(left, right), Some(Ordering::Greater)) + matches!(Self::compare(left, right), Ordering::Greater) } /// Less than comparison between two scalars, some data types not support comparison. #[inline(always)] fn less_than(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { - matches!(Self::compare(left, right), Some(Ordering::Less)) + matches!(Self::compare(left, right), Ordering::Less) } /// Greater than or equal comparison between two scalars, some data types not support comparison. #[inline(always)] fn greater_than_equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { - !matches!(Self::compare(left, right), Some(Ordering::Less)) + !matches!(Self::compare(left, right), Ordering::Less) } /// Less than or equal comparison between two scalars, some data types not support comparison. #[inline(always)] fn less_than_equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { - !matches!(Self::compare(left, right), Some(Ordering::Greater)) + !matches!(Self::compare(left, right), Ordering::Greater) } } diff --git a/src/query/expression/src/types/any.rs b/src/query/expression/src/types/any.rs index 283d4e808cfb..b0ed227866ca 100755 --- a/src/query/expression/src/types/any.rs +++ b/src/query/expression/src/types/any.rs @@ -149,7 +149,7 @@ impl ValueType for AnyType { } #[inline(always)] - fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Option { - Some(lhs.cmp(&rhs)) + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(&rhs) } } diff --git a/src/query/expression/src/types/binary.rs b/src/query/expression/src/types/binary.rs index 8cbc2b83c225..7bd855a0ff34 100644 --- a/src/query/expression/src/types/binary.rs +++ b/src/query/expression/src/types/binary.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::iter::once; use std::marker::PhantomData; use std::ops::Range; @@ -168,6 +169,11 @@ impl ValueType for BinaryType { fn column_memory_size(col: &Self::Column) -> usize { col.data().len() + col.offsets().len() * 8 } + + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(rhs) + } } impl ArgType for BinaryType { diff --git a/src/query/expression/src/types/bitmap.rs b/src/query/expression/src/types/bitmap.rs index 9ff7bd013c5c..1823941ba2b7 100644 --- a/src/query/expression/src/types/bitmap.rs +++ b/src/query/expression/src/types/bitmap.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::ops::Range; use super::binary::BinaryColumn; @@ -162,6 +163,11 @@ impl ValueType for BitmapType { fn column_memory_size(col: &Self::Column) -> usize { col.data().len() + col.offsets().len() * 8 } + + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(rhs) + } } impl ArgType for BitmapType { diff --git a/src/query/expression/src/types/boolean.rs b/src/query/expression/src/types/boolean.rs index 63bbdf8b8a84..ed579f263fc9 100644 --- a/src/query/expression/src/types/boolean.rs +++ b/src/query/expression/src/types/boolean.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::ops::Range; use databend_common_arrow::arrow::bitmap::Bitmap; @@ -164,6 +165,11 @@ impl ValueType for BooleanType { builder.get(0) } + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(&rhs) + } + #[inline(always)] fn equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { left == right diff --git a/src/query/expression/src/types/date.rs b/src/query/expression/src/types/date.rs index dc7b8d6611f0..4e171a329bca 100644 --- a/src/query/expression/src/types/date.rs +++ b/src/query/expression/src/types/date.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::fmt::Display; use std::io::Cursor; use std::ops::Range; @@ -185,6 +186,11 @@ impl ValueType for DateType { builder[0] } + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(&rhs) + } + #[inline(always)] fn equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { left == right diff --git a/src/query/expression/src/types/decimal.rs b/src/query/expression/src/types/decimal.rs index 99ada93822e5..0271d2d7fcc8 100644 --- a/src/query/expression/src/types/decimal.rs +++ b/src/query/expression/src/types/decimal.rs @@ -181,6 +181,11 @@ impl ValueType for DecimalType { builder[0] } + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(&rhs) + } + #[inline(always)] fn equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { left == right diff --git a/src/query/expression/src/types/empty_array.rs b/src/query/expression/src/types/empty_array.rs index 7cb868244058..15818aaa9742 100644 --- a/src/query/expression/src/types/empty_array.rs +++ b/src/query/expression/src/types/empty_array.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::ops::Range; use crate::property::Domain; @@ -161,6 +162,11 @@ impl ValueType for EmptyArrayType { std::mem::size_of::() } + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(&rhs) + } + #[inline(always)] fn equal(_left: Self::ScalarRef<'_>, _right: Self::ScalarRef<'_>) -> bool { true diff --git a/src/query/expression/src/types/empty_map.rs b/src/query/expression/src/types/empty_map.rs index c7c07698ea4e..019cdc2d3705 100644 --- a/src/query/expression/src/types/empty_map.rs +++ b/src/query/expression/src/types/empty_map.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::ops::Range; use crate::property::Domain; @@ -160,6 +161,11 @@ impl ValueType for EmptyMapType { fn column_memory_size(_: &Self::Column) -> usize { std::mem::size_of::() } + + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(&rhs) + } } impl ArgType for EmptyMapType { diff --git a/src/query/expression/src/types/generic.rs b/src/query/expression/src/types/generic.rs index 7c32f0480011..91f5e9fd9de7 100755 --- a/src/query/expression/src/types/generic.rs +++ b/src/query/expression/src/types/generic.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::ops::Range; use crate::property::Domain; @@ -149,6 +150,11 @@ impl ValueType for GenericType { fn column_memory_size(col: &Self::Column) -> usize { col.memory_size() } + + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(&rhs) + } } impl ArgType for GenericType { diff --git a/src/query/expression/src/types/geography.rs b/src/query/expression/src/types/geography.rs index 492fdd734ac6..8c0d95e92c4e 100644 --- a/src/query/expression/src/types/geography.rs +++ b/src/query/expression/src/types/geography.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::fmt::Debug; use std::hash::Hash; use std::ops::Range; @@ -63,7 +64,7 @@ impl Geography { } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct GeographyRef<'a>(pub &'a [u8]); impl<'a> GeographyRef<'a> { @@ -233,8 +234,9 @@ impl ValueType for GeographyType { col.memory_size() } - fn compare(a: Self::ScalarRef<'_>, b: Self::ScalarRef<'_>) -> Option { - a.partial_cmp(&b) + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(&rhs) } } diff --git a/src/query/expression/src/types/geometry.rs b/src/query/expression/src/types/geometry.rs index 7c2cb11b251a..31756163fbe7 100644 --- a/src/query/expression/src/types/geometry.rs +++ b/src/query/expression/src/types/geometry.rs @@ -167,6 +167,11 @@ impl ValueType for GeometryType { fn column_memory_size(col: &Self::Column) -> usize { col.data().len() + col.offsets().len() * 8 } + + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + lhs.cmp(&rhs) + } } impl ArgType for GeometryType { diff --git a/src/query/expression/src/types/null.rs b/src/query/expression/src/types/null.rs index 60bf6906281d..91bb41203993 100644 --- a/src/query/expression/src/types/null.rs +++ b/src/query/expression/src/types/null.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::ops::Range; use super::nullable::NullableDomain; @@ -167,6 +168,11 @@ impl ValueType for NullType { fn column_memory_size(_: &Self::Column) -> usize { std::mem::size_of::() } + + #[inline(always)] + fn compare(_: Self::ScalarRef<'_>, _: Self::ScalarRef<'_>) -> Ordering { + Ordering::Equal + } } impl ArgType for NullType { diff --git a/src/query/expression/src/types/nullable.rs b/src/query/expression/src/types/nullable.rs index 63b6c1ddd805..a5c291136dde 100755 --- a/src/query/expression/src/types/nullable.rs +++ b/src/query/expression/src/types/nullable.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::marker::PhantomData; use std::ops::Range; @@ -222,6 +223,17 @@ impl ValueType for NullableType { fn column_memory_size(col: &Self::Column) -> usize { col.memory_size() } + + // Null default lastly + #[inline(always)] + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + match (lhs, rhs) { + (Some(lhs), Some(rhs)) => T::compare(lhs, rhs), + (Some(_), None) => Ordering::Greater, + (None, Some(_)) => Ordering::Less, + (None, None) => Ordering::Equal, + } + } } impl ArgType for NullableType { diff --git a/src/query/expression/src/types/number.rs b/src/query/expression/src/types/number.rs index 0afe13311c63..73f9e8922ad0 100644 --- a/src/query/expression/src/types/number.rs +++ b/src/query/expression/src/types/number.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::fmt::Debug; use std::marker::PhantomData; use std::ops::Range; @@ -218,6 +219,11 @@ impl ValueType for NumberType { builder[0] } + #[inline(always)] + fn compare(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> Ordering { + left.cmp(&right) + } + #[inline(always)] fn equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { left == right diff --git a/src/query/expression/src/types/string.rs b/src/query/expression/src/types/string.rs index 11efef686b14..e7a62e9f1e3b 100644 --- a/src/query/expression/src/types/string.rs +++ b/src/query/expression/src/types/string.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::iter::once; use std::ops::Range; @@ -168,6 +169,11 @@ impl ValueType for StringType { col.data().len() + col.offsets().len() * 8 } + #[inline(always)] + fn compare(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> Ordering { + left.cmp(right) + } + #[inline(always)] fn equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { left == right diff --git a/src/query/expression/src/types/timestamp.rs b/src/query/expression/src/types/timestamp.rs index b2970b5eb862..dc903c346721 100644 --- a/src/query/expression/src/types/timestamp.rs +++ b/src/query/expression/src/types/timestamp.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::fmt::Display; use std::io::Cursor; use std::ops::Range; @@ -192,6 +193,11 @@ impl ValueType for TimestampType { builder[0] } + #[inline(always)] + fn compare(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> Ordering { + left.cmp(&right) + } + #[inline(always)] fn equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool { left == right diff --git a/src/query/expression/src/types/variant.rs b/src/query/expression/src/types/variant.rs index 557fab4004ee..1a3c8fac1845 100644 --- a/src/query/expression/src/types/variant.rs +++ b/src/query/expression/src/types/variant.rs @@ -179,8 +179,8 @@ impl ValueType for VariantType { } #[inline(always)] - fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Option { - Some(jsonb::compare(lhs, rhs).expect("unable to parse jsonb value")) + fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { + jsonb::compare(lhs, rhs).expect("unable to parse jsonb value") } } diff --git a/src/query/expression/src/utils/mod.rs b/src/query/expression/src/utils/mod.rs index 3d65daf97acd..bb0ba0cd7967 100644 --- a/src/query/expression/src/utils/mod.rs +++ b/src/query/expression/src/utils/mod.rs @@ -23,6 +23,7 @@ pub mod filter_helper; pub mod serialize; pub mod udf_client; pub mod variant_transform; +pub mod visitor; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_ast::Span; diff --git a/src/query/expression/src/utils/visitor.rs b/src/query/expression/src/utils/visitor.rs new file mode 100755 index 000000000000..c3f7929b4f08 --- /dev/null +++ b/src/query/expression/src/utils/visitor.rs @@ -0,0 +1,141 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_arrow::arrow::buffer::Buffer; +use databend_common_exception::Result; +use decimal::DecimalType; +use geometry::GeometryType; + +use crate::types::*; +use crate::*; + +pub trait ValueVisitor { + fn visit_scalar(&mut self, _scalar: Scalar) -> Result<()>; + + fn visit_null(&mut self, len: usize) -> Result<()> { + self.visit_typed_column::(len) + } + + fn visit_empty_array(&mut self, len: usize) -> Result<()> { + self.visit_typed_column::(len) + } + + fn visit_empty_map(&mut self, len: usize) -> Result<()> { + self.visit_typed_column::(len) + } + + fn visit_number( + &mut self, + column: as ValueType>::Column, + ) -> Result<()> { + self.visit_typed_column::>(column) + } + + fn visit_decimal(&mut self, column: Buffer) -> Result<()> { + self.visit_typed_column::>(column) + } + + fn visit_boolean(&mut self, bitmap: Bitmap) -> Result<()> { + self.visit_typed_column::(bitmap) + } + + fn visit_binary(&mut self, column: BinaryColumn) -> Result<()> { + self.visit_typed_column::(column) + } + + fn visit_string(&mut self, column: StringColumn) -> Result<()> { + self.visit_typed_column::(column) + } + + fn visit_timestamp(&mut self, buffer: Buffer) -> Result<()> { + self.visit_typed_column::(buffer) + } + + fn visit_date(&mut self, buffer: Buffer) -> Result<()> { + self.visit_typed_column::(buffer) + } + + fn visit_array(&mut self, column: Box>) -> Result<()> { + self.visit_typed_column::(Column::Array(column)) + } + + fn visit_map(&mut self, column: Box>) -> Result<()> { + self.visit_typed_column::(Column::Map(column)) + } + + fn visit_tuple(&mut self, columns: Vec) -> Result<()> { + self.visit_typed_column::(Column::Tuple(columns)) + } + + fn visit_bitmap(&mut self, column: BinaryColumn) -> Result<()> { + self.visit_typed_column::(column) + } + + fn visit_nullable(&mut self, column: Box>) -> Result<()> { + self.visit_typed_column::(Column::Nullable(column)) + } + + fn visit_variant(&mut self, column: BinaryColumn) -> Result<()> { + self.visit_typed_column::(column) + } + + fn visit_geometry(&mut self, column: BinaryColumn) -> Result<()> { + self.visit_typed_column::(column) + } + + fn visit_geography(&mut self, column: GeographyColumn) -> Result<()> { + self.visit_typed_column::(column) + } + + fn visit_typed_column(&mut self, column: ::Column) -> Result<()>; + + fn visit_value(&mut self, value: Value) -> Result<()> { + match value { + Value::Scalar(c) => self.visit_scalar(c), + Value::Column(c) => self.visit_column(c), + } + } + + fn visit_column(&mut self, column: Column) -> Result<()> { + match column { + Column::Null { len } => self.visit_null(len), + Column::EmptyArray { len } => self.visit_empty_array(len), + Column::EmptyMap { len } => self.visit_empty_map(len), + Column::Number(column) => { + with_number_type!(|NUM_TYPE| match column { + NumberColumn::NUM_TYPE(b) => self.visit_number(b), + }) + } + Column::Decimal(column) => { + with_decimal_type!(|DECIMAL_TYPE| match column { + DecimalColumn::DECIMAL_TYPE(b, _) => self.visit_decimal(b), + }) + } + Column::Boolean(bitmap) => self.visit_boolean(bitmap), + Column::Binary(column) => self.visit_binary(column), + Column::String(column) => self.visit_string(column), + Column::Timestamp(buffer) => self.visit_timestamp(buffer), + Column::Date(buffer) => self.visit_date(buffer), + Column::Array(column) => self.visit_array(column), + Column::Map(column) => self.visit_map(column), + Column::Tuple(columns) => self.visit_tuple(columns), + Column::Bitmap(column) => self.visit_bitmap(column), + Column::Nullable(column) => self.visit_nullable(column), + Column::Variant(column) => self.visit_variant(column), + Column::Geometry(column) => self.visit_geometry(column), + Column::Geography(column) => self.visit_geography(column), + } + } +} diff --git a/src/query/expression/src/values.rs b/src/query/expression/src/values.rs index 3c0813785305..685d948684a8 100755 --- a/src/query/expression/src/values.rs +++ b/src/query/expression/src/values.rs @@ -884,11 +884,29 @@ impl PartialOrd for Column { (Column::Geography(col1), Column::Geography(col2)) => { col1.iter().partial_cmp(col2.iter()) } - _ => None, + (a, b) => { + if a.len() != b.len() { + a.len().partial_cmp(&b.len()) + } else { + for (l, r) in AnyType::iter_column(a).zip(AnyType::iter_column(b)) { + match l.partial_cmp(&r) { + Some(Ordering::Equal) => {} + other => return other, + } + } + Some(Ordering::Equal) + } + } } } } +impl Ord for Column { + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap_or(Ordering::Equal) + } +} + impl PartialEq for Column { fn eq(&self, other: &Self) -> bool { self.partial_cmp(other) == Some(Ordering::Equal) diff --git a/src/query/expression/tests/it/sort.rs b/src/query/expression/tests/it/sort.rs index fcc148c38b8c..2ae2a783340a 100644 --- a/src/query/expression/tests/it/sort.rs +++ b/src/query/expression/tests/it/sort.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use std::vec; use databend_common_exception::Result; +use databend_common_expression::block_debug::assert_block_value_eq; use databend_common_expression::types::decimal::*; use databend_common_expression::types::number::*; use databend_common_expression::types::StringType; @@ -200,131 +201,11 @@ fn test_block_sort() -> Result<()> { entry.value ); } - } - - Ok(()) -} - -#[test] -fn test_blocks_merge_sort() -> Result<()> { - let blocks = vec![ - new_block(&[ - Int64Type::from_data(vec![4i64, 6]), - StringType::from_data(vec!["b2", "b1"]), - ]), - new_block(&[ - Int64Type::from_data(vec![2i64, 3]), - StringType::from_data(vec!["b4", "b3"]), - ]), - new_block(&[ - Int64Type::from_data(vec![1i64, 1]), - StringType::from_data(vec!["b6", "b5"]), - ]), - ]; - - // test cast: - // - name - // - sort descriptions - // - limit - // - expected cols - #[allow(clippy::type_complexity)] - let test_cases: Vec<( - String, - Vec, - Option, - Vec, - )> = vec![ - ( - "order by col1".to_string(), - vec![SortColumnDescription { - offset: 0, - asc: true, - nulls_first: false, - is_nullable: false, - }], - None, - vec![ - Int64Type::from_data(vec![1_i64, 1, 2, 3, 4, 6]), - StringType::from_data(vec!["b6", "b5", "b4", "b3", "b2", "b1"]), - ], - ), - ( - "order by col1 limit 4".to_string(), - vec![SortColumnDescription { - offset: 0, - asc: true, - nulls_first: false, - is_nullable: false, - }], - Some(4), - vec![ - Int64Type::from_data(vec![1_i64, 1, 2, 3]), - StringType::from_data(vec!["b6", "b5", "b4", "b3"]), - ], - ), - ( - "order by col2 desc".to_string(), - vec![SortColumnDescription { - offset: 1, - asc: false, - nulls_first: false, - is_nullable: false, - }], - None, - vec![ - Int64Type::from_data(vec![1_i64, 1, 2, 3, 4, 6]), - StringType::from_data(vec!["b6", "b5", "b4", "b3", "b2", "b1"]), - ], - ), - ( - "order by col1, col2 desc".to_string(), - vec![ - SortColumnDescription { - offset: 0, - asc: true, - nulls_first: false, - is_nullable: false, - }, - SortColumnDescription { - offset: 1, - asc: false, - nulls_first: false, - is_nullable: false, - }, - ], - None, - vec![ - Int64Type::from_data(vec![1_i64, 1, 2, 3, 4, 6]), - StringType::from_data(vec!["b6", "b5", "b4", "b3", "b2", "b1"]), - ], - ), - ]; - - struct NeverAbort; - impl CheckAbort for NeverAbort { - fn is_aborting(&self) -> bool { - false - } - fn try_check_aborting(&self) -> Result<()> { - Ok(()) - } - } - let aborting: AbortChecker = Arc::new(NeverAbort); - - for (name, sort_descs, limit, expected) in test_cases { - let res = DataBlock::merge_sort(&blocks, &sort_descs, limit, aborting.clone())?; - - for (entry, expect) in res.columns().iter().zip(expected.iter()) { - assert_eq!( - entry.value.as_column().unwrap(), - expect, - "{}: the column after sort is wrong, expect: {:?}, got: {:?}", - name, - expect, - entry.value - ); - } + // test new sort algorithm + let res = DataBlock::sort_old(&decimal_block, &sort_descs, Some(decimal_block.num_rows()))?; + let res_new = DataBlock::sort(&decimal_block, &sort_descs, None)?; + assert_block_value_eq(&res, &res_new); } Ok(()) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index 38a7c1541f73..bebf2b390202 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -622,24 +622,6 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread")] - async fn test_two_way_merge_sort() -> Result<()> { - let fixture = TestFixture::setup().await?; - let ctx = fixture.new_query_ctx().await?; - let (input, expected) = basic_test_data(None); - - test(ctx, input, expected, 4, 2, false, None).await - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_two_way_merge_sort_with_memory_block() -> Result<()> { - let fixture = TestFixture::setup().await?; - let ctx = fixture.new_query_ctx().await?; - let (input, expected) = basic_test_data(None); - - test(ctx, input, expected, 4, 2, true, None).await - } - async fn basic_test( ctx: Arc, batch_rows: usize, From c5b91a3d63419e93b3ac3c8dd1d1b6f4a0349364 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Sun, 15 Sep 2024 09:28:02 +0800 Subject: [PATCH 02/11] update --- src/query/expression/src/kernels/sort.rs | 1 - src/query/expression/tests/it/sort.rs | 3 --- 2 files changed, 4 deletions(-) diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index 6a25dd2d9633..1684bce7af55 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::cmp::Ordering; -use std::iter::once; use std::sync::Arc; use databend_common_arrow::arrow::array::ord as arrow_ord; diff --git a/src/query/expression/tests/it/sort.rs b/src/query/expression/tests/it/sort.rs index 2ae2a783340a..f9fc04d2dc78 100644 --- a/src/query/expression/tests/it/sort.rs +++ b/src/query/expression/tests/it/sort.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; use std::vec; use databend_common_exception::Result; @@ -20,8 +19,6 @@ use databend_common_expression::block_debug::assert_block_value_eq; use databend_common_expression::types::decimal::*; use databend_common_expression::types::number::*; use databend_common_expression::types::StringType; -use databend_common_expression::AbortChecker; -use databend_common_expression::CheckAbort; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::FromData; From b5cd2e49673ec3e43b93f2fc26fe2dee37f35a63 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Sun, 15 Sep 2024 09:53:37 +0800 Subject: [PATCH 03/11] update --- .../expression/src/kernels/sort_compare.rs | 80 ++++++++++++------- src/query/expression/src/types/geometry.rs | 2 +- 2 files changed, 51 insertions(+), 31 deletions(-) diff --git a/src/query/expression/src/kernels/sort_compare.rs b/src/query/expression/src/kernels/sort_compare.rs index 31373427fda7..64dbaee97a2d 100644 --- a/src/query/expression/src/kernels/sort_compare.rs +++ b/src/query/expression/src/kernels/sort_compare.rs @@ -37,30 +37,57 @@ pub struct SortCompare { equality_index: Vec, } -macro_rules! generate_comparator { - ($value:expr, $validity:expr, $g:expr, $c:expr, $ordering_desc:expr) => { - |&a, &b| { - let ord = if let Some(valids) = &$validity { - match (valids.get_bit(a as _), valids.get_bit(b as _)) { - (true, true) => { +macro_rules! do_sorter { + ($self: expr, $value:expr, $validity:expr, $g:expr, $c:expr, $ordering_desc:expr, $range: expr) => { + if let Some(valids) = &$validity { + if $ordering_desc.asc { + $self.do_inner_sort( + |&a, &b| match (valids.get_bit(a as _), valids.get_bit(b as _)) { + (true, true) => { + let left = $g($value, a); + let right = $g($value, b); + $c(left, right) + } + (true, false) => Ordering::Less, + (false, true) => Ordering::Greater, + (false, false) => Ordering::Equal, + }, + $range, + ); + } else { + $self.do_inner_sort( + |&a, &b| match (valids.get_bit(a as _), valids.get_bit(b as _)) { + (true, true) => { + let left = $g($value, a); + let right = $g($value, b); + $c(right, left) + } + (true, false) => Ordering::Greater, + (false, true) => Ordering::Less, + (false, false) => Ordering::Equal, + }, + $range, + ); + } + } else { + if $ordering_desc.asc { + $self.do_inner_sort( + |&a, &b| { let left = $g($value, a); let right = $g($value, b); $c(left, right) - } - (true, false) => Ordering::Less, - (false, true) => Ordering::Greater, - (false, false) => Ordering::Equal, - } - } else { - let left = $g($value, a); - let right = $g($value, b); - $c(left, right) - }; - - if $ordering_desc.asc { - ord + }, + $range, + ); } else { - ord.reverse() + $self.do_inner_sort( + |&a, &b| { + let left = $g($value, a); + let right = $g($value, b); + $c(right, left) + }, + $range, + ); } } }; @@ -122,10 +149,7 @@ impl SortCompare { // faster path for only one sort column if self.ordering_descs.len() == 1 { - self.do_inner_sort( - generate_comparator!(value, validity, g, c, ordering_desc), - 0..self.rows, - ); + do_sorter!(self, value, validity, g, c, ordering_desc, 0..self.rows); } else { let mut current = 1; let len = self.rows; @@ -152,7 +176,6 @@ impl SortCompare { }; let range = start - 1..end; - if let Some(v) = validity.as_mut() { v.slice(range.start, range.end - range.start); if v.unset_bits() == 0 { @@ -160,10 +183,7 @@ impl SortCompare { } } // Perform the inner sort on the found range - self.do_inner_sort( - generate_comparator!(value, validity, g, c, ordering_desc), - range, - ); + do_sorter!(self, value, validity, g, c, ordering_desc, range); if need_update_equality_index { // Update equality_index @@ -207,7 +227,7 @@ impl ValueVisitor for SortCompare { fn visit_typed_column(&mut self, col: T::Column) -> Result<()> { self.common_sort( &col, - |c, idx| -> T::ScalarRef<'_> { unsafe { T::index_column_unchecked(&c, idx as _) } }, + |c, idx| -> T::ScalarRef<'_> { unsafe { T::index_column_unchecked(c, idx as _) } }, |a, b| T::compare(a, b), ); Ok(()) diff --git a/src/query/expression/src/types/geometry.rs b/src/query/expression/src/types/geometry.rs index 31756163fbe7..67f1afc95ec9 100644 --- a/src/query/expression/src/types/geometry.rs +++ b/src/query/expression/src/types/geometry.rs @@ -170,7 +170,7 @@ impl ValueType for GeometryType { #[inline(always)] fn compare(lhs: Self::ScalarRef<'_>, rhs: Self::ScalarRef<'_>) -> Ordering { - lhs.cmp(&rhs) + lhs.cmp(rhs) } } From 57870a33f9e66be83aa2bf48ec8f14bcc83a3406 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Sun, 15 Sep 2024 22:52:29 +0800 Subject: [PATCH 04/11] update --- .../expression/src/kernels/sort_compare.rs | 62 +++++++++++-------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/src/query/expression/src/kernels/sort_compare.rs b/src/query/expression/src/kernels/sort_compare.rs index 64dbaee97a2d..4bed9cbe2c27 100644 --- a/src/query/expression/src/kernels/sort_compare.rs +++ b/src/query/expression/src/kernels/sort_compare.rs @@ -40,35 +40,38 @@ pub struct SortCompare { macro_rules! do_sorter { ($self: expr, $value:expr, $validity:expr, $g:expr, $c:expr, $ordering_desc:expr, $range: expr) => { if let Some(valids) = &$validity { - if $ordering_desc.asc { - $self.do_inner_sort( - |&a, &b| match (valids.get_bit(a as _), valids.get_bit(b as _)) { + $self.do_inner_sort( + |&a, &b| { + let order = match (valids.get_bit(a as _), valids.get_bit(b as _)) { (true, true) => { let left = $g($value, a); let right = $g($value, b); - $c(left, right) + + if $ordering_desc.asc { + $c(left, right) + } else { + $c(left, right).reverse() + } } - (true, false) => Ordering::Less, - (false, true) => Ordering::Greater, - (false, false) => Ordering::Equal, - }, - $range, - ); - } else { - $self.do_inner_sort( - |&a, &b| match (valids.get_bit(a as _), valids.get_bit(b as _)) { - (true, true) => { - let left = $g($value, a); - let right = $g($value, b); - $c(right, left) + (true, false) => { + if $ordering_desc.nulls_first { + Ordering::Greater + } else { + Ordering::Less + } + } + (false, true) => { + if $ordering_desc.nulls_first { + Ordering::Less + } else { + Ordering::Greater + } } - (true, false) => Ordering::Greater, - (false, true) => Ordering::Less, (false, false) => Ordering::Equal, - }, - $range, - ); - } + }; + }, + $range, + ); } else { if $ordering_desc.asc { $self.do_inner_sort( @@ -176,14 +179,19 @@ impl SortCompare { }; let range = start - 1..end; + + // If there are no unset bits, we don't need compare with nulls + let mut temp_v = None; if let Some(v) = validity.as_mut() { - v.slice(range.start, range.end - range.start); - if v.unset_bits() == 0 { - validity = None; + let mut temp_v2 = v.clone(); + temp_v2.slice(range.start, range.end - range.start); + if temp_v2.unset_bits() > 0 { + temp_v = validity.clone(); } } + // Perform the inner sort on the found range - do_sorter!(self, value, validity, g, c, ordering_desc, range); + do_sorter!(self, value, temp_v, g, c, ordering_desc, range); if need_update_equality_index { // Update equality_index From a154c4b2d5dfa7dca7a5bbe74cea6d5069834f2f Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Sun, 15 Sep 2024 22:52:55 +0800 Subject: [PATCH 05/11] update --- src/query/expression/src/kernels/sort_compare.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/expression/src/kernels/sort_compare.rs b/src/query/expression/src/kernels/sort_compare.rs index 4bed9cbe2c27..188c27430f26 100644 --- a/src/query/expression/src/kernels/sort_compare.rs +++ b/src/query/expression/src/kernels/sort_compare.rs @@ -50,7 +50,7 @@ macro_rules! do_sorter { if $ordering_desc.asc { $c(left, right) } else { - $c(left, right).reverse() + $c(right, left) } } (true, false) => { From 6f5d1634b0c9419ecf449489e3df5c63180a3bf9 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Sun, 15 Sep 2024 22:54:27 +0800 Subject: [PATCH 06/11] update --- .../expression/src/kernels/sort_compare.rs | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/src/query/expression/src/kernels/sort_compare.rs b/src/query/expression/src/kernels/sort_compare.rs index 188c27430f26..7e982a6e8865 100644 --- a/src/query/expression/src/kernels/sort_compare.rs +++ b/src/query/expression/src/kernels/sort_compare.rs @@ -41,34 +41,32 @@ macro_rules! do_sorter { ($self: expr, $value:expr, $validity:expr, $g:expr, $c:expr, $ordering_desc:expr, $range: expr) => { if let Some(valids) = &$validity { $self.do_inner_sort( - |&a, &b| { - let order = match (valids.get_bit(a as _), valids.get_bit(b as _)) { - (true, true) => { - let left = $g($value, a); - let right = $g($value, b); - - if $ordering_desc.asc { - $c(left, right) - } else { - $c(right, left) - } + |&a, &b| match (valids.get_bit(a as _), valids.get_bit(b as _)) { + (true, true) => { + let left = $g($value, a); + let right = $g($value, b); + + if $ordering_desc.asc { + $c(left, right) + } else { + $c(right, left) } - (true, false) => { - if $ordering_desc.nulls_first { - Ordering::Greater - } else { - Ordering::Less - } + } + (true, false) => { + if $ordering_desc.nulls_first { + Ordering::Greater + } else { + Ordering::Less } - (false, true) => { - if $ordering_desc.nulls_first { - Ordering::Less - } else { - Ordering::Greater - } + } + (false, true) => { + if $ordering_desc.nulls_first { + Ordering::Less + } else { + Ordering::Greater } - (false, false) => Ordering::Equal, - }; + } + (false, false) => Ordering::Equal, }, $range, ); From 60540a53ccb5d835fea6ce5b76abfcc39dd7f679 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 16 Sep 2024 09:16:25 +0800 Subject: [PATCH 07/11] update --- src/query/expression/src/kernels/sort.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index 1684bce7af55..867bd8c9c9ad 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -81,6 +81,9 @@ impl DataBlock { limit: Option, ) -> Result { let num_rows = block.num_rows(); + if num_rows <= 1 || block.num_columns() == 0 { + return Ok(block.clone()); + } let mut sort_compare = SortCompare::new(descriptions.to_owned(), num_rows, limit); for desc in descriptions.iter() { From d8bc92448c3b9f9c74fd73f52e4418b09a15a346 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 16 Sep 2024 12:12:10 +0800 Subject: [PATCH 08/11] update --- .../expression/src/kernels/sort_compare.rs | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/query/expression/src/kernels/sort_compare.rs b/src/query/expression/src/kernels/sort_compare.rs index 7e982a6e8865..6abf8ebb50db 100644 --- a/src/query/expression/src/kernels/sort_compare.rs +++ b/src/query/expression/src/kernels/sort_compare.rs @@ -178,29 +178,29 @@ impl SortCompare { let range = start - 1..end; - // If there are no unset bits, we don't need compare with nulls - let mut temp_v = None; - if let Some(v) = validity.as_mut() { - let mut temp_v2 = v.clone(); - temp_v2.slice(range.start, range.end - range.start); - if temp_v2.unset_bits() > 0 { - temp_v = validity.clone(); - } - } - // Perform the inner sort on the found range do_sorter!(self, value, temp_v, g, c, ordering_desc, range); - if need_update_equality_index { // Update equality_index for i in start..end { - let is_equal = u8::from( + let is_equal = if let Some(ref v) = validity { + let va = v.get_bit(self.permutation[i] as _); + let vb = v.get_bit(self.permutation[i - 1] as _); + if va && vb { + c( + g(value, self.permutation[i]), + g(value, self.permutation[i - 1]), + ) == Ordering::Equal + } else { + !va && !vb + } + } else { c( g(value, self.permutation[i]), g(value, self.permutation[i - 1]), - ) == Ordering::Equal, - ); - self.equality_index[i] &= is_equal; + ) == Ordering::Equal + }; + self.equality_index[i] &= u8::from(is_equal); } } From 6246046142f24adcb3f800c76d5212a06352bbd5 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 16 Sep 2024 12:12:47 +0800 Subject: [PATCH 09/11] update --- src/query/expression/src/kernels/sort_compare.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/expression/src/kernels/sort_compare.rs b/src/query/expression/src/kernels/sort_compare.rs index 6abf8ebb50db..a7c2ddb5c167 100644 --- a/src/query/expression/src/kernels/sort_compare.rs +++ b/src/query/expression/src/kernels/sort_compare.rs @@ -179,7 +179,7 @@ impl SortCompare { let range = start - 1..end; // Perform the inner sort on the found range - do_sorter!(self, value, temp_v, g, c, ordering_desc, range); + do_sorter!(self, value, validity, g, c, ordering_desc, range); if need_update_equality_index { // Update equality_index for i in start..end { From 68deba5235bdd9f92effa702622d42588b1f0fe4 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 18 Sep 2024 07:35:48 +0800 Subject: [PATCH 10/11] update --- src/query/expression/src/kernels/sort.rs | 20 ++- .../expression/src/kernels/sort_compare.rs | 117 +++++++++++++++--- src/query/expression/tests/it/sort.rs | 13 +- 3 files changed, 127 insertions(+), 23 deletions(-) diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index 867bd8c9c9ad..d26a1b7d3404 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -53,6 +53,22 @@ pub struct SortColumnDescription { pub is_nullable: bool, } +#[derive(Copy, Clone, Debug)] +pub enum LimitType { + None, + LimitRows(usize), + LimitRank(usize), +} + +impl LimitType { + pub fn limit_rows(&self, rows: usize) -> usize { + match self { + LimitType::LimitRows(limit) => *limit, + _ => rows, + } + } +} + #[derive(Clone, Debug)] pub struct SortField { pub data_type: DataType, @@ -78,7 +94,7 @@ impl DataBlock { pub fn sort( block: &DataBlock, descriptions: &[SortColumnDescription], - limit: Option, + limit: LimitType, ) -> Result { let num_rows = block.num_rows(); if num_rows <= 1 || block.num_columns() == 0 { @@ -217,7 +233,7 @@ pub fn compare_scalars(rows: Vec>, data_types: &[DataType]) -> Resul }) .collect::>(); - let mut sort_compare = SortCompare::new(descriptions, length, None); + let mut sort_compare = SortCompare::new(descriptions, length, LimitType::None); for array in order_columns { sort_compare.visit_value(Value::Column(array))?; diff --git a/src/query/expression/src/kernels/sort_compare.rs b/src/query/expression/src/kernels/sort_compare.rs index a7c2ddb5c167..35f265e8f4b5 100644 --- a/src/query/expression/src/kernels/sort_compare.rs +++ b/src/query/expression/src/kernels/sort_compare.rs @@ -25,11 +25,12 @@ use crate::types::NullableColumn; use crate::types::Number; use crate::types::ValueType; use crate::visitor::ValueVisitor; +use crate::LimitType; use crate::SortColumnDescription; pub struct SortCompare { rows: usize, - limit: Option, + limit: LimitType, permutation: Vec, ordering_descs: Vec, current_column_index: usize, @@ -95,16 +96,13 @@ macro_rules! do_sorter { } impl SortCompare { - pub fn new( - ordering_descs: Vec, - rows: usize, - limit: Option, - ) -> Self { - let equality_index = if ordering_descs.len() == 1 { - vec![] - } else { - vec![1; rows as _] - }; + pub fn new(ordering_descs: Vec, rows: usize, limit: LimitType) -> Self { + let equality_index = + if ordering_descs.len() == 1 && matches!(limit, LimitType::LimitRank(_)) { + vec![] + } else { + vec![1; rows as _] + }; Self { rows, limit, @@ -121,16 +119,59 @@ impl SortCompare { } pub fn take_permutation(mut self) -> Vec { - let limit = self.limit.unwrap_or(self.rows); - self.permutation.truncate(limit); - self.permutation + match self.limit { + LimitType::None => self.permutation, + LimitType::LimitRows(rows) => { + self.permutation.truncate(rows); + return self.permutation; + } + LimitType::LimitRank(rank_number) => { + let mut unique_count = 0; + + let mut start = 0; + // the index of last zero sign + let mut zero_index: isize = -1; + while start < self.rows { + // Find the first occurrence of 1 in the equality_index using memchr + if let Some(pos) = memchr(1, &self.equality_index[start..self.rows]) { + start += pos; + } else { + start = self.rows; + } + unique_count += (start as isize - zero_index) as usize; + + if unique_count > rank_number { + start -= unique_count - rank_number; + break; + } + + if start == self.rows { + break; + } + + // Find the first occurrence of 0 after the start position using memchr + if let Some(pos) = memchr(0, &self.equality_index[start..self.rows]) { + start += pos; + } else { + start = self.rows; + } + if unique_count == rank_number { + break; + } + zero_index = start as _; + } + + self.permutation.truncate(start); + self.permutation + } + } } fn do_inner_sort(&mut self, c: C, range: Range) where C: FnMut(&u32, &u32) -> Ordering + Copy { let permutations = &mut self.permutation[range.start..range.end]; - let limit = self.limit.unwrap_or(self.rows); + let limit = self.limit.limit_rows(self.rows); if limit > range.start && limit < range.end { let (p, _, _) = permutations.select_nth_unstable_by(limit - range.start, c); p.sort_unstable_by(c); @@ -139,13 +180,14 @@ impl SortCompare { } } - fn common_sort(&mut self, value: V, g: G, c: C) + // sort the value using generic G and C + fn generic_sort(&mut self, value: V, g: G, c: C) where G: Fn(V, u32) -> T + Copy, V: Copy, C: Fn(T, T) -> Ordering + Copy, { - let mut validity = self.validity.take(); + let validity = self.validity.take(); let ordering_desc = self.ordering_descs[self.current_column_index].clone(); // faster path for only one sort column @@ -177,7 +219,6 @@ impl SortCompare { }; let range = start - 1..end; - // Perform the inner sort on the found range do_sorter!(self, value, validity, g, c, ordering_desc, range); if need_update_equality_index { @@ -218,7 +259,7 @@ impl ValueVisitor for SortCompare { // faster path for numeric fn visit_number(&mut self, column: Buffer) -> Result<()> { let values = column.as_slice(); - self.common_sort(values, |c, idx| c[idx as usize], |a: T, b: T| a.cmp(&b)); + self.generic_sort(values, |c, idx| c[idx as usize], |a: T, b: T| a.cmp(&b)); Ok(()) } @@ -231,7 +272,7 @@ impl ValueVisitor for SortCompare { } fn visit_typed_column(&mut self, col: T::Column) -> Result<()> { - self.common_sort( + self.generic_sort( &col, |c, idx| -> T::ScalarRef<'_> { unsafe { T::index_column_unchecked(c, idx as _) } }, |a, b| T::compare(a, b), @@ -246,3 +287,39 @@ impl ValueVisitor for SortCompare { self.visit_column(column.column.clone()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_take_permutation() { + let test_cases1 = vec![ + (12, LimitType::None, 0..12), + (12, LimitType::LimitRows(5), 0..5), + ]; + + let test_cases2 = vec![ + (12, LimitType::LimitRank(5), 0..11), + (12, LimitType::LimitRank(3), 0..6), + (12, LimitType::LimitRank(4), 0..7), + (12, LimitType::LimitRank(5), 0..11), + ]; + + for (c, limit, range) in test_cases1 { + let sort_compare = SortCompare::new(vec![], c, limit); + + let permutation = sort_compare.take_permutation(); + let result: Vec = range.map(|c| c as u32).collect(); + assert_eq!(permutation, result); + } + + for (c, limit, range) in test_cases2 { + let mut sort_compare = SortCompare::new(vec![], c, limit); + sort_compare.equality_index = vec![1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 0]; + let permutation = sort_compare.take_permutation(); + let result: Vec = range.map(|c| c as u32).collect(); + assert_eq!(permutation, result); + } + } +} diff --git a/src/query/expression/tests/it/sort.rs b/src/query/expression/tests/it/sort.rs index f9fc04d2dc78..578036b78be4 100644 --- a/src/query/expression/tests/it/sort.rs +++ b/src/query/expression/tests/it/sort.rs @@ -22,6 +22,7 @@ use databend_common_expression::types::StringType; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::FromData; +use databend_common_expression::LimitType; use databend_common_expression::SortColumnDescription; use crate::common::new_block; @@ -101,6 +102,11 @@ fn test_block_sort() -> Result<()> { ]; for (sort_descs, limit, expected) in test_cases { + let limit = if let Some(l) = limit { + LimitType::LimitRows(l) + } else { + LimitType::None + }; let res = DataBlock::sort(&block, &sort_descs, limit)?; for (entry, expect) in res.columns().iter().zip(expected.iter()) { @@ -187,6 +193,11 @@ fn test_block_sort() -> Result<()> { ]; for (sort_descs, limit, expected) in test_cases { + let limit = if let Some(l) = limit { + LimitType::LimitRows(l) + } else { + LimitType::None + }; let res = DataBlock::sort(&decimal_block, &sort_descs, limit)?; for (entry, expect) in res.columns().iter().zip(expected.iter()) { @@ -201,7 +212,7 @@ fn test_block_sort() -> Result<()> { // test new sort algorithm let res = DataBlock::sort_old(&decimal_block, &sort_descs, Some(decimal_block.num_rows()))?; - let res_new = DataBlock::sort(&decimal_block, &sort_descs, None)?; + let res_new = DataBlock::sort(&decimal_block, &sort_descs, LimitType::None)?; assert_block_value_eq(&res, &res_new); } From 2e3be84aa16b5f4f99fe367711d1fba2ae890772 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 18 Sep 2024 08:26:00 +0800 Subject: [PATCH 11/11] update --- src/query/expression/src/kernels/sort.rs | 14 ++++++++++++++ src/query/expression/src/kernels/sort_compare.rs | 2 +- src/query/expression/tests/it/sort.rs | 13 +------------ 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index d26a1b7d3404..fece7246bd69 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -92,6 +92,20 @@ impl SortField { impl DataBlock { pub fn sort( + block: &DataBlock, + descriptions: &[SortColumnDescription], + limit: Option, + ) -> Result { + let limit = if let Some(l) = limit { + LimitType::LimitRows(l) + } else { + LimitType::None + }; + + Self::sort_with_type(block, descriptions, limit) + } + + pub fn sort_with_type( block: &DataBlock, descriptions: &[SortColumnDescription], limit: LimitType, diff --git a/src/query/expression/src/kernels/sort_compare.rs b/src/query/expression/src/kernels/sort_compare.rs index 35f265e8f4b5..a1df50931ed6 100644 --- a/src/query/expression/src/kernels/sort_compare.rs +++ b/src/query/expression/src/kernels/sort_compare.rs @@ -123,7 +123,7 @@ impl SortCompare { LimitType::None => self.permutation, LimitType::LimitRows(rows) => { self.permutation.truncate(rows); - return self.permutation; + self.permutation } LimitType::LimitRank(rank_number) => { let mut unique_count = 0; diff --git a/src/query/expression/tests/it/sort.rs b/src/query/expression/tests/it/sort.rs index 578036b78be4..f9fc04d2dc78 100644 --- a/src/query/expression/tests/it/sort.rs +++ b/src/query/expression/tests/it/sort.rs @@ -22,7 +22,6 @@ use databend_common_expression::types::StringType; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::FromData; -use databend_common_expression::LimitType; use databend_common_expression::SortColumnDescription; use crate::common::new_block; @@ -102,11 +101,6 @@ fn test_block_sort() -> Result<()> { ]; for (sort_descs, limit, expected) in test_cases { - let limit = if let Some(l) = limit { - LimitType::LimitRows(l) - } else { - LimitType::None - }; let res = DataBlock::sort(&block, &sort_descs, limit)?; for (entry, expect) in res.columns().iter().zip(expected.iter()) { @@ -193,11 +187,6 @@ fn test_block_sort() -> Result<()> { ]; for (sort_descs, limit, expected) in test_cases { - let limit = if let Some(l) = limit { - LimitType::LimitRows(l) - } else { - LimitType::None - }; let res = DataBlock::sort(&decimal_block, &sort_descs, limit)?; for (entry, expect) in res.columns().iter().zip(expected.iter()) { @@ -212,7 +201,7 @@ fn test_block_sort() -> Result<()> { // test new sort algorithm let res = DataBlock::sort_old(&decimal_block, &sort_descs, Some(decimal_block.num_rows()))?; - let res_new = DataBlock::sort(&decimal_block, &sort_descs, LimitType::None)?; + let res_new = DataBlock::sort(&decimal_block, &sort_descs, None)?; assert_block_value_eq(&res, &res_new); }