From ac6f7720a8cea1c92e8fe0107ef52c132885a53c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 16 Feb 2024 00:57:53 -0500 Subject: [PATCH] Tweaks after self review Signed-off-by: Matt Lord --- .../vreplication/vreplication_test.go | 51 ++++++++----------- .../tabletmanager/vreplication/vreplicator.go | 4 +- 2 files changed, 23 insertions(+), 32 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 595a07f6051..3c23ac19069 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -1156,7 +1156,7 @@ func materialize(t *testing.T, spec string, useVtctldClient bool) { func materializeProduct(t *testing.T, useVtctldClient bool) { t.Run("materializeProduct", func(t *testing.T) { - // materializing from "product" keyspace to "customer" keyspace + // Materializing from "product" keyspace to "customer" keyspace. workflow := "cproduct" keyspace := "customer" defaultCell := vc.Cells[vc.CellNames[0]] @@ -1170,7 +1170,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "primary") t.Run("throttle-app-product", func(t *testing.T) { - // Now, throttle the streamer on source tablets, insert some rows + // Now, throttle the source side component (vstreamer), and insert some rows. for _, tab := range productTablets { body, err := throttleApp(tab, sourceThrottlerAppName) assert.NoError(t, err) @@ -1181,45 +1181,33 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusNotThrottled) } insertMoreProductsForSourceThrottler(t) - // To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place, - // we expect the additional rows to **not appear** in the materialized view + // To be fair to the test, we give the target time to apply the new changes. We + // expect it to NOT get them in the first place, we expect the additional rows + // to **not appear** in the materialized view for _, tab := range customerTablets { waitForRowCountInTablet(t, tab, keyspace, workflow, 5) - // Confirm that we updated the stats on the throttled target tablets as expected. + // Confirm that we updated the stats on the target tablets as expected. jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) require.NoError(t, err) require.NotEqual(t, "{}", jsVal) - // Value looks like this: {"cproduct.4.tablet.vstreamer": 2} - require.Greater(t, gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int(), int64(0)) - // We only need to do this once. + // The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2} + vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int() + require.Greater(t, vstreamerThrottledCount, int64(0)) + // We only need to do this stat check once. val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"}) require.NoError(t, err) - throttledCount, err := strconv.Atoi(val) + throttledCount, err := strconv.ParseInt(val, 10, 64) require.NoError(t, err) - require.Greater(t, throttledCount, 0) - } - // Now confirm that we updated the stats on the throttled source tablets as expected. - for _, tab := range productTablets { - jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) - require.NoError(t, err) - require.NotEqual(t, "{}", jsVal) - // Value looks like this: {"rollup.1.tablet.vstreamer": 2, "rollup.1.tablet.rowstreamer": 1} - require.Greater(t, gjson.Get(jsVal, `*\.*\.tablet\.vstreamer`).Int(), int64(0)) - // We only need to do this once. - val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"}) - require.NoError(t, err) - throttledCount, err := strconv.Atoi(val) - require.NoError(t, err) - require.Greater(t, throttledCount, 0) + require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount) } }) t.Run("unthrottle-app-product", func(t *testing.T) { - // unthrottle on source tablets, and expect the rows to show up + // Unthrottle the vstreamer component, and expect the rows to show up. for _, tab := range productTablets { body, err := unthrottleApp(tab, sourceThrottlerAppName) assert.NoError(t, err) assert.Contains(t, body, sourceThrottlerAppName) - // give time for unthrottling to take effect and for target to fetch data + // Give time for unthrottling to take effect and for targets to fetch data. waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled) } for _, tab := range customerTablets { @@ -1228,8 +1216,8 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { }) t.Run("throttle-app-customer", func(t *testing.T) { - // Now, throttle vreplication (vcopier/vapplier) on target tablets, and - // insert some more rows. + // Now, throttle vreplication on the target side (vplayer), and insert some + // more rows. for _, tab := range customerTablets { body, err := throttleApp(tab, targetThrottlerAppName) assert.NoError(t, err) @@ -1244,12 +1232,13 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { // rows to **not appear** in the materialized view. for _, tab := range customerTablets { waitForRowCountInTablet(t, tab, keyspace, workflow, 8) - // Confirm that we updated the stats on the throttled target tablets as expected. + // Confirm that we updated the stats on the target tablets as expected. jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) require.NoError(t, err) require.NotEqual(t, "{}", jsVal) - // Value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4} - require.Greater(t, gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int(), int64(0)) + // The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4} + vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int() + require.Greater(t, vplayerThrottledCount, int64(0)) } }) t.Run("unthrottle-app-customer", func(t *testing.T) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index ee4c14dd86f..daa642c53af 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -585,7 +585,9 @@ func (vr *vreplicator) throttlerAppName() string { // throttleUpdatesRateLimiter.tickerTime. // It also increments the throttled count in the stats to keep track of how many // times a VReplication workflow, and the specific sub-component, is throttled by the -// tablet throttler over time. +// tablet throttler over time. It also increments the global throttled count to keep +// track of how many times in total vreplication has been throttled across all workflows +// (both ones that currently exist and ones that no longer do). func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error { appName := appThrottled.String() vr.stats.ThrottledCounts.Add([]string{"tablet", appName}, 1)