Skip to content

Commit

Permalink
add more test
Browse files Browse the repository at this point in the history
Signed-off-by: 童剑 <1045931706@qq.com>
  • Loading branch information
bufferflies committed Feb 13, 2025
1 parent d0f2cae commit f3b7ab9
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 105 deletions.
1 change: 1 addition & 0 deletions pkg/core/constant/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
RegionKind
// WitnessKind indicates the witness kind resource
WitnessKind

// ResourceKindLen represents the ResourceKind count
ResourceKindLen
)
Expand Down
88 changes: 63 additions & 25 deletions pkg/schedule/schedulers/balance_range.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2025 TiKV Project Authors.
// Copyright 2025 tiKV Project Authors.

Check failure on line 1 in pkg/schedule/schedulers/balance_range.go

View workflow job for this annotation

GitHub Actions / statics

Pattern Copyright \d{4} TiKV Project Authors. doesn't match. (goheader)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -21,11 +21,13 @@ import (
"time"

"github.com/gorilla/mux"

"github.com/unrolled/render"

"go.uber.org/zap"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
Expand Down Expand Up @@ -77,7 +79,7 @@ type balanceRangeSchedulerConfig struct {
type balanceRangeSchedulerJob struct {
JobID uint64 `json:"job-id"`
Role Role `json:"role"`
Engine Engine `json:"engine"`
Engine engine `json:"engine"`
Timeout time.Duration `json:"timeout"`
Ranges []core.KeyRange `json:"ranges"`
Alias string `json:"alias"`
Expand All @@ -96,6 +98,10 @@ func (conf *balanceRangeSchedulerConfig) begin(job *balanceRangeSchedulerJob) bo
now := time.Now()
job.Start = &now
job.Status = running
if err := conf.save(); err != nil {
log.Warn("failed to persist config", zap.Error(err), zap.Uint64("job-id", job.JobID))
return false
}
return true
}

Expand All @@ -107,10 +113,14 @@ func (conf *balanceRangeSchedulerConfig) finish(job *balanceRangeSchedulerJob) b
}
now := time.Now()
job.Finish = &now
if err := conf.save(); err != nil {
log.Warn("failed to persist config", zap.Error(err), zap.Uint64("job-id", job.JobID))
return false
}
return true
}

func (conf *balanceRangeSchedulerConfig) pop() *balanceRangeSchedulerJob {
func (conf *balanceRangeSchedulerConfig) peek() *balanceRangeSchedulerJob {
conf.RLock()
defer conf.RLock()
for _, job := range conf.jobs {
Expand Down Expand Up @@ -182,12 +192,12 @@ func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster)
if !allowed {
operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange)
}
job := s.conf.pop()
job := s.conf.peek()
if job != nil {
if job.Status == pending {
s.conf.begin(job)
}
if time.Now().Sub(*job.Start) > job.Timeout {
if time.Since(*job.Start) > job.Timeout {
s.conf.finish(job)
balanceRangeExpiredCounter.Inc()
}
Expand Down Expand Up @@ -220,9 +230,9 @@ func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRa
}

// Schedule schedules the balance key range operator.
func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
balanceRangeCounter.Inc()
job := s.conf.pop()
job := s.conf.peek()
if job == nil {
return nil, nil
}
Expand Down Expand Up @@ -336,6 +346,7 @@ func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, ds
)
op.SetAdditionalInfo("sourceScore", strconv.FormatInt(plan.sourceScore, 10))
op.SetAdditionalInfo("targetScore", strconv.FormatInt(plan.targetScore, 10))
op.SetAdditionalInfo("tolerate", strconv.FormatInt(plan.tolerate, 10))
return op
}
balanceRangeNoReplacementCounter.Inc()
Expand All @@ -357,6 +368,8 @@ type balanceRangeSchedulerPlan struct {
fit *placement.RegionFit
averageScore int64
job *balanceRangeSchedulerJob
opInfluence operator.OpInfluence
tolerate int64
}

type storeInfo struct {
Expand All @@ -371,7 +384,7 @@ func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluen
return nil, err
}
filters := s.filters
if job.Engine == TiFlash {
if job.Engine == tiflash {
filters = append(filters, filter.NewEngineFilter(balanceRangeName, filter.TiFlashEngineConstraint))
}
sources := filter.SelectSourceStores(cluster.GetStores(), filters, cluster.GetSchedulerConfig(), nil, nil)
Expand All @@ -389,6 +402,10 @@ func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluen
totalScore += 1
}
}
tolerate := int64(float64(len(scanRegions)) * adjustRatio)
if tolerate < 1 {
tolerate = 1
}

