Skip to content

Commit

Permalink
Tweaks after self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 16, 2024
1 parent 9857538 commit ac6f772
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 32 deletions.
51 changes: 20 additions & 31 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ func materialize(t *testing.T, spec string, useVtctldClient bool) {

func materializeProduct(t *testing.T, useVtctldClient bool) {
t.Run("materializeProduct", func(t *testing.T) {
// materializing from "product" keyspace to "customer" keyspace
// Materializing from "product" keyspace to "customer" keyspace.
workflow := "cproduct"
keyspace := "customer"
defaultCell := vc.Cells[vc.CellNames[0]]
Expand All @@ -1170,7 +1170,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {

productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "primary")
t.Run("throttle-app-product", func(t *testing.T) {
// Now, throttle the streamer on source tablets, insert some rows
// Now, throttle the source side component (vstreamer), and insert some rows.
for _, tab := range productTablets {
body, err := throttleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
Expand All @@ -1181,45 +1181,33 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusNotThrottled)
}
insertMoreProductsForSourceThrottler(t)
// To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
// we expect the additional rows to **not appear** in the materialized view
// To be fair to the test, we give the target time to apply the new changes. We
// expect it to NOT get them in the first place, we expect the additional rows
// to **not appear** in the materialized view
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 5)
// Confirm that we updated the stats on the throttled target tablets as expected.
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// Value looks like this: {"cproduct.4.tablet.vstreamer": 2}
require.Greater(t, gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int(), int64(0))
// We only need to do this once.
// The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2}
vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int()
require.Greater(t, vstreamerThrottledCount, int64(0))
// We only need to do this stat check once.
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
throttledCount, err := strconv.Atoi(val)
throttledCount, err := strconv.ParseInt(val, 10, 64)
require.NoError(t, err)
require.Greater(t, throttledCount, 0)
}
// Now confirm that we updated the stats on the throttled source tablets as expected.
for _, tab := range productTablets {
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// Value looks like this: {"rollup.1.tablet.vstreamer": 2, "rollup.1.tablet.rowstreamer": 1}
require.Greater(t, gjson.Get(jsVal, `*\.*\.tablet\.vstreamer`).Int(), int64(0))
// We only need to do this once.
val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"})
require.NoError(t, err)
throttledCount, err := strconv.Atoi(val)
require.NoError(t, err)
require.Greater(t, throttledCount, 0)
require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount)
}
})
t.Run("unthrottle-app-product", func(t *testing.T) {
// unthrottle on source tablets, and expect the rows to show up
// Unthrottle the vstreamer component, and expect the rows to show up.
for _, tab := range productTablets {
body, err := unthrottleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, sourceThrottlerAppName)
// give time for unthrottling to take effect and for target to fetch data
// Give time for unthrottling to take effect and for targets to fetch data.
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled)
}
for _, tab := range customerTablets {
Expand All @@ -1228,8 +1216,8 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
})

t.Run("throttle-app-customer", func(t *testing.T) {
// Now, throttle vreplication (vcopier/vapplier) on target tablets, and
// insert some more rows.
// Now, throttle vreplication on the target side (vplayer), and insert some
// more rows.
for _, tab := range customerTablets {
body, err := throttleApp(tab, targetThrottlerAppName)
assert.NoError(t, err)
Expand All @@ -1244,12 +1232,13 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
// rows to **not appear** in the materialized view.
for _, tab := range customerTablets {
waitForRowCountInTablet(t, tab, keyspace, workflow, 8)
// Confirm that we updated the stats on the throttled target tablets as expected.
// Confirm that we updated the stats on the target tablets as expected.
jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"})
require.NoError(t, err)
require.NotEqual(t, "{}", jsVal)
// Value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
require.Greater(t, gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int(), int64(0))
// The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4}
vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int()
require.Greater(t, vplayerThrottledCount, int64(0))
}
})
t.Run("unthrottle-app-customer", func(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,9 @@ func (vr *vreplicator) throttlerAppName() string {
// throttleUpdatesRateLimiter.tickerTime.
// It also increments the throttled count in the stats to keep track of how many
// times a VReplication workflow, and the specific sub-component, is throttled by the
// tablet throttler over time.
// tablet throttler over time. It also increments the global throttled count to keep
// track of how many times in total vreplication has been throttled across all workflows
// (both ones that currently exist and ones that no longer do).
func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error {
appName := appThrottled.String()
vr.stats.ThrottledCounts.Add([]string{"tablet", appName}, 1)
Expand Down

0 comments on commit ac6f772

Please sign in to comment.