diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 2b28253d2..0b069234b 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -49,7 +49,7 @@ enable-dispatch = true safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "file", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "kafka" db-type = "mysql" # ignore syncing the txn with specified commit ts to downstream diff --git a/drainer/checkpoint/checkpoint.go b/drainer/checkpoint/checkpoint.go index 08ddf8881..a9a0a20fa 100644 --- a/drainer/checkpoint/checkpoint.go +++ b/drainer/checkpoint/checkpoint.go @@ -51,8 +51,6 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) { cp, err = newMysql(cfg) case "file": cp, err = NewFile(cfg) - case "flash": - cp, err = newFlash(cfg) default: err = errors.Errorf("unsupported checkpoint type %s", cfg.CheckpointType) } diff --git a/drainer/checkpoint/flash.go b/drainer/checkpoint/flash.go deleted file mode 100644 index 87189a975..000000000 --- a/drainer/checkpoint/flash.go +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package checkpoint - -import ( - "database/sql" - "encoding/json" - "fmt" - "sync" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" - "go.uber.org/zap" -) - -// FlashCheckPoint is a local savepoint struct for flash -type FlashCheckPoint struct { - sync.RWMutex - closed bool - clusterID uint64 - initialCommitTS int64 - - db *sql.DB - schema string - table string - - CommitTS int64 `toml:"commitTS" json:"commitTS"` -} - -func checkFlashConfig(cfg *Config) { - if cfg == nil { - return - } - if cfg.Db == nil { - cfg.Db = new(DBConfig) - } - if cfg.Db.Host == "" { - cfg.Db.Host = "127.0.0.1" - } - if cfg.Db.Port == 0 { - cfg.Db.Port = 9000 - } - if cfg.Schema == "" { - cfg.Schema = "tidb_binlog" - } - if cfg.Table == "" { - cfg.Table = "checkpoint" - } -} - -var openCH = pkgsql.OpenCH - -func newFlash(cfg *Config) (CheckPoint, error) { - checkFlashConfig(cfg) - - hostAndPorts, err := pkgsql.ParseCHAddr(cfg.Db.Host) - if err != nil { - return nil, errors.Trace(err) - } - - db, err := openCH(hostAndPorts[0].Host, hostAndPorts[0].Port, cfg.Db.User, cfg.Db.Password, "", 0) - if err != nil { - log.Error("open database failed", zap.Error(err)) - return nil, errors.Trace(err) - } - - sp := &FlashCheckPoint{ - db: db, - clusterID: cfg.ClusterID, - initialCommitTS: cfg.InitialCommitTS, - schema: cfg.Schema, - table: cfg.Table, - } - - sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", sp.schema) - if _, err = db.Exec(sql); err != nil { - log.Error("Create database failed", zap.String("sql", sql), zap.Error(err)) - return sp, errors.Trace(err) - } - - sql = fmt.Sprintf("ATTACH TABLE IF NOT EXISTS `%s`.`%s`(`clusterid` UInt64, `checkpoint` String) ENGINE MutableMergeTree((`clusterid`), 8192)", sp.schema, sp.table) - if _, err = db.Exec(sql); err != nil { - log.Error("Create table failed", zap.String("sql", sql), zap.Error(err)) - return nil, errors.Trace(err) - } - - err = sp.Load() - return sp, errors.Trace(err) -} - -// Load implements CheckPoint.Load interface -func (sp *FlashCheckPoint) Load() error { - sp.Lock() - defer sp.Unlock() - - if sp.closed { - return errors.Trace(ErrCheckPointClosed) - } - - sql := fmt.Sprintf("SELECT `checkpoint` from `%s`.`%s` WHERE `clusterid` = %d", sp.schema, sp.table, sp.clusterID) - rows, err := sp.db.Query(sql) - if err != nil { - log.Error("select checkPoint failed", zap.String("sql", sql), zap.Error(err)) - return errors.Trace(err) - } - - var str string - for rows.Next() { - err = rows.Scan(&str) - if err != nil { - log.Error("rows Scan failed", zap.Error(err)) - return errors.Trace(err) - } - } - - if len(str) == 0 { - sp.CommitTS = sp.initialCommitTS - return nil - } - - err = json.Unmarshal([]byte(str), sp) - if err != nil { - return errors.Trace(err) - } - - if sp.CommitTS == 0 { - sp.CommitTS = sp.initialCommitTS - } - return nil -} - -// Save implements checkpoint.Save interface -func (sp *FlashCheckPoint) Save(ts, slaveTS int64) error { - sp.Lock() - defer sp.Unlock() - - if sp.closed { - return errors.Trace(ErrCheckPointClosed) - } - - sp.CommitTS = ts - - b, err := json.Marshal(sp) - if err != nil { - return errors.Trace(err) - } - - sql := fmt.Sprintf("IMPORT INTO `%s`.`%s` (`clusterid`, `checkpoint`) VALUES(?, ?)", sp.schema, sp.table) - sqls := []string{sql} - args := [][]interface{}{{sp.clusterID, b}} - err = pkgsql.ExecuteSQLs(sp.db, sqls, args, false) - - return errors.Trace(err) -} - -// TS implements CheckPoint.TS interface -func (sp *FlashCheckPoint) TS() int64 { - sp.RLock() - defer sp.RUnlock() - - return sp.CommitTS -} - -// Close implements CheckPoint.Close interface. -func (sp *FlashCheckPoint) Close() error { - sp.Lock() - defer sp.Unlock() - - if sp.closed { - return errors.Trace(ErrCheckPointClosed) - } - - err := sp.db.Close() - if err == nil { - sp.closed = true - } - return errors.Trace(err) -} diff --git a/drainer/checkpoint/flash_test.go b/drainer/checkpoint/flash_test.go deleted file mode 100644 index 1434d357e..000000000 --- a/drainer/checkpoint/flash_test.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package checkpoint - -import ( - "database/sql" - "errors" - "regexp" - - "github.com/DATA-DOG/go-sqlmock" - . "github.com/pingcap/check" -) - -type flashSuite struct{} - -var _ = Suite(&flashSuite{}) - -func (s *flashSuite) TestcheckFlashConfig(c *C) { - cfg := Config{} - checkFlashConfig(&cfg) - c.Assert(cfg.Db.Host, Equals, "127.0.0.1") - c.Assert(cfg.Db.Port, Equals, 9000) - c.Assert(cfg.Schema, Equals, "tidb_binlog") - c.Assert(cfg.Table, Equals, "checkpoint") -} - -func (s *flashSuite) TestClose(c *C) { - db, mock, err := sqlmock.New() - c.Assert(err, IsNil) - mock.ExpectClose() - cp := FlashCheckPoint{db: db} - cp.Close() - cp.Close() // Show that closing more than once is OK - c.Assert(cp.closed, IsTrue) -} - -func (s *flashSuite) TestSave(c *C) { - db, mock, err := sqlmock.New() - c.Assert(err, IsNil) - cp := FlashCheckPoint{ - db: db, - } - mock.ExpectBegin() - mock.ExpectExec("IMPORT INTO.*").WillReturnResult(sqlmock.NewResult(0, 0)) - mock.ExpectCommit() - err = cp.Save(1024, 0) - c.Assert(err, IsNil) -} - -func (s *flashSuite) TestLoad(c *C) { - db, mock, err := sqlmock.New() - c.Assert(err, IsNil) - cp := FlashCheckPoint{db: db} - rows := sqlmock.NewRows([]string{"checkPoint"}).AddRow(`{"commitTS": 1003}`) - mock.ExpectQuery("SELECT `checkpoint` from.*").WillReturnRows(rows) - err = cp.Load() - c.Assert(err, IsNil) - c.Assert(cp.CommitTS, Equals, int64(1003)) -} - -type newFlashSuite struct{} - -var _ = Suite(&newFlashSuite{}) - -func (s *newFlashSuite) TestShouldRejectInvalidHost(c *C) { - cfg := Config{Db: &DBConfig{Host: "invalid"}} - _, err := newFlash(&cfg) - c.Assert(err, NotNil) -} - -func (s *newFlashSuite) TestCannotOpenDB(c *C) { - origOpen := openCH - openCH = func(host string, port int, username string, password string, dbName string, blockSize int) (*sql.DB, error) { - return nil, errors.New("OpenErr") - } - defer func() { - openCH = origOpen - }() - cfg := Config{Db: &DBConfig{Host: "127.0.0.1:9000"}} - _, err := newFlash(&cfg) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, "OpenErr") -} - -func (s *newFlashSuite) TestDBStatementErrs(c *C) { - db, mock, err := sqlmock.New() - c.Assert(err, IsNil) - - origOpen := openCH - openCH = func(host string, port int, username string, password string, dbName string, blockSize int) (*sql.DB, error) { - return db, nil - } - defer func() { - openCH = origOpen - }() - - sqlDB := "CREATE DATABASE IF NOT EXISTS `test`" - mock.ExpectExec(sqlDB).WillReturnError(errors.New("createdb")) - - cfg := Config{Db: &DBConfig{Host: "127.0.0.1:9000"}, Schema: "test", Table: "tbl"} - _, err = newFlash(&cfg) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, "createdb") - - mock.ExpectExec(sqlDB).WillReturnResult(sqlmock.NewResult(0, 0)) - sqlAttach := regexp.QuoteMeta("ATTACH TABLE IF NOT EXISTS `test`.`tbl`(`clusterid` UInt64, `checkpoint` String) ENGINE MutableMergeTree((`clusterid`), 8192)") - mock.ExpectExec(sqlAttach).WillReturnError(errors.New("attachtbl")) - _, err = newFlash(&cfg) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, "attachtbl") -} diff --git a/drainer/sync/flash.go b/drainer/sync/flash.go deleted file mode 100644 index da0ad829c..000000000 --- a/drainer/sync/flash.go +++ /dev/null @@ -1,360 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package sync - -import ( - "database/sql" - "fmt" - "strconv" - "strings" - "sync" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb-binlog/drainer/translator" - pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" - "github.com/zanmato1984/clickhouse" - "go.uber.org/zap" -) - -var extraRowSize = 1024 - -// flashRowBatch is an in-memory row batch caching rows about to be passed to flash. -// It's not thread-safe, so callers must take care of the synchronizing. -type flashRowBatch struct { - sql string - columnSize int - capacity int - rows [][]interface{} - latestCommitTS int64 -} - -func newFlashRowBatch(sql string, capacity int) *flashRowBatch { - pos := strings.LastIndex(sql, "(") - values := sql[pos:] - columnSize := strings.Count(values, "?") - // Loosing the space to tolerant a little more rows being added. - rows := make([][]interface{}, 0, capacity+extraRowSize) - return &flashRowBatch{ - sql: sql, - columnSize: columnSize, - capacity: capacity, - rows: rows, - latestCommitTS: 0, - } -} - -// AddRow appends single row into this row batch. -func (batch *flashRowBatch) AddRow(args []interface{}, commitTS int64) error { - if len(args) != batch.columnSize { - return errors.Errorf("Row %v column size %d mismatches the row batch column size %d", args, len(args), batch.columnSize) - } - batch.rows = append(batch.rows, args) - - if batch.latestCommitTS < commitTS { - batch.latestCommitTS = commitTS - } - - log.Debug(fmt.Sprintf("[add_row] Added row %v.", args)) - return nil -} - -// Size returns the number of rows stored in this batch. -func (batch *flashRowBatch) Size() int { - return len(batch.rows) -} - -// Flush writes all the rows in this row batch into CH, with retrying when failure. -func (batch *flashRowBatch) Flush(db *sql.DB) (commitTS int64, err error) { - for i := 0; i < pkgsql.MaxDMLRetryCount; i++ { - if i > 0 { - log.Warn("retrying flushing row batch", zap.String("sql", batch.sql), zap.Duration("RetryWaitTime", pkgsql.RetryWaitTime)) - time.Sleep(pkgsql.RetryWaitTime) - } - commitTS, err = batch.flushInternal(db) - if err == nil { - return commitTS, nil - } - log.Warn("flushing row batch failed", zap.Error(err), zap.String("sql", batch.sql)) - } - - return commitTS, errors.Trace(err) -} - -func (batch *flashRowBatch) flushInternal(db *sql.DB) (_ int64, err error) { - log.Debug("flushing row batch", zap.Int("size", batch.Size()), zap.String("sql", batch.sql)) - - if batch.Size() == 0 { - return batch.latestCommitTS, nil - } - - tx, err := db.Begin() - if err != nil { - return batch.latestCommitTS, errors.Trace(err) - } - defer func() { - if err != nil { - if err := tx.Rollback(); err != nil { - log.Error(err.Error()) - } - } - }() - - stmt, err := tx.Prepare(batch.sql) - if err != nil { - return batch.latestCommitTS, errors.Trace(err) - } - defer stmt.Close() - - for _, row := range batch.rows { - _, err = stmt.Exec(row...) - if err != nil { - return batch.latestCommitTS, errors.Trace(err) - } - } - err = tx.Commit() - if err != nil { - if ce, ok := err.(*clickhouse.Exception); ok { - // Stack trace from server side could be very helpful for triaging problems. - log.Error("commit failed", zap.String("stack trace", ce.StackTrace)) - } - return batch.latestCommitTS, errors.Trace(err) - } - - // Clearing all rows. - // Loosing the space to tolerant a little more rows being added. - batch.rows = make([][]interface{}, 0, batch.capacity+extraRowSize) - - return batch.latestCommitTS, nil -} - -// FlashSyncer sync binlog to TiFlash -type FlashSyncer struct { - sync.Mutex - close chan bool - wg sync.WaitGroup - - timeLimit time.Duration - sizeLimit int - - dbs []*sql.DB - // [sql][len(dbs)], group by same sql and shard by hashKey - rowBatches map[string][]*flashRowBatch - - items []*Item - *baseSyncer -} - -// openCH should only be change for unit test mock -var openCH = pkgsql.OpenCH - -var _ Syncer = &FlashSyncer{} - -// NewFlashSyncer returns a instance of FlashSyncer -func NewFlashSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*FlashSyncer, error) { - timeLimit, err := time.ParseDuration(cfg.TimeLimit) - if err != nil { - return nil, errors.Trace(err) - } - // TODO: check time limit validity, and give a default value (and a warning) if invalid. - - sizeLimit, err := strconv.Atoi(cfg.SizeLimit) - if err != nil { - return nil, errors.Trace(err) - } - // TODO: check size limit validity, and give a default value (and a warning) if invalid. - - hostAndPorts, err := pkgsql.ParseCHAddr(cfg.Host) - if err != nil { - return nil, errors.Trace(err) - } - - dbs := make([]*sql.DB, 0, len(hostAndPorts)) - for _, hostAndPort := range hostAndPorts { - db, err := openCH(hostAndPort.Host, hostAndPort.Port, cfg.User, cfg.Password, "", sizeLimit) - if err != nil { - return nil, errors.Trace(err) - } - dbs = append(dbs, db) - } - - e := FlashSyncer{ - close: make(chan bool), - timeLimit: timeLimit, - sizeLimit: sizeLimit, - dbs: dbs, - rowBatches: make(map[string][]*flashRowBatch), - baseSyncer: newBaseSyncer(tableInfoGetter), - } - - e.wg.Add(1) - go e.flushRoutine() - - return &e, nil -} - -// Sync implements Syncer interface -func (e *FlashSyncer) Sync(item *Item) error { - e.Lock() - defer e.Unlock() - - if e.err != nil { - return errors.Annotate(e.err, "executor seeing error from the flush thread") - } - - tiBinlog := item.Binlog - - if tiBinlog.DdlJobId > 0 { - // Flush all row batches. - e.err = e.flushAll() - if e.err != nil { - return errors.Annotate(e.err, "executor seeing error when flushing") - } - - sql, err := translator.GenFlashDDLSQL(string(tiBinlog.GetDdlQuery()), item.Schema) - if err != nil { - return errors.Annotate(err, "gen ddl sql fail") - } - - var args [][]interface{} - args = append(args, nil) - for _, db := range e.dbs { - e.err = pkgsql.ExecuteSQLs(db, []string{sql}, args, true) - if e.err != nil { - return errors.Trace(e.err) - } - - e.success <- item - } - } else { - sqls, args, err := translator.GenFlashSQLs(e.tableInfoGetter, item.PrewriteValue, tiBinlog.GetCommitTs()) - if err != nil { - return errors.Annotate(err, "gen sqls fail") - } - - for i, row := range args { - hashKey := e.partition(row[0].(int64)) - sql := sqls[i] - args := row[1:] - if _, ok := e.rowBatches[sql]; !ok { - e.rowBatches[sql] = make([]*flashRowBatch, len(e.dbs)) - } - if e.rowBatches[sql][hashKey] == nil { - e.rowBatches[sql][hashKey] = newFlashRowBatch(sql, e.sizeLimit) - } - rb := e.rowBatches[sql][hashKey] - e.err = rb.AddRow(args, tiBinlog.GetCommitTs()) - if e.err != nil { - return errors.Trace(e.err) - } - - // Check if size limit exceeded. - if rb.Size() >= e.sizeLimit { - _, e.err = rb.Flush(e.dbs[hashKey]) - if e.err != nil { - return errors.Trace(e.err) - } - } - } - - e.items = append(e.items, item) - } - - return nil -} - -// Close implements Syncer interface -func (e *FlashSyncer) Close() error { - // Could have had error in async flush goroutine, log it. - e.Lock() - if e.err != nil { - log.Error("close see error", zap.Error(e.err)) - } - e.Unlock() - - // Wait for async flush goroutine to exit. - log.Info("Waiting for flush thread to close") - close(e.close) - - hasError := false - for _, db := range e.dbs { - err := db.Close() - if err != nil { - hasError = true - log.Error("close db failed", zap.Error(err)) - } - } - if hasError { - return errors.New("error in closing some flash connector, check log for details") - } - - return nil -} - -func (e *FlashSyncer) flushRoutine() { - defer e.wg.Done() - log.Info("Flush thread started.") - for { - select { - case <-e.close: - log.Info("Flush thread closing.") - return - case <-time.After(e.timeLimit): - e.Lock() - log.Debug("Flush thread reached time limit, flushing.") - if e.err != nil { - e.Unlock() - log.Error("Flush thread seeing error from the executor, exiting", zap.Error(e.err)) - return - } - err := e.flushAll() - if err != nil { - e.Unlock() - log.Error("Flush thread seeing error when flushing, exiting.", zap.Error(e.err)) - e.setErr(err) - return - } - e.Unlock() - } - } -} - -// partition must be a index of dbs -func (e *FlashSyncer) partition(key int64) int { - return int(key) % len(e.dbs) -} - -func (e *FlashSyncer) flushAll() error { - log.Debug("[flush_all] Flushing all row batches.") - - for _, rbs := range e.rowBatches { - for i, rb := range rbs { - if rb == nil { - continue - } - _, err := rb.Flush(e.dbs[i]) - if err != nil { - return errors.Trace(err) - } - } - } - - for _, item := range e.items { - e.success <- item - } - e.items = nil - - return nil -} diff --git a/drainer/sync/flash_test.go b/drainer/sync/flash_test.go deleted file mode 100644 index 14bd01972..000000000 --- a/drainer/sync/flash_test.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. -package sync - -import ( - "database/sql" - "time" - - sqlmock "github.com/DATA-DOG/go-sqlmock" - "github.com/pingcap/check" - "github.com/pingcap/tidb-binlog/drainer/translator" -) - -var _ = check.Suite(&flashSuite{}) - -type flashSuite struct { - flash *FlashSyncer - - flashMock sqlmock.Sqlmock - gen *translator.BinlogGenrator -} - -func (s *flashSuite) setUpTest(c *check.C, timeLimit string, sizeLimit string) { - s.gen = &translator.BinlogGenrator{} - - // create flash syncer and mock - oldOpenCH := openCH - defer func() { - openCH = oldOpenCH - }() - openCH = func(string, int, string, string, string, int) (db *sql.DB, err error) { - db, s.flashMock, err = sqlmock.New() - return - } - - cfg := &DBConfig{ - Host: "localhost:3306", - TimeLimit: timeLimit, - SizeLimit: sizeLimit, - } - - var err error - s.flash, err = NewFlashSyncer(cfg, s.gen) - c.Assert(err, check.IsNil) -} - -func (s *flashSuite) TestFlushBySizeLimit(c *check.C) { - s.setUpTest(c, "1h", "1") - - gen := s.gen - syncer := s.flash - - gen.SetInsert(c) - item := &Item{ - Binlog: gen.TiBinlog, - PrewriteValue: gen.PV, - Schema: gen.Schema, - Table: gen.Table, - } - - // set up mock - s.flashMock.ExpectBegin() - prepare := s.flashMock.ExpectPrepare("IMPORT INTO .*") - prepare.ExpectExec().WillReturnResult(sqlmock.NewResult(0, 1)) - s.flashMock.ExpectCommit() - - err := syncer.Sync(item) - c.Assert(err, check.IsNil) - - err = s.flashMock.ExpectationsWereMet() - c.Assert(err, check.IsNil) -} - -func (s *flashSuite) TestFlushByTimeLimit(c *check.C) { - s.setUpTest(c, "1s", "100000000") - - gen := s.gen - syncer := s.flash - - gen.SetInsert(c) - item := &Item{ - Binlog: gen.TiBinlog, - PrewriteValue: gen.PV, - Schema: gen.Schema, - Table: gen.Table, - } - - // set up mock - s.flashMock.ExpectBegin() - prepare := s.flashMock.ExpectPrepare("IMPORT INTO .*") - prepare.ExpectExec().WillReturnResult(sqlmock.NewResult(0, 1)) - s.flashMock.ExpectCommit() - - err := syncer.Sync(item) - c.Assert(err, check.IsNil) - - select { - case <-syncer.Successes(): - break - case <-time.After(time.Second * 5): - c.Fatal("time out to get from Successes") - } -} - -func (s *flashSuite) TearDownTest(c *check.C) { - s.flashMock.ExpectClose() - - s.flash.Close() -} diff --git a/drainer/sync/syncer_test.go b/drainer/sync/syncer_test.go index a153eb132..a595ed994 100644 --- a/drainer/sync/syncer_test.go +++ b/drainer/sync/syncer_test.go @@ -15,7 +15,6 @@ package sync import ( "database/sql" "reflect" - "strconv" "sync/atomic" "testing" "time" @@ -37,7 +36,6 @@ type syncerSuite struct { syncers []Syncer mysqlMock sqlmock.Sqlmock - flashMock sqlmock.Sqlmock mockProducer *mocks.AsyncProducer } @@ -49,8 +47,6 @@ func (s *syncerSuite) SetUpTest(c *check.C) { Password: "", Port: 3306, KafkaVersion: "0.8.2.0", - TimeLimit: "1s", - SizeLimit: "1024", } // create pb syncer @@ -87,27 +83,10 @@ func (s *syncerSuite) SetUpTest(c *check.C) { c.Assert(err, check.IsNil) s.syncers = append(s.syncers, kafka) - // create flash - oldOpenCH := openCH - defer func() { - openCH = oldOpenCH - }() - openCH = func(string, int, string, string, string, int) (db *sql.DB, err error) { - db, s.flashMock, err = sqlmock.New() - return - } - - // flash does not use `cfg.Port`, and use `cfg.Host` as an `addr` - cfg.Host += ":" + strconv.Itoa(cfg.Port) - flash, err := NewFlashSyncer(cfg, infoGetter) - c.Assert(err, check.IsNil) - s.syncers = append(s.syncers, flash) - c.Logf("set up %d syncer", len(s.syncers)) } func (s *syncerSuite) TearDownTest(c *check.C) { - s.flashMock.ExpectClose() s.mysqlMock.ExpectClose() closeSyncers(c, s.syncers) @@ -130,11 +109,6 @@ func (s *syncerSuite) TestGetFromSuccesses(c *check.C) { // set up kafka producer mock expect s.mockProducer.ExpectInputAndSucceed() - // set up flash db mock expect - s.flashMock.ExpectBegin() - s.flashMock.ExpectExec("CREATE TABLE .*").WillReturnResult(sqlmock.NewResult(0, 0)) - s.flashMock.ExpectCommit() - var successCount = make([]int64, len(s.syncers)) for idx, syncer := range s.syncers { gen.SetDDL() diff --git a/drainer/sync/util.go b/drainer/sync/util.go index cfa922e19..3fd182d2f 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -26,8 +26,6 @@ type DBConfig struct { Port int `toml:"port" json:"port"` Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` BinlogFileDir string `toml:"dir" json:"dir"` - TimeLimit string `toml:"time-limit" json:"time-limit"` - SizeLimit string `toml:"size-limit" json:"size-limit"` ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"` KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` diff --git a/drainer/syncer.go b/drainer/syncer.go index 7bc10b7c5..1d49c3b36 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -98,11 +98,6 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err if err != nil { return nil, errors.Annotate(err, "fail to create pb dsyncer") } - case "flash": - dsyncer, err = dsync.NewFlashSyncer(cfg.To, schema) - if err != nil { - return nil, errors.Annotate(err, "fail to create flash dsyncer") - } case "mysql", "tidb": dsyncer, err = dsync.NewMysqlSyncer(cfg.To, schema, cfg.WorkerCount, cfg.TxnBatch, queryHistogramVec, cfg.StrSQLMode, cfg.DestDBType) if err != nil { diff --git a/drainer/translator/flash.go b/drainer/translator/flash.go deleted file mode 100644 index f4406f387..000000000 --- a/drainer/translator/flash.go +++ /dev/null @@ -1,611 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package translator - -import ( - "fmt" - "io" - "strconv" - "strings" - gotime "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/parser/ast" - "github.com/pingcap/parser/model" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb-binlog/pkg/dml" - "github.com/pingcap/tidb-binlog/pkg/util" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - tipb "github.com/pingcap/tipb/go-binlog" - "go.uber.org/zap" -) - -// GenFlashSQLs generate the SQL need to execute syncing this binlog to Flash -func GenFlashSQLs(infoGetter TableInfoGetter, pv *tipb.PrewriteValue, commitTS int64) (sqls []string, argss [][]interface{}, err error) { - for _, mut := range pv.GetMutations() { - var info *model.TableInfo - var ok bool - info, ok = infoGetter.TableByID(mut.GetTableId()) - if !ok { - return nil, nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId()) - } - isTblDroppingCol := infoGetter.IsDroppingColumn(mut.GetTableId()) - - var schema string - schema, _, ok = infoGetter.SchemaAndTableName(mut.GetTableId()) - if !ok { - return nil, nil, errors.Errorf("SchemaAndTableName empty table id: %d", mut.GetTableId()) - } - - iter := newSequenceIterator(&mut) - for { - mutType, row, err := iter.next() - if err != nil { - if err == io.EOF { - break - } - return nil, nil, errors.Trace(err) - } - - var sql string - var args []interface{} - switch mutType { - case tipb.MutationType_Insert: - sql, args, err = GenFlashInsertSQL(schema, info, row, commitTS) - if err != nil { - return nil, nil, errors.Annotate(err, "gen insert sql fail") - } - case tipb.MutationType_Update: - sql, args, err = GenFlashUpdateSQL(schema, info, row, commitTS, isTblDroppingCol) - if err != nil { - return nil, nil, errors.Annotate(err, "gen update sql fail") - } - case tipb.MutationType_DeleteRow: - sql, args, err = GenFlashDeleteSQL(schema, info, row, commitTS) - if err != nil { - return nil, nil, errors.Annotate(err, "gen delete sql fail") - } - default: - return nil, nil, errors.Errorf("unknown mutation type: %v", mutType) - } - sqls = append(sqls, sql) - argss = append(argss, args) - } - - } - - return -} - -// GenFlashInsertSQL generate the SQL need to execute syncing this insert row to Flash -func GenFlashInsertSQL(schema string, table *model.TableInfo, row []byte, commitTS int64) (sql string, args []interface{}, err error) { - schema = strings.ToLower(schema) - if pkHandleColumn(table) == nil { - fakeImplicitColumn(table) - } - columns := writableColumns(table) - version := makeInternalVersionValue(uint64(commitTS)) - delFlag := makeInternalDelmarkValue(false) - - columnList := genColumnList(columns) - // addition 2 holder is for del flag and version - columnPlaceholders := dml.GenColumnPlaceholders(len(columns) + 2) - sql = fmt.Sprintf("IMPORT INTO `%s`.`%s` (%s) values (%s);", schema, table.Name.L, columnList, columnPlaceholders) - - //decode the pk value - pk, columnValues, err := insertRowToDatums(table, row) - if err != nil { - return "", nil, errors.Trace(err) - } - - hashKey := pk.GetInt64() - - var vals []interface{} - vals = append(vals, hashKey) - for _, col := range columns { - val, ok := columnValues[col.ID] - if !ok { - vals = append(vals, col.GetDefaultValue()) - } else { - value, err := formatFlashData(&val, &col.FieldType) - if err != nil { - return "", nil, errors.Trace(err) - } - - vals = append(vals, value) - } - } - vals = append(vals, version) - vals = append(vals, delFlag) - - if len(columnValues) == 0 { - panic(errors.New("columnValues is nil")) - } - - args = vals - - return -} - -// GenFlashUpdateSQL generate the SQL need to execute syncing this update row to Flash -func GenFlashUpdateSQL(schema string, table *model.TableInfo, row []byte, commitTS int64, isTblDroppingCol bool) (sql string, args []interface{}, err error) { - schema = strings.ToLower(schema) - pkColumn := pkHandleColumn(table) - if pkColumn == nil { - pkColumn = fakeImplicitColumn(table) - } - pkID := pkColumn.ID - - updtDecoder := newUpdateDecoder(table, isTblDroppingCol) - version := makeInternalVersionValue(uint64(commitTS)) - delFlag := makeInternalDelmarkValue(false) - - var updateColumns []*model.ColumnInfo - var newValues []interface{} - - // TODO: Make updating pk working - _, newColumnValues, err := updtDecoder.decode(row, gotime.Local) - newPkValue := newColumnValues[pkID] - - if err != nil { - return "", nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name.L) - } - - updateColumns, newValues, err = genColumnAndValue(table.Columns, newColumnValues) - if err != nil { - return "", nil, errors.Trace(err) - } - // TODO: confirm column list should be the same across update - columnList := genColumnList(updateColumns) - // addition 2 holder is for del flag and version - columnPlaceholders := dml.GenColumnPlaceholders(len(table.Columns) + 2) - - sql = fmt.Sprintf("IMPORT INTO `%s`.`%s` (%s) values (%s);", schema, table.Name.L, columnList, columnPlaceholders) - - args = makeRow(newPkValue.GetInt64(), newValues, version, delFlag) - return -} - -// GenFlashDeleteSQL generate the SQL need to execute syncing this delete row to Flash -func GenFlashDeleteSQL(schema string, table *model.TableInfo, row []byte, commitTS int64) (sql string, args []interface{}, err error) { - schema = strings.ToLower(schema) - pkColumn := pkHandleColumn(table) - if pkColumn == nil { - pkColumn = fakeImplicitColumn(table) - } - columns := table.Columns - colsTypeMap := util.ToColumnTypeMap(columns) - - columnValues, err := tablecodec.DecodeRow(row, colsTypeMap, gotime.Local) - if err != nil { - return "", nil, errors.Trace(err) - } - - sql, args, _, err = genDeleteSQL(schema, table, pkColumn.ID, columnValues, commitTS) - if err != nil { - return "", nil, errors.Trace(err) - } - - return -} - -// GenFlashDDLSQL generate the SQL need to execute syncing this DDL to Flash -func GenFlashDDLSQL(sql string, schema string) (string, error) { - schema = strings.ToLower(schema) - ddlParser := getParser() - stmt, err := ddlParser.ParseOneStmt(sql, "", "") - if err != nil { - return "", errors.Annotatef(err, "parse sql failed: %s", sql) - } - - switch stmt := stmt.(type) { - case *ast.CreateDatabaseStmt: - return extractCreateDatabase(stmt) - case *ast.DropDatabaseStmt: - return extractDropDatabase(stmt) - case *ast.DropTableStmt: - return extractDropTable(stmt, schema) - case *ast.CreateTableStmt: - return extractCreateTable(stmt, schema) - case *ast.AlterTableStmt: - alterSQL, err := extractAlterTable(stmt, schema) - if err != nil { - return alterSQL, err - } - if len(alterSQL) == 0 { - return genEmptySQL(sql), nil - } - return alterSQL, nil - case *ast.RenameTableStmt: - return extractRenameTable(stmt, schema) - case *ast.TruncateTableStmt: - return extractTruncateTable(stmt, schema), nil - default: - // TODO: hacking around empty sql, should bypass in upper level - return genEmptySQL(sql), nil - } -} - -func genDeleteSQL(schema string, table *model.TableInfo, pkID int64, columnValues map[int64]types.Datum, commitTS int64) (string, []interface{}, []string, error) { - columns := table.Columns - pk := columnValues[pkID] - hashKey := pk.GetInt64() - version := makeInternalVersionValue(uint64(commitTS)) - delFlag := makeInternalDelmarkValue(true) - oldColumns, value, err := genColumnAndValue(columns, columnValues) - var pkValue []interface{} - pkValue = append(pkValue, hashKey) - value = append(pkValue, value...) - if err != nil { - return "", nil, nil, errors.Trace(err) - } - columnList := genColumnList(oldColumns) - columnPlaceholders := dml.GenColumnPlaceholders(len(oldColumns) + 2) - - key, err := genDispatchKey(table, columnValues) - if err != nil { - return "", nil, nil, errors.Trace(err) - } - - sql := fmt.Sprintf("IMPORT INTO `%s`.`%s` (%s) values (%s);", schema, table.Name.L, columnList, columnPlaceholders) - - value = append(value, version) - value = append(value, delFlag) - return sql, value, key, nil -} - -func extractCreateDatabase(stmt *ast.CreateDatabaseStmt) (string, error) { - dbName := strings.ToLower(stmt.Name) - return fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`;", dbName), nil -} - -func extractDropDatabase(stmt *ast.DropDatabaseStmt) (string, error) { - dbName := strings.ToLower(stmt.Name) - // http://clickhouse-docs.readthedocs.io/en/latest/query_language/queries.html#drop - // Drop cascade semantics and should be save to not consider sequence - return fmt.Sprintf("DROP DATABASE `%s`;", dbName), nil -} - -func extractCreateTable(stmt *ast.CreateTableStmt, schema string) (string, error) { - tableName := stmt.Table.Name.L - // create table like - if stmt.ReferTable != nil { - referTableSchema, referTableName := stmt.ReferTable.Schema.L, stmt.ReferTable.Name.L - if len(referTableSchema) == 0 { - referTableSchema = schema - } - return fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`", schema, tableName, referTableSchema, referTableName), nil - } - // extract primary key - pkColumn, explicitHandle := extractRowHandle(stmt) - colStrs := make([]string, len(stmt.Cols)) - for i, colDef := range stmt.Cols { - colStr, _ := analyzeColumnDef(colDef, pkColumn) - colStrs[i] = colStr - } - if !explicitHandle { - colStr := fmt.Sprintf("`%s` %s", pkColumn, "Int64") - colStrs = append([]string{colStr}, colStrs...) - } - return fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s` (%s) ENGINE MutableMergeTree((`%s`), 8192);", schema, tableName, strings.Join(colStrs, ","), pkColumn), nil -} - -func extractAlterTable(stmt *ast.AlterTableStmt, schema string) (string, error) { - if stmt.Specs[0].Tp == ast.AlterTableRenameTable { - return makeRenameTableStmt(schema, stmt.Table, stmt.Specs[0].NewTable), nil - } - specStrs := make([]string, 0, len(stmt.Specs)) - for _, spec := range stmt.Specs { - specStr, err := analyzeAlterSpec(spec) - if err != nil { - return "", errors.Trace(err) - } - if len(specStr) != 0 { - specStrs = append(specStrs, specStr) - } - } - - tableName := stmt.Table.Name.L - if len(specStrs) == 0 { - return "", nil - } - return fmt.Sprintf("ALTER TABLE `%s`.`%s` %s;", schema, tableName, strings.Join(specStrs, ", ")), nil -} - -func extractTruncateTable(stmt *ast.TruncateTableStmt, schema string) string { - tableName := stmt.Table.Name.L - return fmt.Sprintf("TRUNCATE TABLE `%s`.`%s`", schema, tableName) -} - -func extractRenameTable(stmt *ast.RenameTableStmt, schema string) (string, error) { - return makeRenameTableStmt(schema, stmt.OldTable, stmt.NewTable), nil -} - -func makeRenameTableStmt(schema string, table *ast.TableName, newTable *ast.TableName) string { - tableName := table.Name.L - var newSchema = schema - if len(newTable.Schema.String()) > 0 { - newSchema = newTable.Schema.L - } - newTableName := newTable.Name.L - return fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", schema, tableName, newSchema, newTableName) -} - -func extractDropTable(stmt *ast.DropTableStmt, schema string) (string, error) { - // TODO: Make drop multiple tables works - tableName := stmt.Tables[0].Name.L - return fmt.Sprintf("DROP TABLE `%s`.`%s`;", schema, tableName), nil -} - -// extract single row handle column, if implicit, generate one -func extractRowHandle(stmt *ast.CreateTableStmt) (colName string, explicitHandle bool) { - constrains := stmt.Constraints - columns := stmt.Cols - var primaryCnt = 0 - var primaryColumn = "" - for _, colDef := range columns { - cNameLowercase := colDef.Name.Name.L - if isPrimaryKeyColumn(colDef) { - primaryCnt++ - primaryColumn = cNameLowercase - } else { - for _, constrain := range constrains { - // row handle only applies when single integer key - if len(constrain.Keys) != 1 { - continue - } - if constrain.Tp == ast.ConstraintPrimaryKey && - isHandleTypeColumn(colDef) && - cNameLowercase == constrain.Keys[0].Column.Name.L { - return cNameLowercase, true - } - } - } - } - - if primaryCnt == 1 { - return primaryColumn, true - } - // no explicit handle column, generate one - return implicitColName, false -} - -func analyzeAlterSpec(alterSpec *ast.AlterTableSpec) (string, error) { - switch alterSpec.Tp { - case ast.AlterTableOption: - return "", nil - case ast.AlterTableAddColumns: - var colDefStr string - var colPosStr string - var err error - // TODO: Support add multiple columns. - colDefStr, err = analyzeColumnDef(alterSpec.NewColumns[0], "") - if err != nil { - return "", errors.Trace(err) - } - if alterSpec.Position != nil && alterSpec.Position.Tp != ast.ColumnPositionNone { - colPosStr, err = analyzeColumnPosition(alterSpec.Position) - if err != nil { - return "", errors.Trace(err) - } - colPosStr = " " + colPosStr - } - return fmt.Sprintf("ADD COLUMN %s", colDefStr+colPosStr), nil - case ast.AlterTableAddConstraint: - return "", nil - case ast.AlterTableDropColumn: - col := alterSpec.OldColumnName.Name.L - return fmt.Sprintf("DROP COLUMN `%s`", col), nil - case ast.AlterTableDropPrimaryKey: - return "", nil - case ast.AlterTableDropIndex: - return "", nil - case ast.AlterTableDropForeignKey: - return "", nil - case ast.AlterTableChangeColumn: - oldColName := alterSpec.OldColumnName.Name.L - newColName := alterSpec.NewColumns[0].Name.Name.L - if oldColName != newColName { - return "", errors.NotSupportedf("Rename column: " + alterSpec.Text()) - } - return analyzeModifyColumn(alterSpec) - case ast.AlterTableModifyColumn: - return analyzeModifyColumn(alterSpec) - case ast.AlterTableAlterColumn: - return "", nil - case ast.AlterTableLock: - return "", nil - default: - return "", errors.New("Invalid alter table spec type code: " + strconv.Itoa(int(alterSpec.Tp))) - } -} - -func analyzeModifyColumn(alterSpec *ast.AlterTableSpec) (string, error) { - var colDefStr string - var colPosStr string - var err error - colDefStr, err = analyzeColumnDef(alterSpec.NewColumns[0], "") - if err != nil { - return "", errors.Trace(err) - } - if alterSpec.Position != nil && alterSpec.Position.Tp != ast.ColumnPositionNone { - colPosStr, err = analyzeColumnPosition(alterSpec.Position) - if err != nil { - return "", errors.Trace(err) - } - colPosStr = " " + colPosStr - } - return fmt.Sprintf("MODIFY COLUMN %s", colDefStr+colPosStr), nil -} - -// Refer to https://dev.mysql.com/doc/refman/5.7/en/integer-types.html -// https://clickhouse.yandex/docs/en/data_types/ -func analyzeColumnDef(colDef *ast.ColumnDef, pkColumn string) (string, error) { - cName := colDef.Name.Name.L - - tp := colDef.Tp - var typeStr string - var typeStrFormat = "%s" - unsigned := mysql.HasUnsignedFlag(tp.Flag) - nullable := cName != pkColumn && isNullable(colDef) - if nullable { - typeStrFormat = "Nullable(%s)" - } - switch tp.Tp { - case mysql.TypeBit: // bit - typeStr = fmt.Sprintf(typeStrFormat, "UInt64") - case mysql.TypeTiny: // tinyint - if unsigned { - typeStr = fmt.Sprintf(typeStrFormat, "UInt8") - } else { - typeStr = fmt.Sprintf(typeStrFormat, "Int8") - } - case mysql.TypeShort: // smallint - if unsigned { - typeStr = fmt.Sprintf(typeStrFormat, "UInt16") - } else { - typeStr = fmt.Sprintf(typeStrFormat, "Int16") - } - case mysql.TypeYear: - typeStr = fmt.Sprintf(typeStrFormat, "Int16") - case mysql.TypeLong, mysql.TypeInt24: // int, mediumint - if unsigned { - typeStr = fmt.Sprintf(typeStrFormat, "UInt32") - } else { - typeStr = fmt.Sprintf(typeStrFormat, "Int32") - } - case mysql.TypeFloat: - typeStr = fmt.Sprintf(typeStrFormat, "Float32") - case mysql.TypeDouble: - typeStr = fmt.Sprintf(typeStrFormat, "Float64") - case mysql.TypeNewDecimal, mysql.TypeDecimal: - if tp.Flen == types.UnspecifiedLength { - tp.Flen, _ = mysql.GetDefaultFieldLengthAndDecimal(tp.Tp) - } - if tp.Decimal == types.UnspecifiedLength { - _, tp.Decimal = mysql.GetDefaultFieldLengthAndDecimal(tp.Tp) - } - decimalTypeStr := fmt.Sprintf("Decimal(%d, %d)", tp.Flen, tp.Decimal) - typeStr = fmt.Sprintf(typeStrFormat, decimalTypeStr) - case mysql.TypeTimestamp, mysql.TypeDatetime: // timestamp, datetime - typeStr = fmt.Sprintf(typeStrFormat, "DateTime") - case mysql.TypeDuration: // duration - typeStr = fmt.Sprintf(typeStrFormat, "Int64") - case mysql.TypeLonglong: - if unsigned { - typeStr = fmt.Sprintf(typeStrFormat, "UInt64") - } else { - typeStr = fmt.Sprintf(typeStrFormat, "Int64") - } - case mysql.TypeDate, mysql.TypeNewDate: - typeStr = fmt.Sprintf(typeStrFormat, "Date") - case mysql.TypeString, mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString: - typeStr = fmt.Sprintf(typeStrFormat, "String") - case mysql.TypeEnum: - enumStr := "" - format := "Enum16(''=0,%s)" - for i, elem := range tp.Elems { - if len(elem) == 0 { - // Don't append item empty enum if there is already one specified by user. - format = "Enum16(%s)" - } - if i == 0 { - enumStr = fmt.Sprintf("'%s'=%d", elem, i+1) - } else { - enumStr = fmt.Sprintf("%s,'%s'=%d", enumStr, elem, i+1) - } - } - enumStr = fmt.Sprintf(format, enumStr) - typeStr = fmt.Sprintf(typeStrFormat, enumStr) - case mysql.TypeSet, mysql.TypeJSON: - typeStr = fmt.Sprintf(typeStrFormat, "String") - // case mysql.TypeGeometry: - // TiDB doesn't have Geometry type so we don't really need to handle it. - default: - return "", errors.New("Don't support type : " + tp.String()) - } - - colDefStr := fmt.Sprintf("`%s` %s", cName, typeStr) - - for _, option := range colDef.Options { - if option.Tp == ast.ColumnOptionDefaultValue { - if defaultValue, shouldQuote, err := formatFlashLiteral(option.Expr, colDef.Tp); err != nil { - log.Warn("Cannot compile column", zap.String("name", cName), zap.Error(err)) - } else { - if shouldQuote { - // Do final quote for string types. As we want to quote values like -255, which is hard to quote in lower level. - defaultValue = fmt.Sprintf("'%s'", defaultValue) - } - colDefStr = fmt.Sprintf("%s DEFAULT %s", colDefStr, defaultValue) - } - break - } - } - - return colDefStr, nil -} - -func analyzeColumnPosition(cp *ast.ColumnPosition) (string, error) { - switch cp.Tp { - // case ast.ColumnPositionFirst: - case ast.ColumnPositionAfter: - return fmt.Sprintf("AFTER `%s`", cp.RelativeColumn.Name.L), nil - default: - return "", errors.New("Invalid column position code: " + strconv.Itoa(int(cp.Tp))) - } -} - -func genColumnList(columns []*model.ColumnInfo) string { - var columnList []byte - for _, column := range columns { - colName := column.Name.L - name := fmt.Sprintf("`%s`", colName) - columnList = append(columnList, []byte(name)...) - - columnList = append(columnList, ',') - } - colVersion := fmt.Sprintf("`%s`,", internalVersionColName) - columnList = append(columnList, []byte(colVersion)...) - - colDelFlag := fmt.Sprintf("`%s`", internalDelmarkColName) - columnList = append(columnList, []byte(colDelFlag)...) - - return string(columnList) -} - -func genColumnAndValue(columns []*model.ColumnInfo, columnValues map[int64]types.Datum) ([]*model.ColumnInfo, []interface{}, error) { - var newColumn []*model.ColumnInfo - var newColumnsValues []interface{} - - for _, col := range columns { - val, ok := columnValues[col.ID] - if ok { - newColumn = append(newColumn, col) - value, err := formatFlashData(&val, &col.FieldType) - if err != nil { - return nil, nil, errors.Trace(err) - } - - newColumnsValues = append(newColumnsValues, value) - } - } - - return newColumn, newColumnsValues, nil -} - -func genDispatchKey(table *model.TableInfo, columnValues map[int64]types.Datum) ([]string, error) { - return make([]string, 0), nil -} diff --git a/drainer/translator/flash_util.go b/drainer/translator/flash_util.go deleted file mode 100644 index eca6db773..000000000 --- a/drainer/translator/flash_util.go +++ /dev/null @@ -1,552 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package translator - -import ( - "fmt" - "math" - "math/big" - gotime "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/parser/ast" - "github.com/pingcap/parser/model" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/parser/opcode" - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/types" - "go.uber.org/zap" -) - -const implicitColName = "_tidb_rowid" -const internalVersionColName = "_INTERNAL_VERSION" -const internalDelmarkColName = "_INTERNAL_DELMARK" - -const emptySQL = "select 1" - -func genEmptySQL(reason string) string { - return emptySQL + " -- Reason: " + reason -} - -func fakeImplicitColumn(table *model.TableInfo) *model.ColumnInfo { - for _, col := range table.Columns { - // since we appended a fake key, remove the original keys - if mysql.HasPriKeyFlag(col.Flag) { - col.Flag ^= mysql.PriKeyFlag - } - } - - handleColumn := model.NewExtraHandleColInfo() - // Transform TiDB's default extra handle column name and type into our own. - handleColumn.Name = model.NewCIStr(implicitColName) - handleColumn.Tp = mysql.TypeLonglong - handleColumn.State = model.StatePublic - table.Columns = append(table.Columns, handleColumn) - - table.PKIsHandle = true - return handleColumn -} - -func isPrimaryKeyColumn(colDef *ast.ColumnDef) bool { - for _, option := range colDef.Options { - if option.Tp == ast.ColumnOptionPrimaryKey && - isHandleTypeColumn(colDef) { - return true - } - } - return false -} - -func pkHandleColumn(table *model.TableInfo) *model.ColumnInfo { - for _, col := range table.Columns { - if IsPKHandleColumn(table, col) { - return col - } - } - - return nil -} - -func isNullable(colDef *ast.ColumnDef) bool { - if isPrimaryKeyColumn(colDef) { - return false - } - for _, option := range colDef.Options { - if option.Tp == ast.ColumnOptionNotNull { - return false - } - } - return true -} - -func isHandleTypeColumn(colDef *ast.ColumnDef) bool { - tp := colDef.Tp.Tp - return tp == mysql.TypeTiny || - tp == mysql.TypeShort || - tp == mysql.TypeInt24 || - tp == mysql.TypeLong || - tp == mysql.TypeLonglong -} - -func makeRow(pk int64, values []interface{}, version uint64, delFlag uint8) []interface{} { - var row []interface{} - row = append(row, pk) - row = append(row, values...) - row = append(row, version) - row = append(row, delFlag) - return row -} - -func makeInternalVersionValue(ver uint64) uint64 { - return ver -} - -func makeInternalDelmarkValue(del bool) uint8 { - if del { - return uint8(1) - } - return uint8(0) -} - -// Convert datum to CH raw data, data type must be strictly matching the rules in analyzeColumnDef. -func formatFlashData(data *types.Datum, ft *types.FieldType) (interface{}, error) { - if data.GetValue() == nil { - return nil, nil - } - - switch ft.Tp { - case mysql.TypeBit: // UInt64 - ui, err := data.GetMysqlBit().ToInt(nil) - if err != nil { - return data, errors.Trace(err) - } - return ui, nil - case mysql.TypeTiny: // UInt8/Int8 - if mysql.HasUnsignedFlag(ft.Flag) { - return uint8(data.GetInt64()), nil - } - return int8(data.GetInt64()), nil - case mysql.TypeShort: // UInt16/Int16 - if mysql.HasUnsignedFlag(ft.Flag) { - return uint16(data.GetInt64()), nil - } - return int16(data.GetInt64()), nil - case mysql.TypeYear: // Int16 - return int16(data.GetInt64()), nil - case mysql.TypeLong, mysql.TypeInt24: // UInt32/Int32 - if mysql.HasUnsignedFlag(ft.Flag) { - return uint32(data.GetInt64()), nil - } - return int32(data.GetInt64()), nil - case mysql.TypeFloat: // Float32 - return data.GetFloat32(), nil - case mysql.TypeDouble: // Float64 - return data.GetFloat64(), nil - case mysql.TypeNewDecimal, mysql.TypeDecimal: // Decimal - dec := data.GetMysqlDecimal() - bin, err := mysqlDecimalToCHDecimalBin(ft, dec) - if err != nil { - log.Warn("Corrupted decimal data, will leave it zero.", zap.Reflect("data", data.GetMysqlDecimal())) - bin = make([]byte, 64) - } - return bin, nil - case mysql.TypeDate, mysql.TypeNewDate: // Int64 - mysqlTime := data.GetMysqlTime() - var result = getUnixTimeSafe(mysqlTime, gotime.UTC) - return result, nil - case mysql.TypeDatetime, mysql.TypeTimestamp: // Int64 - mysqlTime := data.GetMysqlTime() - // Need to consider timezone for DateTime and Timestamp, which are mapped to timezone-sensitive DateTime in CH. - var result = getUnixTimeSafe(mysqlTime, gotime.Local) - return result, nil - case mysql.TypeDuration: // Int64 - num, err := data.GetMysqlDuration().ToNumber().ToInt() - if err != nil { - log.Warn("Corrupted Duration data, will leave it zero.", zap.Reflect("data", data.GetMysqlDuration())) - num = 0 - } - return num, nil - case mysql.TypeLonglong: // UInt64/Int64 - if mysql.HasUnsignedFlag(ft.Flag) { - return data.GetUint64(), nil - } - return data.GetInt64(), nil - case mysql.TypeString, mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString: // String - return data.GetString(), nil - case mysql.TypeEnum: // Int16 - return int16(data.GetMysqlEnum().Value), nil - case mysql.TypeSet: // String - return data.GetMysqlSet().String(), nil - case mysql.TypeJSON: // String - return data.GetMysqlJSON().String(), nil - case mysql.TypeGeometry: - // TiDB doesn't have Geometry type, so put it null. - return nil, nil - } - - return nil, nil -} - -// Poor man's expression eval function, that is mostly used for DDL that refers constant expressions, -// such as default value in CREATE/ALTER TABLE. -// Support very limited expression types: ValueExpr/Function(current_timestamp)/UnaryOp(-) -func formatFlashLiteral(expr ast.ExprNode, ft *types.FieldType) (string, bool, error) { - shouldQuote := false - switch ft.Tp { - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString, mysql.TypeSet: - shouldQuote = true - } - switch e := expr.(type) { - case ast.ValueExpr: - value := new(types.Datum) - value.SetValue(e.GetValue()) - if value.GetValue() == nil { - return "NULL", false, nil - } - switch ft.Tp { - case mysql.TypeNull: - return "NULL", false, nil - // case mysql.TypeJSON, mysql.TypeGeometry: - // TiDB doesn't allow default value for JSON types, and doesn't have Geometry at all. - default: - // Do conversion. - converted, err := convertValueType(*value, expr.GetType(), ft) - if err != nil { - return "", false, errors.Trace(err) - } - return fmt.Sprintf("%v", converted), shouldQuote, nil - } - case *ast.FuncCallExpr: - // Evaluate current time immediately as CH won't use the instant value for now() and gives different values for each time data is retrieved. - if e.FnName.L == ast.CurrentTimestamp { - t := types.NewTimeDatum(types.CurrentTime(e.GetType().Tp)) - return fmt.Sprintf("'%v'", t.GetMysqlTime().String()), shouldQuote, nil - } - return "", false, fmt.Errorf("function expression %s is not supported", e.FnName) - case *ast.UnaryOperationExpr: - op := "" - switch e.Op { - case opcode.Minus: - if ft.Tp != mysql.TypeYear { - // Year will ignore the heading -. - op = "-" - } - case opcode.Plus: - if ft.Tp != mysql.TypeYear { - // Year will ignore the heading +. - op = "+" - } - default: - return "", false, fmt.Errorf("op %s is not supported", e.Op.String()) - } - child, _, err := formatFlashLiteral(e.V, ft) - if err != nil { - return "", false, errors.Trace(err) - } - return fmt.Sprintf("%s%s", op, child), shouldQuote, nil - default: - return "", false, fmt.Errorf("expression %v is not supported", e) - } -} - -// Poor man's data conversion function. -// Support very limited conversions, such as among numeric/string/date/time. -func convertValueType(data types.Datum, source *types.FieldType, target *types.FieldType) (interface{}, error) { - switch target.Tp { - case mysql.TypeSet: - var set types.Set - var err error - switch source.Tp { - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: - set, err = types.ParseSetName(target.Elems, data.GetString()) - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: - set, err = types.ParseSetValue(target.Elems, data.GetUint64()) - } - if err != nil { - return nil, errors.Trace(err) - } - return escapeString(set.Name), nil - case mysql.TypeEnum: - switch source.Tp { - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: - return fmt.Sprintf("'%s'", escapeString(data.GetString())), nil - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: - return data.GetInt64(), nil - } - case mysql.TypeDate, mysql.TypeNewDate: - // Towards date types, either 'YYYY-MM-DD hh:mm:ss' or 'YYYY-MM-DD' is OK. - var mysqlTime types.Time - var err error - switch source.Tp { - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: - mysqlTime, err = types.ParseTime(&stmtctx.StatementContext{TimeZone: gotime.UTC}, data.GetString(), target.Tp, 0) - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: - mysqlTime, err = types.ParseTimeFromInt64(&stmtctx.StatementContext{TimeZone: gotime.UTC}, data.GetInt64()) - case mysql.TypeFloat, mysql.TypeDouble, mysql.TypeNewDecimal: - mysqlTime, err = types.ParseTimeFromFloatString(&stmtctx.StatementContext{TimeZone: gotime.UTC}, data.GetMysqlDecimal().String(), target.Tp, 0) - } - if err != nil { - return nil, errors.Trace(err) - } - return fmt.Sprintf("'%v'", mysqlTime), nil - case mysql.TypeTimestamp, mysql.TypeDatetime: - // Towards time types, convert to string formatted as 'YYYY-MM-DD hh:mm:ss' - var mysqlTime types.Time - var err error - switch source.Tp { - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: - mysqlTime, err = types.ParseTime(&stmtctx.StatementContext{TimeZone: gotime.Local}, data.GetString(), target.Tp, 0) - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: - mysqlTime, err = types.ParseTimeFromInt64(&stmtctx.StatementContext{TimeZone: gotime.Local}, data.GetInt64()) - case mysql.TypeFloat, mysql.TypeDouble, mysql.TypeNewDecimal: - mysqlTime, err = types.ParseTimeFromFloatString(&stmtctx.StatementContext{TimeZone: gotime.Local}, data.GetMysqlDecimal().String(), target.Tp, 0) - } - if err != nil { - return nil, errors.Trace(err) - } - formatted, err := mysqlTime.DateFormat("%Y-%m-%d %H:%i:%S") - if err != nil { - return nil, errors.Trace(err) - } - return fmt.Sprintf("'%s'", formatted), nil - case mysql.TypeDuration: - // Towards duration type, convert to gotime.Duration. - var duration types.Duration - var err error - switch source.Tp { - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: - duration, err = types.ParseDuration(new(stmtctx.StatementContext), data.GetString(), 0) - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: - duration, err = types.ParseDuration(new(stmtctx.StatementContext), fmt.Sprintf("%v", data.GetInt64()), 0) - case mysql.TypeFloat, mysql.TypeDouble, mysql.TypeNewDecimal: - duration, err = types.ParseDuration(new(stmtctx.StatementContext), data.GetMysqlDecimal().String(), 0) - } - if err != nil { - return nil, errors.Trace(err) - } - return duration.ToNumber().ToInt() - case mysql.TypeYear: - var year int16 - var err error - switch source.Tp { - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: - // As TiDB allows string literal like '1998.0' to be year value, and ParseYear() will error out for it, we need to cast to integer by ourselves. - var d float64 - _, err = fmt.Sscanf(data.GetString(), "%f", &d) - if err != nil { - return nil, errors.Trace(err) - } - year, err = types.ParseYear(fmt.Sprintf("%d", int64(math.Abs(d)))) - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: - year, err = types.ParseYear(fmt.Sprintf("%v", data.GetInt64())) - case mysql.TypeFloat, mysql.TypeDouble, mysql.TypeNewDecimal: - var d float64 - d, err = data.GetMysqlDecimal().ToFloat64() - if err != nil { - return nil, errors.Trace(err) - } - year, err = types.ParseYear(fmt.Sprintf("%d", int64(math.Abs(d)))) - } - if err != nil { - return nil, errors.Trace(err) - } - return year, nil - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: - // Towards string types, escape it. Will single-quote in upper logic. - s := "" - switch source.Tp { - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: - s = data.GetString() - case mysql.TypeDecimal, mysql.TypeFloat, mysql.TypeDouble, mysql.TypeNewDecimal: - s = fmt.Sprintf("%v", data.GetMysqlDecimal()) - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: - s = fmt.Sprintf("%v", data.GetValue()) - } - return escapeString(s), nil - case mysql.TypeBit: - // Towards bit, convert to raw bytes and return as uint64. - switch source.Tp { - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: - return data.GetMysqlBit().ToInt(nil) - case mysql.TypeFloat, mysql.TypeDecimal, mysql.TypeDouble, mysql.TypeNewDecimal: - // TiDB rounds float to uint for bit. - f, err := data.GetMysqlDecimal().ToFloat64() - if err != nil { - return nil, errors.Trace(err) - } - f = types.RoundFloat(f) - return uint64(int64(f)), nil - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: - return data.GetValue(), nil - } - case mysql.TypeDecimal, mysql.TypeNewDecimal, mysql.TypeFloat, mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeDouble, mysql.TypeLonglong, mysql.TypeInt24: - // Towards numeric types, do really conversion. - switch source.Tp { - case mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: - return data.GetString(), nil - case mysql.TypeDecimal, mysql.TypeFloat, mysql.TypeDouble, mysql.TypeNewDecimal: - return data.GetMysqlDecimal(), nil - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: - return data.GetValue(), nil - } - } - return nil, errors.Errorf("Unable to convert data %v from type %s to type %s.", data, source.String(), target.String()) -} - -func getUnixTimeSafe(mysqlTime types.Time, tz *gotime.Location) int64 { - if mysqlTime.IsZero() { - return 0 - } - time := mysqlTime.Time - goTime := gotime.Date(time.Year(), gotime.Month(time.Month()), time.Day(), time.Hour(), time.Minute(), time.Second(), time.Microsecond()*1000, tz) - return goTime.Unix() -} - -// Escape a string to CH string literal. -// See: http://clickhouse-docs.readthedocs.io/en/latest/query_language/syntax.html -func escapeString(s string) string { - escaped := "" - for _, c := range s { - switch c { - case '\\': - escaped += "\\\\" - case '\'': - escaped += "\\'" - case '\b': - escaped += "\\b" - case '\f': - escaped += "\\f" - case '\r': - escaped += "\\r" - case '\n': - escaped += "\\n" - case '\t': - escaped += "\\t" - case 0: - escaped += "\\0" - default: - escaped += string(c) - } - } - return escaped -} - -// Transform a MyDecimal to CH Decimal binary. -func mysqlDecimalToCHDecimalBin(ft *types.FieldType, d *types.MyDecimal) ([]byte, error) { - const ( - ten0 = 1 - ten1 = 10 - ten2 = 100 - ten3 = 1000 - ten4 = 10000 - ten5 = 100000 - ten6 = 1000000 - ten7 = 10000000 - ten8 = 100000000 - ten9 = 1000000000 - - digitsPerWord = 9 // A word holds 9 digits. - wordSize = 4 // A word is 4 bytes int32. - wordBase = ten9 - ) - - var ( - powers10 = [10]int64{ten0, ten1, ten2, ten3, ten4, ten5, ten6, ten7, ten8, ten9} - dig2bytes = [10]int{0, 1, 1, 2, 2, 3, 3, 4, 4, 4} - ) - - precision, frac, intdigits := ft.Flen, ft.Decimal, ft.Flen-ft.Decimal - myBytes, err := d.ToBin(precision, frac) - if err != nil { - return nil, errors.Trace(err) - } - - // Calculate offsets. - leadingBytes := dig2bytes[intdigits%digitsPerWord] - alignedFrom, alignedTo := leadingBytes, leadingBytes+intdigits/digitsPerWord*wordSize+frac/digitsPerWord*wordSize - trailingDigits := frac % digitsPerWord - trailingBytes := dig2bytes[trailingDigits] - - // Get mask. - mask := int32(-1) - if myBytes[0]&0x80 > 0 { - mask = 0 - } - - // Flip the very first bit. - myBytes[0] ^= 0x80 - - // Accumulate the word value into big.Int. - var digitsGoInt, baseGoInt = big.NewInt(0), big.NewInt(wordBase) - if leadingBytes > 0 { - leadingInt := int64(0) - for i := 0; i < leadingBytes; i++ { - leadingInt = leadingInt<<8 + int64(myBytes[i]^byte(mask)) - } - digitsGoInt.Add(digitsGoInt, big.NewInt(leadingInt)) - } - for i := alignedFrom; i < alignedTo; i += wordSize { - word := int32(myBytes[i])<<24 + int32(myBytes[i+1])<<16 + int32(myBytes[i+2])<<8 + int32(myBytes[i+3]) - word ^= mask - digitsGoInt.Mul(digitsGoInt, baseGoInt) - digitsGoInt.Add(digitsGoInt, big.NewInt(int64(word))) - } - if trailingBytes > 0 { - trailingFrac := int64(0) - for i := 0; i < trailingBytes; i++ { - trailingFrac = trailingFrac<<8 + int64(myBytes[alignedTo+i]^byte(mask)) - } - digitsGoInt.Mul(digitsGoInt, big.NewInt(powers10[trailingDigits])) - digitsGoInt.Add(digitsGoInt, big.NewInt(trailingFrac)) - } - - // Get bytes and swap to little-endian. - bin := digitsGoInt.Bytes() - for i := 0; i < len(bin)/2; i++ { - bin[i], bin[len(bin)-1-i] = bin[len(bin)-1-i], bin[i] - } - - // Pack 32-byte value part for CH Decimal. - if len(bin) > 32 { - return nil, errors.Errorf("Decimal out of range.") - } - chBin := append(bin, make([]byte, 32-len(bin))...) - - // Append limbs. - limbs := int16(math.Ceil(float64(len(bin)) / 8.0)) - chBin = append(chBin, byte(limbs), byte(limbs>>8)) - - // Append sign. - if d.IsNegative() { - chBin = append(chBin, byte(1)) - } else { - chBin = append(chBin, byte(0)) - } - chBin = append(chBin, byte(0)) - - // Padding to 48 bytes. - chBin = append(chBin, make([]byte, 12)...) - - // Append precision. - chBin = append(chBin, byte(precision), byte(precision>>8)) - - // Append scale. - chBin = append(chBin, byte(frac), byte(frac>>8)) - - // Padding to 64 bytes. - chBin = append(chBin, make([]byte, 12)...) - - return chBin, nil -} diff --git a/drainer/util.go b/drainer/util.go index 5ef02aecc..9543cdf3c 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -113,13 +113,7 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) { case "kafka": checkpointCfg.CheckpointType = "file" case "flash": - checkpointCfg.CheckpointType = "flash" - checkpointCfg.Db = &checkpoint.DBConfig{ - Host: cfg.SyncerCfg.To.Host, - User: cfg.SyncerCfg.To.User, - Password: cfg.SyncerCfg.To.Password, - Port: cfg.SyncerCfg.To.Port, - } + return nil, errors.New("the flash DestDBType is no longer supported") default: return nil, errors.Errorf("unknown DestDBType: %s", cfg.SyncerCfg.DestDBType) } diff --git a/go.mod b/go.mod index ccc860914..73e79f2c4 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,6 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d - github.com/zanmato1984/clickhouse v1.3.4-0.20181106115746-3e9a6b9beb12 go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 go.uber.org/zap v1.12.0 golang.org/x/net v0.0.0-20190909003024-a7b16738d86b diff --git a/go.sum b/go.sum index bb8ff05f6..75b9fa666 100644 --- a/go.sum +++ b/go.sum @@ -344,8 +344,6 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Ya9AIoYBpE= -github.com/zanmato1984/clickhouse v1.3.4-0.20181106115746-3e9a6b9beb12 h1:xY43AJc1/ADHYp7zbGKzvFeH8SnNCNjJ60fYuOsc8LQ= -github.com/zanmato1984/clickhouse v1.3.4-0.20181106115746-3e9a6b9beb12/go.mod h1:OfQR+gm56UQgWasTZbusI1U1yS7i+gCFd1V/Dt0wEyg= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= diff --git a/tests/binlog/drainer.toml b/tests/binlog/drainer.toml index 236b8425c..fa00cc142 100644 --- a/tests/binlog/drainer.toml +++ b/tests/binlog/drainer.toml @@ -37,7 +37,7 @@ worker-count = 20 safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "file", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "kafka" db-type = "mysql" ##replicate-do-db priority over replicate-do-table if have same db name @@ -58,9 +58,6 @@ host = "127.0.0.1" user = "root" password = "" port = 3306 -# Time and size limits for flash batch write -# time-limit = "30s" -# size-limit = "100000" [syncer.to.checkpoint] #schema = "tidb_binlog" diff --git a/tests/filter/drainer.toml b/tests/filter/drainer.toml index e510ad2c9..b92feb69c 100644 --- a/tests/filter/drainer.toml +++ b/tests/filter/drainer.toml @@ -26,7 +26,7 @@ worker-count = 1 safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "file", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "kafka" db-type = "mysql" # disable sync these schema @@ -59,9 +59,6 @@ host = "127.0.0.1" user = "root" password = "" port = 3306 -# Time and size limits for flash batch write -# time-limit = "30s" -# size-limit = "100000" [syncer.to.checkpoint] #schema = "tidb_binlog" diff --git a/tests/flash/config.toml b/tests/flash/config.toml deleted file mode 100644 index 0e6322746..000000000 --- a/tests/flash/config.toml +++ /dev/null @@ -1,26 +0,0 @@ -# Importer Configuration. - -log-level = "info" - -worker-count = 10 -job-count = 1000 -batch = 10 - -[source-db] -host = "127.0.0.1" -user = "root" -password = "" -name = "test" -port = 4000 - -[target-db] -host = "127.0.0.1:9000" -user = "" -password = "" -name = "test" - -[diff] -equal-index = false -equal-create-table = false -equal-row-count = true -equal-data = false diff --git a/tests/flash/flash.go b/tests/flash/flash.go deleted file mode 100644 index 0171fa211..000000000 --- a/tests/flash/flash.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "flag" - "fmt" - "os" - - _ "github.com/go-sql-driver/mysql" - "github.com/pingcap/errors" - "github.com/pingcap/log" - pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" - "github.com/pingcap/tidb-binlog/tests/dailytest" - "github.com/pingcap/tidb-binlog/tests/util" - _ "github.com/zanmato1984/clickhouse" -) - -func main() { - cfg := util.NewConfig() - err := cfg.Parse(os.Args[1:]) - switch errors.Cause(err) { - case nil: - case flag.ErrHelp: - os.Exit(0) - default: - log.S().Errorf("parse cmd flags err %s\n", err) - os.Exit(2) - } - - sourceDB, err := util.CreateDB(cfg.SourceDBCfg) - if err != nil { - log.S().Fatal(err) - } - defer func() { - if err := util.CloseDB(sourceDB); err != nil { - log.S().Errorf("Failed to close source database: %s\n", err) - } - }() - - targetAddr, err := pkgsql.ParseCHAddr(cfg.TargetDBCfg.Host) - if err != nil { - log.S().Fatal(err) - } - if len(targetAddr) != 1 { - log.S().Fatal("only support 1 flash node so far.") - } - targetDB, err := pkgsql.OpenCH(targetAddr[0].Host, targetAddr[0].Port, cfg.TargetDBCfg.User, cfg.TargetDBCfg.Password, "default", 0) - if err != nil { - log.S().Fatal(err) - } - defer func() { - if err := util.CloseDB(targetDB); err != nil { - log.S().Errorf("Failed to close target database: %s\n", err) - } - }() - _, err = targetDB.Exec(fmt.Sprintf("drop database if exists %s", cfg.TargetDBCfg.Name)) - if err != nil { - log.S().Fatal(err) - } - _, err = targetDB.Exec(fmt.Sprintf("create database %s", cfg.TargetDBCfg.Name)) - if err != nil { - log.S().Fatal(err) - } - _, err = targetDB.Exec(fmt.Sprintf("use %s", cfg.TargetDBCfg.Name)) - if err != nil { - log.S().Fatal(err) - } - - dailytest.Run(sourceDB, targetDB, cfg.TargetDBCfg.Name, cfg.WorkerCount, cfg.JobCount, cfg.Batch) -} diff --git a/tests/kafka/drainer.toml b/tests/kafka/drainer.toml index 386a104da..bf5fba964 100644 --- a/tests/kafka/drainer.toml +++ b/tests/kafka/drainer.toml @@ -20,12 +20,9 @@ worker-count = 1 safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "file", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "kafka" db-type = "kafka" -# Time and size limits for flash batch write -# time-limit = "30s" -# size-limit = "100000" [syncer.to.checkpoint] #schema = "tidb_binlog" type = "mysql" diff --git a/tests/reparo/drainer.toml b/tests/reparo/drainer.toml index 8f9de6057..7251b6369 100644 --- a/tests/reparo/drainer.toml +++ b/tests/reparo/drainer.toml @@ -29,7 +29,7 @@ worker-count = 20 safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "file", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "kafka" db-type = "file" #[syncer.to] diff --git a/tests/restart/drainer.toml b/tests/restart/drainer.toml index 8e5f028ae..7836a2ad7 100644 --- a/tests/restart/drainer.toml +++ b/tests/restart/drainer.toml @@ -29,7 +29,7 @@ worker-count = 20 safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "file", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "kafka" db-type = "mysql" # the downstream mysql protocol database diff --git a/tests/status/drainer.toml b/tests/status/drainer.toml index 2808065d7..a9d9bacb9 100644 --- a/tests/status/drainer.toml +++ b/tests/status/drainer.toml @@ -29,7 +29,7 @@ worker-count = 1 safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "file", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "kafka" db-type = "file" #[syncer.to]