Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.edge.Edge;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.PlanCheckContext;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.PredicateCollectorContext;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.Plan;
Expand All @@ -27,6 +30,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.nereids.util.Utils;

Expand All @@ -39,26 +43,42 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;

/**
* HyperGraph Node.
*/
public class StructInfoNode extends AbstractNode {
private final List<Set<Expression>> expressions;
private final Pair<List<Set<Expression>>, List<Set<Expression>>> expressions;
private final Set<CatalogRelation> relationSet;

/**
* the constructor of StructInfoNode
*/
public StructInfoNode(int index, Plan plan, List<Edge> edges) {
super(extractPlan(plan), index, edges);
relationSet = plan.collect(CatalogRelation.class::isInstance);
expressions = collectExpressions(plan);
// check the node pattern is valid
PlanCheckContext checkContext = PlanCheckContext.of(ImmutableSet.of());
Boolean checkResult = plan.accept(StructInfo.PLAN_PATTERN_CHECKER, checkContext);
if (checkResult && !checkContext.isWindowUnderAggregate() && checkContext.getTopAggregateNum() <= 1
&& checkContext.getTopWindowNum() <= 1) {
expressions = collectExpressions(plan);
} else {
expressions = null;
}
}

public StructInfoNode(int index, Plan plan) {
this(index, plan, new ArrayList<>());
}

private @Nullable List<Set<Expression>> collectExpressions(Plan plan) {
/**
* The key of pair is the expression which could be moved around anywhere,
* the value of pair is the expression which could not be moved around anywhere.
* */
private @Nullable Pair<List<Set<Expression>>, List<Set<Expression>>> collectExpressions(Plan plan) {

Pair<Boolean, Builder<Set<Expression>>> collector = Pair.of(true, ImmutableList.builder());
plan.accept(new DefaultPlanVisitor<Void, Pair<Boolean, ImmutableList.Builder<Set<Expression>>>>() {
Expand All @@ -69,7 +89,6 @@ public Void visitLogicalAggregate(LogicalAggregate<? extends Plan> aggregate,
return null;
}
collector.value().add(ImmutableSet.copyOf(aggregate.getExpressions()));
collector.value().add(ImmutableSet.copyOf(((LogicalAggregate<?>) plan).getGroupByExpressions()));
return super.visit(aggregate, collector);
}

Expand All @@ -93,6 +112,13 @@ public Void visitGroupPlan(GroupPlan groupPlan,
return groupActualPlan.accept(this, collector);
}

@Override
public Void visitLogicalWindow(LogicalWindow<? extends Plan> window,
Pair<Boolean, Builder<Set<Expression>>> context) {
collector.value().add(ImmutableSet.copyOf(window.getActualWindowExpressions()));
return super.visit(window, context);
}

@Override
public Void visit(Plan plan, Pair<Boolean, ImmutableList.Builder<Set<Expression>>> context) {
if (!isValidNodePlan(plan)) {
Expand All @@ -102,24 +128,52 @@ public Void visit(Plan plan, Pair<Boolean, ImmutableList.Builder<Set<Expression>
return super.visit(plan, context);
}
}, collector);
return collector.key() ? collector.value().build() : null;

if (!collector.key()) {
return null;
}
PredicateCollectorContext predicateCollectorContext = new PredicateCollectorContext();
plan.accept(StructInfo.PREDICATE_COLLECTOR, predicateCollectorContext);
collector.value().add(ImmutableSet.copyOf(predicateCollectorContext.getCouldPullUpPredicates()));
if (predicateCollectorContext.getCouldNotPullUpPredicates().isEmpty()) {
return Pair.of(collector.value().build(), ImmutableList.of());
}
return Pair.of(collector.value().build(),
ImmutableList.of(predicateCollectorContext.getCouldNotPullUpPredicates()));
}

private boolean isValidNodePlan(Plan plan) {
return plan instanceof LogicalProject || plan instanceof LogicalAggregate
|| plan instanceof LogicalFilter || plan instanceof LogicalCatalogRelation;
|| plan instanceof LogicalFilter || plan instanceof LogicalCatalogRelation
|| plan instanceof LogicalWindow;
}

/**
* get all expressions of nodes
*/
public @Nullable List<Expression> getExpressions() {
return expressions == null ? null : expressions.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
if (expressions == null) {
return null;
}
return Stream.concat(expressions.key().stream().flatMap(Collection::stream),
expressions.value().stream().flatMap(Collection::stream)).collect(Collectors.toList());
}

public @Nullable List<Expression> getCouldMoveExpressions() {
if (expressions == null) {
return null;
}
return expressions.key().stream().flatMap(Collection::stream).collect(Collectors.toList());
}

public @Nullable List<Expression> getCouldNotMoveExpressions() {
if (expressions == null) {
return null;
}
return expressions.value().stream().flatMap(Collection::stream).collect(Collectors.toList());
}

public @Nullable List<Set<Expression>> getExprSetList() {
public @Nullable Pair<List<Set<Expression>>, List<Set<Expression>>> getExprSetList() {
return expressions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewProjectFilterScanRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewProjectJoinRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewProjectScanRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewWindowAggregateRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewWindowJoinRule;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewWindowScanRule;
import org.apache.doris.nereids.rules.expression.ExpressionNormalizationAndOptimization;
import org.apache.doris.nereids.rules.implementation.AggregateStrategies;
import org.apache.doris.nereids.rules.implementation.LogicalAssertNumRowsToPhysicalAssertNumRows;
Expand Down Expand Up @@ -268,6 +271,9 @@ public class RuleSet {
.add(MaterializedViewProjectFilterScanRule.INSTANCE)
.add(MaterializedViewAggregateOnNoneAggregateRule.INSTANCE)
.add(MaterializedViewOnlyScanRule.INSTANCE)
.add(MaterializedViewWindowScanRule.INSTANCE)
.add(MaterializedViewWindowJoinRule.INSTANCE)
.add(MaterializedViewWindowAggregateRule.INSTANCE)
.build();

public static final List<Rule> MATERIALIZED_VIEW_IN_RBO_RULES = planRuleFactories()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,27 @@ public enum RuleType {
MATERIALIZED_VIEW_FILTER_PROJECT_JOIN(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_ONLY_JOIN(RuleTypeClass.MATERIALIZE_VIEW),

// window above join without aggregate
MATERIALIZED_VIEW_PROJECT_WINDOW_JOIN(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_FILTER_WINDOW_JOIN(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_PROJECT_FILTER_WINDOW_JOIN(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_FILTER_PROJECT_WINDOW_JOIN(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_ONLY_WINDOW_JOIN(RuleTypeClass.MATERIALIZE_VIEW),

// window above scan without aggregate
MATERIALIZED_VIEW_FILTER_WINDOW_SCAN(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_PROJECT_WINDOW_SCAN(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_FILTER_PROJECT_WINDOW_SCAN(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_PROJECT_FILTER_WINDOW_SCAN(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_ONLY_WINDOW_SCAN(RuleTypeClass.MATERIALIZE_VIEW),

// window with aggregate
MATERIALIZED_VIEW_PROJECT_WINDOW_AGGREGATE(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_FILTER_WINDOW_AGGREGATE(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_PROJECT_FILTER_WINDOW_AGGREGATE(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_FILTER_PROJECT_WINDOW_AGGREGATE(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_ONLY_WINDOW_AGGREGATE(RuleTypeClass.MATERIALIZE_VIEW),

MATERIALIZED_VIEW_PROJECT_AGGREGATE(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_FILTER_AGGREGATE(RuleTypeClass.MATERIALIZE_VIEW),
MATERIALIZED_VIEW_PROJECT_FILTER_AGGREGATE(RuleTypeClass.MATERIALIZE_VIEW),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,8 @@ protected boolean checkQueryPattern(StructInfo structInfo, CascadesContext casca
PlanCheckContext checkContext = PlanCheckContext.of(SUPPORTED_JOIN_TYPE_SET);
// if query or mv contains more then one top aggregate, should fail
return structInfo.getTopPlan().accept(StructInfo.PLAN_PATTERN_CHECKER, checkContext)
&& checkContext.isContainsTopAggregate() && checkContext.getTopAggregateNum() <= 1;
&& checkContext.isContainsTopAggregate() && checkContext.getTopAggregateNum() <= 1
&& !checkContext.isContainsTopWindow();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
protected boolean checkQueryPattern(StructInfo structInfo, CascadesContext cascadesContext) {
PlanCheckContext checkContext = PlanCheckContext.of(SUPPORTED_JOIN_TYPE_SET);
return structInfo.getTopPlan().accept(StructInfo.PLAN_PATTERN_CHECKER, checkContext)
&& !checkContext.isContainsTopAggregate();
&& !checkContext.isContainsTopAggregate() && !checkContext.isContainsTopWindow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
rewrittenPlan = new LogicalFilter<>(Sets.newLinkedHashSet(rewriteCompensatePredicates), mvScan);
}
boolean checkResult = rewriteQueryByViewPreCheck(matchMode, queryStructInfo,
viewStructInfo, viewToQuerySlotMapping, rewrittenPlan, materializationContext);
viewStructInfo, viewToQuerySlotMapping, rewrittenPlan, materializationContext,
comparisonResult);
if (!checkResult) {
continue;
}
Expand Down Expand Up @@ -517,7 +518,7 @@ protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>>
*/
protected boolean rewriteQueryByViewPreCheck(MatchMode matchMode, StructInfo queryStructInfo,
StructInfo viewStructInfo, SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan,
MaterializationContext materializationContext) {
MaterializationContext materializationContext, ComparisonResult comparisonResult) {
if (materializationContext instanceof SyncMaterializationContext
&& queryStructInfo.getBottomPlan() instanceof LogicalOlapScan) {
LogicalOlapScan olapScan = (LogicalOlapScan) queryStructInfo.getBottomPlan();
Expand Down Expand Up @@ -741,13 +742,13 @@ protected SplitPredicate predicatesCompensate(
// set pulled up expression to queryStructInfo predicates and update related predicates
if (!queryPulledUpExpressions.isEmpty()) {
queryStructInfo = queryStructInfo.withPredicates(
queryStructInfo.getPredicates().merge(queryPulledUpExpressions));
queryStructInfo.getPredicates().mergePulledUpPredicates(queryPulledUpExpressions));
}
List<Expression> viewPulledUpExpressions = ImmutableList.copyOf(comparisonResult.getViewExpressions());
// set pulled up expression to viewStructInfo predicates and update related predicates
if (!viewPulledUpExpressions.isEmpty()) {
viewStructInfo = viewStructInfo.withPredicates(
viewStructInfo.getPredicates().merge(viewPulledUpExpressions));
viewStructInfo.getPredicates().mergePulledUpPredicates(viewPulledUpExpressions));
}
// if the join type in query and mv plan is different, we should check query is have the
// filters which rejects null
Expand All @@ -760,8 +761,8 @@ protected SplitPredicate predicatesCompensate(
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, queryStructInfo,
viewStructInfo, cascadesContext);
if (!valid) {
queryStructInfo = queryStructInfo.withPredicates(
queryStructInfo.getPredicates().merge(comparisonResult.getQueryAllPulledUpExpressions()));
queryStructInfo = queryStructInfo.withPredicates(queryStructInfo.getPredicates()
.mergePulledUpPredicates(comparisonResult.getQueryAllPulledUpExpressions()));
valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping,
queryStructInfo, viewStructInfo, cascadesContext);
Expand All @@ -770,6 +771,13 @@ protected SplitPredicate predicatesCompensate(
return SplitPredicate.INVALID_INSTANCE;
}
}
// compensate couldNot PulledUp Conjunctions
Map<Expression, ExpressionInfo> couldNotPulledUpCompensateConjunctions =
Predicates.compensateCouldNotPullUpPredicates(queryStructInfo, viewStructInfo,
viewToQuerySlotMapping, comparisonResult);
if (couldNotPulledUpCompensateConjunctions == null) {
return SplitPredicate.INVALID_INSTANCE;
}
// viewEquivalenceClass to query based
// equal predicate compensate
final Map<Expression, ExpressionInfo> equalCompensateConjunctions = Predicates.compensateEquivalence(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
protected boolean checkQueryPattern(StructInfo structInfo, CascadesContext cascadesContext) {
PlanCheckContext checkContext = PlanCheckContext.of(ImmutableSet.of());
return structInfo.getTopPlan().accept(StructInfo.SCAN_PLAN_PATTERN_CHECKER, checkContext)
&& !checkContext.isContainsTopAggregate();
&& !checkContext.isContainsTopAggregate() && !checkContext.isContainsTopWindow();
}
}
Loading
Loading