Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): sort kernel optimization #16458

Merged
merged 11 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/query/expression/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod group_by;
mod group_by_hash;
mod scatter;
mod sort;
mod sort_compare;
mod take;
mod take_chunks;
mod take_compact;
Expand All @@ -27,6 +28,7 @@ mod utils;

pub use group_by_hash::*;
pub use sort::*;
pub use sort_compare::*;
pub use take_chunks::*;
pub use topk::*;
pub use utils::*;
197 changes: 73 additions & 124 deletions src/query/expression/src/kernels/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,30 @@
// limitations under the License.

use std::cmp::Ordering;
use std::iter::once;
use std::sync::Arc;

use databend_common_arrow::arrow::array::ord as arrow_ord;
use databend_common_arrow::arrow::array::ord::DynComparator;
use databend_common_arrow::arrow::array::Array;
use databend_common_arrow::arrow::array::PrimitiveArray;
use databend_common_arrow::arrow::compute::merge_sort as arrow_merge_sort;
use databend_common_arrow::arrow::compute::merge_sort::build_comparator_impl;
use databend_common_arrow::arrow::compute::sort as arrow_sort;
use databend_common_arrow::arrow::datatypes::DataType as ArrowType;
use databend_common_arrow::arrow::error::Error as ArrowError;
use databend_common_arrow::arrow::error::Result as ArrowResult;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

use crate::converts::arrow2::ARROW_EXT_TYPE_EMPTY_ARRAY;
use crate::converts::arrow2::ARROW_EXT_TYPE_EMPTY_MAP;
use crate::converts::arrow2::ARROW_EXT_TYPE_VARIANT;
use crate::types::DataType;
use crate::utils::arrow::column_to_arrow_array;
use crate::visitor::ValueVisitor;
use crate::Column;
use crate::ColumnBuilder;
use crate::DataBlock;
use crate::Scalar;
use crate::SortCompare;
use crate::Value;

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;

Expand All @@ -54,6 +53,22 @@ pub struct SortColumnDescription {
pub is_nullable: bool,
}

#[derive(Copy, Clone, Debug)]
pub enum LimitType {
None,
LimitRows(usize),
LimitRank(usize),
}

impl LimitType {
pub fn limit_rows(&self, rows: usize) -> usize {
match self {
LimitType::LimitRows(limit) => *limit,
_ => rows,
}
}
}

