Skip to content

Re-plan UNION plans to get type information #15476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions go/vt/vtgate/engine/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"context"
"sync"

"vitess.io/vitess/go/sqltypes"

querypb "vitess.io/vitess/go/vt/proto/query"
)

var _ Primitive = (*Record)(nil)

type Record struct {
Input Primitive
doOnce sync.Once
record func(result *sqltypes.Result)
}

// RouteType returns a description of the query routing type used by the primitive
func (rc *Record) RouteType() string {
return rc.Input.RouteType()
}

// GetKeyspaceName specifies the Keyspace that this primitive routes to.
func (rc *Record) GetKeyspaceName() string {
return rc.Input.GetKeyspaceName()
}

// GetTableName specifies the table that this primitive routes to.
func (rc *Record) GetTableName() string {
return rc.Input.GetTableName()
}

// TryExecute satisfies the Primitive interface.
func (rc *Record) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
r, err := rc.Input.TryExecute(ctx, vcursor, bindVars, wantfields)
if err != nil {
return nil, err
}
go rc.doOnce.Do(func() {
rc.record(r)
})
return r, nil
}

// TryStreamExecute satisfies the Primitive interface.
func (rc *Record) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
panic("not implemented")
}

// GetFields implements the Primitive interface.
func (rc *Record) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return rc.Input.GetFields(ctx, vcursor, bindVars)
}

// Inputs returns the input to limit
func (rc *Record) Inputs() ([]Primitive, []map[string]any) {
return []Primitive{rc.Input}, nil
}

// NeedsTransaction implements the Primitive interface.
func (rc *Record) NeedsTransaction() bool {
return rc.Input.NeedsTransaction()
}

func (rc *Record) description() PrimitiveDescription {
other := map[string]any{}
return PrimitiveDescription{
Other: other,
}
}
88 changes: 88 additions & 0 deletions go/vt/vtgate/engine/replan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"context"
"sync"

"vitess.io/vitess/go/sqltypes"

querypb "vitess.io/vitess/go/vt/proto/query"
)

var _ Primitive = (*RePlan)(nil)

type RePlan struct {
Input Primitive
replanOnce sync.Once
replan func()
}

// RouteType returns a description of the query routing type used by the primitive
func (rp *RePlan) RouteType() string {
return rp.Input.RouteType()
}

// GetKeyspaceName specifies the Keyspace that this primitive routes to.
func (rp *RePlan) GetKeyspaceName() string {
return rp.Input.GetKeyspaceName()
}

// GetTableName specifies the table that this primitive routes to.
func (rp *RePlan) GetTableName() string {
return rp.Input.GetTableName()
}

// TryExecute satisfies the Primitive interface.
func (rp *RePlan) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
defer func() {
go rp.replanOnce.Do(rp.replan)
}()
r, err := rp.Input.TryExecute(ctx, vcursor, bindVars, wantfields)
if err != nil {
return nil, err
}
return r, nil
}

// TryStreamExecute satisfies the Primitive interface.
func (rp *RePlan) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
panic("not implemented")
}

// GetFields implements the Primitive interface.
func (rp *RePlan) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return rp.Input.GetFields(ctx, vcursor, bindVars)
}

// Inputs returns the input to limit
func (rp *RePlan) Inputs() ([]Primitive, []map[string]any) {
return []Primitive{rp.Input}, nil
}

// NeedsTransaction implements the Primitive interface.
func (rp *RePlan) NeedsTransaction() bool {
return rp.Input.NeedsTransaction()
}

func (rp *RePlan) description() PrimitiveDescription {
other := map[string]any{}
return PrimitiveDescription{
Other: other,
}
}
31 changes: 27 additions & 4 deletions go/vt/vtgate/planbuilder/operators/offset_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,35 @@ func addColumnsToInput(ctx *plancontext.PlanningContext, root Operator) Operator
return TopDown(root, TableID, visitor, stopAtRoute)
}

