Skip to content

Commit

Permalink
fix(cubesql): Calculate proper limit and offset for CubeScan in neste…
Browse files Browse the repository at this point in the history
…d limits case
  • Loading branch information
mcheshkov committed Jan 10, 2025
1 parent 9bdd638 commit 17bdb11
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 27 deletions.
102 changes: 75 additions & 27 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ impl RewriteRules for MemberRules {
"?members",
"?filters",
"?orders",
"?cube_fetch",
"?offset",
"?inner_fetch",
"?inner_skip",
"?split",
"?can_pushdown_join",
"CubeScanWrapped:false",
Expand All @@ -260,7 +260,14 @@ impl RewriteRules for MemberRules {
"CubeScanWrapped:false",
"?ungrouped",
),
self.push_down_limit("?skip", "?fetch", "?new_skip", "?new_fetch"),
self.push_down_limit(
"?skip",
"?fetch",
"?inner_skip",
"?inner_fetch",
"?new_skip",
"?new_fetch",
),
),
// MOD function to binary expr
transforming_rewrite_with_root(
Expand Down Expand Up @@ -1597,47 +1604,88 @@ impl MemberRules {
&self,
skip_var: &'static str,
fetch_var: &'static str,
inner_skip_var: &'static str,
inner_fetch_var: &'static str,
new_skip_var: &'static str,
new_fetch_var: &'static str,
) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool {
let skip_var = var!(skip_var);
let fetch_var = var!(fetch_var);
let inner_skip_var = var!(inner_skip_var);
let inner_fetch_var = var!(inner_fetch_var);
let new_skip_var = var!(new_skip_var);
let new_fetch_var = var!(new_fetch_var);
move |egraph, subst| {
// This transform expects only single value in every (eclass, kind)
// No two different values of fetch or skip should ever get unified

let mut skip_value = None;
for skip in var_iter!(egraph[subst[skip_var]], LimitSkip) {
if skip.unwrap_or_default() > 0 {
skip_value = *skip;
break;
}
skip_value = *skip;
break;
}
let mut fetch_value = None;
for fetch in var_iter!(egraph[subst[fetch_var]], LimitFetch) {
if fetch.unwrap_or_default() > 0 {
fetch_value = *fetch;
break;
}
fetch_value = *fetch;
break;
}
// TODO support this case
if fetch_value == Some(0) {
// Broken and unsupported case for now
return false;
}

if skip_value.is_some() || fetch_value.is_some() {
subst.insert(
new_skip_var,
egraph.add(LogicalPlanLanguage::CubeScanOffset(CubeScanOffset(
skip_value,
))),
);
subst.insert(
new_fetch_var,
egraph.add(LogicalPlanLanguage::CubeScanLimit(CubeScanLimit(
fetch_value,
))),
);

return true;
let mut inner_skip_value = None;
for inner_skip in var_iter!(egraph[subst[inner_skip_var]], CubeScanOffset) {
inner_skip_value = *inner_skip;
break;
}

false
let mut inner_fetch_value = None;
for inner_fetch in var_iter!(egraph[subst[inner_fetch_var]], CubeScanLimit) {
inner_fetch_value = *inner_fetch;
break;
}

let new_skip = match (skip_value, inner_skip_value) {
(None, None) => None,
(Some(skip), None) | (None, Some(skip)) => Some(skip),
(Some(outer_skip), Some(inner_skip)) => Some(outer_skip + inner_skip),
};
let new_fetch = match (fetch_value, inner_fetch_value) {
(None, None) => None,
// Inner node have no limit, maybe just offset, result limit is same as for outer node
(Some(outer_fetch), None) => Some(outer_fetch),
// Outer node have no limit, but may have offset
// First, inner offset would apply
// Then inner node would limit rows
// Then outer offset would apply, which would yield no more than `inner_fetch - outer_skip` rows
(None, Some(inner_fetch)) => {
Some(inner_fetch.saturating_sub(skip_value.unwrap_or(0)))

Check warning on line 1664 in rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs#L1663-L1664

Added lines #L1663 - L1664 were not covered by tests
}
// Both nodes have a limit
// First, inner offset would apply
// Then inner node would limit rows
// Then outer offset would apply, which would yield no more than `in_limit - out_offset` rows
// Then outer limit would apply, which would yield no more than minimal of two
(Some(outer_fetch), Some(inner_fetch)) => Some(usize::min(
inner_fetch.saturating_sub(skip_value.unwrap_or(0)),
outer_fetch,
)),
};

subst.insert(
new_skip_var,
egraph.add(LogicalPlanLanguage::CubeScanOffset(CubeScanOffset(
new_skip,
))),
);
subst.insert(
new_fetch_var,
egraph.add(LogicalPlanLanguage::CubeScanLimit(CubeScanLimit(new_fetch))),
);

true
}
}

Expand Down
2 changes: 2 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub mod test_bi_workarounds;
#[cfg(test)]
pub mod test_cube_join;
#[cfg(test)]
pub mod test_cube_scan;
#[cfg(test)]
pub mod test_df_execution;
#[cfg(test)]
pub mod test_introspection;
Expand Down
226 changes: 226 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/test_cube_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
use cubeclient::models::V1LoadRequestQuery;
use pretty_assertions::assert_eq;

use crate::compile::{
test::{convert_select_to_query_plan, init_testing_logger, utils::LogicalPlanTestUtils},
DatabaseProtocol,
};

/// LIMIT n OFFSET m should be pushed to CubeScan
#[tokio::test]
async fn cubescan_limit_offset() {
init_testing_logger();

let query_plan = convert_select_to_query_plan(
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
LIMIT 2
OFFSET 3
"#
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(2),
offset: Some(3),
..Default::default()
}
);
}

/// LIMIT over LIMIT should be pushed to single CubeScan
#[tokio::test]
async fn cubescan_limit_limit() {
init_testing_logger();

let variants = vec![
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
LIMIT 3
) scan
LIMIT 2
"#,
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
LIMIT 2
) scan
LIMIT 3
"#,
];

for variant in variants {
let query_plan =
convert_select_to_query_plan(variant.to_string(), DatabaseProtocol::PostgreSQL).await;

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(2),
..Default::default()
}
);
}
}

/// OFFSET over OFFSET should be pushed to single CubeScan
#[tokio::test]
async fn cubescan_offset_offset() {
init_testing_logger();

let variants = vec![
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
OFFSET 3
) scan
OFFSET 2
"#,
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
OFFSET 2
) scan
OFFSET 3
"#,
];

for variant in variants {
let query_plan =
convert_select_to_query_plan(variant.to_string(), DatabaseProtocol::PostgreSQL).await;

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
segments: Some(vec![]),
order: Some(vec![]),
offset: Some(5),
..Default::default()
}
);
}
}

/// LIMIT OFFSET over LIMIT OFFSET should be pushed to single CubeScan with a proper values
#[tokio::test]
async fn cubescan_limit_offset_limit_offset() {
init_testing_logger();

let variants = vec![
(
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
LIMIT 3
OFFSET 3
) scan
LIMIT 2
OFFSET 2
"#,
1,
),
(
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
LIMIT 10
OFFSET 3
) scan
LIMIT 2
OFFSET 2
"#,
2,
),
];

for (variant, limit) in variants {
let query_plan =
convert_select_to_query_plan(variant.to_string(), DatabaseProtocol::PostgreSQL).await;

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(limit),
offset: Some(5),
..Default::default()
}
);
}
}

0 comments on commit 17bdb11

Please sign in to comment.