storeList := make([]*storeInfo, 0, len(storeInfos))
for storeID, store := range storeInfos {
Expand Down Expand Up @@ -425,6 +442,8 @@ func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluen
region: nil,
averageScore: averageScore,
job: job,
opInfluence: opInfluence,
tolerate: tolerate,
}, nil
}

Expand All @@ -441,16 +460,33 @@ func (p *balanceRangeSchedulerPlan) score(storeID uint64) int64 {
}

func (p *balanceRangeSchedulerPlan) shouldBalance(scheduler string) bool {
shouldBalance := p.sourceScore > p.targetScore
sourceInfluence := p.opInfluence.GetStoreInfluence(p.sourceStoreID())
sourceInf := p.job.Role.getStoreInfluence(sourceInfluence)
if sourceInf < 0 {
sourceInf = -sourceInf
}
sourceScore := p.sourceScore - sourceInf - p.tolerate

targetInfluence := p.opInfluence.GetStoreInfluence(p.targetStoreID())
targetInf := p.job.Role.getStoreInfluence(targetInfluence)
if targetInf < 0 {
targetInf = -targetInf
}
targetScore := p.targetScore + targetInf + p.tolerate

shouldBalance := sourceScore >= targetScore
if !shouldBalance && log.GetLevel() <= zap.DebugLevel {
log.Debug("skip balance ",
zap.String("scheduler", scheduler),
zap.Uint64("region-id", p.region.GetID()),
zap.Uint64("source-store", p.sourceStoreID()),
zap.Uint64("target-store", p.targetStoreID()),
zap.Int64("source-score", p.sourceScore),
zap.Int64("target-score", p.targetScore),
zap.Int64("origin-source-score", p.sourceScore),
zap.Int64("origin-target-score", p.targetScore),
zap.Int64("influence-source-score", sourceScore),
zap.Int64("influence-target-score", targetScore),
zap.Int64("average-region-size", p.averageScore),
zap.Int64("tolerate", p.tolerate),
)
}
return shouldBalance
Expand Down Expand Up @@ -480,38 +516,40 @@ func (r Role) String() string {
}
}

type Engine int
// engine is the engine of the store.
type engine int

const (
TiKV Engine = iota
TiFlash
Unknown
tiKV engine = iota
tiflash
notSupported
)

