Skip to content

Commit

Permalink
fix failpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu committed Jan 3, 2024
1 parent 32ce78f commit 255adf1
Show file tree
Hide file tree
Showing 32 changed files with 110 additions and 4 deletions.
6 changes: 6 additions & 0 deletions cdc/cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/cdc/sink/codec"
Expand Down Expand Up @@ -243,6 +244,11 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
return 0, nil
}

failpoint.Inject("SinkFlushEventPanic", func() {
time.Sleep(time.Second)
log.Fatal("kafka sink injected error")
})

for _, msg := range messages {
err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion cdc/cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
kafkaMaxBatchSize = math.MaxInt64

downstreamURIStr string
waitTopicDur time.Duration

logPath string
logLevel string
Expand All @@ -72,6 +73,7 @@ func init() {
flag.StringVar(&ca, "ca", "", "CA certificate path for Kafka SSL connection")
flag.StringVar(&cert, "cert", "", "Certificate path for Kafka SSL connection")
flag.StringVar(&key, "key", "", "Private key path for Kafka SSL connection")
flag.DurationVar(&waitTopicDur, "wait-topic", time.Minute, "Duration waiting for topic created")
flag.Parse()

err := logutil.InitLogger(&logutil.Config{
Expand Down Expand Up @@ -172,7 +174,8 @@ func waitTopicCreated(address []string, topic string, cfg *sarama.Config) error
return errors.Trace(err)
}
defer admin.Close()
for i := 0; i <= 30; i++ {
start := time.Now()
for time.Since(start) < waitTopicDur {
topics, err := admin.ListTopics()
if err != nil {
return errors.Trace(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
image: wurstmeister/kafka:2.12-2.4.1
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_MESSAGE_MAX_BYTES: 11534336
KAFKA_REPLICA_FETCH_MAX_BYTES: 11534336
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/_utils/start_tidb_cluster_impl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# --tidb-config: path to tidb config file
# --multiple-upstream-pd: whether to deploy multiple pd severs in upstream

set -euxo pipefail
set -euo pipefail

OUT_DIR=
tidb_config=
Expand Down
4 changes: 4 additions & 0 deletions cdc/tests/integration_tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ function get_kafka_sink_uri() {
TOPIC_NAME="tikvcdc-$TEST_NAME-test-$RANDOM"
echo "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=$KAFKA_VERSION&max-message-bytes=10485760"
}

function stop_kafka_consumer() {
cleanup_process cdc_kafka_consumer
}
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/autorandom/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
9 changes: 9 additions & 0 deletions cdc/tests/integration_tests/availability/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,19 @@ function prepare() {
fi
}

function cleanup() {
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
prepare $*

test_owner_ha $*
test_capture_ha $*
test_processor_ha $*

cleanup
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ function run() {

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/changefeed_auto_stop/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ function run() {
run_cdc_cli changefeed remove -c $changefeedid_1
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/changefeed_fast_fail/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ function run() {

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/changefeed_finish/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ function run() {
ensure $MAX_RETRIES check_changefeed_is_finished $UP_PD $changefeed_id

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/changefeed_pause_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ function run() {
done

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/changefeed_reconstruct/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ EOF
curl http://127.0.0.1:8600/status

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/disk_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ EOF
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/flow_control/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ EOF

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ function run() {

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/http_api/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ function run() {
done

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/kill_owner/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/kv_client_stream_reconnect/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ function run() {

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/kv_filter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/multi_capture/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/processor_err_chan/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ function run() {

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/processor_panic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
run $*
check_logs_contains $WORK_DIR "tikv sink injected error" 1
check_logs_contains $WORK_DIR "$SINK_TYPE sink injected error" 1
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/processor_stop_delay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ function run() {

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 2 additions & 1 deletion cdc/tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ set -eo pipefail

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)

sink_type=tikv
group=$1

# Define groups
Expand Down Expand Up @@ -51,5 +52,5 @@ fi
# Run test cases
if [[ -n $test_names ]]; then
echo "Run cases: ${test_names}"
"${CUR}"/run.sh "${test_names}"
"${CUR}"/run.sh "${sink_type}" "${test_names}"
fi
6 changes: 6 additions & 0 deletions cdc/tests/integration_tests/sigstop/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ function run_kill_upstream() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

function run_kill_downstream() {
Expand Down Expand Up @@ -115,6 +118,9 @@ function run_kill_downstream() {
check_sync_diff $WORK_DIR $DOWN_PD $UP_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/sink_hang/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ function run() {

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
7 changes: 7 additions & 0 deletions cdc/tests/integration_tests/sorter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
run_cdc_cli unsafe reset --no-confirm

echo "test memory sorter"
Expand All @@ -63,7 +66,11 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD
rawkv_op $UP_PD delete 5000
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down
3 changes: 3 additions & 0 deletions cdc/tests/integration_tests/stop_downstream/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ function run() {
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
if [ "$SINK_TYPE" == "kafka" ]; then
stop_kafka_consumer
fi
}

trap stop_tidb_cluster EXIT
Expand Down

0 comments on commit 255adf1

Please sign in to comment.