Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Feb 17, 2025
1 parent 149f8f8 commit aa4830b
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 71 deletions.
10 changes: 5 additions & 5 deletions go/vt/sqlparser/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,11 @@ func IsSimpleTuple(node Expr) bool {
return false
}

// IsLockingFunc returns true for all functions that are used to work with mysql advisory locks
func IsLockingFunc(node Expr) bool {
switch node.(type) {
case *LockingFunc:
func SupportsOptimizerHint(stmt StatementType) bool {
switch stmt {
case StmtSelect, StmtInsert, StmtUpdate, StmtDelete, StmtStream, StmtVStream:
return true
default:
return false
}
return false
}
29 changes: 29 additions & 0 deletions go/vt/vtgate/engine/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ package engine
import (
"bytes"
"encoding/json"
"fmt"
"sync/atomic"
"time"

"vitess.io/vitess/go/cache/theine"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/vthash"

"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
)
Expand All @@ -38,6 +43,7 @@ type Plan struct {
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting
Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs
TablesUsed []string // TablesUsed is the list of tables that this plan will query
QueryHints sqlparser.QueryHints

ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
Expand All @@ -47,6 +53,29 @@ type Plan struct {
Errors uint64 // Total number of errors
}

type PlanKey struct {
CurrentKeyspace string
Query string
SetVarComment string
Collation collations.ID
}

func (pk PlanKey) DebugString() string {
return fmt.Sprintf("CurrentKeyspace: %s, Query: %s, SetVarComment: %s, Collation: %d", pk.CurrentKeyspace, pk.Query, pk.SetVarComment, pk.Collation)
}

func (pk PlanKey) Hash() theine.HashKey256 {
hasher := vthash.New256()
_, _ = hasher.WriteUint16(uint16(pk.Collation))
_, _ = hasher.WriteString(pk.CurrentKeyspace)
_, _ = hasher.WriteString(pk.SetVarComment)
_, _ = hasher.WriteString(pk.Query)

var planKey theine.HashKey256
hasher.Sum(planKey[:0])
return planKey
}

// AddStats updates the plan execution statistics
func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
atomic.AddUint64(&p.ExecCount, execCount)
Expand Down
127 changes: 62 additions & 65 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,21 +1098,61 @@ func (e *Executor) getPlan(
return nil, vterrors.VT13001("vschema not initialized")
}

qh, err := sqlparser.BuildQueryHints(stmt)
plan, err := e.getCachedOrBuild(ctx, vcursor, query, stmt, reservedVars, bindVars, allowParameterization, comments, logStats)
if err != nil {
return nil, err
}

qh := plan.QueryHints
vcursor.SetIgnoreMaxMemoryRows(qh.IgnoreMaxMemoryRows)
vcursor.SetConsolidator(qh.Consolidator)
vcursor.SetWorkloadName(qh.Workload)
vcursor.UpdateForeignKeyChecksState(qh.ForeignKeyChecks)
vcursor.SetPriority(qh.Priority)
vcursor.SetExecQueryTimeout(qh.Timeout)

setVarComment, err := prepareSetVarComment(vcursor, stmt)
// TODO: do this after getting a plan.
// if needsReservedConn(plan.Type) {
// switch plan.Type {
// // If the statement is a transaction statement or a set no reserved connection / SET_VAR is needed
// case sqlparser.StmtBegin, sqlparser.StmtCommit, sqlparser.StmtRollback, sqlparser.StmtSavepoint,
// sqlparser.StmtSRollback, sqlparser.StmtRelease, sqlparser.StmtSet, sqlparser.StmtShow:
// case sqlparser.SupportsOptimizerHint(plan.Type):
// default:
// vc.NeedsReservedConn()
// return "", nil
// }

return plan, nil
}

func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey {
hasher := vthash.New256()
vcursor.KeyForPlan(ctx, query, hasher)

var planKey PlanCacheKey
hasher.Sum(planKey[:0])
return planKey
}

func (e *Executor) getCachedOrBuild(
ctx context.Context,
vcursor *econtext.VCursorImpl,
query string,
stmt sqlparser.Statement,
reservedVars *sqlparser.ReservedVars,
bindVars map[string]*querypb.BindVariable,
allowParameterization bool,
comments sqlparser.MarginComments,
logStats *logstats.LogStats,
) (*engine.Plan, error) {
setVarComment := vcursor.PrepareSetVarComment()

qh, err := sqlparser.BuildQueryHints(stmt)
if err != nil {
return nil, err
}
vcursor.UpdateForeignKeyChecksState(qh.ForeignKeyChecks)

rewriteASTResult, err := sqlparser.Normalize(
stmt,
Expand All @@ -1138,16 +1178,26 @@ func (e *Executor) getPlan(
logStats.SQL = comments.Leading + query + comments.Trailing
logStats.BindVariables = sqltypes.CopyBindVariables(bindVars)

return e.cacheAndBuildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, logStats)
}
planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan()
if planCachable {
// build Plan key
pk := engine.PlanKey{
CurrentKeyspace: vcursor.GetKeyspace(),
Query: query,
SetVarComment: setVarComment,
Collation: vcursor.ConnCollation(),
}

func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey {
hasher := vthash.New256()
vcursor.KeyForPlan(ctx, query, hasher)
planKey := pk.Hash()

var planKey PlanCacheKey
hasher.Sum(planKey[:0])
return planKey
var plan *engine.Plan
var err error
plan, logStats.CachedPlan, err = e.plans.GetOrLoad(planKey, e.epoch.Load(), func() (*engine.Plan, error) {
return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh)
})
return plan, err
}
return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, qh)
}

func (e *Executor) buildStatement(
Expand All @@ -1157,73 +1207,20 @@ func (e *Executor) buildStatement(
stmt sqlparser.Statement,
reservedVars *sqlparser.ReservedVars,
bindVarNeeds *sqlparser.BindVarNeeds,
qh sqlparser.QueryHints,
) (*engine.Plan, error) {
plan, err := planbuilder.BuildFromStmt(ctx, query, stmt, reservedVars, vcursor, bindVarNeeds, e.ddlConfig)
if err != nil {
return nil, err
}

plan.Warnings = vcursor.GetAndEmptyWarnings()
plan.QueryHints = qh

err = e.checkThatPlanIsValid(stmt, plan)
return plan, err
}

func (e *Executor) cacheAndBuildStatement(
ctx context.Context,
vcursor *econtext.VCursorImpl,
query string,
stmt sqlparser.Statement,
reservedVars *sqlparser.ReservedVars,
bindVarNeeds *sqlparser.BindVarNeeds,
logStats *logstats.LogStats,
) (*engine.Plan, error) {
planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan()
if planCachable {
planKey := e.hashPlan(ctx, vcursor, query)

var plan *engine.Plan
var err error
plan, logStats.CachedPlan, err = e.plans.GetOrLoad(planKey, e.epoch.Load(), func() (*engine.Plan, error) {
return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds)
})
return plan, err
}
return e.buildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds)
}

