Skip to content

Commit

Permalink
add it
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu committed Aug 24, 2023
1 parent d2969bc commit 23e0d96
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 6 deletions.
47 changes: 47 additions & 0 deletions cdc/tests/integration_tests/_utils/check_total_kvs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/bash
# parameter 1: work directory
# parameter 2: dst pd
# parameter 3: total kvs
# parameter 4: max check times

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
TLS_DIR=$(cd $CUR/../_certificates && pwd)

workdir=$1
DOWN_PD=$2
TOTAL_KVS=$3
if [ $# -ge 4 ]; then
check_time=$4
else
check_time=50
fi
PWD=$(pwd)

if ! command -v rawkv_data &>/dev/null; then
cd $CUR/../../..
make rawkv_data
cd $PWD
fi
set +e

cd $workdir
i=0
while [ $i -lt $check_time ]; do
rm -rf $workdir/rawkv_data/
rawkv_data totalkvs --dst-pd $DOWN_PD --count $TOTAL_KVS --ca-path=$TLS_DIR/ca.pem --cert-path=$TLS_DIR/client.pem --key-path=$TLS_DIR/client-key.pem
ret=$?
if [ "$ret" == 0 ]; then
echo "check total-kvs successfully"
break
fi
((i++))
echo "check total-kvs failed $i-th time, retry later"
sleep 3
done

if [ $i -ge $check_time ]; then
echo "check total-kvs failed at last"
exit 1
fi

cd $PWD
2 changes: 2 additions & 0 deletions cdc/tests/integration_tests/kv_filter/conf/changefeed.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[filter]
key-pattern = "indexInfo_:_pf01_:_APD0101_:_0{15}[3-9]"
49 changes: 49 additions & 0 deletions cdc/tests/integration_tests/kv_filter/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash

set -euo pipefail

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=tikv-cdc.test
SINK_TYPE=$1
UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1
DOWN_PD=http://$DOWN_PD_HOST:$DOWN_PD_PORT

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

start_ts=$(get_start_ts $UP_PD)
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

case $SINK_TYPE in
tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;;
*) SINK_URI="" ;;
esac

uuid="custom-changefeed-name"
tikv-cdc cli changefeed create \
--start-ts=$start_ts \
--sink-uri="$SINK_URI" \
--changefeed-id="$uuid" \
--config $CUR/conf/changefeed.toml

rawkv_op $UP_PD put 5000

# Filter configured in $CUR/conf/changefeed.toml will filter events with key >= 3000
# So wait for sync finished, pause changefeed, delete keys < 3000 for upstream, then check_sync_diff
sleep 1 && check_total_kvs $WORK_DIR $DOWN_PD 2000
run_cdc_cli changefeed --changefeed-id $uuid pause
rawkv_op $UP_PD delete 3000

check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ group=$1
# https://github.com/PingCAP-QE/ci/blob/main/pipelines/tikv/migration/latest/pull_integration_test.groovy
declare -A groups
groups=(
["G00"]='autorandom'
["G00"]='autorandom kv_filter'
["G01"]='capture_session_done_during_task cdc_hang_on'
["G02"]='changefeed_auto_stop changefeed_error changefeed_fast_fail'
["G03"]='changefeed_finish changefeed_pause_resume changefeed_reconstruct'
Expand Down
42 changes: 42 additions & 0 deletions cdc/tests/utils/rawkv_data/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,45 @@ func runChecksum(cmd *cobra.Command) error {
fmt.Printf("Upstream checksum %v are same with downstream %v\n", srcChecksum, dstChecksum)
return nil
}

func NewTotalKvsCommand() *cobra.Command {
return &cobra.Command{
Use: "totalkvs",
Short: "Verify that the total number of key-values of downstream is equal to --count argument",
SilenceUsage: true,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runTotalKvs(cmd)
},
}
}

func runTotalKvs(cmd *cobra.Command) error {
cfg := &Config{}
err := cfg.ParseFromFlags(cmd.Flags(), true)
if err != nil {
return err
}
ctx := context.Background()

dstCli, err := rawkv.NewClientWithOpts(ctx, cfg.DstPD,
rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2),
rawkv.WithSecurity(cfg.DstSec))
if err != nil {
return err
}
defer dstCli.Close()

dstChecksum, err := dstCli.Checksum(ctx, nil, nil)
if err != nil {
return err
}

if dstChecksum.TotalKvs != uint64(cfg.Count) {
msg := fmt.Sprintf("Downstream total kvs %v is not equal to expected %v", dstChecksum, cfg.Count)
log.Info(msg)
return fmt.Errorf(msg)
}
fmt.Printf("Downstream total kvs %v equals to expected %v", dstChecksum, cfg.Count)
return nil
}
10 changes: 5 additions & 5 deletions cdc/tests/utils/rawkv_data/gen_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func generateTestData(keyIndex int) (key, value0, value1 []byte) {
key = []byte(fmt.Sprintf("indexInfo_:_pf01_:_APD0101_:_%019d", keyIndex))
value0 = []byte{} // Don't assign nil, which means "NotFound" in CompareAndSwap
if keyIndex%100 != 42 { // To generate test data with empty value. See https://github.com/tikv/migration/issues/250
value0 = []byte(fmt.Sprintf("v0_%020d", keyIndex))
value0 = []byte(fmt.Sprintf("v0_%020d_%v", keyIndex, rand.Uint64()))
}
value1 = []byte{}
if keyIndex%100 != 43 {
value1 = []byte(fmt.Sprintf("v1_%020d%020d", keyIndex, keyIndex))
value1 = []byte(fmt.Sprintf("v1_%020d%020d_%v", keyIndex, keyIndex, rand.Uint64()))
}
return key, value0, value1
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func runPutCmd(cmd *cobra.Command) error {
endIdx := startIdx + count1 + count2
kvCntPerBatch := 512
for startIdx1 < endIdx {
batchCnt := min(kvCntPerBatch, endIdx-startIdx)
batchCnt := min(kvCntPerBatch, endIdx-startIdx1)
keys, values0, values1 := batchGenerateData(startIdx1, batchCnt)
err := cli.BatchPut(ctx, keys, values0)
if err != nil {
Expand All @@ -179,7 +179,7 @@ func runPutCmd(cmd *cobra.Command) error {
if err != nil {
return err
}
startIdx1 += kvCntPerBatch
startIdx1 += batchCnt
}
return nil
})
Expand All @@ -199,7 +199,7 @@ func runPutCmd(cmd *cobra.Command) error {
return err
}
if !ret && !bytes.Equal(preValue, value1) {
return errors.Errorf("CAS put data error: preValue: %v, ret: %v, value0: %v, value1: %v", preValue, ret, value0, value1)
return errors.Errorf("CAS put data error: preValue: %v, ret: %v, value0: %v, value1: %v", string(preValue), ret, string(value0), string(value1))
}
}
return nil
Expand Down
1 change: 1 addition & 0 deletions cdc/tests/utils/rawkv_data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func main() {
rootCmd.AddCommand(NewPutCommand())
rootCmd.AddCommand(NewDeleteCommand())
rootCmd.AddCommand(NewChecksumCommand())
rootCmd.AddCommand(NewTotalKvsCommand())

rootCmd.SetOut(os.Stdout)
rootCmd.SetArgs(os.Args[1:])
Expand Down

0 comments on commit 23e0d96

Please sign in to comment.