Skip to content

Commit 89ce6b7

Browse files
committed
perf(cubestore): Update DataFusion pointer to new GroupsAccumulator changes
* Replaces Accumulator usage in grouped hash aggregation with GroupsAccumulator API * Makes all accumulators in DF grouped hash aggregation use GroupsAccumulatorFlatAdapter * Uses PrimitiveGroupsAccumulator and CountGroupsAccumulator for sum, min, max, and count for faster performance * Improves memory layout and traversal order of group by keys saved during grouped aggregation.
1 parent f61afc3 commit 89ce6b7

File tree

3 files changed

+9
-8
lines changed

3 files changed

+9
-8
lines changed

rust/cubestore/Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3144,14 +3144,14 @@ async fn planning_inplace_aggregate2(service: Box<dyn SqlClient>) {
31443144
pp_phys_plan_ext(p.router.as_ref(), &verbose),
31453145
"Projection, [url, SUM(Data.hits)@1:hits]\
31463146
\n AggregateTopK, limit: 10, sortBy: [2 desc null last]\
3147-
\n ClusterSend, partitions: [[1, 2]]"
3147+
\n ClusterSend, partitions: [[1, 2]], sort_order: [1]"
31483148
);
31493149
assert_eq!(
31503150
pp_phys_plan_ext(p.worker.as_ref(), &verbose),
31513151
"Projection, [url, SUM(Data.hits)@1:hits]\
31523152
\n AggregateTopK, limit: 10, sortBy: [2 desc null last]\
3153-
\n Worker\
3154-
\n Sort, by: [SUM(hits)@1 desc nulls last]\
3153+
\n Worker, sort_order: [1]\
3154+
\n Sort, by: [SUM(hits)@1 desc nulls last], sort_order: [1]\
31553155
\n FullInplaceAggregate, sort_order: [0]\
31563156
\n MergeSort, single_vals: [0, 1], sort_order: [0, 1, 2]\
31573157
\n Union, single_vals: [0, 1], sort_order: [0, 1, 2]\

rust/cubestore/cubestore/src/metastore/table.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use byteorder::{BigEndian, WriteBytesExt};
1111
use chrono::DateTime;
1212
use chrono::Utc;
1313
use datafusion::arrow::datatypes::Schema as ArrowSchema;
14-
use datafusion::physical_plan::expressions::{Column as FusionColumn, Max, Min, Sum};
14+
use datafusion::physical_plan::expressions::{sum_return_type, Column as FusionColumn, Max, Min, Sum};
1515
use datafusion::physical_plan::{udaf, AggregateExpr, PhysicalExpr};
1616
use itertools::Itertools;
1717

@@ -76,7 +76,8 @@ impl AggregateColumn {
7676
)?);
7777
let res: Arc<dyn AggregateExpr> = match self.function {
7878
AggregateFunction::SUM => {
79-
Arc::new(Sum::new(col.clone(), col.name(), col.data_type(schema)?))
79+
let input_data_type = col.data_type(schema)?;
80+
Arc::new(Sum::new(col.clone(), col.name(), sum_return_type(&input_data_type)?, &input_data_type))
8081
}
8182
AggregateFunction::MAX => {
8283
Arc::new(Max::new(col.clone(), col.name(), col.data_type(schema)?))

0 commit comments

Comments
 (0)