From 278fef10e0960656600aba72da740bd090ab4f79 Mon Sep 17 00:00:00 2001 From: Pavel Tiunov Date: Sun, 15 Dec 2024 16:25:34 -0800 Subject: [PATCH] feat(tesseract): Support multiple join paths within single query --- .../src/adapter/BaseQuery.js | 13 +- .../src/adapter/BigqueryQuery.ts | 1 + .../src/adapter/PostgresQuery.ts | 1 + .../src/adapter/SnowflakeQuery.ts | 1 + .../postgres/multi-fact-join.test.ts | 134 +++++++++++++++ .../src/cube_bridge/join_item.rs | 2 +- .../cubesqlplanner/src/plan/builder/join.rs | 41 +++-- .../cubesqlplanner/src/plan/filter.rs | 18 ++ .../cubesqlplanner/src/plan/join.rs | 10 +- .../cubesqlplanner/src/planner/base_query.rs | 16 +- .../full_key_query_aggregate_planner.rs | 18 +- .../src/planner/planners/join_planner.rs | 14 +- .../multi_stage/member_query_planner.rs | 3 + .../multiplied_measures_query_planner.rs | 46 +++++- .../planner/planners/simple_query_planer.rs | 4 +- .../src/planner/query_properties.rs | 155 +++++++++++++++++- .../cubesqlplanner/src/planner/query_tools.rs | 96 +++++++++-- .../multiplied_measures_collector.rs | 16 +- .../src/planner/sql_evaluator/compiler.rs | 1 + .../src/planner/sql_templates/plan.rs | 37 ++++- 20 files changed, 555 insertions(+), 72 deletions(-) create mode 100644 packages/cubejs-schema-compiler/test/integration/postgres/multi-fact-join.test.ts diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 34a9d2587ba26..2c71d23778ea3 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -261,7 +261,10 @@ export class BaseQuery { }).filter(R.identity).map(this.newTimeDimension.bind(this)); this.allFilters = this.timeDimensions.concat(this.segments).concat(this.filters); - this.join = this.joinGraph.buildJoin(this.allJoinHints); + if (!getEnv('nativeSqlPlanner')) { + // Tesseract doesn't require join to be prebuilt and there's a case where single join can't be built for multi-fact query + this.join = this.joinGraph.buildJoin(this.allJoinHints); + } this.cubeAliasPrefix = this.options.cubeAliasPrefix; this.preAggregationsSchemaOption = this.options.preAggregationsSchema ?? DEFAULT_PREAGGREGATIONS_SCHEMA; this.externalQueryClass = this.options.externalQueryClass; @@ -349,7 +352,8 @@ export class BaseQuery { initUngrouped() { this.ungrouped = this.options.ungrouped; if (this.ungrouped) { - if (!this.options.allowUngroupedWithoutPrimaryKey) { + // this.join is not defined for Tesseract + if (!this.options.allowUngroupedWithoutPrimaryKey && !getEnv('nativeSqlPlanner')) { const cubes = R.uniq([this.join.root].concat(this.join.joins.map(j => j.originalTo))); const primaryKeyNames = cubes.flatMap(c => this.primaryKeyNames(c)); const missingPrimaryKeys = primaryKeyNames.filter(key => !this.dimensions.find(d => d.dimension === key)); @@ -616,7 +620,6 @@ export class BaseQuery { dimensions: this.options.dimensions, timeDimensions: this.options.timeDimensions, timezone: this.options.timezone, - joinRoot: this.join.root, joinGraph: this.joinGraph, cubeEvaluator: this.cubeEvaluator, order, @@ -3312,6 +3315,7 @@ export class BaseQuery { always_true: '1 = 1' }, + operators: {}, quotes: { identifiers: '"', escape: '""' @@ -3321,7 +3325,8 @@ export class BaseQuery { }, join_types: { inner: 'INNER', - left: 'LEFT' + left: 'LEFT', + full: 'FULL', }, window_frame_types: { rows: 'ROWS', diff --git a/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts b/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts index 7c34a4a0d8e9a..07cc1759f6a2d 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts @@ -256,6 +256,7 @@ export class BigqueryQuery extends BaseQuery { templates.types.double = 'FLOAT64'; templates.types.decimal = 'BIGDECIMAL({{ precision }},{{ scale }})'; templates.types.binary = 'BYTES'; + templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM'; return templates; } } diff --git a/packages/cubejs-schema-compiler/src/adapter/PostgresQuery.ts b/packages/cubejs-schema-compiler/src/adapter/PostgresQuery.ts index 01f763e4339dd..818a2f3c92d09 100644 --- a/packages/cubejs-schema-compiler/src/adapter/PostgresQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/PostgresQuery.ts @@ -81,6 +81,7 @@ export class PostgresQuery extends BaseQuery { templates.types.float = 'REAL'; templates.types.double = 'DOUBLE PRECISION'; templates.types.binary = 'BYTEA'; + templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM'; return templates; } diff --git a/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts b/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts index 8d3e5a63dc85a..a59e139f9a2df 100644 --- a/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts @@ -115,6 +115,7 @@ export class SnowflakeQuery extends BaseQuery { templates.expressions.extract = 'EXTRACT({{ date_part }} FROM {{ expr }})'; templates.expressions.interval = 'INTERVAL \'{{ interval }}\''; templates.expressions.timestamp_literal = '\'{{ value }}\'::timestamp_tz'; + templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM'; delete templates.types.interval; return templates; } diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/multi-fact-join.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/multi-fact-join.test.ts new file mode 100644 index 0000000000000..444118489a1ce --- /dev/null +++ b/packages/cubejs-schema-compiler/test/integration/postgres/multi-fact-join.test.ts @@ -0,0 +1,134 @@ +import { PostgresQuery } from '../../../src/adapter/PostgresQuery'; +import { prepareCompiler } from '../../unit/PrepareCompiler'; +import { dbRunner } from './PostgresDBRunner'; +import { + getEnv, +} from '@cubejs-backend/shared'; + +describe('Multi-fact join', () => { + jest.setTimeout(200000); + + const { compiler, joinGraph, cubeEvaluator } = prepareCompiler(` +cube(\`orders\`, { + sql: \` + SELECT 79 AS id, 1 AS amount, 1 AS city_id UNION ALL + SELECT 80 AS id, 2 AS amount, 1 AS city_id UNION ALL + SELECT 81 AS id, 3 AS amount, 1 AS city_id UNION ALL + SELECT 82 AS id, 4 AS amount, 2 AS city_id UNION ALL + SELECT 83 AS id, 5 AS amount, 2 AS city_id UNION ALL + SELECT 84 AS id, 6 AS amount, 3 AS city_id + \`, + + joins: { + city: { + relationship: \`many_to_one\`, + sql: \`\${orders}.city_id = \${city}.id\`, + }, + }, + + measures: { + amount: { + sql: \`amount\`, + type: 'sum' + } + }, + + dimensions: { + id: { + sql: \`id\`, + type: \`number\`, + primaryKey: true, + }, + }, +}); + +cube(\`shipments\`, { + sql: \` + SELECT 100 AS id, 1 AS foo_id, 1 AS city_id UNION ALL + SELECT 101 AS id, 2 AS foo_id, 2 AS city_id UNION ALL + SELECT 102 AS id, 3 AS foo_id, 2 AS city_id UNION ALL + SELECT 103 AS id, 4 AS foo_id, 2 AS city_id UNION ALL + SELECT 104 AS id, 5 AS foo_id, 4 AS city_id + \`, + + joins: { + city: { + relationship: \`many_to_one\`, + sql: \`\${shipments}.city_id = \${city}.id\`, + }, + }, + + measures: { + count: { + type: \`count\` + }, + }, + + dimensions: { + id: { + sql: \`id\`, + type: \`number\`, + primaryKey: true, + shown: true + }, + } +}); + +cube(\`city\`, { + sql: \` + SELECT 1 AS id, 'San Francisco' AS name UNION ALL + SELECT 2 AS id, 'New York City' AS name + \`, + + dimensions: { + id: { + sql: \`id\`, + type: \`number\`, + primaryKey: true, + }, + + name: { + sql: \`\${CUBE}.name\`, + type: \`string\`, + }, + }, +}); + `); + + async function runQueryTest(q, expectedResult) { + if (!getEnv('nativeSqlPlanner')) { + return; + } + await compiler.compile(); + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, q); + + console.log(query.buildSqlAndParams()); + + const res = await dbRunner.testQuery(query.buildSqlAndParams()); + console.log(JSON.stringify(res)); + + expect(res).toEqual( + expectedResult + ); + } + + it('two regular sub-queries', async () => runQueryTest({ + measures: ['orders.amount', 'shipments.count'], + dimensions: [ + 'city.name' + ], + order: [{ id: 'city.name' }] + }, [{ + city__name: 'New York City', + orders__amount: '9', + shipments__count: '3', + }, { + city__name: 'San Francisco', + orders__amount: '6', + shipments__count: '1', + }, { + city__name: null, + orders__amount: '6', + shipments__count: '1', + }])); +}); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/join_item.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/join_item.rs index 2cdcd912f2b44..f3130d0ec7f6f 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/join_item.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/join_item.rs @@ -11,7 +11,7 @@ use std::any::Any; use std::marker::PhantomData; use std::rc::Rc; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)] pub struct JoinItemStatic { pub from: String, pub to: String, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/join.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/join.rs index 8b2abcb1cf587..22ea6911dcf07 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/join.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/join.rs @@ -1,3 +1,4 @@ +use crate::plan::join::JoinType; use crate::plan::{Join, JoinCondition, JoinItem, QueryPlan, Schema, Select, SingleAliasedSource}; use crate::planner::BaseCube; use std::rc::Rc; @@ -41,15 +42,19 @@ impl JoinBuilder { } pub fn left_join_subselect(&mut self, subquery: Rc, alias: String, on: JoinCondition) { - self.join_subselect(subquery, alias, on, true) + self.join_subselect(subquery, alias, on, JoinType::Inner) + } + + pub fn full_join_subselect(&mut self, subquery: Rc, alias: String, on: JoinCondition, - is_inner: bool, + join_type: JoinType, ) { let subquery = Rc::new(QueryPlan::Select(subquery)); let from = SingleAliasedSource::new_from_subquery(subquery, alias); - self.joins.push(JoinItem { from, on, is_inner }) + self.joins.push(JoinItem { + from, + on, + join_type, + }) } fn join_cube( @@ -105,10 +114,14 @@ impl JoinBuilder { cube: Rc, alias: Option, on: JoinCondition, - is_inner: bool, + join_type: JoinType, ) { let from = SingleAliasedSource::new_from_cube(cube, alias); - self.joins.push(JoinItem { from, on, is_inner }) + self.joins.push(JoinItem { + from, + on, + join_type, + }) } fn join_table_reference( @@ -117,9 +130,13 @@ impl JoinBuilder { schema: Rc, alias: Option, on: JoinCondition, - is_inner: bool, + join_type: JoinType, ) { let from = SingleAliasedSource::new_from_table_reference(reference, schema, alias); - self.joins.push(JoinItem { from, on, is_inner }) + self.joins.push(JoinItem { + from, + on, + join_type, + }) } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs index b7e59049cb373..123463e24075f 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs @@ -1,4 +1,5 @@ use crate::planner::filter::BaseFilter; +use crate::planner::sql_evaluator::MemberSymbol; use crate::planner::sql_templates::PlanSqlTemplates; use crate::planner::VisitorContext; use cubenativeutils::CubeError; @@ -79,6 +80,23 @@ impl FilterItem { }; Ok(res) } + + pub fn all_member_evaluators(&self) -> Vec> { + let mut result = Vec::new(); + self.find_all_member_evaluators(&mut result); + result + } + + pub fn find_all_member_evaluators(&self, result: &mut Vec>) { + match self { + FilterItem::Group(group) => { + for item in group.items.iter() { + item.find_all_member_evaluators(result) + } + } + FilterItem::Item(item) => result.push(item.member_evaluator().clone()), + } + } } impl Filter { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs index 3a19170443d80..96d7355544c66 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs @@ -179,7 +179,7 @@ impl JoinCondition { pub struct JoinItem { pub from: SingleAliasedSource, pub on: JoinCondition, - pub is_inner: bool, + pub join_type: JoinType, } pub struct Join { @@ -187,6 +187,12 @@ pub struct Join { pub joins: Vec, } +pub enum JoinType { + Inner, + Left, + Full, +} + impl JoinItem { pub fn to_sql( &self, @@ -197,7 +203,7 @@ impl JoinItem { let result = templates.join( &self.from.to_sql(templates, context)?, &on_sql, - self.is_inner, + &self.join_type, )?; Ok(result) } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs index 38a5001baa149..6e59585e4fbbf 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs @@ -44,8 +44,8 @@ impl BaseQuery { } pub fn build_sql_and_params(&self) -> Result, CubeError> { - let plan = self.build_sql_and_params_impl()?; let templates = PlanSqlTemplates::new(self.query_tools.templates_render()); + let plan = self.build_sql_and_params_impl(templates.clone())?; let sql = plan.to_sql(&templates)?; let (result_sql, params) = self.query_tools.build_sql_and_params(&sql, true)?; @@ -58,7 +58,7 @@ impl BaseQuery { Ok(result) } - fn build_sql_and_params_impl(&self) -> Result { + fn build_sql_and_params_impl(&self, templates: PlanSqlTemplates) -> Result { let mut nodes_factory = SqlNodesFactory::new(); if self.request.ungrouped() { @@ -73,15 +73,19 @@ impl BaseQuery { ); planner.plan() } else { + let request = self.request.clone(); let multiplied_measures_query_planner = MultipliedMeasuresQueryPlanner::new( self.query_tools.clone(), - self.request.clone(), + request.clone(), nodes_factory.clone(), ); let multi_stage_query_planner = - MultiStageQueryPlanner::new(self.query_tools.clone(), self.request.clone()); - let full_key_aggregate_planner = - FullKeyAggregateQueryPlanner::new(self.request.clone(), nodes_factory.clone()); + MultiStageQueryPlanner::new(self.query_tools.clone(), request.clone()); + let full_key_aggregate_planner = FullKeyAggregateQueryPlanner::new( + request.clone(), + nodes_factory.clone(), + templates, + ); let mut subqueries = multiplied_measures_query_planner.plan_queries()?; let (multi_stage_ctes, multi_stage_subqueries) = multi_stage_query_planner.plan_queries()?; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/full_key_query_aggregate_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/full_key_query_aggregate_planner.rs index 99ea934db4da5..4ec591e1baffc 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/full_key_query_aggregate_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/full_key_query_aggregate_planner.rs @@ -4,6 +4,7 @@ use crate::plan::{ }; use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; use crate::planner::sql_evaluator::ReferencesBuilder; +use crate::planner::sql_templates::PlanSqlTemplates; use crate::planner::BaseMeasure; use crate::planner::BaseMemberHelper; use crate::planner::QueryProperties; @@ -16,14 +17,21 @@ pub struct FullKeyAggregateQueryPlanner { query_properties: Rc, order_planner: OrderPlanner, context_factory: SqlNodesFactory, + plan_sql_templates: PlanSqlTemplates, } impl FullKeyAggregateQueryPlanner { - pub fn new(query_properties: Rc, context_factory: SqlNodesFactory) -> Self { + pub fn new( + query_properties: Rc, + context_factory: SqlNodesFactory, + // TODO get rid of this dependency + plan_sql_templates: PlanSqlTemplates, + ) -> Self { Self { order_planner: OrderPlanner::new(query_properties.clone()), query_properties, context_factory, + plan_sql_templates, } } @@ -56,6 +64,7 @@ impl FullKeyAggregateQueryPlanner { let right_alias = format!("q_{}", i); let left_schema = joins[i - 1].schema(); let right_schema = joins[i].schema(); + // TODO every next join should join to all previous dimensions through OR: q_0.a = q_1.a, q_0.a = q_2.a OR q_1.a = q_2.a, ... let conditions = dimensions_to_select .iter() .map(|dim| { @@ -73,7 +82,12 @@ impl FullKeyAggregateQueryPlanner { }) .collect_vec(); let on = JoinCondition::new_dimension_join(conditions, true); - join_builder.inner_join_subselect(join.clone(), format!("q_{}", i), on); + let next_alias = format!("q_{}", i); + if self.plan_sql_templates.supports_is_not_distinct_from() { + join_builder.inner_join_subselect(join.clone(), next_alias, on); + } else { + join_builder.full_join_subselect(join.clone(), next_alias, on); + } } let from = From::new_from_join(join_builder.build()); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/join_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/join_planner.rs index d087448a73743..6a0bab5378db3 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/join_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/join_planner.rs @@ -21,14 +21,6 @@ impl JoinPlanner { } } - pub fn make_join_node_with_prefix( - &self, - alias_prefix: &Option, /*TODO dimensions for subqueries*/ - ) -> Result, CubeError> { - let join = self.query_tools.cached_data().join()?.clone(); - self.make_join_node_impl(alias_prefix, join) - } - pub fn make_join_node_with_prefix_and_join_hints( &self, alias_prefix: &Option, /*TODO dimensions for subqueries*/ @@ -38,11 +30,7 @@ impl JoinPlanner { self.make_join_node_impl(alias_prefix, join) } - pub fn make_join_node(&self) -> Result, CubeError> { - self.make_join_node_with_prefix(&None) - } - - fn make_join_node_impl( + pub fn make_join_node_impl( &self, alias_prefix: &Option, join: Rc, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs index 461fbcc50bbae..5d906ecb83552 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs @@ -12,12 +12,14 @@ use crate::planner::planners::{ use crate::planner::query_tools::QueryTools; use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; use crate::planner::sql_evaluator::ReferencesBuilder; +use crate::planner::sql_templates::PlanSqlTemplates; use crate::planner::QueryProperties; use crate::planner::{BaseDimension, BaseMeasure, BaseMember, BaseMemberHelper, BaseTimeDimension}; use cubenativeutils::CubeError; use itertools::Itertools; use std::collections::HashMap; use std::rc::Rc; + pub struct MultiStageMemberQueryPlanner { query_tools: Rc, _query_properties: Rc, @@ -381,6 +383,7 @@ impl MultiStageMemberQueryPlanner { let full_key_aggregate_planner = FullKeyAggregateQueryPlanner::new( cte_query_properties.clone(), node_factory.clone(), + PlanSqlTemplates::new(self.query_tools.templates_render()), ); let subqueries = multiplied_measures_query_planner.plan_queries()?; let result = full_key_aggregate_planner.plan(subqueries, vec![])?; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs index 1d1152d276e91..5e4dcce5ab367 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs @@ -1,4 +1,5 @@ use super::{CommonUtils, JoinPlanner}; +use crate::cube_bridge::join_definition::JoinDefinition; use crate::plan::{ Expr, From, JoinBuilder, JoinCondition, MemberExpression, QualifiedColumnName, Select, SelectBuilder, @@ -50,8 +51,21 @@ impl MultipliedMeasuresQueryPlanner { let mut joins = Vec::new(); if !measures.regular_measures.is_empty() { - let regular_subquery = self.regular_measures_subquery(&measures.regular_measures)?; - joins.push(regular_subquery); + let join_multi_fact_groups = self + .query_properties + .compute_join_multi_fact_groups_with_measures(&measures.regular_measures)?; + for (i, (join, measures)) in join_multi_fact_groups.iter().enumerate() { + let regular_subquery = self.regular_measures_subquery( + measures, + join.clone(), + if i == 0 { + "main".to_string() + } else { + format!("main_{}", i) + }, + )?; + joins.push(regular_subquery); + } } for (cube_name, measures) in measures @@ -60,7 +74,22 @@ impl MultipliedMeasuresQueryPlanner { .into_iter() .into_group_map_by(|m| m.cube_name().clone()) { - let aggregate_subquery = self.aggregate_subquery(&cube_name, &measures)?; + let join_multi_fact_groups = self + .query_properties + .compute_join_multi_fact_groups_with_measures(&measures)?; + if join_multi_fact_groups.len() != 1 { + return Err(CubeError::internal( + format!( + "Expected just one multi-fact join group for aggregate measures but got multiple: {}", + join_multi_fact_groups.into_iter().map(|(_, measures)| format!("({})", measures.iter().map(|m| m.full_name()).join(", "))).join(", ") + ) + )); + } + let aggregate_subquery = self.aggregate_subquery( + &cube_name, + &measures, + join_multi_fact_groups.into_iter().next().unwrap().0, + )?; joins.push(aggregate_subquery); } Ok(joins) @@ -70,9 +99,10 @@ impl MultipliedMeasuresQueryPlanner { &self, key_cube_name: &String, measures: &Vec>, + key_join: Rc, ) -> Result, CubeError> { let primary_keys_dimensions = self.common_utils.primary_keys_dimensions(key_cube_name)?; - let keys_query = self.key_query(&primary_keys_dimensions, key_cube_name)?; + let keys_query = self.key_query(&primary_keys_dimensions, key_join, key_cube_name)?; let keys_query_alias = format!("keys"); let should_build_join_for_measure_select = self.check_should_build_join_for_measure_select(measures, key_cube_name)?; @@ -205,6 +235,7 @@ impl MultipliedMeasuresQueryPlanner { } Ok(false) } + fn aggregate_subquery_measure_join( &self, _key_cube_name: &String, @@ -230,10 +261,12 @@ impl MultipliedMeasuresQueryPlanner { fn regular_measures_subquery( &self, measures: &Vec>, + join: Rc, + alias_prefix: String, ) -> Result, CubeError> { let source = self .join_planner - .make_join_node_with_prefix(&Some(format!("main")))?; + .make_join_node_impl(&Some(alias_prefix), join)?; let mut select_builder = SelectBuilder::new(source.clone()); let mut context_factory = self.context_factory.clone(); @@ -259,11 +292,12 @@ impl MultipliedMeasuresQueryPlanner { fn key_query( &self, dimensions: &Vec>, + key_join: Rc, key_cube_name: &String, ) -> Result, CubeError> { let source = self .join_planner - .make_join_node_with_prefix(&Some(format!("{}_key", key_cube_name)))?; + .make_join_node_impl(&Some(format!("{}_key", key_cube_name)), key_join)?; let dimensions = self .query_properties .dimensions_for_select_append(dimensions); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs index 9229314a0cc51..9ed024a96dd04 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs @@ -36,7 +36,9 @@ impl SimpleQueryPlanner { }) }; let mut context_factory = self.context_factory.clone(); - let from = self.join_planner.make_join_node()?; + let from = self + .join_planner + .make_join_node_impl(&None, self.query_properties.simple_query_join()?)?; let mut select_builder = SelectBuilder::new(from.clone()); for time_dim in self.query_properties.time_dimensions() { if let Some(granularity) = time_dim.get_granularity() { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs index 54b31fa40ed20..4550603762411 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs @@ -2,6 +2,7 @@ use super::filter::compiler::FilterCompiler; use super::query_tools::QueryTools; use super::{BaseDimension, BaseMeasure, BaseMember, BaseMemberHelper, BaseTimeDimension}; use crate::cube_bridge::base_query_options::BaseQueryOptions; +use crate::cube_bridge::join_definition::JoinDefinition; use crate::plan::{Expr, Filter, FilterItem, MemberExpression}; use crate::planner::sql_evaluator::collectors::{ collect_multiplied_measures, has_cumulative_members, has_multi_stage_members, @@ -62,6 +63,7 @@ pub struct QueryProperties { query_tools: Rc, ignore_cumulative: bool, ungrouped: bool, + multi_fact_join_groups: Vec<(Rc, Vec>)>, } impl QueryProperties { @@ -127,9 +129,6 @@ impl QueryProperties { let (dimensions_filters, time_dimensions_filters, measures_filters) = filter_compiler.extract_result(); - let all_join_hints = evaluator_compiler.join_hints()?; - let join = query_tools.join_graph().build_join(all_join_hints)?; - query_tools.cached_data_mut().set_join(join); //FIXME may be this filter should be applied on other place let time_dimensions = time_dimensions .into_iter() @@ -157,6 +156,16 @@ impl QueryProperties { }; let ungrouped = options.static_data().ungrouped.unwrap_or(false); + let multi_fact_join_groups = Self::compute_join_multi_fact_groups( + query_tools.clone(), + &measures, + &dimensions, + &time_dimensions, + &time_dimensions_filters, + &dimensions_filters, + &measures_filters, + )?; + Ok(Rc::new(Self { measures, dimensions, @@ -170,6 +179,7 @@ impl QueryProperties { query_tools, ignore_cumulative: false, ungrouped, + multi_fact_join_groups, })) } @@ -193,6 +203,16 @@ impl QueryProperties { order_by }; + let multi_fact_join_groups = Self::compute_join_multi_fact_groups( + query_tools.clone(), + &measures, + &dimensions, + &time_dimensions, + &time_dimensions_filters, + &dimensions_filters, + &measures_filters, + )?; + Ok(Rc::new(Self { measures, dimensions, @@ -206,9 +226,122 @@ impl QueryProperties { query_tools, ignore_cumulative, ungrouped, + multi_fact_join_groups, })) } + pub fn compute_join_multi_fact_groups_with_measures( + &self, + measures: &Vec>, + ) -> Result, Vec>)>, CubeError> { + Self::compute_join_multi_fact_groups( + self.query_tools.clone(), + measures, + &self.dimensions, + &self.time_dimensions, + &self.time_dimensions_filters, + &self.dimensions_filters, + &self.measures_filters, + ) + } + + pub fn compute_join_multi_fact_groups( + query_tools: Rc, + measures: &Vec>, + dimensions: &Vec>, + time_dimensions: &Vec>, + time_dimensions_filters: &Vec, + dimensions_filters: &Vec, + measures_filters: &Vec, + ) -> Result, Vec>)>, CubeError> { + let dimensions_join_hints = query_tools + .cached_data_mut() + .join_hints_for_base_member_vec(&dimensions)?; + let time_dimensions_join_hints = query_tools + .cached_data_mut() + .join_hints_for_base_member_vec(&time_dimensions)?; + let time_dimensions_filters_join_hints = query_tools + .cached_data_mut() + .join_hints_for_filter_item_vec(&time_dimensions_filters)?; + let dimensions_filters_join_hints = query_tools + .cached_data_mut() + .join_hints_for_filter_item_vec(&dimensions_filters)?; + let measures_filters_join_hints = query_tools + .cached_data_mut() + .join_hints_for_filter_item_vec(&measures_filters)?; + + let mut dimension_and_filter_join_hints_concat = Vec::new(); + + dimension_and_filter_join_hints_concat.extend(dimensions_join_hints.into_iter()); + dimension_and_filter_join_hints_concat.extend(time_dimensions_join_hints.into_iter()); + dimension_and_filter_join_hints_concat + .extend(time_dimensions_filters_join_hints.into_iter()); + dimension_and_filter_join_hints_concat.extend(dimensions_filters_join_hints.into_iter()); + // TODO This is not quite correct. Decide on how to handle it. Keeping it here just to blow up on unsupported case + dimension_and_filter_join_hints_concat.extend(measures_filters_join_hints.into_iter()); + + let measures_to_join = if measures.is_empty() { + let join = query_tools + .cached_data_mut() + .join_by_hints(dimension_and_filter_join_hints_concat.clone(), |hints| { + query_tools.join_graph().build_join(hints) + })?; + vec![(Vec::new(), join)] + } else { + measures + .iter() + .map(|m| -> Result<_, CubeError> { + let measure_join_hints = query_tools + .cached_data_mut() + .join_hints_for_member(m.member_evaluator())?; + let join = query_tools.cached_data_mut().join_by_hints( + dimension_and_filter_join_hints_concat + .clone() + .into_iter() + .chain(vec![measure_join_hints].into_iter()) + .collect::>(), + |hints| query_tools.join_graph().build_join(hints), + )?; + Ok((vec![m.clone()], join)) + }) + .collect::, _>>()? + }; + Ok(measures_to_join + .into_iter() + .into_group_map_by(|(_, (key, _))| key.clone()) + .into_values() + .map(|measures_and_join| { + ( + measures_and_join.iter().next().unwrap().1 .1.clone(), + measures_and_join + .into_iter() + .flat_map(|m| m.0) + .collect::>(), + ) + }) + .collect()) + } + + pub fn is_multi_fact_join(&self) -> bool { + self.multi_fact_join_groups.len() > 1 + } + + pub fn simple_query_join(&self) -> Result, CubeError> { + if self.multi_fact_join_groups.len() != 1 { + return Err(CubeError::internal(format!( + "Expected just one multi-fact join group for simple query but got multiple: {}", + self.multi_fact_join_groups + .iter() + .map(|(_, measures)| format!( + "({})", + measures.iter().map(|m| m.full_name()).join(", ") + )) + .join(", ") + ))); + } + Ok(self.multi_fact_join_groups.iter().next().unwrap().0.clone()) + } + pub fn measures(&self) -> &Vec> { &self.measures } @@ -390,6 +523,7 @@ impl QueryProperties { let full_aggregate_measure = self.full_key_aggregate_measures()?; if full_aggregate_measure.multiplied_measures.is_empty() && full_aggregate_measure.multi_stage_measures.is_empty() + && !self.is_multi_fact_join() { Ok(true) } else { @@ -413,9 +547,18 @@ impl QueryProperties { if has_multi_stage_members(m.member_evaluator(), self.ignore_cumulative)? { result.multi_stage_measures.push(m.clone()) } else { - for item in - collect_multiplied_measures(self.query_tools.clone(), m.member_evaluator())? - { + let join = self + .compute_join_multi_fact_groups_with_measures(&vec![m.clone()])? + .iter() + .next() + .expect("No join groups returned for single measure multi-fact join group") + .0 + .clone(); + for item in collect_multiplied_measures( + self.query_tools.clone(), + m.member_evaluator(), + join, + )? { if item.multiplied { result.multiplied_measures.push(item.measure.clone()); } else { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs index b2cb0497606c8..dfa07de467437 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs @@ -1,10 +1,13 @@ -use super::sql_evaluator::Compiler; -use super::ParamsAllocator; +use super::sql_evaluator::{Compiler, MemberSymbol}; +use super::{BaseMember, ParamsAllocator}; use crate::cube_bridge::base_tools::BaseTools; use crate::cube_bridge::evaluator::CubeEvaluator; use crate::cube_bridge::join_definition::JoinDefinition; use crate::cube_bridge::join_graph::JoinGraph; +use crate::cube_bridge::join_item::JoinItemStatic; use crate::cube_bridge::sql_templates_render::SqlTemplatesRender; +use crate::plan::FilterItem; +use crate::planner::sql_evaluator::collectors::collect_join_hints; use chrono_tz::Tz; use convert_case::{Case, Casing}; use cubenativeutils::CubeError; @@ -12,25 +15,98 @@ use itertools::Itertools; use lazy_static::lazy_static; use regex::Regex; use std::cell::{Ref, RefCell, RefMut}; +use std::collections::HashMap; use std::rc::Rc; pub struct QueryToolsCachedData { - join: Option>, + join_hints: HashMap>>, + join_hints_to_join_key: HashMap>>, Rc>, + join_key_to_join: HashMap, Rc>, } +#[derive(Hash, PartialEq, Eq)] +pub struct JoinKey(Vec); + impl QueryToolsCachedData { pub fn new() -> Self { - Self { join: None } + Self { + join_hints: HashMap::new(), + join_hints_to_join_key: HashMap::new(), + join_key_to_join: HashMap::new(), + } + } + + pub fn join_hints_for_member( + &mut self, + node: &Rc, + ) -> Result>, CubeError> { + let full_name = node.full_name(); + if let Some(val) = self.join_hints.get(&full_name) { + Ok(val.clone()) + } else { + let join_hints = Rc::new(collect_join_hints(node)?); + self.join_hints.insert(full_name, join_hints.clone()); + Ok(join_hints) + } } - pub fn join(&self) -> Result, CubeError> { - self.join.clone().ok_or(CubeError::internal( - "Join not set in QueryToolsCachedData".to_string(), - )) + pub fn join_hints_for_base_member_vec( + &mut self, + vec: &Vec>, + ) -> Result>>, CubeError> { + vec.iter() + .map(|b| self.join_hints_for_member(&b.member_evaluator())) + .collect::, _>>() } - pub fn set_join(&mut self, join: Rc) { - self.join = Some(join); + pub fn join_hints_for_member_symbol_vec( + &mut self, + vec: &Vec>, + ) -> Result>>, CubeError> { + vec.iter() + .map(|b| self.join_hints_for_member(b)) + .collect::, _>>() + } + + pub fn join_hints_for_filter_item_vec( + &mut self, + vec: &Vec, + ) -> Result>>, CubeError> { + let mut member_symbols = Vec::new(); + for i in vec.iter() { + i.find_all_member_evaluators(&mut member_symbols); + } + member_symbols + .iter() + .map(|b| self.join_hints_for_member(b)) + .collect::, _>>() + } + + pub fn join_by_hints( + &mut self, + hints: Vec>>, + join_fn: impl FnOnce(Vec) -> Result, CubeError>, + ) -> Result<(Rc, Rc), CubeError> { + if let Some(key) = self.join_hints_to_join_key.get(&hints) { + Ok((key.clone(), self.join_key_to_join.get(key).unwrap().clone())) + } else { + let join = join_fn( + hints + .iter() + .flat_map(|h| h.as_ref().iter().cloned()) + .collect(), + )?; + let join_key = Rc::new(JoinKey( + join.joins()? + .items() + .iter() + .map(|i| i.static_data().clone()) + .collect(), + )); + self.join_hints_to_join_key.insert(hints, join_key.clone()); + self.join_key_to_join.insert(join_key.clone(), join.clone()); + Ok((join_key, join)) + } } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/collectors/multiplied_measures_collector.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/collectors/multiplied_measures_collector.rs index c285bff19379f..62de3b9914d73 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/collectors/multiplied_measures_collector.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/collectors/multiplied_measures_collector.rs @@ -1,3 +1,4 @@ +use crate::cube_bridge::join_definition::JoinDefinition; use crate::planner::query_tools::QueryTools; use crate::planner::sql_evaluator::{MemberSymbol, TraversalVisitor}; use crate::planner::BaseMeasure; @@ -65,13 +66,19 @@ pub struct MultipliedMeasuresCollector { query_tools: Rc, composite_measures: HashSet, colllected_measures: Vec, + join: Rc, } impl MultipliedMeasuresCollector { - pub fn new(query_tools: Rc, composite_measures: HashSet) -> Self { + pub fn new( + query_tools: Rc, + composite_measures: HashSet, + join: Rc, + ) -> Self { Self { query_tools, composite_measures, + join, colllected_measures: vec![], } } @@ -91,8 +98,8 @@ impl TraversalVisitor for MultipliedMeasuresCollector { let res = match node.as_ref() { MemberSymbol::Measure(e) => { let full_name = e.full_name(); - let join = self.query_tools.cached_data().join()?; - let multiplied = join + let multiplied = self + .join .static_data() .multiplication_factor .get(e.cube_name()) @@ -123,11 +130,12 @@ impl TraversalVisitor for MultipliedMeasuresCollector { pub fn collect_multiplied_measures( query_tools: Rc, node: &Rc, + join: Rc, ) -> Result, CubeError> { let mut composite_collector = CompositeMeasuresCollector::new(); composite_collector.apply(node, &CompositeMeasureCollectorState::new(None))?; let composite_measures = composite_collector.extract_result(); - let mut visitor = MultipliedMeasuresCollector::new(query_tools, composite_measures); + let mut visitor = MultipliedMeasuresCollector::new(query_tools, composite_measures, join); visitor.apply(node, &())?; Ok(visitor.extract_result()) } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs index c23f085fbee99..9e87582b8c7d1 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs @@ -12,6 +12,7 @@ use std::collections::HashMap; use std::rc::Rc; pub struct Compiler { cube_evaluator: Rc, + /* (type, name) */ members: HashMap<(String, String), Rc>, } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/plan.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/plan.rs index 26b12acacd25a..d00ebafa4caba 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/plan.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/plan.rs @@ -1,10 +1,12 @@ use super::{TemplateGroupByColumn, TemplateOrderByColumn, TemplateProjectionColumn}; use crate::cube_bridge::sql_templates_render::SqlTemplatesRender; +use crate::plan::join::JoinType; use convert_case::{Case, Casing}; use cubenativeutils::CubeError; use minijinja::context; use std::rc::Rc; +#[derive(Clone)] pub struct PlanSqlTemplates { render: Rc, } @@ -74,6 +76,7 @@ impl PlanSqlTemplates { context! { expr => expr, negate => negate }, ) } + pub fn always_true(&self) -> Result { Ok(self.render.get_template("filters/always_true")?.clone()) } @@ -168,11 +171,16 @@ impl PlanSqlTemplates { ) } - pub fn join(&self, source: &str, condition: &str, is_inner: bool) -> Result { - let join_type = if is_inner { - self.render.get_template("join_types/inner")? - } else { - self.render.get_template("join_types/left")? + pub fn join( + &self, + source: &str, + condition: &str, + join_type: &JoinType, + ) -> Result { + let join_type = match join_type { + JoinType::Full => self.render.get_template("join_types/full")?, + JoinType::Inner => self.render.get_template("join_types/inner")?, + JoinType::Left => self.render.get_template("join_types/left")?, }; self.render.render_template( "statements/join", @@ -180,6 +188,13 @@ impl PlanSqlTemplates { ) } + pub fn binary_expr(&self, left: &str, op: &str, right: &str) -> Result { + self.render.render_template( + "expressions/binary", + context! { left => left, op => op, right => right }, + ) + } + pub fn join_by_dimension_conditions( &self, left_column: &String, @@ -187,6 +202,13 @@ impl PlanSqlTemplates { null_check: bool, ) -> Result { let null_check = if null_check { + if self.supports_is_not_distinct_from() { + let is_not_distinct_from_op = self + .render + .render_template("operators/is_not_distinct_from", context! {})?; + + return self.binary_expr(left_column, &is_not_distinct_from_op, right_column); + } format!( " OR ({} AND {})", self.is_null_expr(&left_column, false)?, @@ -201,4 +223,9 @@ impl PlanSqlTemplates { left_column, right_column, null_check )) } + + pub fn supports_is_not_distinct_from(&self) -> bool { + self.render + .contains_template("operators/is_not_distinct_from") + } }