diff --git a/logservice/logpuller/errors.go b/logservice/logpuller/errors.go index 835ae551..6fddf0d6 100644 --- a/logservice/logpuller/errors.go +++ b/logservice/logpuller/errors.go @@ -41,6 +41,10 @@ func (e *rpcCtxUnavailableErr) Error() string { e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer()) } +type getStoreErr struct{} + +func (e *getStoreErr) Error() string { return "get store error" } + type sendRequestToStoreErr struct{} func (e *sendRequestToStoreErr) Error() string { return "send request to store error" } diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index e0b0974e..d019fc50 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/util" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" @@ -101,12 +102,29 @@ func newRegionRequestWorker( if err := waitForPreFetching(); err != nil { return err } - if canceled := worker.run(ctx, credential); canceled { - return nil + var regionErr error + if err := version.CheckStoreVersion(ctx, worker.client.pd, worker.store.storeID); err != nil { + log.Info("event feed check store version fails", + zap.Uint64("workerID", worker.workerID), + zap.Uint64("storeID", worker.store.storeID), + zap.String("addr", worker.store.storeAddr), + zap.Error(err)) + if errors.Cause(err) == context.Canceled { + return nil + } else if cerror.Is(err, cerror.ErrGetAllStoresFailed) { + regionErr = &getStoreErr{} + } else { + regionErr = &sendRequestToStoreErr{} + } + } else { + if canceled := worker.run(ctx, credential); canceled { + return nil + } + regionErr = &sendRequestToStoreErr{} } for subID, m := range worker.clearRegionStates() { for _, state := range m { - state.markStopped(&sendRequestToStoreErr{}) + state.markStopped(regionErr) regionEvent := regionEvent{ state: state, worker: worker, @@ -120,7 +138,7 @@ func newRegionRequestWorker( // It means it's a special task for stopping the table. continue } - client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{})) + client.onRegionFail(newRegionErrorInfo(region, regionErr)) } if err := util.Hang(ctx, time.Second); err != nil { return err @@ -141,8 +159,6 @@ func (s *regionRequestWorker) run(ctx context.Context, credential *security.Cred } } - // FIXME: check tikv store version - log.Info("region request worker going to create grpc stream", zap.Uint64("workerID", s.workerID), zap.Uint64("storeID", s.store.storeID), @@ -446,15 +462,3 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo { } return regions } - -func (s *regionRequestWorker) getAllRegionStates() regionFeedStates { - s.requestedRegions.RLock() - defer s.requestedRegions.RUnlock() - states := make(regionFeedStates) - for _, statesMap := range s.requestedRegions.subscriptions { - for regionID, state := range statesMap { - states[regionID] = state - } - } - return states -} diff --git a/pkg/version/check.go b/pkg/version/check.go new file mode 100644 index 00000000..fde38bd9 --- /dev/null +++ b/pkg/version/check.go @@ -0,0 +1,95 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "context" + "fmt" + "regexp" + "strings" + + "github.com/coreos/go-semver/semver" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/tidb/pkg/util/engine" + cerror "github.com/pingcap/tiflow/pkg/errors" + pd "github.com/tikv/pd/client" +) + +var ( + // MinTiKVVersion is the version of the minimal compatible TiKV. + // The min version should be 7.5 for new arch. + MinTiKVVersion = semver.New("7.5.0-alpha") + // maxTiKVVersion is the version of the maximum compatible TiKV. + // Compatible versions are in [MinTiKVVersion, maxTiKVVersion) + maxTiKVVersion = semver.New("15.0.0") +) + +var versionHash = regexp.MustCompile("-[0-9]+-g[0-9a-f]{7,}(-dev)?") + +// SanitizeVersion remove the prefix "v" and suffix git hash. +func SanitizeVersion(v string) string { + if v == "" { + return v + } + v = versionHash.ReplaceAllLiteralString(v, "") + v = strings.TrimSuffix(v, "-fips") + v = strings.TrimSuffix(v, "-dirty") + return strings.TrimPrefix(v, "v") +} + +// CheckStoreVersion checks whether the given TiKV is compatible with this CDC. +// If storeID is 0, it checks all TiKV. +func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error { + failpoint.Inject("GetStoreFailed", func() { + failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID))) + }) + var stores []*metapb.Store + var err error + if storeID == 0 { + stores, err = client.GetAllStores(ctx, pd.WithExcludeTombstone()) + } else { + stores = make([]*metapb.Store, 1) + stores[0], err = client.GetStore(ctx, storeID) + } + if err != nil { + return cerror.WrapError(cerror.ErrGetAllStoresFailed, err) + } + + for _, s := range stores { + if engine.IsTiFlash(s) { + continue + } + + ver, err := semver.NewVersion(SanitizeVersion(s.Version)) + if err != nil { + err = errors.Annotate(err, "invalid TiKV version") + return cerror.WrapError(cerror.ErrNewSemVersion, err) + } + minOrd := ver.Compare(*MinTiKVVersion) + if minOrd < 0 { + arg := fmt.Sprintf("TiKV %s is not supported, the minimal compatible version is %s", + SanitizeVersion(s.Version), MinTiKVVersion) + return cerror.ErrVersionIncompatible.GenWithStackByArgs(arg) + } + maxOrd := ver.Compare(*maxTiKVVersion) + if maxOrd >= 0 { + arg := fmt.Sprintf("TiKV %s is not supported, only support version less than %s", + SanitizeVersion(s.Version), maxTiKVVersion) + return cerror.ErrVersionIncompatible.GenWithStackByArgs(arg) + } + } + return nil +}