func (e Engine) String() string {
func (e engine) String() string {
switch e {
case TiKV:
case tiKV:
return "tikv"
case TiFlash:
case tiflash:
return "tiflash"
default:
return "unknown"
return "not-supported"
}
}

func (e Engine) MarshalJSON() ([]byte, error) {
// MarshalJSON marshals to json.
func (e engine) MarshalJSON() ([]byte, error) {
return []byte(`"` + e.String() + `"`), nil
}

// NewEngine creates a new engine.
func NewEngine(role string) Engine {
func NewEngine(role string) engine {
switch role {
case "tikv":
return TiKV
return tiKV
case "tiflash":
return TiFlash
return tiflash
default:
return Unknown
return notSupported
}
}

Expand Down
71 changes: 31 additions & 40 deletions pkg/schedule/schedulers/balance_range_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2025 TiKV Project Authors.
// Copyright 2025 tiKV Project Authors.

Check failure on line 1 in pkg/schedule/schedulers/balance_range_test.go

View workflow job for this annotation

GitHub Actions / statics

Pattern Copyright \d{4} TiKV Project Authors. doesn't match. (goheader)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,15 @@ package schedulers

import (
"fmt"
"github.com/tikv/pd/pkg/schedule/operator"

"testing"

"github.com/pingcap/kvproto/pkg/metapb"

"github.com/stretchr/testify/require"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/storage"
Expand Down Expand Up @@ -61,8 +62,12 @@ func TestGetPeers(t *testing.T) {
}

func TestJobStatus(t *testing.T) {
s := storage.NewStorageWithMemoryBackend()
re := require.New(t)
conf := balanceRangeSchedulerConfig{}
conf := &balanceRangeSchedulerConfig{
schedulerConfig: &baseSchedulerConfig{},
}
conf.init(balanceRangeName, s, conf)
for _, v := range []struct {
jobStatus JobStatus
begin bool
Expand Down Expand Up @@ -93,32 +98,6 @@ func TestJobStatus(t *testing.T) {
}
}

func TestBalanceRangeShouldBalance(t *testing.T) {
re := require.New(t)
for _, v := range []struct {
sourceScore int64
targetScore int64
shouldBalance bool
}{
{
100,
10,
true,
},
{
10,
10,
false,
},
} {
plan := balanceRangeSchedulerPlan{
sourceScore: v.sourceScore,
targetScore: v.targetScore,
}
re.Equal(plan.shouldBalance(balanceRangeName), v.shouldBalance)
}
}

func TestBalanceRangePlan(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
Expand All @@ -129,7 +108,7 @@ func TestBalanceRangePlan(t *testing.T) {
}
tc.AddLeaderRegionWithRange(1, "100", "110", 1, 2, 3)
job := &balanceRangeSchedulerJob{
Engine: TiKV,
Engine: tiKV,
Role: leader,
Ranges: []core.KeyRange{core.NewKeyRange("100", "110")},
}
Expand All @@ -138,15 +117,16 @@ func TestBalanceRangePlan(t *testing.T) {
re.NotNil(plan)
re.Len(plan.stores, 3)
re.Len(plan.scoreMap, 3)
re.Equal(plan.scoreMap[1], int64(1))
re.Equal(int64(1), plan.scoreMap[1])
re.Equal(int64(1), plan.tolerate)
}

func TestTIKVEngine(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
scheduler, err := CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"leader", "tikv", "1h", "test", "100", "200"}))
re.Nil(err)
re.NoError(err)
ops, _ := scheduler.Schedule(tc, true)
re.Empty(ops)
for i := 1; i <= 3; i++ {
Expand All @@ -166,17 +146,17 @@ func TestTIKVEngine(t *testing.T) {
ops, _ = scheduler.Schedule(tc, true)
re.NotEmpty(ops)
op := ops[0]
re.Equal(op.GetAdditionalInfo("sourceScore"), "3")
re.Equal(op.GetAdditionalInfo("targetScore"), "1")
re.Equal("3", op.GetAdditionalInfo("sourceScore"))
re.Equal("1", op.GetAdditionalInfo("targetScore"))
re.Contains(op.Brief(), "transfer leader: store 1 to 3")
tc.AddLeaderStore(4, 0)

// case2: move peer from store 1 to store 4
ops, _ = scheduler.Schedule(tc, true)
re.NotEmpty(ops)
op = ops[0]
re.Equal(op.GetAdditionalInfo("sourceScore"), "3")
re.Equal(op.GetAdditionalInfo("targetScore"), "0")
re.Equal("3", op.GetAdditionalInfo("sourceScore"))
re.Equal("0", op.GetAdditionalInfo("targetScore"))
re.Contains(op.Brief(), "mv peer: store [1] to [4]")
}

Expand All @@ -185,15 +165,15 @@ func TestTIFLASHEngine(t *testing.T) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tikvCount := 3
// 3 tikv and 3 tiflash
for i := 1; i <= tikvCount; i++ {
tc.AddLeaderStore(uint64(i), 0)
}
for i := tikvCount + 1; i <= tikvCount+3; i++ {
tc.AddLabelsStore(uint64(i), 0, map[string]string{"engine": "tiflash"})
}
for i := 1; i <= 3; i++ {
tc.AddRegionWithLearner(uint64(i), 1, []uint64{2, 3}, []uint64{4})
}
tc.AddRegionWithLearner(uint64(1), 1, []uint64{2, 3}, []uint64{4})

startKey := fmt.Sprintf("%20d0", 1)
endKey := fmt.Sprintf("%20d0", 10)
tc.RuleManager.SetRule(&placement.Rule{
Expand All @@ -208,11 +188,22 @@ func TestTIFLASHEngine(t *testing.T) {
},
})

// generate a balance range scheduler with tiflash engine
scheduler, err := CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"learner", "tiflash", "1h", "test", startKey, endKey}))
re.NoError(err)
// tiflash-4 only has 1 region, so it doesn't need to balance
ops, _ := scheduler.Schedule(tc, false)
re.Empty(ops)

// add 2 learner on tiflash-4
for i := 2; i <= 3; i++ {
tc.AddRegionWithLearner(uint64(i), 1, []uint64{2, 3}, []uint64{4})
}
ops, _ = scheduler.Schedule(tc, false)
re.NotEmpty(ops)
op := ops[0]
re.Equal(op.GetAdditionalInfo("sourceScore"), "3")
re.Equal("3", op.GetAdditionalInfo("sourceScore"))
re.Equal("0", op.GetAdditionalInfo("targetScore"))
re.Equal("1", op.GetAdditionalInfo("tolerate"))
re.Contains(op.Brief(), "mv peer: store [4] to")
}
Loading

0 comments on commit f3b7ab9

Please sign in to comment.