Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl(ticdc): ignore ddl with schemaversion 0 (#11856) #12034

Open
wants to merge 1 commit into
base: release-6.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,25 @@
var snap *schema.Snapshot
if len(s.snaps) > 0 {
lastSnap := s.snaps[len(s.snaps)-1]
<<<<<<< HEAD

Check failure on line 189 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expecting }

Check failure on line 189 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expecting }
// We use schemaVersion to check if an already-executed DDL job is processed for a second time.
// Unexecuted DDL jobs should have largest schemaVersions.
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion {
log.Info("ignore foregone DDL",
=======

Check failure on line 194 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expecting expression

Check failure on line 194 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ==, expecting expression
// already-executed DDL could filted by finishedTs.
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() {

Check failure on line 196 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected if in argument list; possibly missing comma or )

Check failure on line 196 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected if in argument list; possibly missing comma or )
log.Info("schemaStorage: ignore foregone DDL",
>>>>>>> 7f57e1f548 (ddl(ticdc): ignore ddl with schemaversion 0 (#11856))

Check failure on line 198 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected >>, expecting expression

Check failure on line 198 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

Check failure on line 198 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected >>, expecting expression

Check failure on line 198 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("DDL", job.Query),
zap.String("state", job.State.String()),
zap.Int64("jobID", job.ID),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Int64("schemaVersion", s.schemaVersion),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),
zap.String("role", s.role.String()))

Check failure on line 207 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ), expecting := or = or comma

Check failure on line 207 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ), expecting := or = or comma
return nil
}
snap = lastSnap.Copy()
Expand Down
18 changes: 18 additions & 0 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,24 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
return false, nil
}

<<<<<<< HEAD
=======
if job.BinlogInfo.FinishedTS <= p.getResolvedTs() ||
job.BinlogInfo.SchemaVersion == 0 /* means the ddl is ignored in upstream */ {
log.Info("ddl job finishedTs less than puller resolvedTs,"+
"discard the ddl job",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.String("query", job.Query),
zap.Uint64("pullerResolvedTs", p.getResolvedTs()))
return true, nil
}