#[derive(Clone, Debug)]
pub struct SortField {
pub data_type: DataType,
Expand All @@ -80,6 +95,43 @@ impl DataBlock {
block: &DataBlock,
descriptions: &[SortColumnDescription],
limit: Option<usize>,
) -> Result<DataBlock> {
let limit = if let Some(l) = limit {
LimitType::LimitRows(l)
} else {
LimitType::None
};

Self::sort_with_type(block, descriptions, limit)
}

pub fn sort_with_type(
block: &DataBlock,
descriptions: &[SortColumnDescription],
limit: LimitType,
) -> Result<DataBlock> {
let num_rows = block.num_rows();
if num_rows <= 1 || block.num_columns() == 0 {
return Ok(block.clone());
}
let mut sort_compare = SortCompare::new(descriptions.to_owned(), num_rows, limit);

for desc in descriptions.iter() {
let array = block.get_by_offset(desc.offset).value.clone();
sort_compare.visit_value(array)?;
sort_compare.increment_column_index();
}

let permutations = sort_compare.take_permutation();
DataBlock::take(block, &permutations, &mut None)
}

// TODO remove these
#[allow(dead_code)]
pub fn sort_old(
block: &DataBlock,
descriptions: &[SortColumnDescription],
limit: Option<usize>,
) -> Result<DataBlock> {
let num_rows = block.num_rows();
if num_rows <= 1 {
Expand All @@ -106,115 +158,6 @@ impl DataBlock {
arrow_sort::lexsort_to_indices_impl(&order_arrays, limit, &build_compare)?;
DataBlock::take(block, indices.values(), &mut None)
}

// merge two blocks to one sorted block
// require: lhs and rhs have been `convert_to_full`.
fn two_way_merge_sort(
blocks: &[DataBlock],
descriptions: &[SortColumnDescription],
limit: Option<usize>,
) -> Result<DataBlock> {
assert!(blocks.len() == 2);

let lhs = &blocks[0];
let rhs = &blocks[1];

let lhs_len = lhs.num_rows();
let rhs_len = rhs.num_rows();
if lhs_len == 0 {
return Ok(rhs.clone());
}
if rhs_len == 0 {
return Ok(lhs.clone());
}

let mut sort_options = Vec::with_capacity(descriptions.len());
let sort_arrays = descriptions
.iter()
.map(|d| {
let left = column_to_arrow_array(lhs.get_by_offset(d.offset), lhs_len);
let right = column_to_arrow_array(rhs.get_by_offset(d.offset), rhs_len);
sort_options.push(arrow_sort::SortOptions {
descending: !d.asc,
nulls_first: d.nulls_first,
});
vec![left, right]
})
.collect::<Vec<_>>();

let sort_dyn_arrays = sort_arrays
.iter()
.map(|f| vec![f[0].as_ref(), f[1].as_ref()])
.collect::<Vec<_>>();

let sort_options_with_arrays = sort_dyn_arrays
.iter()
.zip(sort_options.iter())
.map(|(arrays, opt)| (arrays as &[&dyn Array], opt))
.collect::<Vec<_>>();

let comparator = build_comparator_impl(&sort_options_with_arrays, &build_compare)?;
let lhs_slice = (0, 0, lhs_len);
let rhs_slice = (1, 0, rhs_len);

let slices =
arrow_merge_sort::merge_sort_slices(once(&lhs_slice), once(&rhs_slice), &comparator)
.to_vec(limit);

let block = DataBlock::take_by_slices_limit_from_blocks(blocks, &slices, limit);
Ok(block)
}

pub fn merge_sort(
blocks: &[DataBlock],
descriptions: &[SortColumnDescription],
limit: Option<usize>,
abort_checker: AbortChecker,
) -> Result<DataBlock> {
match blocks.len() {
0 => Result::Err(ErrorCode::EmptyData("Can't merge empty blocks")),
1 => Ok(blocks[0].clone()),
2 => {
if abort_checker.is_aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

DataBlock::two_way_merge_sort(blocks, descriptions, limit)
}
_ => {
if abort_checker.is_aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}
let left = DataBlock::merge_sort(
&blocks[0..blocks.len() / 2],
descriptions,
limit,
abort_checker.clone(),
)?;
if abort_checker.is_aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}
let right = DataBlock::merge_sort(
&blocks[blocks.len() / 2..blocks.len()],
descriptions,
limit,
abort_checker.clone(),
)?;
if abort_checker.is_aborting() {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}
DataBlock::two_way_merge_sort(&[left, right], descriptions, limit)
}
}
}
}

fn compare_variant(left: &dyn Array, right: &dyn Array) -> ArrowResult<DynComparator> {
Expand Down Expand Up @@ -290,20 +233,26 @@ pub fn compare_scalars(rows: Vec<Vec<Scalar>>, data_types: &[DataType]) -> Resul

let order_columns = columns
.into_iter()
.map(|builder| builder.build().as_arrow())
.map(|builder| builder.build())
.collect::<Vec<_>>();
let order_arrays = order_columns

let descriptions = order_columns
.iter()
.map(|array| arrow_sort::SortColumn {
values: array.as_ref(),
options: Some(arrow_sort::SortOptions {
descending: false,
nulls_first: false,
}),
.enumerate()
.map(|(idx, array)| SortColumnDescription {
offset: idx,
asc: true,
nulls_first: false,
is_nullable: array.data_type().is_nullable(),
})
.collect::<Vec<_>>();
let indices: PrimitiveArray<u32> =
arrow_sort::lexsort_to_indices_impl(&order_arrays, None, &build_compare)?;

Ok(indices.values().to_vec())
let mut sort_compare = SortCompare::new(descriptions, length, LimitType::None);

for array in order_columns {
sort_compare.visit_value(Value::Column(array))?;
sort_compare.increment_column_index();
}

Ok(sort_compare.take_permutation())
}
Loading
Loading