Skip to content

Commit a1c3ea0

Browse files
july2993ericsyh
authored andcommitted
drainer/*: get latest timestamp from pd when initial-commit-ts is -1 (pingcap#787)
* drainer/*: get latest timestamp from pd when initial-commit-ts is -1 Co-Authored-By: Eric Shen <ericshenyuhao@outlook.com>
1 parent 654a172 commit a1c3ea0

File tree

7 files changed

+14
-17
lines changed

7 files changed

+14
-17
lines changed

drainer/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func NewConfig() *Config {
123123
fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
124124
fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push")
125125
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
126-
fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", 0, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint")
126+
fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", -1, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint, will get a latest timestamp from pd if setting to be -1")
127127
fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)")
128128
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
129129
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas")

drainer/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ func NewServer(cfg *Config) (*Server, error) {
116116
}
117117
latestTime := time.Now()
118118

119+
if cfg.InitialCommitTS == -1 {
120+
log.Info("set InitialCommitTS", zap.Int64("ts", latestTS))
121+
cfg.InitialCommitTS = latestTS
122+
}
123+
119124
cfg.SyncerCfg.To.ClusterID = clusterID
120125
pdCli.Close()
121126

tests/dailytest/case.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) {
211211
// run casePKAddDuplicateUK
212212
tr.run(func(src *sql.DB) {
213213
err := execSQLs(src, casePKAddDuplicateUK)
214-
if err != nil && !strings.Contains(err.Error(), "Duplicate for key") {
214+
// the add unique index will failed by duplicate entry
215+
if err != nil && !strings.Contains(err.Error(), "Duplicate") {
215216
log.S().Fatal(err)
216217
}
217218
})

tests/kafka/run.sh

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@
22

33
set -e
44

5-
# use latest ts as initial-commit-ts, so we can skip binlog by previous test case
6-
ms=$(date +'%s')
7-
ts=$(($ms*1000<<18))
8-
95
cd "$(dirname "$0")"
106

11-
args="-initial-commit-ts=$ts"
7+
args="-initial-commit-ts=-1"
128

139
kafka_addr=${KAFKA_ADDRS-127.0.0.1:9092}
1410

tests/reparo/run.sh

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,17 @@ set -e
55
cd "$(dirname "$0")"
66

77
# use latest ts as initial-commit-ts, so we can skip binlog by previous test case
8-
ms=$(date +'%s')
9-
ts=$(($ms*1000<<18))
10-
args="-initial-commit-ts=$ts"
8+
args="-initial-commit-ts=-1"
119
down_run_sql "DROP DATABASE IF EXISTS tidb_binlog"
12-
run_sql "CREATE DATABASE IF NOT EXISTS \`reparo_test\`"
1310

1411
rm -rf /tmp/tidb_binlog_test/data.drainer
1512

1613
run_drainer "$args" &
1714

1815
GO111MODULE=on go build -o out
1916

17+
run_sql "CREATE DATABASE IF NOT EXISTS \`reparo_test\`"
18+
2019
./out -config ./config.toml > ${OUT_DIR-/tmp}/$TEST_NAME.out 2>&1
2120

2221
sleep 5

tests/restart/run.sh

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ STATUS_LOG="${OUT_DIR}/status.log"
99

1010
# run drainer, and drainer's status should be online
1111
# use latest ts as initial-commit-ts, so we can skip binlog by previous test case
12-
ms=$(date +'%s')
13-
ts=$(($ms*1000<<18))
14-
args="-initial-commit-ts=$ts"
12+
args="-initial-commit-ts=-1"
1513
down_run_sql "DROP DATABASE IF EXISTS tidb_binlog"
1614
rm -rf /tmp/tidb_binlog_test/data.drainer
1715

tests/status/run.sh

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ OUT_DIR=/tmp/tidb_binlog_test
99
STATUS_LOG="${OUT_DIR}/status.log"
1010

1111
# use latest ts as initial-commit-ts, so we can skip binlog by previous test case
12-
ms=$(date +'%s')
13-
ts=$(($ms*1000<<18))
14-
args="-initial-commit-ts=$ts"
12+
args="-initial-commit-ts=-1"
1513
down_run_sql "DROP DATABASE IF EXISTS tidb_binlog"
1614
rm -rf /tmp/tidb_binlog_test/data.drainer
1715

0 commit comments

Comments
 (0)