Skip to content

Commit

Permalink
remove more tidb
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu committed Nov 19, 2023
1 parent 73c8391 commit e640161
Show file tree
Hide file tree
Showing 14 changed files with 36 additions and 635 deletions.
4 changes: 2 additions & 2 deletions cdc/cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (

"github.com/gin-gonic/gin"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/cdc/owner"
"github.com/tikv/migration/cdc/pkg/config"
cerror "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/httputil"
"github.com/tikv/migration/cdc/pkg/logutil"
"github.com/tikv/migration/cdc/pkg/retry"
"github.com/tikv/migration/cdc/pkg/version"
Expand Down Expand Up @@ -782,7 +782,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
}

// forward to owner
cli := httputil.NewClient(tslConfig)
cli := httputil.NewClientByTLSConfig(tslConfig)

Check warning on line 785 in cdc/cdc/capture/http_handler.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/capture/http_handler.go#L785

Added line #L785 was not covered by tests
resp, err := cli.Do(req)
if err != nil {
_ = c.Error(err)
Expand Down
6 changes: 3 additions & 3 deletions cdc/cdc/http_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/tikv/migration/cdc/cdc/capture"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/pkg/config"
cerrors "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/httputil"
"github.com/tikv/migration/cdc/pkg/retry"
security2 "github.com/tikv/migration/cdc/pkg/security"
"github.com/tikv/migration/cdc/pkg/util/testleak"
Expand Down Expand Up @@ -246,7 +246,7 @@ func (s *httpStatusSuite) TestServerTLSWithoutCommonName(c *check.C) {
if err != nil {
c.Assert(err, check.IsNil)
}
cli := httputil.NewClient(tlsConfig)
cli := httputil.NewClientByTLSConfig(tlsConfig)
resp, err := cli.Get(statusURL)
if err != nil {
return err
Expand Down Expand Up @@ -324,7 +324,7 @@ func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) {
if err != nil {
c.Assert(err, check.IsNil)
}
cli := httputil.NewClient(tlsConfig)
cli := httputil.NewClientByTLSConfig(tlsConfig)
resp, err := cli.Get(statusURL)
if err != nil {
return err
Expand Down
6 changes: 2 additions & 4 deletions cdc/cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mockcopr"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -399,9 +398,8 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
cluster.ChangeLeader(3, 5)

ts, err := kvStorage.CurrentTimestamp(oracle.GlobalTxnScope)
ver := kv.NewVersion(ts)
c.Assert(err, check.IsNil)
ch2 <- makeEvent(ver.Ver)
ch2 <- makeEvent(ts)
var event model.RegionFeedEvent
// consume the first resolved ts event, which is sent before region starts
<-eventCh
Expand All @@ -417,7 +415,7 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
case <-time.After(time.Second):
c.Fatalf("reconnection not succeed in 1 second")
}
checkEvent(event, GetSafeResolvedTs(ver.Ver))
checkEvent(event, GetSafeResolvedTs(ts))

// check gRPC connection active counter is updated correctly
bucket, ok := grpcPool.bucketConns[invalidStore]
Expand Down
283 changes: 0 additions & 283 deletions cdc/cdc/kv/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,291 +13,8 @@

package kv

import (
"bytes"
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/driver"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/pkg/regionspan"
"github.com/tikv/migration/cdc/pkg/security"
"github.com/tikv/migration/cdc/pkg/txnutil"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

var genValueID int

func genValue() []byte {
genValueID++
return []byte("value_" + strconv.Itoa(genValueID))
}

type eventChecker struct {
t require.TestingT
eventCh chan model.RegionFeedEvent
closeCh chan struct{}

vals []*model.RawKVEntry
checkpoints []*model.ResolvedSpan
}

func valInSlice(val *model.RawKVEntry, vals []*model.RawKVEntry) bool {
for _, v := range vals {
if val.CRTs == v.CRTs && bytes.Equal(val.Key, v.Key) {
return true
}
}
return false
}

func newEventChecker(t require.TestingT) *eventChecker {
ec := &eventChecker{
t: t,
eventCh: make(chan model.RegionFeedEvent),
closeCh: make(chan struct{}),
}

go func() {
for {
select {
case e := <-ec.eventCh:
log.Debug("get event", zap.Reflect("event", e))
if e.Val != nil {
// check if the value event break the checkpoint guarantee
for _, cp := range ec.checkpoints {
if !regionspan.KeyInSpan(e.Val.Key, cp.Span) ||
e.Val.CRTs > cp.ResolvedTs {
continue
}

if !valInSlice(e.Val, ec.vals) {
require.FailNowf(t, "unexpected value event", "value: %+v checkpoint: %+v", e.Val, cp)
}
}

ec.vals = append(ec.vals, e.Val)
} else {
ec.checkpoints = append(ec.checkpoints, e.Resolved)
}
case <-ec.closeCh:
return
}
}
}()

return ec
}

// stop the checker
func (ec *eventChecker) stop() {
close(ec.closeCh)
}

// CreateStorage creates a tikv Storage instance.
func CreateStorage(pdAddr string) (storage kv.Storage, err error) {
tiPath := fmt.Sprintf("tikv://%s?disableGC=true", pdAddr)
err = store.Register("tikv", driver.TiKVDriver{})
if err != nil && !strings.Contains(err.Error(), "already registered") {
return
}
storage, err = store.New(tiPath)
return
}

func mustGetTimestamp(t require.TestingT, storage tikv.Storage) uint64 {
ts, err := storage.GetOracle().GetTimestamp(context.Background(), nil)
require.NoError(t, err)

return ts
}

func mustGetValue(t require.TestingT, eventCh <-chan model.RegionFeedEvent, value []byte) {
timeout := time.After(time.Second * 20)

for {
select {
case e := <-eventCh:
if e.Val != nil && bytes.Equal(e.Val.Value, value) {
return
}
case <-timeout:
require.FailNowf(t, "timeout to get value", "value: %v", value)
}
}
}

type mockPullerInit struct{}

func (*mockPullerInit) IsInitialized() bool {
return true
}

// TestSplit try split on every region, and test can get value event from
// every region after split.
func TestSplit(t require.TestingT, pdCli pd.Client, storage tikv.Storage, kvStore kv.Storage) {
eventCh := make(chan model.RegionFeedEvent, 1<<20)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
regionCache := tikv.NewRegionCache(pdCli)

cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache)

startTS := mustGetTimestamp(t, storage)

lockresolver := txnutil.NewLockerResolver(storage)
isPullInit := &mockPullerInit{}
go func() {
err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, lockresolver, isPullInit, eventCh)
require.Equal(t, err, context.Canceled)
}()

mockTableID := int64(999)
preRegions, err := pdCli.ScanRegions(context.Background(), nil, nil, 10000)
require.NoError(t, err)

for i := 0; i < 2; i++ {
regions := preRegions
// In second loop try split every region.
if i == 1 {
splitStore, ok := storage.(kv.SplittableStore)
require.True(t, ok)
for _, r := range preRegions {
splitKey := r.Meta.GetStartKey()
if len(splitKey) == 0 {
splitKey = []byte{0}
} else {
splitKey = append(splitKey, 0)
}
splitKeys := [][]byte{splitKey}
_, err := splitStore.SplitRegions(context.Background(), splitKeys, false, &mockTableID)
require.NoError(t, err)
}

time.Sleep(time.Second * 3)

var afterRegions []*pd.Region
afterRegions, err = pdCli.ScanRegions(context.Background(), nil, nil, 10000)
require.NoError(t, err)
require.Greater(t, len(afterRegions), len(preRegions))

regions = afterRegions
}

// Put a key on every region and check we can get the event.
for _, r := range regions {
key := r.Meta.GetStartKey()
if len(key) == 0 {
key = []byte{0}
}
value := genValue()

var tx kv.Transaction
tx, err = kvStore.Begin()
require.NoError(t, err)
err = tx.Set(key, value)
require.NoError(t, err)
err = tx.Commit(ctx)
require.NoError(t, err)

mustGetValue(t, eventCh, value)
}
}
}

func mustSetKey(t require.TestingT, storage kv.Storage, key []byte, value []byte) {
tx, err := storage.Begin()
require.NoError(t, err)
err = tx.Set(key, value)
require.NoError(t, err)
err = tx.Commit(context.Background())
require.NoError(t, err)
}

func mustDeleteKey(t require.TestingT, storage kv.Storage, key []byte) {
tx, err := storage.Begin()
require.NoError(t, err)
err = tx.Delete(key)
require.NoError(t, err)
err = tx.Commit(context.Background())
require.NoError(t, err)
}

// TestGetKVSimple test simple KV operations
func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage tikv.Storage, kvStore kv.Storage) {
checker := newEventChecker(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
regionCache := tikv.NewRegionCache(pdCli)
cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache)

startTS := mustGetTimestamp(t, storage)
lockresolver := txnutil.NewLockerResolver(storage)
isPullInit := &mockPullerInit{}
go func() {
err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, lockresolver, isPullInit, checker.eventCh)
require.Equal(t, err, context.Canceled)
}()

key := []byte("s1")
value := []byte("s1v")

// set
mustSetKey(t, kvStore, key, value)

// delete
mustDeleteKey(t, kvStore, key)

// set again
mustSetKey(t, kvStore, key, value)

for i := 0; i < 2; i++ {
// start a new EventFeed with the startTS before the kv operations should also get the same events.
// This can test the initialize case.
if i == 1 {
checker = newEventChecker(t)
go func() {
err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, lockresolver, isPullInit, checker.eventCh)
require.Equal(t, err, context.Canceled)
}()
}

time.Sleep(5 * time.Second)
checker.stop()

// filter the unrelated keys event.
var vals []*model.RawKVEntry
for _, v := range checker.vals {
if bytes.Equal(v.Key, key) {
vals = append(vals, v)
}
}
checker.vals = vals

// check we can get the events.
require.Len(t, checker.vals, 3)
require.Equal(t, checker.vals[0].OpType, model.OpTypePut)
require.Equal(t, checker.vals[0].Key, key)
require.Equal(t, checker.vals[0].Value, value)

require.Equal(t, checker.vals[1].OpType, model.OpTypeDelete)
require.Equal(t, checker.vals[1].Key, key)

require.Equal(t, checker.vals[2].OpType, model.OpTypePut)
require.Equal(t, checker.vals[2].Key, key)
require.Equal(t, checker.vals[2].Value, value)
}
}
1 change: 0 additions & 1 deletion cdc/cmd/cdc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package main

import (
_ "github.com/pingcap/tidb/types/parser_driver"
"github.com/tikv/migration/cdc/pkg/cmd"
)

Expand Down
Loading

0 comments on commit e640161

Please sign in to comment.