Skip to content

Commit 9857538

Browse files
committed
Add global throttled count and e2e tests
Signed-off-by: Matt Lord <mattalord@gmail.com>
1 parent 0a5d94f commit 9857538

File tree

5 files changed

+54
-4
lines changed

5 files changed

+54
-4
lines changed

go/test/endtoend/vreplication/vreplication_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"net/http"
2424
"runtime"
25+
"strconv"
2526
"strings"
2627
"sync"
2728
"testing"
@@ -1184,6 +1185,32 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
11841185
// we expect the additional rows to **not appear** in the materialized view
11851186
for _, tab := range customerTablets {
11861187
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
1188+
// Confirm that we updated the stats on the throttled target tablets as expected.
1189+
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1190+
require.NoError(t, err)
1191+
require.NotEqual(t, "{}", jsVal)
1192+
// Value looks like this: {"cproduct.4.tablet.vstreamer": 2}
1193+
require.Greater(t, gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int(), int64(0))
1194+
// We only need to do this once.
1195+
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
1196+
require.NoError(t, err)
1197+
throttledCount, err := strconv.Atoi(val)
1198+
require.NoError(t, err)
1199+
require.Greater(t, throttledCount, 0)
1200+
}
1201+
// Now confirm that we updated the stats on the throttled source tablets as expected.
1202+
for _, tab := range productTablets {
1203+
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1204+
require.NoError(t, err)
1205+
require.NotEqual(t, "{}", jsVal)
1206+
// Value looks like this: {"rollup.1.tablet.vstreamer": 2, "rollup.1.tablet.rowstreamer": 1}
1207+
require.Greater(t, gjson.Get(jsVal, `*\.*\.tablet\.vstreamer`).Int(), int64(0))
1208+
// We only need to do this once.
1209+
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
1210+
require.NoError(t, err)
1211+
throttledCount, err := strconv.Atoi(val)
1212+
require.NoError(t, err)
1213+
require.Greater(t, throttledCount, 0)
11871214
}
11881215
})
11891216
t.Run("unthrottle-app-product", func(t *testing.T) {
@@ -1217,6 +1244,12 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
12171244
// rows to **not appear** in the materialized view.
12181245
for _, tab := range customerTablets {
12191246
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
1247+
// Confirm that we updated the stats on the throttled target tablets as expected.
1248+
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
1249+
require.NoError(t, err)
1250+
require.NotEqual(t, "{}", jsVal)
1251+
// Value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
1252+
require.Greater(t, gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int(), int64(0))
12201253
}
12211254
})
12221255
t.Run("unthrottle-app-customer", func(t *testing.T) {

go/vt/binlog/binlogplayer/binlog_player.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ type Stats struct {
106106
PartialQueryCount *stats.CountersWithMultiLabels
107107
PartialQueryCacheSize *stats.CountersWithMultiLabels
108108

109-
ThrottledCounts *stats.CountersWithMultiLabels
109+
ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component
110110
}
111111

112112
// RecordHeartbeat updates the time the last heartbeat from vstreamer was seen

go/vt/vttablet/tabletmanager/vreplication/stats.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,12 @@ type vrStats struct {
5959
mu sync.Mutex
6060
isOpen bool
6161
controllers map[int32]*controller
62+
63+
ThrottledCount *stats.Counter
6264
}
6365

6466
func (st *vrStats) register() {
65-
67+
st.ThrottledCount = stats.NewCounter("", "")
6668
stats.NewGaugeFunc("VReplicationStreamCount", "Number of vreplication streams", st.numControllers)
6769
stats.NewGaugeFunc("VReplicationLagSecondsMax", "Max vreplication seconds behind primary", st.maxReplicationLagSeconds)
6870
stats.NewStringMapFuncWithMultiLabels(
@@ -502,6 +504,14 @@ func (st *vrStats) register() {
502504
return result
503505
})
504506

507+
stats.NewCounterFunc(
508+
"VReplicationThrottledCountTotal",
509+
"The total number of times that vreplication has been throttled",
510+
func() int64 {
511+
st.mu.Lock()
512+
defer st.mu.Unlock()
513+
return st.ThrottledCount.Get()
514+
})
505515
stats.NewCountersFuncWithMultiLabels(
506516
"VReplicationThrottledCounts",
507517
"The number of times vreplication was throttled by workflow, id, throttler (trx or tablet), and the sub-component that was throttled",
@@ -511,7 +521,7 @@ func (st *vrStats) register() {
511521
defer st.mu.Unlock()
512522
result := make(map[string]int64)
513523
for _, ct := range st.controllers {
514-
for key, val := range ct.blpStats.ErrorCounts.Counts() {
524+
for key, val := range ct.blpStats.ThrottledCounts.Counts() {
515525
result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val
516526
}
517527
}

go/vt/vttablet/tabletmanager/vreplication/stats_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/stretchr/testify/require"
2727

2828
"vitess.io/vitess/go/mysql/replication"
29+
"vitess.io/vitess/go/stats"
2930

3031
"vitess.io/vitess/go/vt/binlog/binlogplayer"
3132
"vitess.io/vitess/go/vt/proto/binlogdata"
@@ -132,7 +133,9 @@ func TestStatusHtml(t *testing.T) {
132133
func TestVReplicationStats(t *testing.T) {
133134
blpStats := binlogplayer.NewStats()
134135
defer blpStats.Stop()
135-
testStats := &vrStats{}
136+
testStats := &vrStats{
137+
ThrottledCount: stats.NewCounter("", ""),
138+
}
136139
testStats.isOpen = true
137140
testStats.controllers = map[int32]*controller{
138141
1: {
@@ -184,6 +187,9 @@ func TestVReplicationStats(t *testing.T) {
184187
require.Equal(t, int64(100), testStats.status().Controllers[0].CopyLoopCount)
185188
require.Equal(t, int64(200), testStats.status().Controllers[0].CopyRowCount)
186189

190+
testStats.ThrottledCount.Add(99)
191+
require.Equal(t, int64(99), testStats.ThrottledCount.Get())
192+
187193
blpStats.ThrottledCounts.Add([]string{"tablet", "vcopier"}, 10)
188194
blpStats.ThrottledCounts.Add([]string{"tablet", "vplayer"}, 80)
189195
require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"])

go/vt/vttablet/tabletmanager/vreplication/vreplicator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,7 @@ func (vr *vreplicator) throttlerAppName() string {
589589
func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error {
590590
appName := appThrottled.String()
591591
vr.stats.ThrottledCounts.Add([]string{"tablet", appName}, 1)
592+
globalStats.ThrottledCount.Add(1)
592593
err := vr.throttleUpdatesRateLimiter.Do(func() error {
593594
tm := time.Now().Unix()
594595
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appName)

0 commit comments

Comments
 (0)