Skip to content

Commit 231b8cb

Browse files
committed
refactor: move vcursorImpl to another package - wip
Signed-off-by: Andres Taylor <andres@planetscale.com>
1 parent 79162c5 commit 231b8cb

11 files changed

+646
-543
lines changed

go/vt/vtgate/executor.go

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ import (
6868
)
6969

7070
var (
71-
errNoKeyspace = vterrors.VT09005()
7271
defaultTabletType = topodatapb.TabletType_PRIMARY
7372

7473
// TODO: @rafael - These two counters should be deprecated in favor of the ByTable ones in v17+. They are kept for now for backwards compatibility.
@@ -168,6 +167,7 @@ func NewExecutor(
168167
pv plancontext.PlannerVersion,
169168
warmingReadsPercent int,
170169
) *Executor {
170+
warnings.Add("WarnUnshardedOnly", 1)
171171
e := &Executor{
172172
env: env,
173173
serv: serv,
@@ -301,7 +301,7 @@ func (e *Executor) StreamExecute(
301301
srr := &streaminResultReceiver{callback: callback}
302302
var err error
303303

304-
resultHandler := func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, execStart time.Time) error {
304+
resultHandler := func(ctx context.Context, plan *engine.Plan, vc *econtext.VCursorImpl, bindVars map[string]*querypb.BindVariable, execStart time.Time) error {
305305
var seenResults atomic.Bool
306306
var resultMu sync.Mutex
307307
result := &sqltypes.Result{}
@@ -369,7 +369,7 @@ func (e *Executor) StreamExecute(
369369
logStats.TablesUsed = plan.TablesUsed
370370
logStats.TabletType = vc.TabletType().String()
371371
logStats.ExecuteTime = time.Since(execStart)
372-
logStats.ActiveKeyspace = vc.keyspace
372+
logStats.ActiveKeyspace = vc.GetKeyspace()
373373

374374
e.updateQueryCounts(plan.Instructions.RouteType(), plan.Instructions.GetKeyspaceName(), plan.Instructions.GetTableName(), int64(logStats.ShardQueries))
375375

@@ -435,7 +435,7 @@ func (e *Executor) execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
435435
var err error
436436
var qr *sqltypes.Result
437437
var stmtType sqlparser.StatementType
438-
err = e.newExecute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats, func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, time time.Time) error {
438+
err = e.newExecute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats, func(ctx context.Context, plan *engine.Plan, vc *econtext.VCursorImpl, bindVars map[string]*querypb.BindVariable, time time.Time) error {
439439
stmtType = plan.Type
440440
qr, err = e.executePlan(ctx, safeSession, plan, vc, bindVars, logStats, time)
441441
return err
@@ -449,7 +449,7 @@ func (e *Executor) execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
449449
}
450450

451451
// addNeededBindVars adds bind vars that are needed by the plan
452-
func (e *Executor) addNeededBindVars(vcursor *vcursorImpl, bindVarNeeds *sqlparser.BindVarNeeds, bindVars map[string]*querypb.BindVariable, session *econtext.SafeSession) error {
452+
func (e *Executor) addNeededBindVars(vcursor *econtext.VCursorImpl, bindVarNeeds *sqlparser.BindVarNeeds, bindVars map[string]*querypb.BindVariable, session *econtext.SafeSession) error {
453453
for _, funcName := range bindVarNeeds.NeedFunctionResult {
454454
switch funcName {
455455
case sqlparser.DBVarName:
@@ -542,7 +542,7 @@ func (e *Executor) addNeededBindVars(vcursor *vcursorImpl, bindVarNeeds *sqlpars
542542
}
543543

544544
evalExpr, err := evalengine.Translate(expr, &evalengine.Config{
545-
Collation: vcursor.collation,
545+
Collation: vcursor.ConnCollation(),
546546
Environment: e.env,
547547
SQLMode: evalengine.ParseSQLMode(vcursor.SQLMode()),
548548
})
@@ -553,7 +553,7 @@ func (e *Executor) addNeededBindVars(vcursor *vcursorImpl, bindVarNeeds *sqlpars
553553
if err != nil {
554554
return err
555555
}
556-
bindVars[key] = sqltypes.ValueBindVariable(evaluated.Value(vcursor.collation))
556+
bindVars[key] = sqltypes.ValueBindVariable(evaluated.Value(vcursor.ConnCollation()))
557557
}
558558
}
559559
}
@@ -723,7 +723,7 @@ func (e *Executor) CloseSession(ctx context.Context, safeSession *econtext.SafeS
723723
return e.txConn.ReleaseAll(ctx, safeSession)
724724
}
725725

726-
func (e *Executor) setVitessMetadata(ctx context.Context, name, value string) error {
726+
func (e *Executor) SetVitessMetadata(ctx context.Context, name, value string) error {
727727
// TODO(kalfonso): move to its own acl check and consolidate into an acl component that can handle multiple operations (vschema, metadata)
728728
user := callerid.ImmediateCallerIDFromContext(ctx)
729729
allowed := vschemaacl.Authorized(user)
@@ -742,7 +742,7 @@ func (e *Executor) setVitessMetadata(ctx context.Context, name, value string) er
742742
return ts.UpsertMetadata(ctx, name, value)
743743
}
744744

745-
func (e *Executor) showVitessMetadata(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
745+
func (e *Executor) ShowVitessMetadata(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
746746
ts, err := e.serv.GetTopoServer()
747747
if err != nil {
748748
return nil, err
@@ -775,7 +775,7 @@ func (e *Executor) showVitessMetadata(ctx context.Context, filter *sqlparser.Sho
775775

776776
type tabletFilter func(tablet *topodatapb.Tablet, servingState string, primaryTermStartTime int64) bool
777777

778-
func (e *Executor) showShards(ctx context.Context, filter *sqlparser.ShowFilter, destTabletType topodatapb.TabletType) (*sqltypes.Result, error) {
778+
func (e *Executor) ShowShards(ctx context.Context, filter *sqlparser.ShowFilter, destTabletType topodatapb.TabletType) (*sqltypes.Result, error) {
779779
showVitessShardsFilters := func(filter *sqlparser.ShowFilter) ([]func(string) bool, []func(string, *topodatapb.ShardReference) bool) {
780780
keyspaceFilters := []func(string) bool{}
781781
shardFilters := []func(string, *topodatapb.ShardReference) bool{}
@@ -859,7 +859,7 @@ func (e *Executor) showShards(ctx context.Context, filter *sqlparser.ShowFilter,
859859
}, nil
860860
}
861861

862-
func (e *Executor) showTablets(filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
862+
func (e *Executor) ShowTablets(filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
863863
getTabletFilters := func(filter *sqlparser.ShowFilter) []tabletFilter {
864864
var filters []tabletFilter
865865

@@ -932,7 +932,7 @@ func (e *Executor) showTablets(filter *sqlparser.ShowFilter) (*sqltypes.Result,
932932
}, nil
933933
}
934934

935-
func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
935+
func (e *Executor) ShowVitessReplicationStatus(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
936936
ctx, cancel := context.WithTimeout(ctx, healthCheckTimeout)
937937
defer cancel()
938938
rows := [][]sqltypes.Value{}
@@ -1093,7 +1093,7 @@ func (e *Executor) ParseDestinationTarget(targetString string) (string, topodata
10931093
// the cache, it reuses it.
10941094
func (e *Executor) getPlan(
10951095
ctx context.Context,
1096-
vcursor *vcursorImpl,
1096+
vcursor *econtext.VCursorImpl,
10971097
query string,
10981098
stmt sqlparser.Statement,
10991099
comments sqlparser.MarginComments,
@@ -1131,10 +1131,10 @@ func (e *Executor) getPlan(
11311131
reservedVars,
11321132
bindVars,
11331133
parameterize,
1134-
vcursor.keyspace,
1135-
vcursor.safeSession.GetSelectLimit(),
1134+
vcursor.GetKeyspace(),
1135+
vcursor.GetSelectLimit(),
11361136
setVarComment,
1137-
vcursor.safeSession.SystemVariables,
1137+
vcursor.GetSystemVariablesCopy(),
11381138
vcursor.GetForeignKeyChecksState(),
11391139
vcursor,
11401140
)
@@ -1153,9 +1153,9 @@ func (e *Executor) getPlan(
11531153
return e.cacheAndBuildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, logStats)
11541154
}
11551155

1156-
func (e *Executor) hashPlan(ctx context.Context, vcursor *vcursorImpl, query string) PlanCacheKey {
1156+
func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey {
11571157
hasher := vthash.New256()
1158-
vcursor.keyForPlan(ctx, query, hasher)
1158+
vcursor.KeyForPlan(ctx, query, hasher)
11591159

11601160
var planKey PlanCacheKey
11611161
hasher.Sum(planKey[:0])
@@ -1164,7 +1164,7 @@ func (e *Executor) hashPlan(ctx context.Context, vcursor *vcursorImpl, query str
11641164

11651165
func (e *Executor) buildStatement(
11661166
ctx context.Context,
1167-
vcursor *vcursorImpl,
1167+
vcursor *econtext.VCursorImpl,
11681168
query string,
11691169
stmt sqlparser.Statement,
11701170
reservedVars *sqlparser.ReservedVars,
@@ -1175,23 +1175,22 @@ func (e *Executor) buildStatement(
11751175
return nil, err
11761176
}
11771177

1178-
plan.Warnings = vcursor.warnings
1179-
vcursor.warnings = nil
1178+
plan.Warnings = vcursor.GetAndEmptyWarnings()
11801179

11811180
err = e.checkThatPlanIsValid(stmt, plan)
11821181
return plan, err
11831182
}
11841183

11851184
func (e *Executor) cacheAndBuildStatement(
11861185
ctx context.Context,
1187-
vcursor *vcursorImpl,
1186+
vcursor *econtext.VCursorImpl,
11881187
query string,
11891188
stmt sqlparser.Statement,
11901189
reservedVars *sqlparser.ReservedVars,
11911190
bindVarNeeds *sqlparser.BindVarNeeds,
11921191
logStats *logstats.LogStats,
11931192
) (*engine.Plan, error) {
1194-
planCachable := sqlparser.CachePlan(stmt) && vcursor.safeSession.CachePlan()
1193+
planCachable := sqlparser.CachePlan(stmt) && vcursor.CachePlan()
11951194
if planCachable {
11961195
planKey := e.hashPlan(ctx, vcursor, query)
11971196

@@ -1209,7 +1208,7 @@ func (e *Executor) canNormalizeStatement(stmt sqlparser.Statement, setVarComment
12091208
return sqlparser.CanNormalize(stmt) || setVarComment != ""
12101209
}
12111210

1212-
func prepareSetVarComment(vcursor *vcursorImpl, stmt sqlparser.Statement) (string, error) {
1211+
func prepareSetVarComment(vcursor *econtext.VCursorImpl, stmt sqlparser.Statement) (string, error) {
12131212
if vcursor == nil || vcursor.Session().InReservedConn() {
12141213
return "", nil
12151214
}
@@ -1405,9 +1404,29 @@ func (e *Executor) prepare(ctx context.Context, safeSession *econtext.SafeSessio
14051404
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unrecognized prepare statement: %s", sql)
14061405
}
14071406

1407+
func (e *Executor) getVCursorConfig() econtext.VCursorConfig {
1408+
connCollation := collations.Unknown
1409+
if gw, isTabletGw := e.resolver.resolver.GetGateway().(*TabletGateway); isTabletGw {
1410+
connCollation = gw.DefaultConnCollation()
1411+
}
1412+
return econtext.VCursorConfig{
1413+
WarmingReadsPercent: warmingReadsPercent,
1414+
Collation: connCollation,
1415+
MaxMemoryRows: 0,
1416+
EnableShardRouting: false,
1417+
DefaultTabletType: 0,
1418+
QueryTimeout: 0,
1419+
DBDDLPlugin: "",
1420+
ForeignKeyMode: 0,
1421+
SetVarEnabled: false,
1422+
EnableViews: false,
1423+
}
1424+
}
1425+
14081426
func (e *Executor) handlePrepare(ctx context.Context, safeSession *econtext.SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) {
14091427
query, comments := sqlparser.SplitMarginComments(sql)
1410-
vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv)
1428+
1429+
vcursor, _ := econtext.NewVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, e.getVCursorConfig())
14111430

14121431
stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
14131432
if err != nil {
@@ -1456,12 +1475,12 @@ func parseAndValidateQuery(query string, parser *sqlparser.Parser) (sqlparser.St
14561475
}
14571476

14581477
// ExecuteMultiShard implements the IExecutor interface
1459-
func (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *econtext.SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver resultsObserver) (qr *sqltypes.Result, errs []error) {
1478+
func (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *econtext.SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver econtext.ResultsObserver) (qr *sqltypes.Result, errs []error) {
14601479
return e.scatterConn.ExecuteMultiShard(ctx, primitive, rss, queries, session, autocommit, ignoreMaxMemoryRows, resultsObserver)
14611480
}
14621481

14631482
// StreamExecuteMulti implements the IExecutor interface
1464-
func (e *Executor) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *econtext.SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, resultsObserver resultsObserver) []error {
1483+
func (e *Executor) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *econtext.SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, resultsObserver econtext.ResultsObserver) []error {
14651484
return e.scatterConn.StreamExecuteMulti(ctx, primitive, query, rss, vars, session, autocommit, callback, resultsObserver)
14661485
}
14671486

@@ -1581,21 +1600,21 @@ func (e *Executor) ReleaseLock(ctx context.Context, session *econtext.SafeSessio
15811600
return e.txConn.ReleaseLock(ctx, session)
15821601
}
15831602

1584-
// planPrepareStmt implements the IExecutor interface
1585-
func (e *Executor) planPrepareStmt(ctx context.Context, vcursor *vcursorImpl, query string) (*engine.Plan, sqlparser.Statement, error) {
1603+
// PlanPrepareStmt implements the IExecutor interface
1604+
func (e *Executor) PlanPrepareStmt(ctx context.Context, vcursor *econtext.VCursorImpl, query string) (*engine.Plan, sqlparser.Statement, error) {
15861605
stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
15871606
if err != nil {
15881607
return nil, nil, err
15891608
}
15901609

15911610
// creating this log stats to not interfere with the original log stats.
1592-
lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.safeSession.SessionUUID, nil)
1611+
lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.Session().GetSessionUUID(), nil)
15931612
plan, err := e.getPlan(
15941613
ctx,
15951614
vcursor,
15961615
query,
15971616
sqlparser.Clone(stmt),
1598-
vcursor.marginComments,
1617+
vcursor.GetMarginComments(),
15991618
map[string]*querypb.BindVariable{},
16001619
reservedVars, /* normalize */
16011620
false,
@@ -1617,7 +1636,7 @@ func (e *Executor) Close() {
16171636
e.plans.Close()
16181637
}
16191638

1620-
func (e *Executor) environment() *vtenv.Environment {
1639+
func (e *Executor) Environment() *vtenv.Environment {
16211640
return e.env
16221641
}
16231642

@@ -1629,6 +1648,10 @@ func (e *Executor) UnresolvedTransactions(ctx context.Context, targets []*queryp
16291648
return e.txConn.UnresolvedTransactions(ctx, targets)
16301649
}
16311650

1651+
func (e *Executor) AddWarningCount(name string, count int64) {
1652+
warnings.Add(name, count)
1653+
}
1654+
16321655
type (
16331656
errorTransformer interface {
16341657
TransformError(err error) error

0 commit comments

Comments
 (0)