Skip to content

Commit

Permalink
split-table: Support split-table in mysql sink and ddl test for split…
Browse files Browse the repository at this point in the history
… table (#968)
  • Loading branch information
hongyunyan authored Feb 13, 2025
1 parent 02f02ae commit 78a3f9b
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 14 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -509,13 +509,18 @@ jobs:
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=overwrite_resume_with_syncpoint
- name: Test ddl_for_split_tables
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=ddl_for_split_tables
- name: Test multi_source
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=multi_source
# The 20th case in this group
# The 21th case in this group
- name: Test multi_tables_ddl
if: ${{ success() }}
run: |
Expand Down
11 changes: 3 additions & 8 deletions downstreamadapter/sink/mysql_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ import (
"golang.org/x/sync/errgroup"
)

const (
prime = 31
)

// MysqlSink is responsible for writing data to mysql downstream.
// Including DDL and DML.
type MysqlSink struct {
Expand Down Expand Up @@ -133,10 +129,9 @@ func (s *MysqlSink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore)
}

func (s *MysqlSink) AddDMLEvent(event *commonEvent.DMLEvent) {
// Considering that the parity of tableID is not necessarily even,
// directly dividing by the number of buckets may cause unevenness between buckets.
// Therefore, we first take the modulus of the prime number and then take the modulus of the bucket.
index := int64(event.PhysicalTableID) % prime % int64(s.workerCount)
// We use low value of dispatcherID to divide different tables into different workers.
// And ensure the same table always goes to the same worker.
index := event.GetDispatcherID().GetLow() % uint64(s.workerCount)
s.dmlWorker[index].AddDMLEvent(event)
}

Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/worker/kafka_ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (w *KafkaDDLWorker) WriteBlockEvent(ctx context.Context, event *event.DDLEv
return errors.Trace(err)
}
}
log.Info("kafka ddl worker send block event", zap.Any("event", event))
// after flush all the ddl event, we call the callback function.
event.PostFlush()
return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (d *DispatcherID) GetSize() int {
return 16
}

func (d DispatcherID) GetLow() uint64 {
return d.Low
}

func (d *DispatcherID) Unmarshal(b []byte) error {
gid := GID{}
gid.Unmarshal(b)
Expand Down
4 changes: 0 additions & 4 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,6 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin
return err
}
}
// TODO: Remove the hack once span replication is compatible with all sinks.
if !isSinkCompatibleWithSpanReplication(sinkURI) {
c.Scheduler.EnableTableAcrossNodes = false
}

