Skip to content

Commit

Permalink
Add vreplication throttler stats
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 13, 2024
1 parent a404807 commit 2b89132
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 3 deletions.
6 changes: 5 additions & 1 deletion go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type Stats struct {

PartialQueryCount *stats.CountersWithMultiLabels
PartialQueryCacheSize *stats.CountersWithMultiLabels

ThrottledCounts *stats.CountersWithMultiLabels
}

// RecordHeartbeat updates the time the last heartbeat from vstreamer was seen
Expand Down Expand Up @@ -174,6 +176,7 @@ func NewStats() *Stats {
bps.TableCopyTimings = stats.NewTimings("", "", "Table")
bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"})
return bps
}

Expand Down Expand Up @@ -369,13 +372,14 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
if backoff == throttler.NotThrottled {
break
}
blp.blplStats.ThrottledCounts.Add([]string{"trx", "binlogplayer"}, 1)

Check warning on line 375 in go/vt/binlog/binlogplayer/binlog_player.go

View check run for this annotation

Codecov / codecov/patch

go/vt/binlog/binlogplayer/binlog_player.go#L375

Added line #L375 was not covered by tests
// We don't bother checking for context cancellation here because the
// sleep will block only up to 1 second. (Usually, backoff is 1s / rate
// e.g. a rate of 1000 TPS results into a backoff of 1 ms.)
time.Sleep(backoff)
}

// get the response
// Get the response.
response, err := stream.Recv()
// Check context before checking error, because canceled
// contexts could be wrapped as regular errors.
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgate/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ func (sct *sandboxTopo) WatchSrvVSchema(ctx context.Context, cell string, callba
}

sct.topoServer.UpdateSrvVSchema(ctx, cell, srvVSchema)
current, updateChan, _ := sct.topoServer.WatchSrvVSchema(ctx, cell)
current, updateChan, err := sct.topoServer.WatchSrvVSchema(ctx, cell)
if err != nil {
panic(fmt.Sprintf("sandboxTopo WatchSrvVSchema returned an error: %v", err))
}
if !callback(current.Value, nil) {
panic("sandboxTopo callback returned false")
}
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,21 @@ func (st *vrStats) register() {
return result
})

stats.NewCountersFuncWithMultiLabels(
"VReplicationThrottledCounts",
"The number of times vreplication was throttled by workflow, id, throttler (trx or tablet), and the sub-component that was throttled",
[]string{"workflow", "id", "throttler", "component"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64)
for _, ct := range st.controllers {
for key, val := range ct.blpStats.ErrorCounts.Counts() {
result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val

Check warning on line 515 in go/vt/vttablet/tabletmanager/vreplication/stats.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/stats.go#L510-L515

Added lines #L510 - L515 were not covered by tests
}
}
return result

Check warning on line 518 in go/vt/vttablet/tabletmanager/vreplication/stats.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/stats.go#L518

Added line #L518 was not covered by tests
})
}

func (st *vrStats) numControllers() int64 {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ func TestVReplicationStats(t *testing.T) {
require.Equal(t, int64(100), testStats.status().Controllers[0].CopyLoopCount)
require.Equal(t, int64(200), testStats.status().Controllers[0].CopyRowCount)

blpStats.ThrottledCounts.Add([]string{"tablet", "vcopier"}, 10)
blpStats.ThrottledCounts.Add([]string{"tablet", "vplayer"}, 80)
require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"])
require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"])

var tm int64 = 1234567890
blpStats.RecordHeartbeat(tm)
require.Equal(t, tm, blpStats.Heartbeat())
Expand Down
10 changes: 9 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,18 @@ func (vr *vreplicator) throttlerAppName() string {
return throttlerapp.Concatenate(names...)
}

// updateTimeThrottled updates the time_throttled field in the _vt.vreplication record
// with a rate limit so that it's only saved in the database at most once per
// throttleUpdatesRateLimiter.tickerTime.
// It also increments the throttled count in the stats to keep track of how many
// times a VReplication workflow, and the specific sub-component, is throttled by the
// tablet throttler over time.
func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error {
at := appThrottled.String()
vr.stats.ThrottledCounts.Add([]string{"tablet", at}, 1)

Check warning on line 591 in go/vt/vttablet/tabletmanager/vreplication/vreplicator.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/vreplicator.go#L590-L591

Added lines #L590 - L591 were not covered by tests
err := vr.throttleUpdatesRateLimiter.Do(func() error {
tm := time.Now().Unix()
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled.String())
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, at)

Check warning on line 594 in go/vt/vttablet/tabletmanager/vreplication/vreplicator.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/vreplicator.go#L594

Added line #L594 was not covered by tests
if err != nil {
return err
}
Expand Down

0 comments on commit 2b89132

Please sign in to comment.