// addColumnsToInput adds columns needed by an operator to its input.
// This happens only when the filter expression can be retrieved as an offset from the underlying mysql.
func pullDistinctFromUNION(_ *plancontext.PlanningContext, root Operator) Operator {
func expandUNION(ctx *plancontext.PlanningContext, root Operator) Operator {
visitor := func(in Operator, _ semantics.TableSet, isRoot bool) (Operator, *ApplyResult) {
union, ok := in.(*Union)
if !ok || !union.distinct {
if !ok {
return in, NoRewrite
}

var missingType bool
var newSources []Operator
for _, source := range union.Sources {
var typeNeeded []int
cols := source.GetColumns(ctx)
for colOffset, col := range cols {
if _, found := ctx.SemTable.TypeForExpr(col.Expr); !found {
typeNeeded = append(typeNeeded, colOffset)
}
}
if len(typeNeeded) > 0 {
//
missingType = true
} else {
newSources = append(newSources, source)
}
}
if missingType {
//
}

if !union.distinct {
return in, NoRewrite
}

Expand Down
12 changes: 6 additions & 6 deletions go/vt/vtgate/planbuilder/operators/phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type (
const (
physicalTransform Phase = iota
initialPlanning
pullDistinctFromUnion
expandUnion
delegateAggregation
addAggrOrdering
cleanOutPerfDistinct
Expand All @@ -49,8 +49,8 @@ func (p Phase) String() string {
return "physicalTransform"
case initialPlanning:
return "initial horizon planning optimization"
case pullDistinctFromUnion:
return "pull distinct from UNION1"
case expandUnion:
return "expand UNION"
case delegateAggregation:
return "split aggregation between vtgate and mysql"
case addAggrOrdering:
Expand All @@ -68,7 +68,7 @@ func (p Phase) String() string {

func (p Phase) shouldRun(s semantics.QuerySignature) bool {
switch p {
case pullDistinctFromUnion:
case expandUnion:
return s.Union
case delegateAggregation:
return s.Aggregation
Expand All @@ -87,8 +87,8 @@ func (p Phase) shouldRun(s semantics.QuerySignature) bool {

func (p Phase) act(ctx *plancontext.PlanningContext, op Operator) Operator {
switch p {
case pullDistinctFromUnion:
return pullDistinctFromUNION(ctx, op)
case expandUnion:
return expandUNION(ctx, op)
case delegateAggregation:
return enableDelegateAggregation(ctx, op)
case addAggrOrdering:
Expand Down
79 changes: 79 additions & 0 deletions go/vt/vtgate/planbuilder/operators/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2022 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operators

import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

type Record struct {
Source Operator
}

func newRecord(src Operator) *Record {
return &Record{
Source: src,
}
}

// Clone implements the Operator interface
func (rc *Record) Clone(inputs []Operator) Operator {
newOp := *rc
newOp.Source = inputs[0]
return &newOp
}

func (rc *Record) GetOrdering(*plancontext.PlanningContext) []OrderBy {
return nil
}

// Inputs implements the Operator interface
func (rc *Record) Inputs() []Operator {
return []Operator{rc.Source}
}

// SetInputs implements the Operator interface
func (rc *Record) SetInputs(ops []Operator) {
rc.Source = ops[0]
}

func (rc *Record) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Operator {
newSource := rc.Source.AddPredicate(ctx, expr)
rc.Source = newSource
return rc
}

func (rc *Record) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool, expr *sqlparser.AliasedExpr) int {
return rc.Source.AddColumn(ctx, reuse, gb, expr)
}

func (rc *Record) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return rc.Source.FindCol(ctx, expr, underRoute)
}

func (rc *Record) GetColumns(ctx *plancontext.PlanningContext) (result []*sqlparser.AliasedExpr) {
return rc.Source.GetColumns(ctx)
}

func (rc *Record) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return rc.Source.GetSelectExprs(ctx)
}

func (rc *Record) ShortDescription() string {
return ""
}