Skip to content

Commit

Permalink
Cherry-pick 4bc68db with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed Jan 15, 2024
1 parent 7aa5104 commit efea818
Show file tree
Hide file tree
Showing 23 changed files with 1,028 additions and 137 deletions.
8 changes: 8 additions & 0 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,19 @@ func transformAggregator(ctx *plancontext.PlanningContext, op *operators.Aggrega
oa.aggregates = append(oa.aggregates, aggrParam)
}
for _, groupBy := range op.Grouping {
<<<<<<< HEAD
typ, col, _ := ctx.SemTable.TypeForExpr(groupBy.SimplifiedExpr)
oa.groupByKeys = append(oa.groupByKeys, &engine.GroupByParams{
KeyCol: groupBy.ColOffset,
WeightStringCol: groupBy.WSOffset,
Expr: groupBy.AsAliasedExpr().Expr,
=======
typ, _ := ctx.SemTable.TypeForExpr(groupBy.Inner)
oa.groupByKeys = append(oa.groupByKeys, &engine.GroupByParams{
KeyCol: groupBy.ColOffset,
WeightStringCol: groupBy.WSOffset,
Expr: groupBy.Inner,
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
Type: typ,
CollationID: col,
})
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/SQL_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func buildAggregation(op *Aggregator, qb *queryBuilder) error {

for _, by := range op.Grouping {
qb.addGroupBy(by.Inner)
simplified := by.SimplifiedExpr
simplified := by.Inner
if by.WSOffset != -1 {
qb.addGroupBy(weightStringFor(simplified))
}
Expand Down
117 changes: 108 additions & 9 deletions go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,11 @@ func pushAggregations(ctx *plancontext.PlanningContext, aggregator *Aggregator,
// doing the aggregating on the vtgate level instead
// Adding to group by can be done only once even though there are multiple distinct aggregation with same expression.
if !distinctAggrGroupByAdded {
<<<<<<< HEAD
groupBy := NewGroupBy(distinctExpr, distinctExpr, aeDistinctExpr)
=======
groupBy := NewGroupBy(distinctExprs[0])
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
groupBy.ColOffset = aggr.ColOffset
aggrBelowRoute.Grouping = append(aggrBelowRoute.Grouping, groupBy)
distinctAggrGroupByAdded = true
Expand Down Expand Up @@ -256,7 +260,7 @@ func pushAggregationThroughFilter(
withNextColumn:
for _, col := range columnsNeeded {
for _, gb := range pushedAggr.Grouping {
if ctx.SemTable.EqualsExpr(col, gb.SimplifiedExpr) {
if ctx.SemTable.EqualsExpr(col, gb.Inner) {
continue withNextColumn
}
}
Expand Down Expand Up @@ -299,7 +303,7 @@ func collectColNamesNeeded(ctx *plancontext.PlanningContext, f *Filter) (columns

func overlappingUniqueVindex(ctx *plancontext.PlanningContext, groupByExprs []GroupBy) bool {
for _, groupByExpr := range groupByExprs {
if exprHasUniqueVindex(ctx, groupByExpr.SimplifiedExpr) {
if exprHasUniqueVindex(ctx, groupByExpr.Inner) {
return true
}
}
Expand Down Expand Up @@ -408,9 +412,67 @@ func pushAggregationThroughJoin(ctx *plancontext.PlanningContext, rootAggr *Aggr

// We need to add any columns coming from the lhs of the join to the group by on that side
// If we don't, the LHS will not be able to return the column, and it can't be used to send down to the RHS
<<<<<<< HEAD
err = addColumnsFromLHSInJoinPredicates(ctx, rootAggr, join, lhs)
if err != nil {
return nil, nil, err
=======
addColumnsFromLHSInJoinPredicates(ctx, join, lhs)

join.LHS, join.RHS = lhs.pushed, rhs.pushed

if !rootAggr.Original {
// we only keep the root aggregation, if this aggregator was created
// by splitting one and pushing under a join, we can get rid of this one
return output, Rewrote("push Aggregation under join - keep original")
}

rootAggr.aggregateTheAggregates()
rootAggr.Source = output
return rootAggr, Rewrote("push Aggregation under join")
}

// pushAggregationThroughHashJoin pushes aggregation through a hash-join in a similar way to pushAggregationThroughApplyJoin
func pushAggregationThroughHashJoin(ctx *plancontext.PlanningContext, rootAggr *Aggregator, join *HashJoin) (Operator, *ApplyResult) {
lhs := createJoinPusher(rootAggr, join.LHS)
rhs := createJoinPusher(rootAggr, join.RHS)

columns := &hashJoinColumns{}
output, err := splitAggrColumnsToLeftAndRight(ctx, rootAggr, join, join.LeftJoin, columns, lhs, rhs)
if err != nil {
// if we get this error, we just abort the splitting and fall back on simpler ways of solving the same query
if errors.Is(err, errAbortAggrPushing) {
return nil, nil
}
panic(err)
}

// The two sides of the hash comparisons are added as grouping expressions
for _, cmp := range join.JoinComparisons {
lhs.addGrouping(ctx, NewGroupBy(cmp.LHS))
columns.addLeft(cmp.LHS)

rhs.addGrouping(ctx, NewGroupBy(cmp.RHS))
columns.addRight(cmp.RHS)
}

// The grouping columns need to be pushed down as grouping columns on the respective sides
for _, groupBy := range rootAggr.Grouping {
deps := ctx.SemTable.RecursiveDeps(groupBy.Inner)
switch {
case deps.IsSolvedBy(lhs.tableID):
lhs.addGrouping(ctx, groupBy)
columns.addLeft(groupBy.Inner)
case deps.IsSolvedBy(rhs.tableID):
rhs.addGrouping(ctx, groupBy)
columns.addRight(groupBy.Inner)
case deps.IsSolvedBy(lhs.tableID.Merge(rhs.tableID)):
// TODO: Support this as well
return nil, nil
default:
panic(vterrors.VT13001(fmt.Sprintf("grouping with bad dependencies %s", groupBy.Inner)))
}
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
}

join.LHS, join.RHS = lhs.pushed, rhs.pushed
Expand All @@ -429,6 +491,7 @@ func pushAggregationThroughJoin(ctx *plancontext.PlanningContext, rootAggr *Aggr

var errAbortAggrPushing = fmt.Errorf("abort aggregation pushing")

<<<<<<< HEAD
func addColumnsFromLHSInJoinPredicates(ctx *plancontext.PlanningContext, rootAggr *Aggregator, join *ApplyJoin, lhs *joinPusher) error {
for _, pred := range join.JoinPredicates {
for _, bve := range pred.LHSExprs {
Expand All @@ -438,23 +501,40 @@ func addColumnsFromLHSInJoinPredicates(ctx *plancontext.PlanningContext, rootAgg
return err
}
idx, found := canReuseColumn(ctx, lhs.pushed.Columns, expr, extractExpr)
=======
func createJoinPusher(rootAggr *Aggregator, operator Operator) *joinPusher {
return &joinPusher{
orig: rootAggr,
pushed: &Aggregator{
Source: operator,
QP: rootAggr.QP,
},
columns: initColReUse(len(rootAggr.Columns)),
tableID: TableID(operator),
}
}

func addColumnsFromLHSInJoinPredicates(ctx *plancontext.PlanningContext, join *ApplyJoin, lhs *joinPusher) {
for _, pred := range join.JoinPredicates.columns {
for _, bve := range pred.LHSExprs {
idx, found := canReuseColumn(ctx, lhs.pushed.Columns, bve.Expr, extractExpr)
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
if !found {
idx = len(lhs.pushed.Columns)
lhs.pushed.Columns = append(lhs.pushed.Columns, aeWrap(expr))
lhs.pushed.Columns = append(lhs.pushed.Columns, aeWrap(bve.Expr))
}
_, found = canReuseColumn(ctx, lhs.pushed.Grouping, wexpr, func(by GroupBy) sqlparser.Expr {
return by.SimplifiedExpr
_, found = canReuseColumn(ctx, lhs.pushed.Grouping, bve.Expr, func(by GroupBy) sqlparser.Expr {
return by.Inner
})

if found {
continue
}

lhs.pushed.Grouping = append(lhs.pushed.Grouping, GroupBy{
Inner: expr,
SimplifiedExpr: wexpr,
ColOffset: idx,
WSOffset: -1,
Inner: bve.Expr,
ColOffset: idx,
WSOffset: -1,
})
}
}
Expand All @@ -466,6 +546,7 @@ func splitGroupingToLeftAndRight(ctx *plancontext.PlanningContext, rootAggr *Agg

for _, groupBy := range rootAggr.Grouping {
deps := ctx.SemTable.RecursiveDeps(groupBy.Inner)
<<<<<<< HEAD
expr := groupBy.Inner
switch {
case deps.IsSolvedBy(lhs.tableID):
Expand All @@ -492,6 +573,24 @@ func splitGroupingToLeftAndRight(ctx *plancontext.PlanningContext, rootAggr *Agg
rhs.addGrouping(ctx, NewGroupBy(jc.RHSExpr, jc.RHSExpr, aeWrap(jc.RHSExpr)))
default:
return nil, vterrors.VT13001(fmt.Sprintf("grouping with bad dependencies %s", groupBy.SimplifiedExpr))
=======
switch {
case deps.IsSolvedBy(lhs.tableID):
lhs.addGrouping(ctx, groupBy)
columns.addLeft(groupBy.Inner)
case deps.IsSolvedBy(rhs.tableID):
rhs.addGrouping(ctx, groupBy)
columns.addRight(groupBy.Inner)
case deps.IsSolvedBy(lhs.tableID.Merge(rhs.tableID)):
jc := breakExpressionInLHSandRHSForApplyJoin(ctx, groupBy.Inner, lhs.tableID)
for _, lhsExpr := range jc.LHSExprs {
e := lhsExpr.Expr
lhs.addGrouping(ctx, NewGroupBy(e))
}
rhs.addGrouping(ctx, NewGroupBy(jc.RHSExpr))
default:
panic(vterrors.VT13001(fmt.Sprintf("grouping with bad dependencies %s", groupBy.Inner)))
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
}
}
return groupingJCs, nil
Expand Down
24 changes: 20 additions & 4 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,20 @@ func (a *Aggregator) AddPredicate(ctx *plancontext.PlanningContext, expr sqlpars
}, nil
}

<<<<<<< HEAD
func (a *Aggregator) addColumnWithoutPushing(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) (int, error) {
=======
func (a *Aggregator) addColumnWithoutPushing(_ *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) int {
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
offset := len(a.Columns)
a.Columns = append(a.Columns, expr)

if addToGroupBy {
<<<<<<< HEAD
groupBy := NewGroupBy(expr.Expr, expr.Expr, expr)
=======
groupBy := NewGroupBy(expr.Expr)
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
groupBy.ColOffset = offset
a.Grouping = append(a.Grouping, groupBy)
} else {
Expand Down Expand Up @@ -167,7 +175,7 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gro
// This process also sets the weight string column offset, eliminating the need for a later addition in the aggregator operator's planOffset.
if wsExpr, isWS := rewritten.(*sqlparser.WeightStringFuncExpr); isWS {
idx := slices.IndexFunc(a.Grouping, func(by GroupBy) bool {
return ctx.SemTable.EqualsExprWithDeps(wsExpr.Expr, by.SimplifiedExpr)
return ctx.SemTable.EqualsExprWithDeps(wsExpr.Expr, by.Inner)
})
if idx >= 0 {
a.Grouping[idx].WSOffset = len(a.Columns)
Expand Down Expand Up @@ -273,7 +281,7 @@ func (a *Aggregator) ShortDescription() string {

var grouping []string
for _, gb := range a.Grouping {
grouping = append(grouping, sqlparser.String(gb.SimplifiedExpr))
grouping = append(grouping, sqlparser.String(gb.Inner))
}

return fmt.Sprintf("%s%s group by %s", org, strings.Join(columns, ", "), strings.Join(grouping, ","))
Expand Down Expand Up @@ -306,14 +314,18 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) error {
}
a.Grouping[idx].ColOffset = offset
}
if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.SimplifiedExpr) {
if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.Inner) {
continue
}

<<<<<<< HEAD
offset, err := a.internalAddColumn(ctx, aeWrap(weightStringFor(gb.SimplifiedExpr)), true)
if err != nil {
return err
}
=======
offset := a.internalAddColumn(ctx, aeWrap(weightStringFor(gb.Inner)), true)
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
a.Grouping[idx].WSOffset = offset
}

Expand Down Expand Up @@ -428,14 +440,18 @@ func (a *Aggregator) pushRemainingGroupingColumnsAndWeightStrings(ctx *planconte
a.Grouping[idx].ColOffset = offset
}

if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.SimplifiedExpr) {
if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.Inner) {
continue
}

<<<<<<< HEAD
offset, err := a.internalAddColumn(ctx, aeWrap(weightStringFor(gb.SimplifiedExpr)), false)
if err != nil {
return err
}
=======
offset := a.internalAddColumn(ctx, aeWrap(weightStringFor(gb.Inner)), false)
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
a.Grouping[idx].WSOffset = offset
}
for idx, aggr := range a.Aggregations {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@ func (d *Distinct) planOffsets(ctx *plancontext.PlanningContext) error {
return err
}
for idx, col := range columns {
<<<<<<< HEAD
e, err := d.QP.GetSimplifiedExpr(ctx, col.Expr)
if err != nil {
// ambiguous columns are not a problem for DISTINCT
e = col.Expr
}
=======
e := col.Expr
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
var wsCol *int
typ, coll, _ := ctx.SemTable.TypeForExpr(e)

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ outer:
}
addedToCol := false
for idx, groupBy := range a.Grouping {
if ctx.SemTable.EqualsExprWithDeps(groupBy.SimplifiedExpr, ae.Expr) {
if ctx.SemTable.EqualsExprWithDeps(groupBy.Inner, ae.Expr) {
if !addedToCol {
a.Columns = append(a.Columns, ae)
addedToCol = true
Expand Down Expand Up @@ -243,7 +243,7 @@ func createProjectionForComplexAggregation(a *Aggregator, qp *QueryProjection) (
}
for i, by := range a.Grouping {
a.Grouping[i].ColOffset = len(a.Columns)
a.Columns = append(a.Columns, aeWrap(by.SimplifiedExpr))
a.Columns = append(a.Columns, aeWrap(by.Inner))
}
for i, aggregation := range a.Aggregations {
a.Aggregations[i].ColOffset = len(a.Columns)
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vtgate/planbuilder/operators/phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func addOrderBysForAggregations(ctx *plancontext.PlanningContext, root ops.Opera

func needsOrdering(ctx *plancontext.PlanningContext, in *Aggregator) (bool, error) {
requiredOrder := slice.Map(in.Grouping, func(from GroupBy) sqlparser.Expr {
return from.SimplifiedExpr
return from.Inner
})
if in.DistinctExpr != nil {
requiredOrder = append(requiredOrder, in.DistinctExpr)
Expand Down Expand Up @@ -201,7 +201,11 @@ func addLiteralGroupingToRHS(in *ApplyJoin) (ops.Operator, *rewrite.ApplyResult,
}
if len(aggr.Grouping) == 0 {
gb := sqlparser.NewIntLiteral(".0")
<<<<<<< HEAD
aggr.Grouping = append(aggr.Grouping, NewGroupBy(gb, gb, aeWrap(gb)))
=======
aggr.Grouping = append(aggr.Grouping, NewGroupBy(gb))
>>>>>>> 4bc68db9d3 (bugfix: Columns alias expanding (#14935))
}
return nil
})
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func overlaps(ctx *plancontext.PlanningContext, order []ops.OrderBy, grouping []
ordering:
for _, orderBy := range order {
for _, groupBy := range grouping {
if ctx.SemTable.EqualsExprWithDeps(orderBy.SimplifiedExpr, groupBy.SimplifiedExpr) {
if ctx.SemTable.EqualsExprWithDeps(orderBy.SimplifiedExpr, groupBy.Inner) {
continue ordering
}
}
Expand Down Expand Up @@ -630,7 +630,7 @@ func pushOrderingUnderAggr(ctx *plancontext.PlanningContext, order *Ordering, ag
used := make([]bool, len(aggregator.Grouping))
for _, orderExpr := range order.Order {
for grpIdx, by := range aggregator.Grouping {
if !used[grpIdx] && ctx.SemTable.EqualsExprWithDeps(by.SimplifiedExpr, orderExpr.SimplifiedExpr) {
if !used[grpIdx] && ctx.SemTable.EqualsExprWithDeps(by.Inner, orderExpr.SimplifiedExpr) {
newGrouping = append(newGrouping, by)
used[grpIdx] = true
}
Expand Down
Loading

0 comments on commit efea818

Please sign in to comment.