diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index 88c4093f451..a108ca0d4ad 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -105,6 +105,8 @@ type Stats struct { PartialQueryCount *stats.CountersWithMultiLabels PartialQueryCacheSize *stats.CountersWithMultiLabels + + ThrottledCounts *stats.CountersWithMultiLabels } // RecordHeartbeat updates the time the last heartbeat from vstreamer was seen @@ -174,6 +176,7 @@ func NewStats() *Stats { bps.TableCopyTimings = stats.NewTimings("", "", "Table") bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"}) bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"}) + bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"}) return bps } @@ -369,13 +372,14 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error { if backoff == throttler.NotThrottled { break } + blp.blplStats.ThrottledCounts.Add([]string{"trx", "binlogplayer"}, 1) // We don't bother checking for context cancellation here because the // sleep will block only up to 1 second. (Usually, backoff is 1s / rate // e.g. a rate of 1000 TPS results into a backoff of 1 ms.) time.Sleep(backoff) } - // get the response + // Get the response. response, err := stream.Recv() // Check context before checking error, because canceled // contexts could be wrapped as regular errors. diff --git a/go/vt/vtgate/sandbox_test.go b/go/vt/vtgate/sandbox_test.go index 3ceee09d5f7..27be6442cfe 100644 --- a/go/vt/vtgate/sandbox_test.go +++ b/go/vt/vtgate/sandbox_test.go @@ -281,7 +281,10 @@ func (sct *sandboxTopo) WatchSrvVSchema(ctx context.Context, cell string, callba } sct.topoServer.UpdateSrvVSchema(ctx, cell, srvVSchema) - current, updateChan, _ := sct.topoServer.WatchSrvVSchema(ctx, cell) + current, updateChan, err := sct.topoServer.WatchSrvVSchema(ctx, cell) + if err != nil { + panic(fmt.Sprintf("sandboxTopo WatchSrvVSchema returned an error: %v", err)) + } if !callback(current.Value, nil) { panic("sandboxTopo callback returned false") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index 892247efee0..84520e19729 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -502,6 +502,21 @@ func (st *vrStats) register() { return result }) + stats.NewCountersFuncWithMultiLabels( + "VReplicationThrottledCounts", + "The number of times vreplication was throttled by workflow, id, throttler (trx or tablet), and the sub-component that was throttled", + []string{"workflow", "id", "throttler", "component"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64) + for _, ct := range st.controllers { + for key, val := range ct.blpStats.ErrorCounts.Counts() { + result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val + } + } + return result + }) } func (st *vrStats) numControllers() int64 { diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index 79149d34d6d..3b96219d14a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -184,6 +184,11 @@ func TestVReplicationStats(t *testing.T) { require.Equal(t, int64(100), testStats.status().Controllers[0].CopyLoopCount) require.Equal(t, int64(200), testStats.status().Controllers[0].CopyRowCount) + blpStats.ThrottledCounts.Add([]string{"tablet", "vcopier"}, 10) + blpStats.ThrottledCounts.Add([]string{"tablet", "vplayer"}, 80) + require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"]) + require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"]) + var tm int64 = 1234567890 blpStats.RecordHeartbeat(tm) require.Equal(t, tm, blpStats.Heartbeat()) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 11633d95f33..9cd0977ecad 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -580,10 +580,18 @@ func (vr *vreplicator) throttlerAppName() string { return throttlerapp.Concatenate(names...) } +// updateTimeThrottled updates the time_throttled field in the _vt.vreplication record +// with a rate limit so that it's only saved in the database at most once per +// throttleUpdatesRateLimiter.tickerTime. +// It also increments the throttled count in the stats to keep track of how many +// times a VReplication workflow, and the specific sub-component, is throttled by the +// tablet throttler over time. func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error { + at := appThrottled.String() + vr.stats.ThrottledCounts.Add([]string{"tablet", at}, 1) err := vr.throttleUpdatesRateLimiter.Do(func() error { tm := time.Now().Unix() - update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled.String()) + update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, at) if err != nil { return err }