Skip to content

Commit

Permalink
api: add a new scheduler to balance the regions of the given key range (
Browse files Browse the repository at this point in the history
#8988)

close #8987

Signed-off-by: 童剑 <1045931706@qq.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
bufferflies and ti-chi-bot[bot] authored Feb 10, 2025
1 parent c92da83 commit 517afe0
Show file tree
Hide file tree
Showing 10 changed files with 472 additions and 8 deletions.
10 changes: 10 additions & 0 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core

import (
"bytes"
"encoding/json"

"github.com/tikv/pd/pkg/core/constant"
)
Expand Down Expand Up @@ -156,6 +157,15 @@ type KeyRange struct {
EndKey []byte `json:"end-key"`
}

// MarshalJSON marshals to json.
func (kr KeyRange) MarshalJSON() ([]byte, error) {
m := map[string]string{
"start-key": HexRegionKeyStr(kr.StartKey),
"end-key": HexRegionKeyStr(kr.EndKey),
}
return json.Marshal(m)
}

// NewKeyRange create a KeyRange with the given start key and end key.
func NewKeyRange(startKey, endKey string) KeyRange {
return KeyRange{
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@ func (c *Cluster) updateScheduler() {
)
// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
schedulerType := types.ConvertOldStrToType[scheduler.Type]
schedulerType, ok := types.ConvertOldStrToType[scheduler.Type]
if !ok {
log.Error("scheduler not found", zap.String("type", scheduler.Type))
continue
}
s, err := schedulers.CreateScheduler(
schedulerType,
c.coordinator.GetOperatorController(),
Expand Down
6 changes: 0 additions & 6 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ func RegisterScheduler(typ types.CheckerSchedulerType) {
schedulerMap.Store(typ, struct{}{})
}

// IsSchedulerRegistered checks if the named scheduler type is registered.
func IsSchedulerRegistered(typ types.CheckerSchedulerType) bool {
_, ok := schedulerMap.Load(typ)
return ok
}

// SchedulerConfigProvider is the interface for scheduler configurations.
type SchedulerConfigProvider interface {
SharedConfigProvider
Expand Down
237 changes: 237 additions & 0 deletions pkg/schedule/schedulers/balance_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/unrolled/render"

"github.com/pingcap/log"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/syncutil"
)

type balanceRangeSchedulerHandler struct {
rd *render.Render
config *balanceRangeSchedulerConfig
}

func newBalanceRangeHandler(conf *balanceRangeSchedulerConfig) http.Handler {
handler := &balanceRangeSchedulerHandler{
config: conf,
rd: render.New(render.Options{IndentJSON: true}),
}
router := mux.NewRouter()
router.HandleFunc("/config", handler.updateConfig).Methods(http.MethodPost)
router.HandleFunc("/list", handler.listConfig).Methods(http.MethodGet)
return router
}

func (handler *balanceRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) {
handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported")
}

func (handler *balanceRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.clone()
if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil {
log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err))
}
}

type balanceRangeSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
jobs []*balanceRangeSchedulerJob
}

type balanceRangeSchedulerJob struct {
JobID uint64 `json:"job-id"`
Role Role `json:"role"`
Engine string `json:"engine"`
Timeout time.Duration `json:"timeout"`
Ranges []core.KeyRange `json:"ranges"`
Alias string `json:"alias"`
Status JobStatus `json:"status"`
}

func (conf *balanceRangeSchedulerConfig) clone() []*balanceRangeSchedulerJob {
conf.RLock()
defer conf.RUnlock()
jobs := make([]*balanceRangeSchedulerJob, 0, len(conf.jobs))
for _, job := range conf.jobs {
ranges := make([]core.KeyRange, len(job.Ranges))
copy(ranges, job.Ranges)
jobs = append(jobs, &balanceRangeSchedulerJob{
Ranges: ranges,
Role: job.Role,
Engine: job.Engine,
Timeout: job.Timeout,
Alias: job.Alias,
JobID: job.JobID,
})
}

return jobs
}

// EncodeConfig serializes the config.
func (s *balanceRangeScheduler) EncodeConfig() ([]byte, error) {
s.conf.RLock()
defer s.conf.RUnlock()
return EncodeConfig(s.conf.jobs)
}

// ReloadConfig reloads the config.
func (s *balanceRangeScheduler) ReloadConfig() error {
s.conf.Lock()
defer s.conf.Unlock()

jobs := make([]*balanceRangeSchedulerJob, 0, len(s.conf.jobs))
if err := s.conf.load(jobs); err != nil {
return err
}
s.conf.jobs = jobs
return nil
}

type balanceRangeScheduler struct {
*BaseScheduler
conf *balanceRangeSchedulerConfig
handler http.Handler
filters []filter.Filter
filterCounter *filter.Counter
}

// ServeHTTP implements the http.Handler interface.
func (s *balanceRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}

// Schedule schedules the balance key range operator.
func (*balanceRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) {
log.Debug("balance key range scheduler is scheduling, need to implement")
return nil, nil
}

// IsScheduleAllowed checks if the scheduler is allowed to schedule new operators.
func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit()
if !allowed {
operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange)
}
return allowed
}