if c.Integrity != nil {
switch strings.ToLower(sinkURI.Scheme) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[scheduler]
enable-table-across-nodes = true
region-threshold = 2
28 changes: 28 additions & 0 deletions tests/integration_tests/ddl_for_split_tables/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/ddl_for_split_tables/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["split_region.*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
19 changes: 19 additions & 0 deletions tests/integration_tests/ddl_for_split_tables/data/ddls.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use split_region;

alter table test1 add column c1 int;

alter table test2 drop column val;

alter table test1 add index idx_test (id);

alter table test1 modify column c1 bigint;

alter table test1 drop index idx_test;

truncate table test2;

rename table test1 to test3;

drop table test3;

recover table test3;
23 changes: 23 additions & 0 deletions tests/integration_tests/ddl_for_split_tables/data/dmls.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use split_region;

insert into test3(id) values (11110),(11111),(12111),(12221),(14111),(14221),(16111),(16221),(18111),(18221);
insert into test3(id) values (21110),(21111),(22111),(22221),(24111),(24221),(26111),(26221),(28111),(28221);
insert into test3(id) values (31110),(31111),(32111),(32221),(34111),(34221),(36111),(36221),(38111),(38221);
insert into test3(id) values (41110),(41111),(42111),(42221),(44111),(44221),(46111),(46221),(48111),(48221);
insert into test3(id) values (51110),(51111),(52111),(52221),(54111),(54221),(56111),(56221),(58111),(58221);
insert into test3(id) values (61110),(61111),(62111),(62221),(64111),(64221),(66111),(66221),(68111),(68221);
insert into test3(id) values (71110),(71111),(72111),(72221),(74111),(74221),(76111),(76221),(78111),(78221);
insert into test3(id) values (81110),(81111),(82111),(82221),(84111),(84221),(86111),(86221),(88111),(88221);
insert into test3(id) values (91110),(91111),(92111),(92221),(94111),(94221),(96111),(96221),(98111),(98221);
insert into test3(id) values (111122),(111101),(102111),(102221),(104111),(104221),(106111),(106221),(108111),(108221);

insert into test2(id) values (11110),(11111),(12111),(12221),(14111),(14221),(16111),(16221),(18111),(18221);
insert into test2(id) values (21110),(21111),(22111),(22221),(24111),(24221),(26111),(26221),(28111),(28221);
insert into test2(id) values (31110),(31111),(32111),(32221),(34111),(34221),(36111),(36221),(38111),(38221);
insert into test2(id) values (41110),(41111),(42111),(42221),(44111),(44221),(46111),(46221),(48111),(48221);
insert into test2(id) values (51110),(51111),(52111),(52221),(54111),(54221),(56111),(56221),(58111),(58221);
insert into test2(id) values (61110),(61111),(62111),(62221),(64111),(64221),(66111),(66221),(68111),(68221);
insert into test2(id) values (71110),(71111),(72111),(72221),(74111),(74221),(76111),(76221),(78111),(78221);
insert into test2(id) values (81110),(81111),(82111),(82221),(84111),(84221),(86111),(86221),(88111),(88221);
insert into test2(id) values (91110),(91111),(92111),(92221),(94111),(94221),(96111),(96221),(98111),(98221);
insert into test2(id) values (111122),(111101),(102111),(102221),(104111),(104221),(106111),(106221),(108111),(108221);
36 changes: 36 additions & 0 deletions tests/integration_tests/ddl_for_split_tables/data/pre.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
drop database if exists `split_region`;
create database `split_region`;
use `split_region`;
create table test1 (id int primary key, val int);
create table test2 (id int primary key, val int);
insert into test1(id) values (16),(32),(64),(128),(256),(512),(1024),(2048),(4096),(8192),(16384);
insert into test1(id) values (-2147483648),(-1),(0),(2147483647);

split table split_region.test1 between (1) and (100000) regions 50;
split table split_region.test2 between (1) and (100000) regions 50;

insert into test1(id) values (-100),(-99),(-98),(-97),(-96),(-95),(-94),(-93),(-92),(-91);
insert into test1(id) values (1),(2),(2000),(2001),(4000),(4001),(6000),(6001),(8000),(8001);
insert into test1(id) values (10000),(10001),(12000),(12001),(14000),(14001),(16000),(16001),(18000),(18001);
insert into test1(id) values (20000),(20001),(22000),(22001),(24000),(24001),(26000),(26001),(28000),(28001);
insert into test1(id) values (30000),(30001),(32000),(32001),(34000),(34001),(36000),(36001),(38000),(38001);
insert into test1(id) values (40000),(40001),(42000),(42001),(44000),(44001),(46000),(46001),(48000),(48001);
insert into test1(id) values (50000),(50001),(52000),(52001),(54000),(54001),(56000),(56001),(58000),(58001);
insert into test1(id) values (60000),(60001),(62000),(62001),(64000),(64001),(66000),(66001),(68000),(68001);
insert into test1(id) values (70000),(70001),(72000),(72001),(74000),(74001),(76000),(76001),(78000),(78001);
insert into test1(id) values (80000),(80001),(82000),(82001),(84000),(84001),(86000),(86001),(88000),(88001);
insert into test1(id) values (90000),(90001),(92000),(92001),(94000),(94001),(96000),(96001),(98000),(98001);
insert into test1(id) values (100000),(100001),(102000),(102001),(104000),(104001),(106000),(106001),(108000),(108001);

insert into test2(id) values (-100),(-99),(-98),(-97),(-96),(-95),(-94),(-93),(-92),(-91);
insert into test2(id) values (1),(2),(2000),(2001),(4000),(4001),(6000),(6001),(8000),(8001);
insert into test2(id) values (10000),(10001),(12000),(12001),(14000),(14001),(16000),(16001),(18000),(18001);
insert into test2(id) values (20000),(20001),(22000),(22001),(24000),(24001),(26000),(26001),(28000),(28001);
insert into test2(id) values (30000),(30001),(32000),(32001),(34000),(34001),(36000),(36001),(38000),(38001);
insert into test2(id) values (40000),(40001),(42000),(42001),(44000),(44001),(46000),(46001),(48000),(48001);
insert into test2(id) values (50000),(50001),(52000),(52001),(54000),(54001),(56000),(56001),(58000),(58001);
insert into test2(id) values (60000),(60001),(62000),(62001),(64000),(64001),(66000),(66001),(68000),(68001);
insert into test2(id) values (70000),(70001),(72000),(72001),(74000),(74001),(76000),(76001),(78000),(78001);
insert into test2(id) values (80000),(80001),(82000),(82001),(84000),(84001),(86000),(86001),(88000),(88001);
insert into test2(id) values (90000),(90001),(92000),(92001),(94000),(94001),(96000),(96001),(98000),(98001);
insert into test2(id) values (100000),(100001),(102000),(102001),(104000),(104001),(106000),(106001),(108000),(108001);
69 changes: 69 additions & 0 deletions tests/integration_tests/ddl_for_split_tables/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# this script is test all the basic ddls when the table is split into
# multiple dispatchers in multi cdc server
# TODO:This script need to add kafka-class sink

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
check_time=60

function prepare() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301"

TOPIC_NAME="ticdc-ddl_split_table-$RANDOM"

# to make the table multi regions, to help create multiple dispatchers for the table
run_sql_file $CUR/data/pre.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/pre.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 500_

SINK_URI="mysql://root:@127.0.0.1:3306/"
#SINK_URI="kafka://127.0.0.1:9094/$TOPIC_NAME?protocol=open-protocol&partition-num=1&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760"
# case $SINK_TYPE in
# kafka) SINK_URI="kafka://127.0.0.1:9094/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
# storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
# pulsar)
# run_pulsar_cluster $WORK_DIR normal
# SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
# ;;
# *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
# esac

sleep 10

run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c "test" --config="$CUR/conf/changefeed.toml"

#run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9094/$TOPIC_NAME?protocol=open-protocol&partition-num=1&version=${KAFKA_VERSION}&max-message-bytes=10485760"

# case $SINK_TYPE in
# kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
# storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
# pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
# esac
}

trap stop_tidb_cluster EXIT

prepare $*
## execute ddls
run_sql_file $CUR/data/ddls.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# ## insert some datas
run_sql_file $CUR/data/dmls.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 30
cleanup_process $CDC_BINARY

check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ function check_ts_forward() {
changefeedid=$1
rts1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | grep -v "Command to ticdc" | jq '.resolved_ts')
checkpoint1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | grep -v "Command to ticdc" | jq '.checkpoint_tso')
sleep 1
sleep 5
rts2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | grep -v "Command to ticdc" | jq '.resolved_ts')
checkpoint2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | grep -v "Command to ticdc" | jq '.checkpoint_tso')
if [[ "$rts1" != "null" ]] && [[ "$rts1" != "0" ]]; then
Expand Down

0 comments on commit 78a3f9b

Please sign in to comment.