Skip to content

Commit

Permalink
refactor(query): sort kernel optimization (#16458)
Browse files Browse the repository at this point in the history
* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update
  • Loading branch information
sundy-li authored Sep 18, 2024
1 parent f613f1c commit 3605978
Show file tree
Hide file tree
Showing 26 changed files with 679 additions and 288 deletions.
2 changes: 2 additions & 0 deletions src/query/expression/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod group_by;
mod group_by_hash;
mod scatter;
mod sort;
mod sort_compare;
mod take;
mod take_chunks;
mod take_compact;
Expand All @@ -27,6 +28,7 @@ mod utils;

pub use group_by_hash::*;
pub use sort::*;
pub use sort_compare::*;
pub use take_chunks::*;
pub use topk::*;
pub use utils::*;
197 changes: 73 additions & 124 deletions src/query/expression/src/kernels/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,30 @@
// limitations under the License.

use std::cmp::Ordering;
use std::iter::once;
use std::sync::Arc;

use databend_common_arrow::arrow::array::ord as arrow_ord;
use databend_common_arrow::arrow::array::ord::DynComparator;
use databend_common_arrow::arrow::array::Array;
use databend_common_arrow::arrow::array::PrimitiveArray;
use databend_common_arrow::arrow::compute::merge_sort as arrow_merge_sort;
use databend_common_arrow::arrow::compute::merge_sort::build_comparator_impl;
use databend_common_arrow::arrow::compute::sort as arrow_sort;
use databend_common_arrow::arrow::datatypes::DataType as ArrowType;
use databend_common_arrow::arrow::error::Error as ArrowError;
use databend_common_arrow::arrow::error::Result as ArrowResult;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

use crate::converts::arrow2::ARROW_EXT_TYPE_EMPTY_ARRAY;
use crate::converts::arrow2::ARROW_EXT_TYPE_EMPTY_MAP;
use crate::converts::arrow2::ARROW_EXT_TYPE_VARIANT;
use crate::types::DataType;
use crate::utils::arrow::column_to_arrow_array;
use crate::visitor::ValueVisitor;
use crate::Column;
use crate::ColumnBuilder;
use crate::DataBlock;
use crate::Scalar;
use crate::SortCompare;
use crate::Value;

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;

Expand All @@ -54,6 +53,22 @@ pub struct SortColumnDescription {
pub is_nullable: bool,
}

#[derive(Copy, Clone, Debug)]
pub enum LimitType {
None,
LimitRows(usize),
LimitRank(usize),
}

impl LimitType {
pub fn limit_rows(&self, rows: usize) -> usize {
match self {
LimitType::LimitRows(limit) => *limit,
_ => rows,
}
}
}

#[derive(Clone, Debug)]
pub struct SortField {
pub data_type: DataType,
Expand All @@ -80,6 +95,43 @@ impl DataBlock {
block: &DataBlock,
descriptions: &[SortColumnDescription],
limit: Option<usize>,
) -> Result<DataBlock> {
let limit = if let Some(l) = limit {
LimitType::LimitRows(l)
} else {
LimitType::None
};

Self::sort_with_type(block, descriptions, limit)
}

pub fn sort_with_type(
block: &DataBlock,
descriptions: &[SortColumnDescription],
limit: LimitType,
) -> Result<DataBlock> {
let num_rows = block.num_rows();
if num_rows <= 1 || block.num_columns() == 0 {
return Ok(block.clone());
}
let mut sort_compare = SortCompare::new(descriptions.to_owned(), num_rows, limit);

for desc in descriptions.iter() {
let array = block.get_by_offset(desc.offset).value.clone();
sort_compare.visit_value(array)?;
sort_compare.increment_column_index();
}

let permutations = sort_compare.take_permutation();
DataBlock::take(block, &permutations, &mut None)
}

// TODO remove these
#[allow(dead_code)]
pub fn sort_old(
block: &DataBlock,
descriptions: &[SortColumnDescription],
limit: Option<usize>,
) -> Result<DataBlock> {
let num_rows = block.num_rows();
if num_rows <= 1 {
Expand All @@ -106,115 +158,6 @@ impl DataBlock {
arrow_sort::lexsort_to_indices_impl(&order_arrays, limit, &build_compare)?;
DataBlock::take(block, indices.values(), &mut None)
}

// merge two blocks to one sorted block
// require: lhs and rhs have been `convert_to_full`.
fn two_way_merge_sort(
blocks: &[DataBlock],
descriptions: &[SortColumnDescription],
limit: Option<usize>,
) -> Result<DataBlock> {
assert!(blocks.len() == 2);

let lhs = &blocks[0];
let rhs = &blocks[1];

let lhs_len = lhs.num_rows();
let rhs_len = rhs.num_rows();
if lhs_len == 0 {
return Ok(rhs.clone());
}
if rhs_len == 0 {
return Ok(lhs.clone());
}

let mut sort_options = Vec::with_capacity(descriptions.len());
let sort_arrays = descriptions
.iter()
.map(|d| {
let left = column_to_arrow_array(lhs.get_by_offset(d.offset), lhs_len);
let right = column_to_arrow_array(rhs.get_by_offset(d.offset), rhs_len);
sort_options.push(arrow_sort::SortOptions {
descending: !d.asc,
nulls_first: d.nulls_first,
});
vec![left, right]
})
.collect::<Vec<_>>();

let sort_dyn_arrays = sort_arrays
.iter()
.map(|f| vec![f[0].as_ref(), f[1].as_ref()])
.collect::<Vec<_>>();

let sort_options_with_arrays = sort_dyn_arrays
.iter()
.zip(sort_options.iter())
.map(|(arrays, opt)| (arrays as &[&dyn Array], opt))
.collect::<Vec<_>>();

let comparator = build_comparator_impl(&sort_options_with_arrays, &build_compare)?;
let lhs_slice = (0, 0, lhs_len);
let rhs_slice = (1, 0, rhs_len);

let slices =
arrow_merge_sort::merge_sort_slices(once(&lhs_slice), once(&rhs_slice), &comparator)
.to_vec(limit);

let block = DataBlock::take_by_slices_limit_from_blocks(blocks, &slices, limit);
Ok(block)
}

pub fn merge_sort(
blocks: &[DataBlock],
descriptions: &[SortColumnDescription],
limit: Option<usize>,
abort_checker: AbortChecker,
) -> Result<DataBlock> {
match blocks.len() {
0 => Result::Err(ErrorCode::EmptyData("Can't merge empty blocks")),
1 => Ok(blocks[0].clone()),
2 => {
if abort_checker.is_aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

DataBlock::two_way_merge_sort(blocks, descriptions, limit)
}
_ => {
if abort_checker.is_aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}
let left = DataBlock::merge_sort(
&blocks[0..blocks.len() / 2],
descriptions,
limit,
abort_checker.clone(),
)?;
if abort_checker.is_aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}
let right = DataBlock::merge_sort(
&blocks[blocks.len() / 2..blocks.len()],
descriptions,
limit,
abort_checker.clone(),
)?;
if abort_checker.is_aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}
DataBlock::two_way_merge_sort(&[left, right], descriptions, limit)
}
}
}
}