>>>>>>> 7f57e1f548 (ddl(ticdc): ignore ddl with schemaversion 0 (#11856))
defer func() {
if skip && err == nil {
log.Info("ddl job schema or table does not match, discard it",
Expand Down
28 changes: 28 additions & 0 deletions tests/integration_tests/ddl_with_exists/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/ddl_with_exists/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["ddl_with_exists.*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
57 changes: 57 additions & 0 deletions tests/integration_tests/ddl_with_exists/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

# this test contains `recover table`, which requires super privilege, so we
# can't use the normal user
TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://root@127.0.0.1:3306/" ;;
esac
changefeed_id="ddl-with-exists"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id}

case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE DATABASE ddl_with_exists"

cd $CUR
GO111MODULE=on go run test.go

run_sql "CREATE TABLE ddl_with_exists.finish_mark (a int primary key);"
check_table_exists ddl_with_exists.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
# make sure all tables are equal in upstream and downstream
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180
cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
# run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
102 changes: 102 additions & 0 deletions tests/integration_tests/ddl_with_exists/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"database/sql"
"fmt"
"log"
"math/rand"
"os"
"sync"
"time"

_ "github.com/go-sql-driver/mysql"
)

func main() {
upHost := GetEnvDefault("UP_TIDB_HOST", "127.0.0.1")
upPort := GetEnvDefault("UP_TIDB_PORT", "4000")
dsn := fmt.Sprintf("root@tcp(%s:%s)/", upHost, upPort)
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatal("open db failed:", dsn, ", err: ", err)
}
defer db.Close()

if err = db.Ping(); err != nil {
log.Fatal("ping db failed:", dsn, ", err: ", err)
}
log.Println("connect to tidb success, dsn: ", dsn)

createTable := `create table if not exists ddl_with_exists.t%d (
id int primary key auto_increment,
name varchar(255)
);`
addColumn := "alter table ddl_with_exists.t%d add column if not exists age int;"
dropColumn := "alter table ddl_with_exists.t%d drop column if exists age;"
addIndex := "alter table ddl_with_exists.t%d add index if not exists idx1(id);"
dropIndex := "alter table ddl_with_exists.t%d drop index if exists idx1;"

concurrency := 16
maxTableCnt := 20
db.SetMaxOpenConns(concurrency)

start := time.Now()
for i := 0; i < maxTableCnt; i++ {
_, err := db.Exec(fmt.Sprintf(createTable, i))
if err != nil {
log.Fatal("create table failed:", i, ", err: ", err)
}
}
log.Println("create table cost:", time.Since(start).Seconds(), "s")

var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
log.Println("worker start:", i)
for j := 0; j < 20; j++ {
idx := rand.Intn(maxTableCnt)
ddl := fmt.Sprintf(createTable, idx)
switch rand.Intn(5) {
case 0:
ddl = fmt.Sprintf(addColumn, idx)
case 1:
ddl = fmt.Sprintf(dropColumn, idx)
case 2:
ddl = fmt.Sprintf(addIndex, idx)
case 3:
ddl = fmt.Sprintf(dropIndex, idx)
default:
}
_, err := db.Exec(ddl)
if err != nil {
log.Println(err)
}
}
log.Println("worker exit:", i)
}()
}
wg.Wait()
}

func GetEnvDefault(key, defaultV string) string {
val, ok := os.LookupEnv(key)
if !ok {
return defaultV
}
return val
}
39 changes: 39 additions & 0 deletions tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_t
declare -A groups
groups=(
# Note: only the tests in the first three groups are running in storage sink pipeline.
<<<<<<< HEAD
["G00"]="$mysql_only $kafka_only $storage_only"
["G01"]="$mysql_only_http $kafka_only_protocol $storage_only_canal_json multi_tables_ddl"
["G02"]="$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv"
Expand All @@ -50,6 +51,44 @@ groups=(
# currently G16 is not running in kafka pipeline
["G16"]='owner_resign processor_etcd_worker_delay sink_hang'
["G17"]='clustered_index processor_resolved_ts_fallback'
=======
# G00
"$mysql_only $kafka_only $storage_only"
# G01
"$mysql_only_http $kafka_only_protocol $storage_only_canal_json multi_tables_ddl"
# G02
"$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv"
# G03
'row_format drop_many_tables processor_stop_delay partition_table ddl_with_exists'
# G04
'foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop'
# G05
'charset_gbk ddl_manager multi_source vector'
# G06
'sink_retry changefeed_error ddl_sequence resourcecontrol'
# G07 pulsar oauth2 authentication enabled
'kv_client_stream_reconnect cdc split_region'
# G08
'processor_err_chan changefeed_reconstruct multi_capture synced_status_with_redo'
# G09
'gc_safepoint changefeed_pause_resume cli_with_auth savepoint synced_status'
# G10
'default_value simple cdc_server_tips event_filter sql_mode'
# G11
'resolve_lock move_table autorandom generate_column'
# G12
'many_pk_or_uk capture_session_done_during_task ddl_attributes'
# G13 pulsar mtls authentication enabled
'tiflash region_merge common_1'
# G14
'changefeed_finish force_replicate_table'
# G15
'new_ci_collation batch_add_table multi_rocks ci_collation_compatibility'
# G16, currently G16 is not running in kafka pipeline
'owner_resign processor_etcd_worker_delay sink_hang'
# G17
'clustered_index processor_resolved_ts_fallback'
>>>>>>> 7f57e1f548 (ddl(ticdc): ignore ddl with schemaversion 0 (#11856))
# only run the following tests in mysql pipeline
["G18"]='availability http_proxies sequence'
["G19"]='changefeed_fast_fail batch_update_to_no_batch changefeed_resume_with_checkpoint_ts'
Expand Down
Loading