From f3b7ab91ae322f15c08c6137b4201424fc1d0e0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Thu, 13 Feb 2025 16:03:32 +0800 Subject: [PATCH] add more test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/core/constant/kind.go | 1 + pkg/schedule/schedulers/balance_range.go | 88 +++++++++++++------ pkg/schedule/schedulers/balance_range_test.go | 71 +++++++-------- pkg/schedule/schedulers/init.go | 4 +- server/api/scheduler.go | 4 - server/cluster/cluster_test.go | 12 --- tools/pd-ctl/pdctl/command/scheduler.go | 12 --- .../pd-ctl/tests/scheduler/scheduler_test.go | 10 --- 8 files changed, 97 insertions(+), 105 deletions(-) diff --git a/pkg/core/constant/kind.go b/pkg/core/constant/kind.go index 7e9d173c689..39c256c4f5d 100644 --- a/pkg/core/constant/kind.go +++ b/pkg/core/constant/kind.go @@ -66,6 +66,7 @@ const ( RegionKind // WitnessKind indicates the witness kind resource WitnessKind + // ResourceKindLen represents the ResourceKind count ResourceKindLen ) diff --git a/pkg/schedule/schedulers/balance_range.go b/pkg/schedule/schedulers/balance_range.go index 25546c8f07d..d51287aadd4 100644 --- a/pkg/schedule/schedulers/balance_range.go +++ b/pkg/schedule/schedulers/balance_range.go @@ -1,4 +1,4 @@ -// Copyright 2025 TiKV Project Authors. +// Copyright 2025 tiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,11 +21,13 @@ import ( "time" "github.com/gorilla/mux" + "github.com/unrolled/render" + "go.uber.org/zap" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" - "go.uber.org/zap" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" @@ -77,7 +79,7 @@ type balanceRangeSchedulerConfig struct { type balanceRangeSchedulerJob struct { JobID uint64 `json:"job-id"` Role Role `json:"role"` - Engine Engine `json:"engine"` + Engine engine `json:"engine"` Timeout time.Duration `json:"timeout"` Ranges []core.KeyRange `json:"ranges"` Alias string `json:"alias"` @@ -96,6 +98,10 @@ func (conf *balanceRangeSchedulerConfig) begin(job *balanceRangeSchedulerJob) bo now := time.Now() job.Start = &now job.Status = running + if err := conf.save(); err != nil { + log.Warn("failed to persist config", zap.Error(err), zap.Uint64("job-id", job.JobID)) + return false + } return true } @@ -107,10 +113,14 @@ func (conf *balanceRangeSchedulerConfig) finish(job *balanceRangeSchedulerJob) b } now := time.Now() job.Finish = &now + if err := conf.save(); err != nil { + log.Warn("failed to persist config", zap.Error(err), zap.Uint64("job-id", job.JobID)) + return false + } return true } -func (conf *balanceRangeSchedulerConfig) pop() *balanceRangeSchedulerJob { +func (conf *balanceRangeSchedulerConfig) peek() *balanceRangeSchedulerJob { conf.RLock() defer conf.RLock() for _, job := range conf.jobs { @@ -182,12 +192,12 @@ func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) if !allowed { operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange) } - job := s.conf.pop() + job := s.conf.peek() if job != nil { if job.Status == pending { s.conf.begin(job) } - if time.Now().Sub(*job.Start) > job.Timeout { + if time.Since(*job.Start) > job.Timeout { s.conf.finish(job) balanceRangeExpiredCounter.Inc() } @@ -220,9 +230,9 @@ func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRa } // Schedule schedules the balance key range operator. -func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { balanceRangeCounter.Inc() - job := s.conf.pop() + job := s.conf.peek() if job == nil { return nil, nil } @@ -336,6 +346,7 @@ func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, ds ) op.SetAdditionalInfo("sourceScore", strconv.FormatInt(plan.sourceScore, 10)) op.SetAdditionalInfo("targetScore", strconv.FormatInt(plan.targetScore, 10)) + op.SetAdditionalInfo("tolerate", strconv.FormatInt(plan.tolerate, 10)) return op } balanceRangeNoReplacementCounter.Inc() @@ -357,6 +368,8 @@ type balanceRangeSchedulerPlan struct { fit *placement.RegionFit averageScore int64 job *balanceRangeSchedulerJob + opInfluence operator.OpInfluence + tolerate int64 } type storeInfo struct { @@ -371,7 +384,7 @@ func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluen return nil, err } filters := s.filters - if job.Engine == TiFlash { + if job.Engine == tiflash { filters = append(filters, filter.NewEngineFilter(balanceRangeName, filter.TiFlashEngineConstraint)) } sources := filter.SelectSourceStores(cluster.GetStores(), filters, cluster.GetSchedulerConfig(), nil, nil) @@ -389,6 +402,10 @@ func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluen totalScore += 1 } } + tolerate := int64(float64(len(scanRegions)) * adjustRatio) + if tolerate < 1 { + tolerate = 1 + } storeList := make([]*storeInfo, 0, len(storeInfos)) for storeID, store := range storeInfos { @@ -425,6 +442,8 @@ func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluen region: nil, averageScore: averageScore, job: job, + opInfluence: opInfluence, + tolerate: tolerate, }, nil } @@ -441,16 +460,33 @@ func (p *balanceRangeSchedulerPlan) score(storeID uint64) int64 { } func (p *balanceRangeSchedulerPlan) shouldBalance(scheduler string) bool { - shouldBalance := p.sourceScore > p.targetScore + sourceInfluence := p.opInfluence.GetStoreInfluence(p.sourceStoreID()) + sourceInf := p.job.Role.getStoreInfluence(sourceInfluence) + if sourceInf < 0 { + sourceInf = -sourceInf + } + sourceScore := p.sourceScore - sourceInf - p.tolerate + + targetInfluence := p.opInfluence.GetStoreInfluence(p.targetStoreID()) + targetInf := p.job.Role.getStoreInfluence(targetInfluence) + if targetInf < 0 { + targetInf = -targetInf + } + targetScore := p.targetScore + targetInf + p.tolerate + + shouldBalance := sourceScore >= targetScore if !shouldBalance && log.GetLevel() <= zap.DebugLevel { log.Debug("skip balance ", zap.String("scheduler", scheduler), zap.Uint64("region-id", p.region.GetID()), zap.Uint64("source-store", p.sourceStoreID()), zap.Uint64("target-store", p.targetStoreID()), - zap.Int64("source-score", p.sourceScore), - zap.Int64("target-score", p.targetScore), + zap.Int64("origin-source-score", p.sourceScore), + zap.Int64("origin-target-score", p.targetScore), + zap.Int64("influence-source-score", sourceScore), + zap.Int64("influence-target-score", targetScore), zap.Int64("average-region-size", p.averageScore), + zap.Int64("tolerate", p.tolerate), ) } return shouldBalance @@ -480,38 +516,40 @@ func (r Role) String() string { } } -type Engine int +// engine is the engine of the store. +type engine int const ( - TiKV Engine = iota - TiFlash - Unknown + tiKV engine = iota + tiflash + notSupported ) -func (e Engine) String() string { +func (e engine) String() string { switch e { - case TiKV: + case tiKV: return "tikv" - case TiFlash: + case tiflash: return "tiflash" default: - return "unknown" + return "not-supported" } } -func (e Engine) MarshalJSON() ([]byte, error) { +// MarshalJSON marshals to json. +func (e engine) MarshalJSON() ([]byte, error) { return []byte(`"` + e.String() + `"`), nil } // NewEngine creates a new engine. -func NewEngine(role string) Engine { +func NewEngine(role string) engine { switch role { case "tikv": - return TiKV + return tiKV case "tiflash": - return TiFlash + return tiflash default: - return Unknown + return notSupported } } diff --git a/pkg/schedule/schedulers/balance_range_test.go b/pkg/schedule/schedulers/balance_range_test.go index 49bc348e15f..0f5624fe8dd 100644 --- a/pkg/schedule/schedulers/balance_range_test.go +++ b/pkg/schedule/schedulers/balance_range_test.go @@ -1,4 +1,4 @@ -// Copyright 2025 TiKV Project Authors. +// Copyright 2025 tiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,14 +16,15 @@ package schedulers import ( "fmt" - "github.com/tikv/pd/pkg/schedule/operator" "testing" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/types" "github.com/tikv/pd/pkg/storage" @@ -61,8 +62,12 @@ func TestGetPeers(t *testing.T) { } func TestJobStatus(t *testing.T) { + s := storage.NewStorageWithMemoryBackend() re := require.New(t) - conf := balanceRangeSchedulerConfig{} + conf := &balanceRangeSchedulerConfig{ + schedulerConfig: &baseSchedulerConfig{}, + } + conf.init(balanceRangeName, s, conf) for _, v := range []struct { jobStatus JobStatus begin bool @@ -93,32 +98,6 @@ func TestJobStatus(t *testing.T) { } } -func TestBalanceRangeShouldBalance(t *testing.T) { - re := require.New(t) - for _, v := range []struct { - sourceScore int64 - targetScore int64 - shouldBalance bool - }{ - { - 100, - 10, - true, - }, - { - 10, - 10, - false, - }, - } { - plan := balanceRangeSchedulerPlan{ - sourceScore: v.sourceScore, - targetScore: v.targetScore, - } - re.Equal(plan.shouldBalance(balanceRangeName), v.shouldBalance) - } -} - func TestBalanceRangePlan(t *testing.T) { re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() @@ -129,7 +108,7 @@ func TestBalanceRangePlan(t *testing.T) { } tc.AddLeaderRegionWithRange(1, "100", "110", 1, 2, 3) job := &balanceRangeSchedulerJob{ - Engine: TiKV, + Engine: tiKV, Role: leader, Ranges: []core.KeyRange{core.NewKeyRange("100", "110")}, } @@ -138,7 +117,8 @@ func TestBalanceRangePlan(t *testing.T) { re.NotNil(plan) re.Len(plan.stores, 3) re.Len(plan.scoreMap, 3) - re.Equal(plan.scoreMap[1], int64(1)) + re.Equal(int64(1), plan.scoreMap[1]) + re.Equal(int64(1), plan.tolerate) } func TestTIKVEngine(t *testing.T) { @@ -146,7 +126,7 @@ func TestTIKVEngine(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() scheduler, err := CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"leader", "tikv", "1h", "test", "100", "200"})) - re.Nil(err) + re.NoError(err) ops, _ := scheduler.Schedule(tc, true) re.Empty(ops) for i := 1; i <= 3; i++ { @@ -166,8 +146,8 @@ func TestTIKVEngine(t *testing.T) { ops, _ = scheduler.Schedule(tc, true) re.NotEmpty(ops) op := ops[0] - re.Equal(op.GetAdditionalInfo("sourceScore"), "3") - re.Equal(op.GetAdditionalInfo("targetScore"), "1") + re.Equal("3", op.GetAdditionalInfo("sourceScore")) + re.Equal("1", op.GetAdditionalInfo("targetScore")) re.Contains(op.Brief(), "transfer leader: store 1 to 3") tc.AddLeaderStore(4, 0) @@ -175,8 +155,8 @@ func TestTIKVEngine(t *testing.T) { ops, _ = scheduler.Schedule(tc, true) re.NotEmpty(ops) op = ops[0] - re.Equal(op.GetAdditionalInfo("sourceScore"), "3") - re.Equal(op.GetAdditionalInfo("targetScore"), "0") + re.Equal("3", op.GetAdditionalInfo("sourceScore")) + re.Equal("0", op.GetAdditionalInfo("targetScore")) re.Contains(op.Brief(), "mv peer: store [1] to [4]") } @@ -185,15 +165,15 @@ func TestTIFLASHEngine(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tikvCount := 3 + // 3 tikv and 3 tiflash for i := 1; i <= tikvCount; i++ { tc.AddLeaderStore(uint64(i), 0) } for i := tikvCount + 1; i <= tikvCount+3; i++ { tc.AddLabelsStore(uint64(i), 0, map[string]string{"engine": "tiflash"}) } - for i := 1; i <= 3; i++ { - tc.AddRegionWithLearner(uint64(i), 1, []uint64{2, 3}, []uint64{4}) - } + tc.AddRegionWithLearner(uint64(1), 1, []uint64{2, 3}, []uint64{4}) + startKey := fmt.Sprintf("%20d0", 1) endKey := fmt.Sprintf("%20d0", 10) tc.RuleManager.SetRule(&placement.Rule{ @@ -208,11 +188,22 @@ func TestTIFLASHEngine(t *testing.T) { }, }) + // generate a balance range scheduler with tiflash engine scheduler, err := CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"learner", "tiflash", "1h", "test", startKey, endKey})) re.NoError(err) + // tiflash-4 only has 1 region, so it doesn't need to balance ops, _ := scheduler.Schedule(tc, false) + re.Empty(ops) + + // add 2 learner on tiflash-4 + for i := 2; i <= 3; i++ { + tc.AddRegionWithLearner(uint64(i), 1, []uint64{2, 3}, []uint64{4}) + } + ops, _ = scheduler.Schedule(tc, false) re.NotEmpty(ops) op := ops[0] - re.Equal(op.GetAdditionalInfo("sourceScore"), "3") + re.Equal("3", op.GetAdditionalInfo("sourceScore")) + re.Equal("0", op.GetAdditionalInfo("targetScore")) + re.Equal("1", op.GetAdditionalInfo("tolerate")) re.Contains(op.Brief(), "mv peer: store [4] to") } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index b09b20d9659..463bf87a363 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -572,7 +572,7 @@ func schedulersRegister() { return errs.ErrQueryUnescape.Wrap(err) } engine := NewEngine(engineString) - if engine == Unknown { + if engine == notSupported { return errs.ErrQueryUnescape.FastGenByArgs("engine") } timeout, err := url.QueryUnescape(args[2]) @@ -596,7 +596,7 @@ func schedulersRegister() { id = conf.jobs[len(conf.jobs)-1].JobID + 1 } - if engine == TiFlash && role != learner { + if engine == tiflash && role != learner { return errs.ErrURLParse.FastGenByArgs("TiFlash only support learner role") } diff --git a/server/api/scheduler.go b/server/api/scheduler.go index 4390bb94d94..52e65053b13 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -124,11 +124,7 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques } } -<<<<<<< HEAD - if err := apiutil.CollectStringOption("table-name", input, collector); err != nil { -======= if err := apiutil.CollectStringOption("alias", input, collector); err != nil { ->>>>>>> 4eb7235c629747692b6d336bf626cb86d31ec023 h.r.JSON(w, http.StatusInternalServerError, err.Error()) return } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index c1776a34116..1a588c24fb7 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3213,22 +3213,11 @@ func TestAddScheduler(t *testing.T) { _, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{}), controller.RemoveScheduler) re.Error(err) -<<<<<<< HEAD - gls, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"learner", "tiflash", "1h", "100", "200"}), controller.RemoveScheduler) -======= gls, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"learner", "tiflash", "1h", "test", "100", "200"}), controller.RemoveScheduler) ->>>>>>> 4eb7235c629747692b6d336bf626cb86d31ec023 re.NoError(err) re.NoError(controller.AddScheduler(gls)) conf, err = gls.EncodeConfig() re.NoError(err) -<<<<<<< HEAD - data = make(map[string]any) - re.NoError(json.Unmarshal(conf, &data)) - re.Equal("learner", data["role"]) - re.Equal("tiflash", data["engine"]) - re.Equal(float64(time.Hour.Nanoseconds()), data["timeout"]) -======= var cfg []map[string]any re.NoError(json.Unmarshal(conf, &cfg)) @@ -3236,7 +3225,6 @@ func TestAddScheduler(t *testing.T) { re.Equal("tiflash", cfg[0]["engine"]) re.Equal("test", cfg[0]["alias"]) re.Equal(float64(time.Hour.Nanoseconds()), cfg[0]["timeout"]) ->>>>>>> 4eb7235c629747692b6d336bf626cb86d31ec023 hb, err := schedulers.CreateScheduler(types.BalanceHotRegionScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 8f00052a8ac..7db455b2119 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -377,19 +377,11 @@ func NewBalanceWitnessSchedulerCommand() *cobra.Command { // NewBalanceRangeSchedulerCommand returns a command to add a balance-range-scheduler. func NewBalanceRangeSchedulerCommand() *cobra.Command { -<<<<<<< HEAD - c := &cobra.Command{ - Use: "balance-range-scheduler [--format=raw|encode|hex] ", - Short: "add a scheduler to balance region for given range", - Run: addSchedulerForBalanceRangeCommandFunc, - Deprecated: "balance-range will be deprecated in the future, please use sql instead", -======= // todo: add deprecated warning if sql support c := &cobra.Command{ Use: "balance-range-scheduler [--format=raw|encode|hex] ", Short: "add a scheduler to balance region for given range", Run: addSchedulerForBalanceRangeCommandFunc, ->>>>>>> 4eb7235c629747692b6d336bf626cb86d31ec023 } c.Flags().String("format", "hex", "the key format") return c @@ -453,11 +445,7 @@ func addSchedulerForBalanceRangeCommandFunc(cmd *cobra.Command, args []string) { input["name"] = cmd.Name() input["engine"] = args[0] input["role"] = args[1] -<<<<<<< HEAD - input["table-name"] = args[2] -======= input["alias"] = args[2] ->>>>>>> 4eb7235c629747692b6d336bf626cb86d31ec023 input["start-key"] = url.QueryEscape(startKey) input["end-key"] = url.QueryEscape(endKey) diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index f6c1ba47dc2..d060fc0e3eb 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -546,15 +546,6 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust re.NotContains(echo, "Success!") echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler", "--format=raw", "tiflash", "learner", "test", "a", "b"}, nil) re.Contains(echo, "Success!") -<<<<<<< HEAD - conf = make(map[string]any) - testutil.Eventually(re, func() bool { - mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-range-scheduler"}, &conf) - return conf["role"] == "learner" && conf["engine"] == "tiflash" && conf["table-name"] == "test" - }) - re.Equal(float64(time.Hour.Nanoseconds()), conf["timeout"]) - ranges := conf["ranges"].([]any)[0].(map[string]any) -======= var rangeConf []map[string]any var jobConf map[string]any testutil.Eventually(re, func() bool { @@ -565,7 +556,6 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust re.Equal(float64(time.Hour.Nanoseconds()), jobConf["timeout"]) re.Equal("pending", jobConf["status"]) ranges := jobConf["ranges"].([]any)[0].(map[string]any) ->>>>>>> 4eb7235c629747692b6d336bf626cb86d31ec023 re.Equal(core.HexRegionKeyStr([]byte("a")), ranges["start-key"]) re.Equal(core.HexRegionKeyStr([]byte("b")), ranges["end-key"])