fn compare_variant(left: &dyn Array, right: &dyn Array) -> ArrowResult<DynComparator> {
Expand Down Expand Up @@ -290,20 +233,26 @@ pub fn compare_scalars(rows: Vec<Vec<Scalar>>, data_types: &[DataType]) -> Resul

let order_columns = columns
.into_iter()
.map(|builder| builder.build().as_arrow())
.map(|builder| builder.build())
.collect::<Vec<_>>();
let order_arrays = order_columns

let descriptions = order_columns
.iter()
.map(|array| arrow_sort::SortColumn {
values: array.as_ref(),
options: Some(arrow_sort::SortOptions {
descending: false,
nulls_first: false,
}),
.enumerate()
.map(|(idx, array)| SortColumnDescription {
offset: idx,
asc: true,
nulls_first: false,
is_nullable: array.data_type().is_nullable(),
})
.collect::<Vec<_>>();
let indices: PrimitiveArray<u32> =
arrow_sort::lexsort_to_indices_impl(&order_arrays, None, &build_compare)?;

Ok(indices.values().to_vec())
let mut sort_compare = SortCompare::new(descriptions, length, LimitType::None);

for array in order_columns {
sort_compare.visit_value(Value::Column(array))?;
sort_compare.increment_column_index();
}

Ok(sort_compare.take_permutation())
}
Loading

0 comments on commit 3605978

Please sign in to comment.