Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Add throttler stats #15221

Merged
merged 11 commits into from
Feb 19, 2024
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/fk_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ insert into t2 values(1, 1, 't21'), (2, 1, 't22'), (3, 2, 't23');
}
]
},
"t1": {
"t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"t2": {
"t2": {
"column_vindexes": [
{
"column": "t1id",
Expand Down
8 changes: 4 additions & 4 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ func (ls *fkLoadSimulator) exec(query string) *sqltypes.Result {
// constraints, where the parent table is lexicographically sorted before the child table and
// thus may be dropped first, can be successfully cancelled.
func testFKCancel(t *testing.T, vc *VitessCluster) {
var targetKeyspace = "fktarget"
var sourceKeyspace = "fksource"
var workflowName = "wf2"
var ksWorkflow = fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
targetKeyspace := "fktarget"
sourceKeyspace := "fksource"
workflowName := "wf2"
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
mt := newMoveTables(vc, &moveTablesWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
Expand Down
38 changes: 30 additions & 8 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"net/http"
"runtime"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -1155,7 +1156,7 @@ func materialize(t *testing.T, spec string, useVtctldClient bool) {

func materializeProduct(t *testing.T, useVtctldClient bool) {
t.Run("materializeProduct", func(t *testing.T) {
// materializing from "product" keyspace to "customer" keyspace
// Materializing from "product" keyspace to "customer" keyspace.
workflow := "cproduct"
keyspace := "customer"
defaultCell := vc.Cells[vc.CellNames[0]]
Expand All @@ -1169,7 +1170,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {

productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "primary")
t.Run("throttle-app-product", func(t *testing.T) {
// Now, throttle the streamer on source tablets, insert some rows
// Now, throttle the source side component (vstreamer), and insert some rows.
for _, tab := range productTablets {
body, err := throttleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
Expand All @@ -1180,19 +1181,33 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusNotThrottled)
}
insertMoreProductsForSourceThrottler(t)
// To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
// we expect the additional rows to **not appear** in the materialized view
// To be fair to the test, we give the target time to apply the new changes. We
// expect it to NOT get them in the first place, we expect the additional rows
// to **not appear** in the materialized view.
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2}
vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int()
require.Greater(t, vstreamerThrottledCount, int64(0))
// We only need to do this stat check once.
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
throttledCount, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount)
}
})
t.Run("unthrottle-app-product", func(t *testing.T) {
// unthrottle on source tablets, and expect the rows to show up
// Unthrottle the vstreamer component, and expect the rows to show up.
for _, tab := range productTablets {
body, err := unthrottleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, sourceThrottlerAppName)
// give time for unthrottling to take effect and for target to fetch data
// Give time for unthrottling to take effect and for targets to fetch data.
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled)
}
for _, tab := range customerTablets {
Expand All @@ -1201,8 +1216,8 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
})

t.Run("throttle-app-customer", func(t *testing.T) {
// Now, throttle vreplication (vcopier/vapplier) on target tablets, and
// insert some more rows.
// Now, throttle vreplication on the target side (vplayer), and insert some
// more rows.
for _, tab := range customerTablets {
body, err := throttleApp(tab, targetThrottlerAppName)
assert.NoError(t, err)
Expand All @@ -1217,6 +1232,13 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
// rows to **not appear** in the materialized view.
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int()
require.Greater(t, vplayerThrottledCount, int64(0))
}
})
t.Run("unthrottle-app-customer", func(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@

PartialQueryCount *stats.CountersWithMultiLabels
PartialQueryCacheSize *stats.CountersWithMultiLabels

ThrottledCounts *stats.CountersWithMultiLabels // By throttler and component
}

// RecordHeartbeat updates the time the last heartbeat from vstreamer was seen
Expand Down Expand Up @@ -174,6 +176,7 @@
bps.TableCopyTimings = stats.NewTimings("", "", "Table")
bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"})
return bps
}

Expand Down Expand Up @@ -369,13 +372,14 @@
if backoff == throttler.NotThrottled {
break
}
blp.blplStats.ThrottledCounts.Add([]string{"trx", "binlogplayer"}, 1)

Check warning on line 375 in go/vt/binlog/binlogplayer/binlog_player.go

View check run for this annotation

Codecov / codecov/patch

go/vt/binlog/binlogplayer/binlog_player.go#L375

Added line #L375 was not covered by tests
// We don't bother checking for context cancellation here because the
// sleep will block only up to 1 second. (Usually, backoff is 1s / rate
// e.g. a rate of 1000 TPS results into a backoff of 1 ms.)
time.Sleep(backoff)
}

// get the response
// Get the response.
response, err := stream.Recv()
// Check context before checking error, because canceled
// contexts could be wrapped as regular errors.
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgate/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ func (sct *sandboxTopo) WatchSrvVSchema(ctx context.Context, cell string, callba
}

sct.topoServer.UpdateSrvVSchema(ctx, cell, srvVSchema)
current, updateChan, _ := sct.topoServer.WatchSrvVSchema(ctx, cell)
current, updateChan, err := sct.topoServer.WatchSrvVSchema(ctx, cell)
if err != nil {
panic(fmt.Sprintf("sandboxTopo WatchSrvVSchema returned an error: %v", err))
}
if !callback(current.Value, nil) {
panic("sandboxTopo callback returned false")
}
Expand Down
27 changes: 26 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@
mu sync.Mutex
isOpen bool
controllers map[int32]*controller

ThrottledCount *stats.Counter
}

func (st *vrStats) register() {

st.ThrottledCount = stats.NewCounter("", "")
stats.NewGaugeFunc("VReplicationStreamCount", "Number of vreplication streams", st.numControllers)
stats.NewGaugeFunc("VReplicationLagSecondsMax", "Max vreplication seconds behind primary", st.maxReplicationLagSeconds)
stats.NewStringMapFuncWithMultiLabels(
Expand Down Expand Up @@ -502,6 +504,29 @@
return result
})

stats.NewCounterFunc(
"VReplicationThrottledCountTotal",
"The total number of times that vreplication has been throttled",
func() int64 {
st.mu.Lock()
defer st.mu.Unlock()
return st.ThrottledCount.Get()
})

Check warning on line 514 in go/vt/vttablet/tabletmanager/vreplication/stats.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/stats.go#L511-L514

Added lines #L511 - L514 were not covered by tests
stats.NewCountersFuncWithMultiLabels(
"VReplicationThrottledCounts",
"The number of times vreplication was throttled by workflow, id, throttler (trx or tablet), and the sub-component that was throttled",
[]string{"workflow", "id", "throttler", "component"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64)
for _, ct := range st.controllers {
for key, val := range ct.blpStats.ThrottledCounts.Counts() {
result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val

Check warning on line 525 in go/vt/vttablet/tabletmanager/vreplication/stats.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/stats.go#L520-L525

Added lines #L520 - L525 were not covered by tests
}
}
return result

Check warning on line 528 in go/vt/vttablet/tabletmanager/vreplication/stats.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/stats.go#L528

Added line #L528 was not covered by tests
})
}

func (st *vrStats) numControllers() int64 {
Expand Down
13 changes: 12 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -132,7 +133,9 @@ func TestStatusHtml(t *testing.T) {
func TestVReplicationStats(t *testing.T) {
blpStats := binlogplayer.NewStats()
defer blpStats.Stop()
testStats := &vrStats{}
testStats := &vrStats{
ThrottledCount: stats.NewCounter("", ""),
}
testStats.isOpen = true
testStats.controllers = map[int32]*controller{
1: {
Expand Down Expand Up @@ -184,6 +187,14 @@ func TestVReplicationStats(t *testing.T) {
require.Equal(t, int64(100), testStats.status().Controllers[0].CopyLoopCount)
require.Equal(t, int64(200), testStats.status().Controllers[0].CopyRowCount)

testStats.ThrottledCount.Add(99)
require.Equal(t, int64(99), testStats.ThrottledCount.Get())

blpStats.ThrottledCounts.Add([]string{"tablet", "vcopier"}, 10)
blpStats.ThrottledCounts.Add([]string{"tablet", "vplayer"}, 80)
require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"])
require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"])

var tm int64 = 1234567890
blpStats.RecordHeartbeat(tm)
require.Equal(t, tm, blpStats.Heartbeat())
Expand Down
13 changes: 12 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,21 @@
return throttlerapp.Concatenate(names...)
}

// updateTimeThrottled updates the time_throttled field in the _vt.vreplication record
// with a rate limit so that it's only saved in the database at most once per
// throttleUpdatesRateLimiter.tickerTime.
// It also increments the throttled count in the stats to keep track of how many
// times a VReplication workflow, and the specific sub-component, is throttled by the
// tablet throttler over time. It also increments the global throttled count to keep
// track of how many times in total vreplication has been throttled across all workflows
// (both ones that currently exist and ones that no longer do).
func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error {
appName := appThrottled.String()
vr.stats.ThrottledCounts.Add([]string{"tablet", appName}, 1)
globalStats.ThrottledCount.Add(1)

Check warning on line 594 in go/vt/vttablet/tabletmanager/vreplication/vreplicator.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/vreplicator.go#L592-L594

Added lines #L592 - L594 were not covered by tests
err := vr.throttleUpdatesRateLimiter.Do(func() error {
tm := time.Now().Unix()
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled.String())
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appName)

Check warning on line 597 in go/vt/vttablet/tabletmanager/vreplication/vreplicator.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/vreplicator.go#L597

Added line #L597 was not covered by tests
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const (
)

func getDefaultCollationID() int64 {
return 45
return 45 // utf8mb4_general_ci
}

var (
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated fix for flaky tests seen in the code coverage workflow:

--- FAIL: TestMinimalMode (60.00s)
    main_flaky_test.go:79: 
        	Error Trace:	/home/runner/work/vitess/vitess/go/vt/vttablet/tabletserver/vstreamer/main_flaky_test.go:79
        	            				/home/runner/work/vitess/vitess/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go:1855
        	Error:      	Received unexpected error:
        	            	could not launch mysql: signal: killed
        	Test:       	TestMinimalMode

Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func TestNoBlob(t *testing.T) {
env = nil
newEngine(t, ctx, "noblob")
defer func() {
if engine != nil {
engine.Close()
}
if env != nil {
env.Close()
}
engine = oldEngine
env = oldEnv
}()
Expand Down Expand Up @@ -1854,6 +1860,12 @@ func TestMinimalMode(t *testing.T) {
env = nil
newEngine(t, ctx, "minimal")
defer func() {
if engine != nil {
engine.Close()
}
if env != nil {
env.Close()
}
engine = oldEngine
env = oldEnv
}()
Expand Down
Loading