Skip to content

Commit e6c9e3c

Browse files
committed
tabletserver: do not consolidate streams on primary tablet when consolidator mode is notOnPrimary (#14332)
Signed-off-by: Max Englander <max@planetscale.com>
1 parent faee1cf commit e6c9e3c

File tree

5 files changed

+92
-60
lines changed

5 files changed

+92
-60
lines changed

go/vt/vttablet/endtoend/config_test.go

Lines changed: 81 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -116,64 +116,88 @@ func TestDisableConsolidator(t *testing.T) {
116116
}
117117

118118
func TestConsolidatorReplicasOnly(t *testing.T) {
119-
totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
120-
initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
121-
var wg sync.WaitGroup
122-
wg.Add(2)
123-
go func() {
124-
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
125-
wg.Done()
126-
}()
127-
go func() {
128-
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
129-
wg.Done()
130-
}()
131-
wg.Wait()
132-
afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
133-
assert.Equal(t, initial+1, afterOne, "expected one consolidation")
134-
135-
revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
136-
defer revert()
137-
138-
// primary should not do query consolidation
139-
var wg2 sync.WaitGroup
140-
wg2.Add(2)
141-
go func() {
142-
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
143-
wg2.Done()
144-
}()
145-
go func() {
146-
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
147-
wg2.Done()
148-
}()
149-
wg2.Wait()
150-
noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
151-
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")
152-
153-
// become a replica, where query consolidation should happen
154-
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)
155-
156-
err := client.SetServingType(topodatapb.TabletType_REPLICA)
157-
require.NoError(t, err)
158-
defer func() {
159-
err = client.SetServingType(topodatapb.TabletType_PRIMARY)
160-
require.NoError(t, err)
161-
}()
119+
type executeFn func(
120+
query string, bindvars map[string]*querypb.BindVariable,
121+
) (*sqltypes.Result, error)
122+
123+
testCases := []struct {
124+
name string
125+
getExecuteFn func(qc *framework.QueryClient) executeFn
126+
totalConsolidationsTag string
127+
}{
128+
{
129+
name: "Execute",
130+
getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.Execute },
131+
totalConsolidationsTag: "Waits/Histograms/Consolidations/Count",
132+
},
133+
{
134+
name: "StreamExecute",
135+
getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.StreamExecute },
136+
totalConsolidationsTag: "Waits/Histograms/StreamConsolidations/Count",
137+
},
138+
}
162139

163-
initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
164-
var wg3 sync.WaitGroup
165-
wg3.Add(2)
166-
go func() {
167-
client.Execute("select sleep(0.5) from dual", nil)
168-
wg3.Done()
169-
}()
170-
go func() {
171-
client.Execute("select sleep(0.5) from dual", nil)
172-
wg3.Done()
173-
}()
174-
wg3.Wait()
175-
afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
176-
assert.Equal(t, initial+1, afterOne, "expected another consolidation")
140+
for _, testCase := range testCases {
141+
t.Run(testCase.name, func(t *testing.T) {
142+
initial := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
143+
var wg sync.WaitGroup
144+
wg.Add(2)
145+
go func() {
146+
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
147+
wg.Done()
148+
}()
149+
go func() {
150+
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
151+
wg.Done()
152+
}()
153+
wg.Wait()
154+
afterOne := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
155+
assert.Equal(t, initial+1, afterOne, "expected one consolidation")
156+
157+
revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
158+
defer revert()
159+
160+
// primary should not do query consolidation
161+
var wg2 sync.WaitGroup
162+
wg2.Add(2)
163+
go func() {
164+
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
165+
wg2.Done()
166+
}()
167+
go func() {
168+
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
169+
wg2.Done()
170+
}()
171+
wg2.Wait()
172+
noNewConsolidations := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
173+
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")
174+
175+
// become a replica, where query consolidation should happen
176+
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)
177+
178+
err := client.SetServingType(topodatapb.TabletType_REPLICA)
179+
require.NoError(t, err)
180+
defer func() {
181+
err = client.SetServingType(topodatapb.TabletType_PRIMARY)
182+
require.NoError(t, err)
183+
}()
184+
185+
initial = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
186+
var wg3 sync.WaitGroup
187+
wg3.Add(2)
188+
go func() {
189+
testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
190+
wg3.Done()
191+
}()
192+
go func() {
193+
testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
194+
wg3.Done()
195+
}()
196+
wg3.Wait()
197+
afterOne = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
198+
assert.Equal(t, initial+1, afterOne, "expected another consolidation")
199+
})
200+
}
177201
}
178202

