diff --git a/cdc/cdc/sink/mq.go b/cdc/cdc/sink/mq.go index 289d241d..a305a830 100644 --- a/cdc/cdc/sink/mq.go +++ b/cdc/cdc/sink/mq.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/migration/cdc/cdc/model" "github.com/tikv/migration/cdc/cdc/sink/codec" @@ -243,6 +244,11 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { return 0, nil } + failpoint.Inject("SinkFlushEventPanic", func() { + time.Sleep(time.Second) + log.Fatal("kafka sink injected error") + }) + for _, msg := range messages { err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition) if err != nil { diff --git a/cdc/cmd/kafka-consumer/main.go b/cdc/cmd/kafka-consumer/main.go index a3c09582..b4fb875a 100644 --- a/cdc/cmd/kafka-consumer/main.go +++ b/cdc/cmd/kafka-consumer/main.go @@ -54,6 +54,7 @@ var ( kafkaMaxBatchSize = math.MaxInt64 downstreamURIStr string + waitTopicDur time.Duration logPath string logLevel string @@ -72,6 +73,7 @@ func init() { flag.StringVar(&ca, "ca", "", "CA certificate path for Kafka SSL connection") flag.StringVar(&cert, "cert", "", "Certificate path for Kafka SSL connection") flag.StringVar(&key, "key", "", "Private key path for Kafka SSL connection") + flag.DurationVar(&waitTopicDur, "wait-topic", time.Minute, "Duration waiting for topic created") flag.Parse() err := logutil.InitLogger(&logutil.Config{ @@ -172,7 +174,8 @@ func waitTopicCreated(address []string, topic string, cfg *sarama.Config) error return errors.Trace(err) } defer admin.Close() - for i := 0; i <= 30; i++ { + start := time.Now() + for time.Since(start) < waitTopicDur { topics, err := admin.ListTopics() if err != nil { return errors.Trace(err) diff --git a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml index fc082d99..a3b1d087 100644 --- a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml +++ b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml @@ -10,6 +10,7 @@ services: image: wurstmeister/kafka:2.12-2.4.1 ports: - "9092:9092" + - "9093:9093" environment: KAFKA_MESSAGE_MAX_BYTES: 11534336 KAFKA_REPLICA_FETCH_MAX_BYTES: 11534336 diff --git a/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl b/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl index 70e5c26b..e2f5f201 100755 --- a/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl +++ b/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl @@ -4,7 +4,7 @@ # --tidb-config: path to tidb config file # --multiple-upstream-pd: whether to deploy multiple pd severs in upstream -set -euxo pipefail +set -euo pipefail OUT_DIR= tidb_config= diff --git a/cdc/tests/integration_tests/_utils/test_prepare b/cdc/tests/integration_tests/_utils/test_prepare index 4541b20d..81e4455c 100644 --- a/cdc/tests/integration_tests/_utils/test_prepare +++ b/cdc/tests/integration_tests/_utils/test_prepare @@ -62,3 +62,7 @@ function get_kafka_sink_uri() { TOPIC_NAME="tikvcdc-$TEST_NAME-test-$RANDOM" echo "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=$KAFKA_VERSION&max-message-bytes=10485760" } + +function stop_kafka_consumer() { + cleanup_process cdc_kafka_consumer +} diff --git a/cdc/tests/integration_tests/autorandom/run.sh b/cdc/tests/integration_tests/autorandom/run.sh index a4c39c3e..421f15f3 100644 --- a/cdc/tests/integration_tests/autorandom/run.sh +++ b/cdc/tests/integration_tests/autorandom/run.sh @@ -35,6 +35,9 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/availability/run.sh b/cdc/tests/integration_tests/availability/run.sh index 052295bb..e56cc1e7 100644 --- a/cdc/tests/integration_tests/availability/run.sh +++ b/cdc/tests/integration_tests/availability/run.sh @@ -38,10 +38,19 @@ function prepare() { fi } +function cleanup() { + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi +} + trap stop_tidb_cluster EXIT prepare $* + test_owner_ha $* test_capture_ha $* test_processor_ha $* + +cleanup check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/cdc/tests/integration_tests/capture_session_done_during_task/run.sh b/cdc/tests/integration_tests/capture_session_done_during_task/run.sh index 02a65672..608b77b5 100644 --- a/cdc/tests/integration_tests/capture_session_done_during_task/run.sh +++ b/cdc/tests/integration_tests/capture_session_done_during_task/run.sh @@ -49,6 +49,9 @@ function run() { export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/changefeed_auto_stop/run.sh b/cdc/tests/integration_tests/changefeed_auto_stop/run.sh index 4333201c..ea28df95 100755 --- a/cdc/tests/integration_tests/changefeed_auto_stop/run.sh +++ b/cdc/tests/integration_tests/changefeed_auto_stop/run.sh @@ -65,6 +65,9 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/changefeed_error/run.sh b/cdc/tests/integration_tests/changefeed_error/run.sh index b9851d5f..67d1f98d 100755 --- a/cdc/tests/integration_tests/changefeed_error/run.sh +++ b/cdc/tests/integration_tests/changefeed_error/run.sh @@ -154,6 +154,9 @@ function run() { run_cdc_cli changefeed remove -c $changefeedid_1 export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/changefeed_fast_fail/run.sh b/cdc/tests/integration_tests/changefeed_fast_fail/run.sh index e4010d96..41272783 100644 --- a/cdc/tests/integration_tests/changefeed_fast_fail/run.sh +++ b/cdc/tests/integration_tests/changefeed_fast_fail/run.sh @@ -66,6 +66,9 @@ function run() { export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/changefeed_finish/run.sh b/cdc/tests/integration_tests/changefeed_finish/run.sh index b44d636e..d4a87224 100755 --- a/cdc/tests/integration_tests/changefeed_finish/run.sh +++ b/cdc/tests/integration_tests/changefeed_finish/run.sh @@ -60,6 +60,9 @@ function run() { ensure $MAX_RETRIES check_changefeed_is_finished $UP_PD $changefeed_id cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/changefeed_pause_resume/run.sh b/cdc/tests/integration_tests/changefeed_pause_resume/run.sh index 4d5329dc..51480047 100755 --- a/cdc/tests/integration_tests/changefeed_pause_resume/run.sh +++ b/cdc/tests/integration_tests/changefeed_pause_resume/run.sh @@ -41,6 +41,9 @@ function run() { done cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/changefeed_reconstruct/run.sh b/cdc/tests/integration_tests/changefeed_reconstruct/run.sh index 807e4c37..0e12aabc 100755 --- a/cdc/tests/integration_tests/changefeed_reconstruct/run.sh +++ b/cdc/tests/integration_tests/changefeed_reconstruct/run.sh @@ -57,6 +57,9 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/cli/run.sh b/cdc/tests/integration_tests/cli/run.sh index 79969f87..ae019cb0 100644 --- a/cdc/tests/integration_tests/cli/run.sh +++ b/cdc/tests/integration_tests/cli/run.sh @@ -146,6 +146,9 @@ EOF curl http://127.0.0.1:8600/status cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/disk_full/run.sh b/cdc/tests/integration_tests/disk_full/run.sh index b5bc61a9..f09c558c 100644 --- a/cdc/tests/integration_tests/disk_full/run.sh +++ b/cdc/tests/integration_tests/disk_full/run.sh @@ -66,6 +66,9 @@ EOF check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/flow_control/run.sh b/cdc/tests/integration_tests/flow_control/run.sh index a46eb041..d0f0fe6e 100644 --- a/cdc/tests/integration_tests/flow_control/run.sh +++ b/cdc/tests/integration_tests/flow_control/run.sh @@ -68,6 +68,9 @@ EOF export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/gc_safepoint/run.sh b/cdc/tests/integration_tests/gc_safepoint/run.sh index 596988c5..ba9ad876 100755 --- a/cdc/tests/integration_tests/gc_safepoint/run.sh +++ b/cdc/tests/integration_tests/gc_safepoint/run.sh @@ -137,6 +137,9 @@ function run() { export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/http_api/run.sh b/cdc/tests/integration_tests/http_api/run.sh index 112b07cc..d670d3e0 100644 --- a/cdc/tests/integration_tests/http_api/run.sh +++ b/cdc/tests/integration_tests/http_api/run.sh @@ -98,6 +98,9 @@ function run() { done cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/kill_owner/run.sh b/cdc/tests/integration_tests/kill_owner/run.sh index 1955cdd6..1a7bc350 100755 --- a/cdc/tests/integration_tests/kill_owner/run.sh +++ b/cdc/tests/integration_tests/kill_owner/run.sh @@ -65,6 +65,9 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/kv_client_stream_reconnect/run.sh b/cdc/tests/integration_tests/kv_client_stream_reconnect/run.sh index 63b9901a..ff81a42d 100644 --- a/cdc/tests/integration_tests/kv_client_stream_reconnect/run.sh +++ b/cdc/tests/integration_tests/kv_client_stream_reconnect/run.sh @@ -40,6 +40,9 @@ function run() { export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/kv_filter/run.sh b/cdc/tests/integration_tests/kv_filter/run.sh index 9176e6ed..ecd8f8e7 100644 --- a/cdc/tests/integration_tests/kv_filter/run.sh +++ b/cdc/tests/integration_tests/kv_filter/run.sh @@ -45,6 +45,9 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/multi_capture/run.sh b/cdc/tests/integration_tests/multi_capture/run.sh index eee14c54..443c4ea3 100755 --- a/cdc/tests/integration_tests/multi_capture/run.sh +++ b/cdc/tests/integration_tests/multi_capture/run.sh @@ -51,6 +51,9 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/processor_err_chan/run.sh b/cdc/tests/integration_tests/processor_err_chan/run.sh index 21ed0b6f..7d6a0407 100644 --- a/cdc/tests/integration_tests/processor_err_chan/run.sh +++ b/cdc/tests/integration_tests/processor_err_chan/run.sh @@ -62,6 +62,9 @@ function run() { export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/processor_panic/run.sh b/cdc/tests/integration_tests/processor_panic/run.sh index 0f84bf53..ca06c577 100644 --- a/cdc/tests/integration_tests/processor_panic/run.sh +++ b/cdc/tests/integration_tests/processor_panic/run.sh @@ -40,6 +40,9 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/processor_resolved_ts_fallback/run.sh b/cdc/tests/integration_tests/processor_resolved_ts_fallback/run.sh index fdbc817c..7713ddd7 100755 --- a/cdc/tests/integration_tests/processor_resolved_ts_fallback/run.sh +++ b/cdc/tests/integration_tests/processor_resolved_ts_fallback/run.sh @@ -45,9 +45,12 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT run $* -check_logs_contains $WORK_DIR "tikv sink injected error" 1 +check_logs_contains $WORK_DIR "$SINK_TYPE sink injected error" 1 echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/cdc/tests/integration_tests/processor_stop_delay/run.sh b/cdc/tests/integration_tests/processor_stop_delay/run.sh index 85a17cff..d70683bf 100644 --- a/cdc/tests/integration_tests/processor_stop_delay/run.sh +++ b/cdc/tests/integration_tests/processor_stop_delay/run.sh @@ -45,6 +45,9 @@ function run() { export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/run_group.sh b/cdc/tests/integration_tests/run_group.sh index 1a28d552..9d0996fa 100755 --- a/cdc/tests/integration_tests/run_group.sh +++ b/cdc/tests/integration_tests/run_group.sh @@ -4,6 +4,7 @@ set -eo pipefail CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +sink_type=tikv group=$1 # Define groups @@ -51,5 +52,5 @@ fi # Run test cases if [[ -n $test_names ]]; then echo "Run cases: ${test_names}" - "${CUR}"/run.sh "${test_names}" + "${CUR}"/run.sh "${sink_type}" "${test_names}" fi diff --git a/cdc/tests/integration_tests/sigstop/run.sh b/cdc/tests/integration_tests/sigstop/run.sh index ee9e237c..6bddcee0 100644 --- a/cdc/tests/integration_tests/sigstop/run.sh +++ b/cdc/tests/integration_tests/sigstop/run.sh @@ -61,6 +61,9 @@ function run_kill_upstream() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } function run_kill_downstream() { @@ -115,6 +118,9 @@ function run_kill_downstream() { check_sync_diff $WORK_DIR $DOWN_PD $UP_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/sink_hang/run.sh b/cdc/tests/integration_tests/sink_hang/run.sh index 24068b89..a281be22 100644 --- a/cdc/tests/integration_tests/sink_hang/run.sh +++ b/cdc/tests/integration_tests/sink_hang/run.sh @@ -56,6 +56,9 @@ function run() { export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/sorter/run.sh b/cdc/tests/integration_tests/sorter/run.sh index 592187e1..a9ab40cb 100755 --- a/cdc/tests/integration_tests/sorter/run.sh +++ b/cdc/tests/integration_tests/sorter/run.sh @@ -40,6 +40,9 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi run_cdc_cli unsafe reset --no-confirm echo "test memory sorter" @@ -63,7 +66,11 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD rawkv_op $UP_PD delete 5000 check_sync_diff $WORK_DIR $UP_PD $DOWN_PD + cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/stop_downstream/run.sh b/cdc/tests/integration_tests/stop_downstream/run.sh index 6670eeea..86247232 100644 --- a/cdc/tests/integration_tests/stop_downstream/run.sh +++ b/cdc/tests/integration_tests/stop_downstream/run.sh @@ -59,6 +59,9 @@ function run() { check_sync_diff $WORK_DIR $UP_PD $DOWN_PD cleanup_process $CDC_BINARY + if [ "$SINK_TYPE" == "kafka" ]; then + stop_kafka_consumer + fi } trap stop_tidb_cluster EXIT