// BalanceRangeCreateOption is used to create a scheduler with an option.
type BalanceRangeCreateOption func(s *balanceRangeScheduler)

// newBalanceRangeScheduler creates a scheduler that tends to keep given peer Role on
// special store balanced.
func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRangeSchedulerConfig, options ...BalanceRangeCreateOption) Scheduler {
s := &balanceRangeScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf),
conf: conf,
handler: newBalanceRangeHandler(conf),
}
for _, option := range options {
option(s)
}
s.filters = []filter.Filter{
&filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium},
filter.NewSpecialUseFilter(s.GetName()),
}
s.filterCounter = filter.NewCounter(s.GetName())
return s
}

// JobStatus is the status of the job.
type JobStatus int

const (
pending JobStatus = iota
running
finished
)

func (s JobStatus) String() string {
switch s {
case pending:
return "pending"
case running:
return "running"
case finished:
return "finished"
}
return "unknown"
}

// MarshalJSON marshals to json.
func (s JobStatus) MarshalJSON() ([]byte, error) {
return []byte(`"` + s.String() + `"`), nil
}

// Role is the role of the region.
type Role int

const (
leader Role = iota
// include leader + voter
follower
learner
unknown
)

// NewRole creates a new role.
func NewRole(role string) Role {
switch role {
case "leader":
return leader
case "follower":
return follower
case "learner":
return learner
default:
return unknown
}
}

func (r Role) String() string {
switch r {
case leader:
return "leader"
case follower:
return "follower"
case learner:
return "learner"
default:
return "unknown"
}
}

// MarshalJSON marshals to json.
func (r Role) MarshalJSON() ([]byte, error) {
return []byte(`"` + r.String() + `"`), nil
}
74 changes: 74 additions & 0 deletions pkg/schedule/schedulers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package schedulers

import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -545,4 +547,76 @@ func schedulersRegister() {
conf.init(sche.GetName(), storage, conf)
return sche, nil
})

// balance key range scheduler
// args: [role, engine, timeout, alias, range1, range2, ...]
RegisterSliceDecoderBuilder(types.BalanceRangeScheduler, func(args []string) ConfigDecoder {
return func(v any) error {
conf, ok := v.(*balanceRangeSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
if len(args) < 5 {
return errs.ErrSchedulerConfig.FastGenByArgs("args length must be greater than 4")
}
role, err := url.QueryUnescape(args[0])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}
jobRole := NewRole(role)
if jobRole == unknown {
return errs.ErrQueryUnescape.FastGenByArgs("role")
}
engine, err := url.QueryUnescape(args[1])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}
timeout, err := url.QueryUnescape(args[2])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}
duration, err := time.ParseDuration(timeout)
if err != nil {
return errs.ErrURLParse.Wrap(err)
}
alias, err := url.QueryUnescape(args[3])
if err != nil {
return errs.ErrURLParse.Wrap(err)
}
ranges, err := getKeyRanges(args[4:])
if err != nil {
return err
}

id := uint64(0)
if len(conf.jobs) > 0 {
id = conf.jobs[len(conf.jobs)-1].JobID + 1
}

job := &balanceRangeSchedulerJob{
Role: jobRole,
Engine: engine,
Timeout: duration,
Alias: alias,
Ranges: ranges,
Status: pending,
JobID: id,
}
conf.jobs = append(conf.jobs, job)
return nil
}
})

RegisterScheduler(types.BalanceRangeScheduler, func(opController *operator.Controller,
storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) {
conf := &balanceRangeSchedulerConfig{
schedulerConfig: newBaseDefaultSchedulerConfig(),
}
if err := decoder(conf); err != nil {
return nil, err
}
sche := newBalanceRangeScheduler(opController, conf)
conf.init(sche.GetName(), storage, conf)
return sche, nil
})
}
Loading

0 comments on commit 517afe0

Please sign in to comment.