179203
func TestQueryPlanCache(t *testing.T) {

go/vt/vttablet/tabletserver/query_executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error {
337337

338338
if consolidator := qre.tsv.qe.streamConsolidator; consolidator != nil {
339339
if qre.connID == 0 && qre.plan.PlanID == p.PlanSelectStream && qre.shouldConsolidate() {
340-
return consolidator.Consolidate(qre.logStats, sqlWithoutComments, callback,
340+
return consolidator.Consolidate(qre.tsv.stats.WaitTimings, qre.logStats, sqlWithoutComments, callback,
341341
func(callback StreamCallback) error {
342342
dbConn, err := qre.getStreamConn()
343343
if err != nil {

go/vt/vttablet/tabletserver/stream_consolidator.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ package tabletserver
1919
import (
2020
"sync"
2121
"sync/atomic"
22+
"time"
2223

2324
"vitess.io/vitess/go/sqltypes"
2425
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
26+
"vitess.io/vitess/go/vt/servenv"
2527
"vitess.io/vitess/go/vt/vterrors"
2628
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
2729
)
@@ -70,7 +72,7 @@ func (sc *StreamConsolidator) SetBlocking(block bool) {
7072
// `callback`. A `leaderCallback` must also be supplied: this function must perform the actual
7173
// query in the upstream MySQL server, yielding results into the modified callback that it receives
7274
// as an argument.
73-
func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
75+
func (sc *StreamConsolidator) Consolidate(waitTimings *servenv.TimingsWrapper, logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
7476
var (
7577
inflight *streamInFlight
7678
catchup []*sqltypes.Result
@@ -100,9 +102,11 @@ func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql stri
100102

101103
// if we have a followChan, we're following up on a query that is already being served
102104
if followChan != nil {
105+
startTime := time.Now()
103106
defer func() {
104107
memchange := inflight.unfollow(followChan, sc.cleanup)
105108
atomic.AddInt64(&sc.memory, memchange)
109+
waitTimings.Record("StreamConsolidations", startTime)
106110
}()
107111

108112
logStats.QuerySources |= tabletenv.QuerySourceConsolidator

go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/stretchr/testify/require"
3030

31+
"vitess.io/vitess/go/vt/servenv"
3132
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
3233

3334
"vitess.io/vitess/go/sqltypes"
@@ -123,10 +124,12 @@ func (ct *consolidationTest) run(workers int, generateCallback func(int) (string
123124

124125
go func(worker int) {
125126
defer wg.Done()
127+
exporter := servenv.NewExporter("ConsolidatorTest", "")
128+
timings := exporter.NewTimings("ConsolidatorWaits", "", "StreamConsolidations")
126129
logStats := tabletenv.NewLogStats(context.Background(), "StreamConsolidation")
127130
query, callback := generateCallback(worker)
128131
start := time.Now()
129-
err := ct.cc.Consolidate(logStats, query, func(result *sqltypes.Result) error {
132+
err := ct.cc.Consolidate(timings, logStats, query, func(result *sqltypes.Result) error {
130133
cr := ct.results[worker]
131134
cr.items = append(cr.items, result)
132135
atomic.AddInt64(&cr.count, 1)

go/vt/vttablet/tabletserver/tabletserver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ
914914
ctx: ctx,
915915
logStats: logStats,
916916
tsv: tsv,
917+
tabletType: target.GetTabletType(),
917918
setting: connSetting,
918919
}
919920
return qre.Stream(callback)

0 commit comments

Comments
 (0)