Skip to content

Commit

Permalink
puller: add tikv version check
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Feb 20, 2025
1 parent 7d684be commit 3610090
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 18 deletions.
4 changes: 4 additions & 0 deletions logservice/logpuller/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (e *rpcCtxUnavailableErr) Error() string {
e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer())
}

type getStoreErr struct{}

func (e *getStoreErr) Error() string { return "get store error" }

type sendRequestToStoreErr struct{}

func (e *sendRequestToStoreErr) Error() string { return "send request to store error" }
40 changes: 22 additions & 18 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/util"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
Expand Down Expand Up @@ -101,12 +102,29 @@ func newRegionRequestWorker(
if err := waitForPreFetching(); err != nil {
return err
}
if canceled := worker.run(ctx, credential); canceled {
return nil
var regionErr error
if err := version.CheckStoreVersion(ctx, worker.client.pd, worker.store.storeID); err != nil {
log.Info("event feed check store version fails",
zap.Uint64("workerID", worker.workerID),
zap.Uint64("storeID", worker.store.storeID),
zap.String("addr", worker.store.storeAddr),
zap.Error(err))
if errors.Cause(err) == context.Canceled {
return nil
} else if cerror.Is(err, cerror.ErrGetAllStoresFailed) {
regionErr = &getStoreErr{}
} else {
regionErr = &sendRequestToStoreErr{}
}
} else {
if canceled := worker.run(ctx, credential); canceled {
return nil
}
regionErr = &sendRequestToStoreErr{}
}
for subID, m := range worker.clearRegionStates() {
for _, state := range m {
state.markStopped(&sendRequestToStoreErr{})
state.markStopped(regionErr)
regionEvent := regionEvent{
state: state,
worker: worker,
Expand All @@ -120,7 +138,7 @@ func newRegionRequestWorker(
// It means it's a special task for stopping the table.
continue
}
client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{}))
client.onRegionFail(newRegionErrorInfo(region, regionErr))
}
if err := util.Hang(ctx, time.Second); err != nil {
return err
Expand All @@ -141,8 +159,6 @@ func (s *regionRequestWorker) run(ctx context.Context, credential *security.Cred
}
}

// FIXME: check tikv store version

log.Info("region request worker going to create grpc stream",
zap.Uint64("workerID", s.workerID),
zap.Uint64("storeID", s.store.storeID),
Expand Down Expand Up @@ -446,15 +462,3 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo {
}
return regions
}

func (s *regionRequestWorker) getAllRegionStates() regionFeedStates {
s.requestedRegions.RLock()
defer s.requestedRegions.RUnlock()
states := make(regionFeedStates)
for _, statesMap := range s.requestedRegions.subscriptions {
for regionID, state := range statesMap {
states[regionID] = state
}
}
return states
}
95 changes: 95 additions & 0 deletions pkg/version/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package version

import (
"context"
"fmt"
"regexp"
"strings"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/tidb/pkg/util/engine"
cerror "github.com/pingcap/tiflow/pkg/errors"
pd "github.com/tikv/pd/client"
)

var (
// MinTiKVVersion is the version of the minimal compatible TiKV.
// The min version should be 7.5 for new arch.
MinTiKVVersion = semver.New("7.5.0-alpha")
// maxTiKVVersion is the version of the maximum compatible TiKV.
// Compatible versions are in [MinTiKVVersion, maxTiKVVersion)
maxTiKVVersion = semver.New("15.0.0")
)

var versionHash = regexp.MustCompile("-[0-9]+-g[0-9a-f]{7,}(-dev)?")

// SanitizeVersion remove the prefix "v" and suffix git hash.
func SanitizeVersion(v string) string {
if v == "" {
return v
}
v = versionHash.ReplaceAllLiteralString(v, "")
v = strings.TrimSuffix(v, "-fips")
v = strings.TrimSuffix(v, "-dirty")
return strings.TrimPrefix(v, "v")
}

// CheckStoreVersion checks whether the given TiKV is compatible with this CDC.
// If storeID is 0, it checks all TiKV.
func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error {
failpoint.Inject("GetStoreFailed", func() {
failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID)))
})
var stores []*metapb.Store
var err error
if storeID == 0 {
stores, err = client.GetAllStores(ctx, pd.WithExcludeTombstone())
} else {
stores = make([]*metapb.Store, 1)
stores[0], err = client.GetStore(ctx, storeID)
}
if err != nil {
return cerror.WrapError(cerror.ErrGetAllStoresFailed, err)
}

for _, s := range stores {
if engine.IsTiFlash(s) {
continue
}

ver, err := semver.NewVersion(SanitizeVersion(s.Version))
if err != nil {
err = errors.Annotate(err, "invalid TiKV version")
return cerror.WrapError(cerror.ErrNewSemVersion, err)
}
minOrd := ver.Compare(*MinTiKVVersion)
if minOrd < 0 {
arg := fmt.Sprintf("TiKV %s is not supported, the minimal compatible version is %s",
SanitizeVersion(s.Version), MinTiKVVersion)
return cerror.ErrVersionIncompatible.GenWithStackByArgs(arg)
}
maxOrd := ver.Compare(*maxTiKVVersion)
if maxOrd >= 0 {
arg := fmt.Sprintf("TiKV %s is not supported, only support version less than %s",
SanitizeVersion(s.Version), maxTiKVVersion)
return cerror.ErrVersionIncompatible.GenWithStackByArgs(arg)
}
}
return nil
}

0 comments on commit 3610090

Please sign in to comment.