func (e *Executor) canNormalizeStatement(stmt sqlparser.Statement, setVarComment string) bool {
return sqlparser.CanNormalize(stmt) || setVarComment != ""
}

func prepareSetVarComment(vcursor *econtext.VCursorImpl, stmt sqlparser.Statement) (string, error) {
if vcursor == nil || vcursor.Session().InReservedConn() {
return "", nil
}

if !vcursor.Session().HasSystemVariables() {
return "", nil
}

switch stmt.(type) {
// If the statement is a transaction statement or a set no reserved connection / SET_VAR is needed
case *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback, *sqlparser.Savepoint,
*sqlparser.SRollback, *sqlparser.Release, *sqlparser.Set, *sqlparser.Show:
return "", nil
case sqlparser.SupportOptimizerHint:
break
default:
vcursor.NeedsReservedConn()
return "", nil
}

var res strings.Builder
vcursor.Session().GetSystemVariables(func(k, v string) {
res.WriteString(fmt.Sprintf("SET_VAR(%s = %s) ", k, v))
})
return strings.TrimSpace(res.String()), nil
}

func (e *Executor) debugCacheEntries() (items map[string]*engine.Plan) {
items = make(map[string]*engine.Plan)
e.ForEachPlan(func(plan *engine.Plan) bool {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func TestCreateTableValidTimestamp(t *testing.T) {
query := "create table aa(t timestamp default 0)"
_, err := executor.Execute(context.Background(), nil, "TestSelect", session, query, map[string]*querypb.BindVariable{})
require.NoError(t, err)
require.True(t, session.InReservedConn())
assert.True(t, session.InReservedConn())

wantQueries := []*querypb.BoundQuery{
{Sql: "set sql_mode = ALLOW_INVALID_DATES", BindVariables: map[string]*querypb.BindVariable{}},
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtgate/executorcontext/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,38 @@ func NewVCursorImpl(
}, nil
}

func (vc *VCursorImpl) PrepareSetVarComment() string {
// if vc.Session().InReservedConn() {
// return ""
// }

// TODO: handle foreign keys as it changes the plan.
// verify it, mostly nothing to do here.

// if !vc.Session().HasSystemVariables() {
// return "", nil
// }

// TODO: do this after getting a plan.
// switch stmt.(type) {
// // If the statement is a transaction statement or a set no reserved connection / SET_VAR is needed
// case *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback, *sqlparser.Savepoint,
// *sqlparser.SRollback, *sqlparser.Release, *sqlparser.Set, *sqlparser.Show:
// return "", nil
// case sqlparser.SupportOptimizerHint:
// break
// default:
// vc.NeedsReservedConn()
// return "", nil
// }

var res strings.Builder
vc.Session().GetSystemVariables(func(k, v string) {
res.WriteString(fmt.Sprintf("SET_VAR(%s = %s) ", k, v))
})
return strings.TrimSpace(res.String())
}

func (vc *VCursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor {
callerId := callerid.EffectiveCallerIDFromContext(ctx)
immediateCallerId := callerid.ImmediateCallerIDFromContext(ctx)
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vthash/highway/highwayhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ func (d *Digest) Write(p []byte) (n int, err error) {
return
}

func (d *Digest) WriteUint16(x uint16) (int, error) {
var b [2]byte
binary.LittleEndian.PutUint16(b[:], x)
return d.Write(b[:])
}

func (d *Digest) Sum(b []byte) []byte {
state := d.state
if d.offset > 0 {
Expand Down

0 comments on commit aa4830b

Please sign in to comment.