Skip to content

Commit 34774af

Browse files
committed
add optional flag to enable multi shard autocommit by default
Adds a new vtgate flag --default-multi-shard-autocommit which, as the name implies, opts the query engine into using multi-shard autocommit semantics by default, even if the plan does not contain the query directive MULTI_SHARD_AUTOCOMMIT. Signed-off-by: Michael Demmer <mdemmer@slack-corp.com>
1 parent de33a39 commit 34774af

File tree

9 files changed

+31
-4
lines changed

9 files changed

+31
-4
lines changed

go/vt/vtgate/engine/dml.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func allowOnlyPrimary(rss ...*srvtopo.ResolvedShard) error {
133133
}
134134

135135
func (dml *DML) execMultiShard(ctx context.Context, primitive Primitive, vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery) (*sqltypes.Result, error) {
136-
autocommit := (len(rss) == 1 || dml.MultiShardAutocommit) && vcursor.AutocommitApproval()
136+
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || dml.MultiShardAutocommit) && vcursor.AutocommitApproval()
137137
result, errs := vcursor.ExecuteMultiShard(ctx, primitive, rss, queries, true /*rollbackOnError*/, autocommit, dml.FetchLastInsertID)
138138
return result, vterrors.Aggregate(errs)
139139
}

go/vt/vtgate/engine/fake_vcursor_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,10 @@ func (t *noopVCursor) GetLogs() ([]ExecuteEntry, error) {
414414
func (t *noopVCursor) RecordMirrorStats(sourceExecTime, targetExecTime time.Duration, targetErr error) {
415415
}
416416

417+
func (t *noopVCursor) DefaultMultiShardAutocommit() bool {
418+
return false
419+
}
420+
417421
var (
418422
_ VCursor = (*loggingVCursor)(nil)
419423
_ SessionActions = (*loggingVCursor)(nil)
@@ -845,6 +849,10 @@ func (f *loggingVCursor) SetPriority(string) {
845849
panic("implement me")
846850
}
847851

852+
func (f *loggingVCursor) DefaultMultiShardAutocommit() bool {
853+
return false
854+
}
855+
848856
func (f *loggingVCursor) FindRoutedTable(tbl sqlparser.TableName) (*vindexes.Table, error) {
849857
f.log = append(f.log, fmt.Sprintf("FindTable(%s)", sqlparser.String(tbl)))
850858
return f.tableRoutes.tbl, nil

go/vt/vtgate/engine/insert.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (ins *Insert) executeInsertQueries(
164164
queries []*querypb.BoundQuery,
165165
insertID uint64,
166166
) (*sqltypes.Result, error) {
167-
autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
167+
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
168168
err := allowOnlyPrimary(rss...)
169169
if err != nil {
170170
return nil, err

go/vt/vtgate/engine/insert_select.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func (ins *InsertSelect) executeInsertQueries(
202202
queries []*querypb.BoundQuery,
203203
insertID uint64,
204204
) (*sqltypes.Result, error) {
205-
autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
205+
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
206206
err := allowOnlyPrimary(rss...)
207207
if err != nil {
208208
return nil, err

go/vt/vtgate/engine/primitive.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ type (
149149
RecordMirrorStats(time.Duration, time.Duration, error)
150150

151151
SetLastInsertID(uint64)
152+
153+
// DefaultMultiShardAutocommit returns true if multi shard autocommit semantics are enabled by default
154+
DefaultMultiShardAutocommit() bool
152155
}
153156

154157
// SessionActions gives primitives ability to interact with the session state

go/vt/vtgate/engine/send.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (s *Send) checkAndReturnShards(ctx context.Context, vcursor VCursor) ([]*sr
146146

147147
func (s *Send) canAutoCommit(vcursor VCursor, rss []*srvtopo.ResolvedShard) bool {
148148
if s.IsDML {
149-
return (len(rss) == 1 || s.MultishardAutocommit) && vcursor.AutocommitApproval()
149+
return (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || s.MultishardAutocommit) && vcursor.AutocommitApproval()
150150
}
151151
return false
152152
}

go/vt/vtgate/executor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ type (
115115
AllowScatter bool
116116
WarmingReadsPercent int
117117
QueryLogToFile string
118+
119+
// DefaultMultiShardAutocommit will opt into autocommit semantics even for multi shard DMLs
120+
DefaultMultiShardAutocommit bool
118121
}
119122

120123
Executor struct {
@@ -1424,6 +1427,8 @@ func (e *Executor) initVConfig(warnOnShardedOnly bool, pv plancontext.PlannerVer
14241427
WarmingReadsPercent: e.config.WarmingReadsPercent,
14251428
WarmingReadsTimeout: warmingReadsQueryTimeout,
14261429
WarmingReadsChannel: e.warmingReadsChannel,
1430+
1431+
DefaultMultiShardAutocommit: e.config.DefaultMultiShardAutocommit,
14271432
}
14281433
}
14291434

go/vt/vtgate/executorcontext/vcursor_impl.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ type (
9191
WarmingReadsPercent int
9292
WarmingReadsTimeout time.Duration
9393
WarmingReadsChannel chan bool
94+
95+
DefaultMultiShardAutocommit bool
9496
}
9597

9698
// vcursor_impl needs these facilities to be able to be able to execute queries for vindexes
@@ -1619,3 +1621,7 @@ func (vc *VCursorImpl) SetLastInsertID(id uint64) {
16191621
defer vc.SafeSession.mu.Unlock()
16201622
vc.SafeSession.LastInsertId = id
16211623
}
1624+
1625+
func (vc *VCursorImpl) DefaultMultiShardAutocommit() bool {
1626+
return vc.config.DefaultMultiShardAutocommit
1627+
}

go/vt/vtgate/vtgate.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ var (
7878
noScatter bool
7979
enableShardRouting bool
8080

81+
defaultMultiShardAutocommit bool
82+
8183
// healthCheckRetryDelay is the time to wait before retrying healthcheck
8284
healthCheckRetryDelay = 2 * time.Millisecond
8385
// healthCheckTimeout is the timeout on the RPC call to tablets
@@ -199,6 +201,7 @@ func registerFlags(fs *pflag.FlagSet) {
199201
fs.IntVar(&warmingReadsPercent, "warming-reads-percent", 0, "Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm")
200202
fs.IntVar(&warmingReadsConcurrency, "warming-reads-concurrency", 500, "Number of concurrent warming reads allowed")
201203
fs.DurationVar(&warmingReadsQueryTimeout, "warming-reads-query-timeout", 5*time.Second, "Timeout of warming read queries")
204+
fs.BoolVar(&defaultMultiShardAutocommit, "default-multi-shard-autocommit", defaultMultiShardAutocommit, "By default execute multi-shard DML statements with autocommit, even without the MULTI_SHARD_AUTOCOMMIT directive")
202205

203206
viperutil.BindFlags(fs,
204207
enableOnlineDDL,
@@ -358,6 +361,8 @@ func Init(
358361
AllowScatter: !noScatter,
359362
WarmingReadsPercent: warmingReadsPercent,
360363
QueryLogToFile: queryLogToFile,
364+
365+
DefaultMultiShardAutocommit: defaultMultiShardAutocommit,
361366
}
362367

363368
executor := NewExecutor(ctx, env, serv, cell, resolver, eConfig, warnShardedOnly, plans, si, pv, dynamicConfig)

0 commit comments

Comments
 (0)