From 9018fef4643a5dd7b93bf359e0d5bc8fa2104def Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 11 Aug 2024 16:17:04 +0300 Subject: [PATCH] Tablet throttler: remove cached metric associated with removed tablet (#16555) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/throttler.go | 20 ++++- .../tabletserver/throttle/throttler_test.go | 81 +++++++++++++++++-- 2 files changed, 93 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 382d0a12a77..5cd56460713 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -743,6 +743,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { primaryStimulatorRateLimiter.Stop() throttler.aggregatedMetrics.Flush() throttler.recentApps.Flush() + clear(throttler.inventory.TabletMetrics) }() // we do not flush throttler.throttledApps because this is data submitted by the user; the user expects the data to survive a disable+enable @@ -842,7 +843,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { } case probes := <-throttler.clusterProbesChan: // incoming structural update, sparse, as result of refreshInventory() - throttler.updateClusterProbes(ctx, probes) + throttler.updateClusterProbes(probes) case <-metricsAggregateTicker.C: if throttler.IsOpen() { throttler.aggregateMetrics() @@ -1116,10 +1117,25 @@ func (throttler *Throttler) refreshInventory(ctx context.Context) error { } // synchronous update of inventory -func (throttler *Throttler) updateClusterProbes(ctx context.Context, clusterProbes *base.ClusterProbes) error { +func (throttler *Throttler) updateClusterProbes(clusterProbes *base.ClusterProbes) error { throttler.inventory.ClustersProbes = clusterProbes.TabletProbes throttler.inventory.IgnoreHostsCount = clusterProbes.IgnoreHostsCount throttler.inventory.IgnoreHostsThreshold = clusterProbes.IgnoreHostsThreshold + + for alias := range throttler.inventory.TabletMetrics { + if alias == "" { + // *this* tablet uses the empty alias to identify itself. + continue + } + if _, found := clusterProbes.TabletProbes[alias]; !found { + // There seems to be a metric stored for some alias, say zone1-0000000102, + // but there is no alias for this probe in the new clusterProbes. This + // suggests that the corresponding tablet has been removed, or its type was changed + // (e.g. from REPLICA to RDONLY). We should therefore remove this cached metric. + delete(throttler.inventory.TabletMetrics, alias) + } + } + return nil } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index e095378926c..41036620a60 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -94,6 +94,7 @@ var ( Value: 5.1, }, } + nonPrimaryTabletType atomic.Int32 ) const ( @@ -151,7 +152,11 @@ type FakeTopoServer struct { func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) { tabletType := topodatapb.TabletType_PRIMARY if alias.Uid != 100 { - tabletType = topodatapb.TabletType_REPLICA + val := topodatapb.TabletType(nonPrimaryTabletType.Load()) + if val == topodatapb.TabletType_UNKNOWN { + val = topodatapb.TabletType_REPLICA + } + tabletType = val } tablet := &topo.TabletInfo{ Tablet: &topodatapb.Tablet{ @@ -1156,9 +1161,9 @@ func TestRefreshInventory(t *testing.T) { // validateProbesCount expects number of probes according to cluster name and throttler's leadership status validateProbesCount := func(t *testing.T, probes base.Probes) { if throttler.isLeader.Load() { - assert.Equal(t, 3, len(probes)) + assert.Len(t, probes, 3) } else { - assert.Equal(t, 1, len(probes)) + assert.Len(t, probes, 1) } } t.Run("waiting for probes", func(t *testing.T) { @@ -1171,7 +1176,7 @@ func TestRefreshInventory(t *testing.T) { // not run, and therefore there is none but us to both populate `clusterProbesChan` as well as // read from it. We do not compete here with any other goroutine. assert.NotNil(t, probes) - throttler.updateClusterProbes(ctx, probes) + throttler.updateClusterProbes(probes) validateProbesCount(t, probes.TabletProbes) // Achieved our goal return @@ -1488,6 +1493,70 @@ func TestProbesWhileOperating(t *testing.T) { }) }) }) + + t.Run("metrics", func(t *testing.T) { + var results base.TabletResultMap + <-runSerialFunction(t, ctx, throttler, func(ctx context.Context) { + results = maps.Clone(throttler.inventory.TabletMetrics) + }) + assert.Len(t, results, 3) // 1 self tablet + 2 shard tablets + assert.Contains(t, results, "", "TabletMetrics: %+v", results) // primary self identifies with empty alias + assert.Contains(t, results, "fakezone1-0000000101", "TabletMetrics: %+v", results) + assert.Contains(t, results, "fakezone2-0000000102", "TabletMetrics: %+v", results) + }) + + t.Run("no REPLICA probes", func(t *testing.T) { + nonPrimaryTabletType.Store(int32(topodatapb.TabletType_RDONLY)) + defer nonPrimaryTabletType.Store(int32(topodatapb.TabletType_REPLICA)) + + t.Run("waiting for inventory metrics", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout) + defer cancel() + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + var results base.TabletResultMap + <-runSerialFunction(t, ctx, throttler, func(ctx context.Context) { + results = maps.Clone(throttler.inventory.TabletMetrics) + }) + if len(results) == 1 { + // That's what we were waiting for. Good. + assert.Contains(t, results, "", "TabletMetrics: %+v", results) // primary self identifies with empty alias + return + } + + select { + case <-ticker.C: + case <-ctx.Done(): + assert.FailNowf(t, ctx.Err().Error(), "waiting for inventory metrics") + } + } + }) + }) + t.Run("again with probes", func(t *testing.T) { + t.Run("waiting for inventory metrics", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout) + defer cancel() + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + var results base.TabletResultMap + <-runSerialFunction(t, ctx, throttler, func(ctx context.Context) { + results = maps.Clone(throttler.inventory.TabletMetrics) + }) + if len(results) == 3 { + // That's what we were waiting for. Good. + return + } + + select { + case <-ticker.C: + case <-ctx.Done(): + assert.FailNowf(t, ctx.Err().Error(), "waiting for inventory metrics") + } + } + }) + }) }) } @@ -1603,7 +1672,7 @@ func TestProbesPostDisable(t *testing.T) { }) t.Run("metrics", func(t *testing.T) { - assert.Equal(t, 3, len(throttler.inventory.TabletMetrics)) // 1 self tablet + 2 shard tablets + assert.Empty(t, throttler.inventory.TabletMetrics) // map has been cleared }) t.Run("aggregated", func(t *testing.T) { @@ -2103,7 +2172,7 @@ func TestReplica(t *testing.T) { defer throttler.appCheckedMetrics.Delete(testAppName.String()) checkResult := throttler.Check(ctx, testAppName.String(), nil, flags) require.NotNil(t, checkResult) - assert.Equal(t, 3, len(checkResult.Metrics)) + assert.Len(t, checkResult.Metrics, 3) }) t.Run("client, OK", func(t *testing.T) { client := NewBackgroundClient(throttler, throttlerapp.TestingName, base.UndefinedScope)