diff --git a/client/errs/errno.go b/client/errs/errno.go index 0dbcb4fe147..95c6bffdfa4 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -100,5 +100,5 @@ type ErrClientGetResourceGroup struct { } func (e *ErrClientGetResourceGroup) Error() string { - return fmt.Sprintf("get resource group %v failed, %v", e.ResourceGroupName, e.Cause) + return fmt.Sprintf("get resource group %s failed, %s", e.ResourceGroupName, e.Cause) } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index d25eaee88f0..8d57c46e855 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -38,6 +38,7 @@ import ( ) const ( + defaultResourceGroupName = "default" controllerConfigPath = "resource_group/controller" maxNotificationChanLen = 200 needTokensAmplification = 1.1 @@ -356,22 +357,32 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if err = proto.Unmarshal(item.Kv.Value, group); err != nil { continue } - if item, ok := c.groupsController.Load(group.Name); ok { - gc := item.(*groupCostController) + if gc, ok := c.loadGroupController(group.Name); ok { gc.modifyMeta(group) + // If the resource group is marked as tombstone before, set it as active again. + if swapped := gc.tombstone.CompareAndSwap(true, false); swapped { + resourceGroupStatusGauge.WithLabelValues(group.Name, gc.name).Set(1) + log.Info("[resource group controller] mark resource group as active", zap.String("name", group.Name)) + } } case meta_storagepb.Event_DELETE: if item.PrevKv != nil { if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil { continue } - if _, ok := c.groupsController.LoadAndDelete(group.Name); ok { + // Do not delete the resource group immediately, just mark it as tombstone. + // For the requests that are still in progress, fallback to the default resource group. + if gc, ok := c.loadGroupController(group.Name); ok { + gc.tombstone.Store(true) resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name) + resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1) + log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name)) } } else { // Prev-kv is compacted means there must have been a delete event before this event, // which means that this is just a duplicated event, so we can just ignore it. - log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value))) + log.Info("[resource group controller] previous key-value pair has been compacted", + zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value))) } } } @@ -420,12 +431,32 @@ func (c *ResourceGroupsController) Stop() error { return nil } +// loadGroupController just wraps the `Load` method of `sync.Map`. +func (c *ResourceGroupsController) loadGroupController(name string) (*groupCostController, bool) { + tmp, ok := c.groupsController.Load(name) + if !ok { + return nil, false + } + return tmp.(*groupCostController), true +} + +// loadOrStoreGroupController just wraps the `LoadOrStore` method of `sync.Map`. +func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *groupCostController) (*groupCostController, bool) { + tmp, loaded := c.groupsController.LoadOrStore(name, gc) + return tmp.(*groupCostController), loaded +} + // tryGetResourceGroup will try to get the resource group controller from local cache first, // if the local cache misses, it will then call gRPC to fetch the resource group info from server. func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) { // Get from the local cache first. - if tmp, ok := c.groupsController.Load(name); ok { - return tmp.(*groupCostController), nil + gc, ok := c.loadGroupController(name) + if ok { + // If the resource group is marked as tombstone, fallback to the default resource group. + if gc.tombstone.Load() && name != defaultResourceGroupName { + return c.tryGetResourceGroup(ctx, defaultResourceGroupName) + } + return gc, nil } // Call gRPC to fetch the resource group info. group, err := c.provider.GetResourceGroup(ctx, name) @@ -436,24 +467,21 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name return nil, errors.Errorf("%s does not exists", name) } // Check again to prevent initializing the same resource group concurrently. - if tmp, ok := c.groupsController.Load(name); ok { - gc := tmp.(*groupCostController) + if gc, ok = c.loadGroupController(name); ok { return gc, nil } // Initialize the resource group controller. - gc, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) if err != nil { return nil, err } - // TODO: re-init the state if user change mode from RU to RAW mode. - gc.initRunState() // Check again to prevent initializing the same resource group concurrently. - tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc) + gc, loaded := c.loadOrStoreGroupController(group.Name, gc) if !loaded { resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1) log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName())) } - return tmp.(*groupCostController), nil + return gc, nil } func (c *ResourceGroupsController) cleanUpResourceGroup() { @@ -465,14 +493,15 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() { latestConsumption := *gc.mu.consumption gc.mu.Unlock() if equalRU(latestConsumption, *gc.run.consumption) { - if gc.tombstone { + if gc.inactive || gc.tombstone.Load() { c.groupsController.Delete(resourceGroupName) resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName) + resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName) return true } - gc.tombstone = true + gc.inactive = true } else { - gc.tombstone = false + gc.inactive = false } return true }) @@ -498,12 +527,11 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB c.run.inDegradedMode = false for _, res := range resp { name := res.GetResourceGroupName() - v, ok := c.groupsController.Load(name) + gc, ok := c.loadGroupController(name) if !ok { log.Warn("[resource group controller] a non-existent resource group was found when handle token response", zap.String("name", name)) continue } - gc := v.(*groupCostController) gc.handleTokenBucketResponse(res) } } @@ -572,12 +600,16 @@ func (c *ResourceGroupsController) OnRequestWait( func (c *ResourceGroupsController) OnResponse( resourceGroupName string, req RequestInfo, resp ResponseInfo, ) (*rmpb.Consumption, error) { - tmp, ok := c.groupsController.Load(resourceGroupName) + gc, ok := c.loadGroupController(resourceGroupName) if !ok { log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) return &rmpb.Consumption{}, nil } - return tmp.(*groupCostController).onResponse(req, resp) + // If the resource group is marked as tombstone, fallback to the default resource group. + if gc.tombstone.Load() && resourceGroupName != defaultResourceGroupName { + return c.OnResponse(defaultResourceGroupName, req, resp) + } + return gc.onResponse(req, resp) } // IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. @@ -594,8 +626,7 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context, func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool { // fallback to default resource group. if bg == nil { - resourceGroupName := "default" - gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) + gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName) if err != nil { return false } @@ -681,7 +712,10 @@ type groupCostController struct { requestUnitTokens map[rmpb.RequestUnitType]*tokenCounter } - tombstone bool + // tombstone is set to true when the resource group is deleted. + tombstone atomic.Bool + // inactive is set to true when the resource group has not been updated for a long time. + inactive bool } type groupMetricsCollection struct { @@ -774,6 +808,8 @@ func newGroupCostController( gc.mu.consumption = &rmpb.Consumption{} gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption) gc.mu.globalCounter = &rmpb.Consumption{} + // TODO: re-init the state if user change mode from RU to RAW mode. + gc.initRunState() return gc, nil } @@ -1359,14 +1395,14 @@ func (gc *groupCostController) onResponse( return delta, nil } -// GetActiveResourceGroup is used to get action resource group. +// GetActiveResourceGroup is used to get active resource group. // This is used for test only. func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup { - tmp, ok := c.groupsController.Load(resourceGroupName) - if !ok { + gc, ok := c.loadGroupController(resourceGroupName) + if !ok || gc.tombstone.Load() { return nil } - return tmp.(*groupCostController).getMeta() + return gc.getMeta() } // This is used for test only. diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 3300edf700f..e198effb2d8 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -59,7 +59,6 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController func TestGroupControlBurstable(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) - gc.initRunState() args := tokenBucketReconfigureArgs{ NewRate: 1000, NewBurst: -1, @@ -74,7 +73,6 @@ func TestGroupControlBurstable(t *testing.T) { func TestRequestAndResponseConsumption(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) - gc.initRunState() testCases := []struct { req *TestRequestInfo resp *TestResponseInfo @@ -126,7 +124,6 @@ func TestRequestAndResponseConsumption(t *testing.T) { func TestResourceGroupThrottledError(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) - gc.initRunState() req := &TestRequestInfo{ isWrite: true, writeBytes: 10000000, @@ -142,6 +139,14 @@ type MockResourceGroupProvider struct { mock.Mock } +func newMockResourceGroupProvider() *MockResourceGroupProvider { + mockProvider := &MockResourceGroupProvider{} + mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil) + mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil) + mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil) + return mockProvider +} + func (m *MockResourceGroupProvider) GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error) { args := m.Called(ctx, resourceGroupName, opts) return args.Get(0).(*rmpb.ResourceGroup), args.Error(1) @@ -191,28 +196,22 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mockProvider := new(MockResourceGroupProvider) - - mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil) - // LoadResourceGroups - mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil) - // Watch - mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil) - re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", "default"))) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", defaultResourceGroupName))) defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport") re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group"))) defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport") + mockProvider := newMockResourceGroupProvider() controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil) controller.Start(ctx) - defaultResourceGroup := &rmpb.ResourceGroup{Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} - mockProvider.On("GetResourceGroup", mock.Anything, "default", mock.Anything).Return(defaultResourceGroup, nil) + mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil) mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) - c1, err := controller.tryGetResourceGroup(ctx, "default") + c1, err := controller.tryGetResourceGroup(ctx, defaultResourceGroupName) re.NoError(err) re.Equal(defaultResourceGroup, c1.meta) @@ -226,11 +225,11 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { request := args.Get(1).(*rmpb.TokenBucketsRequest) var responses []*rmpb.TokenBucketResponse for _, req := range request.Requests { - if req.ResourceGroupName == "default" { + if req.ResourceGroupName == defaultResourceGroupName { // no response the default group request, that's mean `len(c.run.currentRequests) != 0` always. time.Sleep(100 * time.Second) responses = append(responses, &rmpb.TokenBucketResponse{ - ResourceGroupName: "default", + ResourceGroupName: defaultResourceGroupName, GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{ { GrantedTokens: &rmpb.TokenBucket{ @@ -271,3 +270,55 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { re.Fail("timeout") } } + +func TestGetController(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil) + controller.Start(ctx) + + defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil) + mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) + mockProvider.On("GetResourceGroup", mock.Anything, "test-group-non-existent", mock.Anything).Return((*rmpb.ResourceGroup)(nil), nil) + + c, err := controller.GetResourceGroup("test-group-non-existent") + re.Error(err) + re.Nil(c) + c, err = controller.GetResourceGroup(defaultResourceGroupName) + re.NoError(err) + re.Equal(defaultResourceGroup, c) + c, err = controller.GetResourceGroup("test-group") + re.NoError(err) + re.Equal(testResourceGroup, c) + _, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{}) + re.NoError(err) + _, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{}) + re.NoError(err) + // Mark the tombstone manually to test the fallback case. + gc, err := controller.tryGetResourceGroup(ctx, "test-group") + re.NoError(err) + gc.tombstone.Store(true) + c, err = controller.GetResourceGroup("test-group") + re.NoError(err) + re.Equal(defaultResourceGroup, c) + _, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{}) + re.NoError(err) + _, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{}) + re.NoError(err) + // Mark the default group tombstone manually to test the fallback case. + gc, err = controller.tryGetResourceGroup(ctx, defaultResourceGroupName) + re.NoError(err) + gc.tombstone.Store(true) + c, err = controller.GetResourceGroup(defaultResourceGroupName) + re.NoError(err) + re.Equal(defaultResourceGroup, c) + _, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, &TestRequestInfo{}) + re.NoError(err) + _, err = controller.OnResponse(defaultResourceGroupName, &TestRequestInfo{}, &TestResponseInfo{}) + re.NoError(err) +} diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index f46f75d6f18..feea19cca39 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -102,7 +102,7 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator { if opController.OperatorCount(operator.OpReplica) < c.conf.GetReplicaScheduleLimit() { return []*operator.Operator{op} } - operator.OperatorLimitCounter.WithLabelValues(c.ruleChecker.GetType(), operator.OpReplica.String()).Inc() + operator.OperatorLimitCounter.WithLabelValues(c.ruleChecker.Name(), operator.OpReplica.String()).Inc() c.pendingProcessedRegions.Put(region.GetID(), nil) } } @@ -114,7 +114,7 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator { if opController.OperatorCount(operator.OpReplica) < c.conf.GetReplicaScheduleLimit() { return []*operator.Operator{op} } - operator.OperatorLimitCounter.WithLabelValues(c.replicaChecker.GetType(), operator.OpReplica.String()).Inc() + operator.OperatorLimitCounter.WithLabelValues(c.replicaChecker.Name(), operator.OpReplica.String()).Inc() c.pendingProcessedRegions.Put(region.GetID(), nil) } } diff --git a/pkg/schedule/checker/joint_state_checker.go b/pkg/schedule/checker/joint_state_checker.go index 872c957f0a7..2122044e64a 100644 --- a/pkg/schedule/checker/joint_state_checker.go +++ b/pkg/schedule/checker/joint_state_checker.go @@ -29,17 +29,6 @@ type JointStateChecker struct { cluster sche.CheckerCluster } -const jointStateCheckerName = "joint_state_checker" - -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - jointCheckCounter = checkerCounter.WithLabelValues(jointStateCheckerName, "check") - jointCheckerPausedCounter = checkerCounter.WithLabelValues(jointStateCheckerName, "paused") - jointCheckerFailedCounter = checkerCounter.WithLabelValues(jointStateCheckerName, "create-operator-fail") - jointCheckerNewOpCounter = checkerCounter.WithLabelValues(jointStateCheckerName, "new-operator") - jointCheckerTransferLeaderCounter = checkerCounter.WithLabelValues(jointStateCheckerName, "transfer-leader") -) - // NewJointStateChecker creates a joint state checker. func NewJointStateChecker(cluster sche.CheckerCluster) *JointStateChecker { return &JointStateChecker{ diff --git a/pkg/schedule/checker/learner_checker.go b/pkg/schedule/checker/learner_checker.go index c6f7d671ac3..f9c4f7efb2b 100644 --- a/pkg/schedule/checker/learner_checker.go +++ b/pkg/schedule/checker/learner_checker.go @@ -28,11 +28,6 @@ type LearnerChecker struct { cluster sche.CheckerCluster } -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - learnerCheckerPausedCounter = checkerCounter.WithLabelValues("learner_checker", "paused") -) - // NewLearnerChecker creates a learner checker. func NewLearnerChecker(cluster sche.CheckerCluster) *LearnerChecker { return &LearnerChecker{ diff --git a/pkg/schedule/checker/merge_checker.go b/pkg/schedule/checker/merge_checker.go index 821c21cc119..d7a28ad0ff8 100644 --- a/pkg/schedule/checker/merge_checker.go +++ b/pkg/schedule/checker/merge_checker.go @@ -42,37 +42,10 @@ const ( // When a region has label `merge_option=deny`, skip merging the region. // If label value is `allow` or other value, it will be treated as `allow`. const ( - mergeCheckerName = "merge_checker" mergeOptionLabel = "merge_option" mergeOptionValueDeny = "deny" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - mergeCheckerCounter = checkerCounter.WithLabelValues(mergeCheckerName, "check") - mergeCheckerPausedCounter = checkerCounter.WithLabelValues(mergeCheckerName, "paused") - mergeCheckerRecentlySplitCounter = checkerCounter.WithLabelValues(mergeCheckerName, "recently-split") - mergeCheckerRecentlyStartCounter = checkerCounter.WithLabelValues(mergeCheckerName, "recently-start") - mergeCheckerNoLeaderCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-leader") - mergeCheckerNoNeedCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-need") - mergeCheckerUnhealthyRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "unhealthy-region") - mergeCheckerAbnormalReplicaCounter = checkerCounter.WithLabelValues(mergeCheckerName, "abnormal-replica") - mergeCheckerHotRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "hot-region") - mergeCheckerNoTargetCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-target") - mergeCheckerTargetTooLargeCounter = checkerCounter.WithLabelValues(mergeCheckerName, "target-too-large") - mergeCheckerSplitSizeAfterMergeCounter = checkerCounter.WithLabelValues(mergeCheckerName, "split-size-after-merge") - mergeCheckerSplitKeysAfterMergeCounter = checkerCounter.WithLabelValues(mergeCheckerName, "split-keys-after-merge") - mergeCheckerNewOpCounter = checkerCounter.WithLabelValues(mergeCheckerName, "new-operator") - mergeCheckerLargerSourceCounter = checkerCounter.WithLabelValues(mergeCheckerName, "larger-source") - mergeCheckerAdjNotExistCounter = checkerCounter.WithLabelValues(mergeCheckerName, "adj-not-exist") - mergeCheckerAdjRecentlySplitCounter = checkerCounter.WithLabelValues(mergeCheckerName, "adj-recently-split") - mergeCheckerAdjRegionHotCounter = checkerCounter.WithLabelValues(mergeCheckerName, "adj-region-hot") - mergeCheckerAdjDisallowMergeCounter = checkerCounter.WithLabelValues(mergeCheckerName, "adj-disallow-merge") - mergeCheckerAdjAbnormalPeerStoreCounter = checkerCounter.WithLabelValues(mergeCheckerName, "adj-abnormal-peerstore") - mergeCheckerAdjSpecialPeerCounter = checkerCounter.WithLabelValues(mergeCheckerName, "adj-special-peer") - mergeCheckerAdjAbnormalReplicaCounter = checkerCounter.WithLabelValues(mergeCheckerName, "adj-abnormal-replica") -) - // MergeChecker ensures region to merge with adjacent region when size is small type MergeChecker struct { PauseController diff --git a/pkg/schedule/checker/metrics.go b/pkg/schedule/checker/metrics.go index 9d0c83f94e6..85a0ca570a5 100644 --- a/pkg/schedule/checker/metrics.go +++ b/pkg/schedule/checker/metrics.go @@ -29,3 +29,113 @@ var ( func init() { prometheus.MustRegister(checkerCounter) } + +const ( + // NOTE: these types are different from pkg/schedule/config/type.go, + // they are only used for prometheus metrics to keep the compatibility. + ruleChecker = "rule_checker" + jointStateChecker = "joint_state_checker" + learnerChecker = "learner_checker" + mergeChecker = "merge_checker" + replicaChecker = "replica_checker" + splitChecker = "split_checker" +) + +func ruleCheckerCounterWithEvent(event string) prometheus.Counter { + return checkerCounter.WithLabelValues(ruleChecker, event) +} + +func jointStateCheckerCounterWithEvent(event string) prometheus.Counter { + return checkerCounter.WithLabelValues(jointStateChecker, event) +} + +func mergeCheckerCounterWithEvent(event string) prometheus.Counter { + return checkerCounter.WithLabelValues(mergeChecker, event) +} + +func replicaCheckerCounterWithEvent(event string) prometheus.Counter { + return checkerCounter.WithLabelValues(replicaChecker, event) +} + +// WithLabelValues is a heavy operation, define variable to avoid call it every time. +var ( + ruleCheckerCounter = ruleCheckerCounterWithEvent("check") + ruleCheckerPausedCounter = ruleCheckerCounterWithEvent("paused") + ruleCheckerRegionNoLeaderCounter = ruleCheckerCounterWithEvent("region-no-leader") + ruleCheckerGetCacheCounter = ruleCheckerCounterWithEvent("get-cache") + ruleCheckerNeedSplitCounter = ruleCheckerCounterWithEvent("need-split") + ruleCheckerSetCacheCounter = ruleCheckerCounterWithEvent("set-cache") + ruleCheckerReplaceDownCounter = ruleCheckerCounterWithEvent("replace-down") + ruleCheckerPromoteWitnessCounter = ruleCheckerCounterWithEvent("promote-witness") + ruleCheckerReplaceOfflineCounter = ruleCheckerCounterWithEvent("replace-offline") + ruleCheckerAddRulePeerCounter = ruleCheckerCounterWithEvent("add-rule-peer") + ruleCheckerNoStoreAddCounter = ruleCheckerCounterWithEvent("no-store-add") + ruleCheckerNoStoreThenTryReplace = ruleCheckerCounterWithEvent("no-store-then-try-replace") + ruleCheckerNoStoreReplaceCounter = ruleCheckerCounterWithEvent("no-store-replace") + ruleCheckerFixPeerRoleCounter = ruleCheckerCounterWithEvent("fix-peer-role") + ruleCheckerFixLeaderRoleCounter = ruleCheckerCounterWithEvent("fix-leader-role") + ruleCheckerNotAllowLeaderCounter = ruleCheckerCounterWithEvent("not-allow-leader") + ruleCheckerFixFollowerRoleCounter = ruleCheckerCounterWithEvent("fix-follower-role") + ruleCheckerNoNewLeaderCounter = ruleCheckerCounterWithEvent("no-new-leader") + ruleCheckerDemoteVoterRoleCounter = ruleCheckerCounterWithEvent("demote-voter-role") + ruleCheckerRecentlyPromoteToNonWitnessCounter = ruleCheckerCounterWithEvent("recently-promote-to-non-witness") + ruleCheckerCancelSwitchToWitnessCounter = ruleCheckerCounterWithEvent("cancel-switch-to-witness") + ruleCheckerSetVoterWitnessCounter = ruleCheckerCounterWithEvent("set-voter-witness") + ruleCheckerSetLearnerWitnessCounter = ruleCheckerCounterWithEvent("set-learner-witness") + ruleCheckerSetVoterNonWitnessCounter = ruleCheckerCounterWithEvent("set-voter-non-witness") + ruleCheckerSetLearnerNonWitnessCounter = ruleCheckerCounterWithEvent("set-learner-non-witness") + ruleCheckerMoveToBetterLocationCounter = ruleCheckerCounterWithEvent("move-to-better-location") + ruleCheckerSkipRemoveOrphanPeerCounter = ruleCheckerCounterWithEvent("skip-remove-orphan-peer") + ruleCheckerRemoveOrphanPeerCounter = ruleCheckerCounterWithEvent("remove-orphan-peer") + ruleCheckerReplaceOrphanPeerCounter = ruleCheckerCounterWithEvent("replace-orphan-peer") + ruleCheckerReplaceOrphanPeerNoFitCounter = ruleCheckerCounterWithEvent("replace-orphan-peer-no-fit") + + jointCheckCounter = jointStateCheckerCounterWithEvent("check") + jointCheckerPausedCounter = jointStateCheckerCounterWithEvent("paused") + jointCheckerFailedCounter = jointStateCheckerCounterWithEvent("create-operator-fail") + jointCheckerNewOpCounter = jointStateCheckerCounterWithEvent("new-operator") + jointCheckerTransferLeaderCounter = jointStateCheckerCounterWithEvent("transfer-leader") + + learnerCheckerPausedCounter = checkerCounter.WithLabelValues(learnerChecker, "paused") + + mergeCheckerCounter = mergeCheckerCounterWithEvent("check") + mergeCheckerPausedCounter = mergeCheckerCounterWithEvent("paused") + mergeCheckerRecentlySplitCounter = mergeCheckerCounterWithEvent("recently-split") + mergeCheckerRecentlyStartCounter = mergeCheckerCounterWithEvent("recently-start") + mergeCheckerNoLeaderCounter = mergeCheckerCounterWithEvent("no-leader") + mergeCheckerNoNeedCounter = mergeCheckerCounterWithEvent("no-need") + mergeCheckerUnhealthyRegionCounter = mergeCheckerCounterWithEvent("unhealthy-region") + mergeCheckerAbnormalReplicaCounter = mergeCheckerCounterWithEvent("abnormal-replica") + mergeCheckerHotRegionCounter = mergeCheckerCounterWithEvent("hot-region") + mergeCheckerNoTargetCounter = mergeCheckerCounterWithEvent("no-target") + mergeCheckerTargetTooLargeCounter = mergeCheckerCounterWithEvent("target-too-large") + mergeCheckerSplitSizeAfterMergeCounter = mergeCheckerCounterWithEvent("split-size-after-merge") + mergeCheckerSplitKeysAfterMergeCounter = mergeCheckerCounterWithEvent("split-keys-after-merge") + mergeCheckerNewOpCounter = mergeCheckerCounterWithEvent("new-operator") + mergeCheckerLargerSourceCounter = mergeCheckerCounterWithEvent("larger-source") + mergeCheckerAdjNotExistCounter = mergeCheckerCounterWithEvent("adj-not-exist") + mergeCheckerAdjRecentlySplitCounter = mergeCheckerCounterWithEvent("adj-recently-split") + mergeCheckerAdjRegionHotCounter = mergeCheckerCounterWithEvent("adj-region-hot") + mergeCheckerAdjDisallowMergeCounter = mergeCheckerCounterWithEvent("adj-disallow-merge") + mergeCheckerAdjAbnormalPeerStoreCounter = mergeCheckerCounterWithEvent("adj-abnormal-peerstore") + mergeCheckerAdjSpecialPeerCounter = mergeCheckerCounterWithEvent("adj-special-peer") + mergeCheckerAdjAbnormalReplicaCounter = mergeCheckerCounterWithEvent("adj-abnormal-replica") + + replicaCheckerCounter = replicaCheckerCounterWithEvent("check") + replicaCheckerPausedCounter = replicaCheckerCounterWithEvent("paused") + replicaCheckerNewOpCounter = replicaCheckerCounterWithEvent("new-operator") + replicaCheckerNoTargetStoreCounter = replicaCheckerCounterWithEvent("no-target-store") + replicaCheckerNoWorstPeerCounter = replicaCheckerCounterWithEvent("no-worst-peer") + replicaCheckerCreateOpFailedCounter = replicaCheckerCounterWithEvent("create-operator-failed") + replicaCheckerAllRightCounter = replicaCheckerCounterWithEvent("all-right") + replicaCheckerNotBetterCounter = replicaCheckerCounterWithEvent("not-better") + replicaCheckerRemoveExtraOfflineFailedCounter = replicaCheckerCounterWithEvent("remove-extra-offline-replica-failed") + replicaCheckerRemoveExtraDownFailedCounter = replicaCheckerCounterWithEvent("remove-extra-down-replica-failed") + replicaCheckerNoStoreOfflineCounter = replicaCheckerCounterWithEvent("no-store-offline") + replicaCheckerNoStoreDownCounter = replicaCheckerCounterWithEvent("no-store-down") + replicaCheckerReplaceOfflineFailedCounter = replicaCheckerCounterWithEvent("replace-offline-replica-failed") + replicaCheckerReplaceDownFailedCounter = replicaCheckerCounterWithEvent("replace-down-replica-failed") + + splitCheckerCounter = checkerCounter.WithLabelValues(splitChecker, "check") + splitCheckerPausedCounter = checkerCounter.WithLabelValues(splitChecker, "paused") +) diff --git a/pkg/schedule/checker/replica_checker.go b/pkg/schedule/checker/replica_checker.go index 4eeefa6d545..f75ffe7e882 100644 --- a/pkg/schedule/checker/replica_checker.go +++ b/pkg/schedule/checker/replica_checker.go @@ -26,32 +26,13 @@ import ( "github.com/tikv/pd/pkg/schedule/config" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" + types "github.com/tikv/pd/pkg/schedule/type" "go.uber.org/zap" ) const ( - replicaCheckerName = "replica-checker" - replicaChecker = "replica_checker" - offlineStatus = "offline" - downStatus = "down" -) - -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - replicaCheckerCounter = checkerCounter.WithLabelValues(replicaChecker, "check") - replicaCheckerPausedCounter = checkerCounter.WithLabelValues(replicaChecker, "paused") - replicaCheckerNewOpCounter = checkerCounter.WithLabelValues(replicaChecker, "new-operator") - replicaCheckerNoTargetStoreCounter = checkerCounter.WithLabelValues(replicaChecker, "no-target-store") - replicaCheckerNoWorstPeerCounter = checkerCounter.WithLabelValues(replicaChecker, "no-worst-peer") - replicaCheckerCreateOpFailedCounter = checkerCounter.WithLabelValues(replicaChecker, "create-operator-failed") - replicaCheckerAllRightCounter = checkerCounter.WithLabelValues(replicaChecker, "all-right") - replicaCheckerNotBetterCounter = checkerCounter.WithLabelValues(replicaChecker, "not-better") - replicaCheckerRemoveExtraOfflineFailedCounter = checkerCounter.WithLabelValues(replicaChecker, "remove-extra-offline-replica-failed") - replicaCheckerRemoveExtraDownFailedCounter = checkerCounter.WithLabelValues(replicaChecker, "remove-extra-down-replica-failed") - replicaCheckerNoStoreOfflineCounter = checkerCounter.WithLabelValues(replicaChecker, "no-store-offline") - replicaCheckerNoStoreDownCounter = checkerCounter.WithLabelValues(replicaChecker, "no-store-down") - replicaCheckerReplaceOfflineFailedCounter = checkerCounter.WithLabelValues(replicaChecker, "replace-offline-replica-failed") - replicaCheckerReplaceDownFailedCounter = checkerCounter.WithLabelValues(replicaChecker, "replace-down-replica-failed") + offlineStatus = "offline" + downStatus = "down" ) // ReplicaChecker ensures region has the best replicas. @@ -75,9 +56,9 @@ func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigPro } } -// GetType return ReplicaChecker's type -func (*ReplicaChecker) GetType() string { - return replicaCheckerName +// Name return ReplicaChecker's name. +func (*ReplicaChecker) Name() string { + return types.ReplicaChecker.String() } // Check verifies a region's replicas, creating an operator.Operator if need. @@ -291,7 +272,7 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy { return &ReplicaStrategy{ - checkerName: replicaCheckerName, + checkerName: r.Name(), cluster: r.cluster, locationLabels: r.conf.GetLocationLabels(), isolationLevel: r.conf.GetIsolationLevel(), diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 66b958911b1..bb4a60ead7c 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -31,15 +31,12 @@ import ( "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" + types "github.com/tikv/pd/pkg/schedule/type" "github.com/tikv/pd/pkg/versioninfo" "go.uber.org/zap" ) -const ( - maxPendingListLen = 100000 - ruleChecker = "rule_checker" - ruleCheckerName = "rule-checker" -) +const maxPendingListLen = 100000 var ( errNoStoreToAdd = errors.New("no store to add peer") @@ -48,37 +45,6 @@ var ( errPeerCannotBeWitness = errors.New("peer cannot be witness") errNoNewLeader = errors.New("no new leader") errRegionNoLeader = errors.New("region no leader") - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - ruleCheckerCounter = checkerCounter.WithLabelValues(ruleChecker, "check") - ruleCheckerPausedCounter = checkerCounter.WithLabelValues(ruleChecker, "paused") - ruleCheckerRegionNoLeaderCounter = checkerCounter.WithLabelValues(ruleChecker, "region-no-leader") - ruleCheckerGetCacheCounter = checkerCounter.WithLabelValues(ruleChecker, "get-cache") - ruleCheckerNeedSplitCounter = checkerCounter.WithLabelValues(ruleChecker, "need-split") - ruleCheckerSetCacheCounter = checkerCounter.WithLabelValues(ruleChecker, "set-cache") - ruleCheckerReplaceDownCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-down") - ruleCheckerPromoteWitnessCounter = checkerCounter.WithLabelValues(ruleChecker, "promote-witness") - ruleCheckerReplaceOfflineCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-offline") - ruleCheckerAddRulePeerCounter = checkerCounter.WithLabelValues(ruleChecker, "add-rule-peer") - ruleCheckerNoStoreAddCounter = checkerCounter.WithLabelValues(ruleChecker, "no-store-add") - ruleCheckerNoStoreThenTryReplace = checkerCounter.WithLabelValues(ruleChecker, "no-store-then-try-replace") - ruleCheckerNoStoreReplaceCounter = checkerCounter.WithLabelValues(ruleChecker, "no-store-replace") - ruleCheckerFixPeerRoleCounter = checkerCounter.WithLabelValues(ruleChecker, "fix-peer-role") - ruleCheckerFixLeaderRoleCounter = checkerCounter.WithLabelValues(ruleChecker, "fix-leader-role") - ruleCheckerNotAllowLeaderCounter = checkerCounter.WithLabelValues(ruleChecker, "not-allow-leader") - ruleCheckerFixFollowerRoleCounter = checkerCounter.WithLabelValues(ruleChecker, "fix-follower-role") - ruleCheckerNoNewLeaderCounter = checkerCounter.WithLabelValues(ruleChecker, "no-new-leader") - ruleCheckerDemoteVoterRoleCounter = checkerCounter.WithLabelValues(ruleChecker, "demote-voter-role") - ruleCheckerRecentlyPromoteToNonWitnessCounter = checkerCounter.WithLabelValues(ruleChecker, "recently-promote-to-non-witness") - ruleCheckerCancelSwitchToWitnessCounter = checkerCounter.WithLabelValues(ruleChecker, "cancel-switch-to-witness") - ruleCheckerSetVoterWitnessCounter = checkerCounter.WithLabelValues(ruleChecker, "set-voter-witness") - ruleCheckerSetLearnerWitnessCounter = checkerCounter.WithLabelValues(ruleChecker, "set-learner-witness") - ruleCheckerSetVoterNonWitnessCounter = checkerCounter.WithLabelValues(ruleChecker, "set-voter-non-witness") - ruleCheckerSetLearnerNonWitnessCounter = checkerCounter.WithLabelValues(ruleChecker, "set-learner-non-witness") - ruleCheckerMoveToBetterLocationCounter = checkerCounter.WithLabelValues(ruleChecker, "move-to-better-location") - ruleCheckerSkipRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "skip-remove-orphan-peer") - ruleCheckerRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "remove-orphan-peer") - ruleCheckerReplaceOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-orphan-peer") - ruleCheckerReplaceOrphanPeerNoFitCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-orphan-peer-no-fit") ) // RuleChecker fix/improve region by placement rules. @@ -86,7 +52,6 @@ type RuleChecker struct { PauseController cluster sche.CheckerCluster ruleManager *placement.RuleManager - name string regionWaitingList cache.Cache pendingList cache.Cache switchWitnessCache *cache.TTLUint64 @@ -98,7 +63,6 @@ func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManage return &RuleChecker{ cluster: cluster, ruleManager: ruleManager, - name: ruleCheckerName, regionWaitingList: regionWaitingList, pendingList: cache.NewDefaultCache(maxPendingListLen), switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()), @@ -106,9 +70,9 @@ func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManage } } -// GetType returns RuleChecker's Type -func (*RuleChecker) GetType() string { - return ruleCheckerName +// Name returns RuleChecker's name. +func (*RuleChecker) Name() string { + return types.RuleChecker.String() } // Check checks if the region matches placement rules and returns Operator to @@ -651,12 +615,12 @@ func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb. func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy { return &ReplicaStrategy{ - checkerName: c.name, + checkerName: c.Name(), cluster: c.cluster, isolationLevel: rule.IsolationLevel, locationLabels: rule.LocationLabels, region: region, - extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.name, rule.LabelConstraints)}, + extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.Name(), rule.LabelConstraints)}, fastFailover: fastFailover, } } diff --git a/pkg/schedule/checker/split_checker.go b/pkg/schedule/checker/split_checker.go index 3a34eee8c90..3cc1664b6cc 100644 --- a/pkg/schedule/checker/split_checker.go +++ b/pkg/schedule/checker/split_checker.go @@ -33,14 +33,6 @@ type SplitChecker struct { labeler *labeler.RegionLabeler } -const splitCheckerName = "split_checker" - -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - splitCheckerCounter = checkerCounter.WithLabelValues(splitCheckerName, "check") - splitCheckerPausedCounter = checkerCounter.WithLabelValues(splitCheckerName, "paused") -) - // NewSplitChecker creates a new SplitChecker. func NewSplitChecker(cluster sche.CheckerCluster, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler) *SplitChecker { return &SplitChecker{ diff --git a/pkg/schedule/type/type.go b/pkg/schedule/type/type.go new file mode 100644 index 00000000000..d872bf0408c --- /dev/null +++ b/pkg/schedule/type/type.go @@ -0,0 +1,36 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +type CheckerSchedulerType string + +func (n CheckerSchedulerType) String() string { + return string(n) +} + +const ( + // JointStateChecker is the name for joint state checker. + JointStateChecker CheckerSchedulerType = "joint-state-checker" + // LearnerChecker is the name for learner checker. + LearnerChecker CheckerSchedulerType = "learner-checker" + // MergeChecker is the name for split checker. + MergeChecker CheckerSchedulerType = "merge-checker" + // ReplicaChecker is the name for replica checker. + ReplicaChecker CheckerSchedulerType = "replica-checker" + // RuleChecker is the name for rule checker. + RuleChecker CheckerSchedulerType = "rule-checker" + // SplitChecker is the name for split checker. + SplitChecker CheckerSchedulerType = "split-checker" +) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 66150ef34e0..635cb17b822 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -270,7 +270,8 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { testutil.Eventually(re, func() bool { name := groupNamePrefix + strconv.Itoa(i) meta = controller.GetActiveResourceGroup(name) - return meta == nil + // The deleted resource group may not be immediately removed from the controller. + return meta == nil || meta.Name == "default" }, testutil.WithTickInterval(50*time.Millisecond)) } } @@ -402,8 +403,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { CPUMsCost: 1, } - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) + controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) controller.Start(suite.ctx) + defer controller.Stop() testCases := []struct { resourceGroupName string @@ -464,9 +466,31 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { wreq := tcs.makeWriteRequest() _, _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) - time.Sleep(time.Millisecond * 200) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) - controller.Stop() + + group, err := controller.GetResourceGroup(rg.Name) + re.NoError(err) + re.Equal(rg, group) + // Delete the resource group and make sure it is tombstone. + resp, err = cli.DeleteResourceGroup(suite.ctx, rg.Name) + re.NoError(err) + re.Contains(resp, "Success!") + // Make sure the resource group is watched by the controller and marked as tombstone. + testutil.Eventually(re, func() bool { + gc, err := controller.GetResourceGroup(rg.Name) + re.NoError(err) + return gc.GetName() == "default" + }, testutil.WithTickInterval(50*time.Millisecond)) + // Add the resource group again. + resp, err = cli.AddResourceGroup(suite.ctx, rg) + re.NoError(err) + re.Contains(resp, "Success!") + // Make sure the resource group can be set to active again. + testutil.Eventually(re, func() bool { + gc, err := controller.GetResourceGroup(rg.Name) + re.NoError(err) + return gc.GetName() == rg.Name + }, testutil.WithTickInterval(50*time.Millisecond)) } // TestSwitchBurst is used to test https://github.com/tikv/pd/issues/6209 @@ -1277,6 +1301,11 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") + group2 := *group + group2.Name = "tombstone_test" + resp, err = cli.AddResourceGroup(suite.ctx, &group2) + re.NoError(err) + re.Contains(resp, "Success!") re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/fastCleanup", `return(true)`)) controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) @@ -1299,9 +1328,19 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { controller.OnResponse(group.Name, rreq, rres) time.Sleep(100 * time.Microsecond) } - time.Sleep(1 * time.Second) + testutil.Eventually(re, func() bool { + meta := controller.GetActiveResourceGroup(group.Name) + return meta == nil + }, testutil.WithTickInterval(50*time.Millisecond)) - re.Nil(controller.GetActiveResourceGroup(group.Name)) + // Mock server deleted the resource group + resp, err = cli.DeleteResourceGroup(suite.ctx, group2.Name) + re.NoError(err) + re.Contains(resp, "Success!") + testutil.Eventually(re, func() bool { + meta := controller.GetActiveResourceGroup(group2.Name) + return meta == nil + }, testutil.WithTickInterval(50*time.Millisecond)) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup")) controller.Stop()