Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Add throttler stats #15221

Merged
merged 11 commits into from
Feb 19, 2024
6 changes: 5 additions & 1 deletion go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@

PartialQueryCount *stats.CountersWithMultiLabels
PartialQueryCacheSize *stats.CountersWithMultiLabels

ThrottledCounts *stats.CountersWithMultiLabels
}

// RecordHeartbeat updates the time the last heartbeat from vstreamer was seen
Expand Down Expand Up @@ -174,6 +176,7 @@
bps.TableCopyTimings = stats.NewTimings("", "", "Table")
bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"})
bps.ThrottledCounts = stats.NewCountersWithMultiLabels("", "", []string{"throttler", "component"})
return bps
}

Expand Down Expand Up @@ -369,13 +372,14 @@
if backoff == throttler.NotThrottled {
break
}
blp.blplStats.ThrottledCounts.Add([]string{"trx", "binlogplayer"}, 1)

Check warning on line 375 in go/vt/binlog/binlogplayer/binlog_player.go

View check run for this annotation

Codecov / codecov/patch

go/vt/binlog/binlogplayer/binlog_player.go#L375

Added line #L375 was not covered by tests
// We don't bother checking for context cancellation here because the
// sleep will block only up to 1 second. (Usually, backoff is 1s / rate
// e.g. a rate of 1000 TPS results into a backoff of 1 ms.)
time.Sleep(backoff)
}

// get the response
// Get the response.
response, err := stream.Recv()
// Check context before checking error, because canceled
// contexts could be wrapped as regular errors.
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgate/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ func (sct *sandboxTopo) WatchSrvVSchema(ctx context.Context, cell string, callba
}

sct.topoServer.UpdateSrvVSchema(ctx, cell, srvVSchema)
current, updateChan, _ := sct.topoServer.WatchSrvVSchema(ctx, cell)
current, updateChan, err := sct.topoServer.WatchSrvVSchema(ctx, cell)
if err != nil {
panic(fmt.Sprintf("sandboxTopo WatchSrvVSchema returned an error: %v", err))
}
if !callback(current.Value, nil) {
panic("sandboxTopo callback returned false")
}
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,21 @@
return result
})

stats.NewCountersFuncWithMultiLabels(
"VReplicationThrottledCounts",
"The number of times vreplication was throttled by workflow, id, throttler (trx or tablet), and the sub-component that was throttled",
[]string{"workflow", "id", "throttler", "component"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64)
for _, ct := range st.controllers {
for key, val := range ct.blpStats.ErrorCounts.Counts() {
result[fmt.Sprintf("%s.%d.%s", ct.workflow, ct.id, key)] = val

Check warning on line 515 in go/vt/vttablet/tabletmanager/vreplication/stats.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/stats.go#L510-L515

Added lines #L510 - L515 were not covered by tests
}
}
return result

Check warning on line 518 in go/vt/vttablet/tabletmanager/vreplication/stats.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/stats.go#L518

Added line #L518 was not covered by tests
})
}

func (st *vrStats) numControllers() int64 {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ func TestVReplicationStats(t *testing.T) {
require.Equal(t, int64(100), testStats.status().Controllers[0].CopyLoopCount)
require.Equal(t, int64(200), testStats.status().Controllers[0].CopyRowCount)

blpStats.ThrottledCounts.Add([]string{"tablet", "vcopier"}, 10)
blpStats.ThrottledCounts.Add([]string{"tablet", "vplayer"}, 80)
require.Equal(t, int64(10), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vcopier"])
require.Equal(t, int64(80), testStats.controllers[1].blpStats.ThrottledCounts.Counts()["tablet.vplayer"])

var tm int64 = 1234567890
blpStats.RecordHeartbeat(tm)
require.Equal(t, tm, blpStats.Heartbeat())
Expand Down
10 changes: 9 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,18 @@
return throttlerapp.Concatenate(names...)
}

// updateTimeThrottled updates the time_throttled field in the _vt.vreplication record
// with a rate limit so that it's only saved in the database at most once per
// throttleUpdatesRateLimiter.tickerTime.
// It also increments the throttled count in the stats to keep track of how many
// times a VReplication workflow, and the specific sub-component, is throttled by the
// tablet throttler over time.
func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error {
at := appThrottled.String()
mattlord marked this conversation as resolved.
Show resolved Hide resolved
vr.stats.ThrottledCounts.Add([]string{"tablet", at}, 1)

Check warning on line 591 in go/vt/vttablet/tabletmanager/vreplication/vreplicator.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/vreplicator.go#L590-L591

Added lines #L590 - L591 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we want to always update stats in a goroutine so as not to make the metrics themselves affect the code's flow (the metric introduces an atomic write).

Copy link
Contributor Author

@mattlord mattlord Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do that in vreplication today. I would expect this to be faster/lighter than a log message, which we don't typically do in a goroutine (which has its own overhead). Your usage of the function that creates the underlying stat resources if needed (in the tablet throttler) I think makes more sense to do in a goroutine like you are.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm about to get rid of that usage 😛

err := vr.throttleUpdatesRateLimiter.Do(func() error {
tm := time.Now().Unix()
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled.String())
update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, at)

Check warning on line 594 in go/vt/vttablet/tabletmanager/vreplication/vreplicator.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vreplication/vreplicator.go#L594

Added line #L594 was not covered by tests
if err != nil {
return err
}
Expand Down
Loading