Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent Early Ordering Pushdown to Enable Aggregation Pushdown to MySQL #16278

Merged
merged 14 commits into from
Jul 4, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -369,14 +369,14 @@ func pushAggregationThroughApplyJoin(ctx *plancontext.PlanningContext, rootAggr

columns := &applyJoinColumns{}
output, err := splitAggrColumnsToLeftAndRight(ctx, rootAggr, join, !join.JoinType.IsInner(), columns, lhs, rhs)
join.JoinColumns = columns
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug was not related to the rest of the PR, but it caused tests to fail, so it had to go.

if err != nil {
// if we get this error, we just abort the splitting and fall back on simpler ways of solving the same query
if errors.Is(err, errAbortAggrPushing) {
return nil, nil
}
panic(err)
}
join.JoinColumns = columns

splitGroupingToLeftAndRight(ctx, rootAggr, lhs, rhs, join.JoinColumns)

Expand Down
73 changes: 34 additions & 39 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,11 @@ func (aj *ApplyJoin) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
return aj.LHS.GetOrdering(ctx)
}

func joinColumnToExpr(column applyJoinColumn) sqlparser.Expr {
return column.Original
}

func (aj *ApplyJoin) getJoinColumnFor(ctx *plancontext.PlanningContext, orig *sqlparser.AliasedExpr, e sqlparser.Expr, addToGroupBy bool) (col applyJoinColumn) {
defer func() {
col.Original = orig.Expr
}()
lhs := TableID(aj.LHS)
rhs := TableID(aj.RHS)
both := lhs.Merge(rhs)
deps := ctx.SemTable.RecursiveDeps(e)
col.GroupBy = addToGroupBy

switch {
case deps.IsSolvedBy(lhs):
Expand All @@ -224,9 +216,12 @@ func (aj *ApplyJoin) getJoinColumnFor(ctx *plancontext.PlanningContext, orig *sq
case deps.IsSolvedBy(both):
col = breakExpressionInLHSandRHS(ctx, e, TableID(aj.LHS))
default:
panic(vterrors.VT13002(sqlparser.String(e)))
panic(vterrors.VT13001(fmt.Sprintf("expression depends on tables outside this join: %s", sqlparser.String(e))))
}

col.GroupBy = addToGroupBy
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another bug that was not related to the issue being found, but it caused some plans to be missing group by expressions

col.Original = orig.Expr

return
}

Expand Down Expand Up @@ -323,33 +318,15 @@ func (aj *ApplyJoin) planOffsets(ctx *plancontext.PlanningContext) Operator {
}

func (aj *ApplyJoin) planOffsetFor(ctx *plancontext.PlanningContext, col applyJoinColumn) {
if col.DTColName != nil {
// If DTColName is set, then we already pushed the parts of the expression down while planning.
// We need to use this name and ask the correct side of the join for it. Nothing else is required.
if col.IsPureLeft() {
offset := aj.LHS.AddColumn(ctx, true, col.GroupBy, aeWrap(col.DTColName))
aj.addOffset(ToLeftOffset(offset))
} else {
for _, lhsExpr := range col.LHSExprs {
offset := aj.LHS.AddColumn(ctx, true, col.GroupBy, aeWrap(lhsExpr.Expr))
aj.Vars[lhsExpr.Name] = offset
}
offset := aj.RHS.AddColumn(ctx, true, col.GroupBy, aeWrap(col.DTColName))
aj.addOffset(ToRightOffset(offset))
}
return
}
for _, lhsExpr := range col.LHSExprs {
offset := aj.LHS.AddColumn(ctx, true, col.GroupBy, aeWrap(lhsExpr.Expr))
if col.RHSExpr == nil {
// if we don't have an RHS expr, it means that this is a pure LHS expression
aj.addOffset(ToLeftOffset(offset))
} else {
if col.IsPureLeft() {
offset := aj.LHS.AddColumn(ctx, true, col.GroupBy, aeWrap(col.GetPureLeftExpr()))
aj.addOffset(ToLeftOffset(offset))
} else {
for _, lhsExpr := range col.LHSExprs {
offset := aj.LHS.AddColumn(ctx, true, col.GroupBy, aeWrap(lhsExpr.Expr))
aj.Vars[lhsExpr.Name] = offset
}
}
if col.RHSExpr != nil {
offset := aj.RHS.AddColumn(ctx, true, col.GroupBy, aeWrap(col.RHSExpr))
offset := aj.RHS.AddColumn(ctx, true, col.GroupBy, aeWrap(col.GetRHSExpr()))
aj.addOffset(ToRightOffset(offset))
}
}
Expand Down Expand Up @@ -443,17 +420,17 @@ func (aj *ApplyJoin) findOrAddColNameBindVarName(ctx *plancontext.PlanningContex
return bvName
}

func (a *ApplyJoin) LHSColumnsNeeded(ctx *plancontext.PlanningContext) (needed sqlparser.Exprs) {
func (aj *ApplyJoin) LHSColumnsNeeded(ctx *plancontext.PlanningContext) (needed sqlparser.Exprs) {
f := func(from BindVarExpr) sqlparser.Expr {
return from.Expr
}
for _, jc := range a.JoinColumns.columns {
for _, jc := range aj.JoinColumns.columns {
needed = append(needed, slice.Map(jc.LHSExprs, f)...)
}
for _, jc := range a.JoinPredicates.columns {
for _, jc := range aj.JoinPredicates.columns {
needed = append(needed, slice.Map(jc.LHSExprs, f)...)
}
needed = append(needed, slice.Map(a.ExtraLHSVars, f)...)
needed = append(needed, slice.Map(aj.ExtraLHSVars, f)...)
return ctx.SemTable.Uniquify(needed)
}

Expand All @@ -462,7 +439,11 @@ func (jc applyJoinColumn) String() string {
lhs := slice.Map(jc.LHSExprs, func(e BindVarExpr) string {
return sqlparser.String(e.Expr)
})
return fmt.Sprintf("[%s | %s | %s]", strings.Join(lhs, ", "), rhs, sqlparser.String(jc.Original))
if jc.DTColName == nil {
return fmt.Sprintf("[%s | %s | %s]", strings.Join(lhs, ", "), rhs, sqlparser.String(jc.Original))
}

return fmt.Sprintf("[%s | %s | %s | %s]", strings.Join(lhs, ", "), rhs, sqlparser.String(jc.Original), sqlparser.String(jc.DTColName))
}

func (jc applyJoinColumn) IsPureLeft() bool {
Expand All @@ -477,6 +458,20 @@ func (jc applyJoinColumn) IsMixedLeftAndRight() bool {
return len(jc.LHSExprs) > 0 && jc.RHSExpr != nil
}

func (jc applyJoinColumn) GetPureLeftExpr() sqlparser.Expr {
if jc.DTColName != nil {
return jc.DTColName
}
return jc.LHSExprs[0].Expr
}

func (jc applyJoinColumn) GetRHSExpr() sqlparser.Expr {
if jc.DTColName != nil {
return jc.DTColName
}
return jc.RHSExpr
}

func (bve BindVarExpr) String() string {
if bve.Name == "" {
return sqlparser.String(bve.Expr)
Expand Down
10 changes: 7 additions & 3 deletions go/vt/vtgate/planbuilder/operators/projection_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,11 @@ func pushProjectionInApplyJoin(
src.LHS = createProjectionWithTheseColumns(ctx, src.LHS, lhs, p.DT)
src.RHS = createProjectionWithTheseColumns(ctx, src.RHS, rhs, p.DT)

return src, Rewrote("split projection to either side of join")
message := "split projection to either side of join"
if p.DT != nil {
message += " " + p.DT.Alias
}
return src, Rewrote(message)
}

// splitProjectionAcrossJoin creates JoinPredicates for all projections,
Expand Down Expand Up @@ -396,14 +400,14 @@ func exposeColumnsThroughDerivedTable(ctx *plancontext.PlanningContext, p *Proje

lhsIDs := TableID(src.LHS)
rhsIDs := TableID(src.RHS)
rewriteColumnsForJoin(ctx, src.JoinPredicates.columns, lhsIDs, rhsIDs, lhs, rhs)
rewriteColumnsForJoin(ctx, src.JoinPredicates.columns, lhsIDs, rhsIDs, lhs)
}

func rewriteColumnsForJoin(
ctx *plancontext.PlanningContext,
columns []applyJoinColumn,
lhsIDs, rhsIDs semantics.TableSet,
lhs, rhs *projector,
lhs *projector,
) {
for colIdx, column := range columns {
for lhsIdx, bve := range column.LHSExprs {
Expand Down
27 changes: 14 additions & 13 deletions go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,24 +608,31 @@ ordering:
return true
}

// pushOrderingUnderAggr pushes the ORDER BY clause under the aggregation if possible,
// to optimize the query plan by aligning the GROUP BY and ORDER BY clauses and
// potentially removing redundant ORDER BY clauses.
func pushOrderingUnderAggr(ctx *plancontext.PlanningContext, order *Ordering, aggregator *Aggregator) (Operator, *ApplyResult) {
// If Aggregator is a derived table, then we should rewrite the ordering before pushing.
// Avoid pushing down too early to allow for aggregation pushdown to MySQL
if !reachedPhase(ctx, delegateAggregation) {
return order, NoRewrite
}

// Rewrite ORDER BY if Aggregator is a derived table
if aggregator.isDerived() {
for idx, orderExpr := range order.Order {
ti, err := ctx.SemTable.TableInfoFor(aggregator.DT.TableID)
if err != nil {
panic(err)
}
// Rewrite expressions in ORDER BY to match derived table columns
newOrderExpr := orderExpr.Map(func(expr sqlparser.Expr) sqlparser.Expr {
return semantics.RewriteDerivedTableExpression(expr, ti)
})
order.Order[idx] = newOrderExpr
}
}

// Step 1: Align the GROUP BY and ORDER BY.
// Reorder the GROUP BY columns to match the ORDER BY columns.
// Since the GB clause is a set, we can reorder these columns freely.
// Align GROUP BY with ORDER BY by reordering GROUP BY columns to match ORDER BY
var newGrouping []GroupBy
used := make([]bool, len(aggregator.Grouping))
for _, orderExpr := range order.Order {
Expand All @@ -637,11 +644,8 @@ func pushOrderingUnderAggr(ctx *plancontext.PlanningContext, order *Ordering, ag
}
}

// Step 2: Add any missing columns from the ORDER BY.
// The ORDER BY column is not a set, but we can add more elements
// to the end without changing the semantics of the query.
// Add any missing GROUP BY columns to ORDER BY
if len(newGrouping) != len(aggregator.Grouping) {
// we are missing some groupings. We need to add them both to the new groupings list, but also to the ORDER BY
for i, added := range used {
if !added {
groupBy := aggregator.Grouping[i]
Expand All @@ -654,19 +658,16 @@ func pushOrderingUnderAggr(ctx *plancontext.PlanningContext, order *Ordering, ag
aggregator.Grouping = newGrouping
aggrSource, isOrdering := aggregator.Source.(*Ordering)
if isOrdering {
// Transform the query plan tree:
// Optimize query plan by removing redundant ORDER BY
// From: Ordering(1) To: Aggregation
// | |
// Aggregation Ordering(1)
// | |
// Ordering(2) <Inputs>
// |
// <Inputs>
//
// Remove Ordering(2) from the plan tree, as it's redundant
// after pushing down the higher ordering.
order.Source = aggrSource.Source
aggrSource.Source = nil // removing from plan tree
aggrSource.Source = nil
aggregator.Source = order
return aggregator, Rewrote("push ordering under aggregation, removing extra ordering")
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/planbuilder/operators/rewriters.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func bottomUp(

newOp, treeIdentity := rewriter(root, rootID, isRoot)
anythingChanged = anythingChanged.Merge(treeIdentity)

return newOp, anythingChanged
}

Expand Down
Loading
Loading