diff --git a/drainer/collector.go b/drainer/collector.go index 23fb7d189..bca6c161d 100644 --- a/drainer/collector.go +++ b/drainer/collector.go @@ -283,9 +283,13 @@ func (c *Collector) syncBinlog(item *binlogItem) error { log.Info("get ddl job", zap.Stringer("job", job)) - if skipJob(job) { + isDelOnlyEvent := model.SchemaState(binlog.DdlSchemaState) == model.StateDeleteOnly + if skipJob(job) && !isDelOnlyEvent { return nil } + if isDelOnlyEvent { + job.SchemaState = model.StateDeleteOnly + } item.SetJob(job) ddlJobsCounter.Add(float64(1)) } diff --git a/drainer/schema.go b/drainer/schema.go index a17f62206..ac14d31c2 100644 --- a/drainer/schema.go +++ b/drainer/schema.go @@ -37,6 +37,7 @@ type Schema struct { tables map[int64]*model.TableInfo truncateTableID map[int64]struct{} + tblsDroppingCol map[int64]bool schemaMetaVersion int64 @@ -56,6 +57,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) { hasImplicitCol: hasImplicitCol, version2SchemaTable: make(map[int64]TableName), truncateTableID: make(map[int64]struct{}), + tblsDroppingCol: make(map[int64]bool), jobs: jobs, } @@ -231,25 +233,27 @@ func (s *Schema) addJob(job *model.Job) { func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error { var i int for i = 0; i < len(s.jobs); i++ { - if skipJob(s.jobs[i]) { - log.Debug("skip ddl job", zap.Stringer("job", s.jobs[i])) + job := s.jobs[i] + + if job.BinlogInfo.SchemaVersion > version { + break + } + + if job.BinlogInfo.SchemaVersion <= s.currentVersion { + log.Warn("ddl job schema version is less than current version, skip this ddl job", + zap.Stringer("job", job), + zap.Int64("currentVersion", s.currentVersion)) continue } - if s.jobs[i].BinlogInfo.SchemaVersion <= version { - if s.jobs[i].BinlogInfo.SchemaVersion <= s.currentVersion { - log.Warn("ddl job schema version is less than current version, skip this ddl job", - zap.Stringer("job", s.jobs[i]), - zap.Int64("currentVersion", s.currentVersion)) - continue - } - - _, _, _, err := s.handleDDL(s.jobs[i]) - if err != nil { - return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s) - } - } else { - break + if job.SchemaState == model.StateDeleteOnly && job.Type == model.ActionDropColumn { + s.tblsDroppingCol[job.TableID] = true + log.Info("Got DeleteOnly Job", zap.Stringer("job", job)) + continue + } + _, _, _, err := s.handleDDL(job) + if err != nil { + return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s) } } @@ -264,12 +268,13 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error { // the third value[string]: the sql that is corresponding to the job // the fourth value[error]: the handleDDL execution's err func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { - log.Debug("handle job: ", zap.Stringer("job", job)) - if skipJob(job) { + log.Debug("Skip job", zap.Stringer("job", job)) return "", "", "", nil } + log.Debug("Handle job", zap.Stringer("job", job)) + sql = job.Query if sql == "" { return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job) @@ -426,11 +431,21 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, s.currentVersion = job.BinlogInfo.SchemaVersion schemaName = schema.Name.O tableName = tbInfo.Name.O + + if job.Type == model.ActionDropColumn { + log.Info("Finished dropping column", zap.Stringer("job", job)) + delete(s.tblsDroppingCol, job.TableID) + } } return } +// IsDroppingColumn returns true if the table is in the middle of dropping a column +func (s *Schema) IsDroppingColumn(id int64) bool { + return s.tblsDroppingCol[id] +} + // IsTruncateTableID returns true if the table id have been truncated by truncate table DDL func (s *Schema) IsTruncateTableID(id int64) bool { _, ok := s.truncateTableID[id] diff --git a/drainer/schema_test.go b/drainer/schema_test.go index 876d4e2e9..f5a070f53 100644 --- a/drainer/schema_test.go +++ b/drainer/schema_test.go @@ -56,7 +56,7 @@ func (t *schemaSuite) TestSchema(c *C) { jobs = append(jobs, job) // construct a rollbackdone job - jobs = append(jobs, &model.Job{ID: 5, State: model.JobStateRollbackDone}) + jobs = append(jobs, &model.Job{ID: 5, State: model.JobStateRollbackDone, BinlogInfo: &model.HistoryInfo{}}) // reconstruct the local schema schema, err := NewSchema(jobs, false) diff --git a/drainer/syncer.go b/drainer/syncer.go index 4c258d2b5..7bc10b7c5 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -362,7 +362,7 @@ ForLoop: lastAddComitTS = binlog.GetCommitTs() err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite}) if err != nil { - err = errors.Annotate(err, "add to dsyncer failed") + err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs) break ForLoop } executeHistogram.Observe(time.Since(beginTime).Seconds()) @@ -383,6 +383,11 @@ ForLoop: break ForLoop } + if b.job.SchemaState == model.StateDeleteOnly && b.job.Type == model.ActionDropColumn { + log.Info("Syncer skips DeleteOnly DDL", zap.Stringer("job", b.job), zap.Int64("ts", b.GetCommitTs())) + continue + } + sql := b.job.Query var schema, table string schema, table, err = s.schema.getSchemaTableAndDelete(b.job.BinlogInfo.SchemaVersion) @@ -404,7 +409,7 @@ ForLoop: err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table}) if err != nil { - err = errors.Annotate(err, "add to dsyncer failed") + err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs) break ForLoop } executeHistogram.Observe(time.Since(beginTime).Seconds()) diff --git a/drainer/translator/flash.go b/drainer/translator/flash.go index 85e6e70c8..f4406f387 100644 --- a/drainer/translator/flash.go +++ b/drainer/translator/flash.go @@ -42,6 +42,7 @@ func GenFlashSQLs(infoGetter TableInfoGetter, pv *tipb.PrewriteValue, commitTS i if !ok { return nil, nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId()) } + isTblDroppingCol := infoGetter.IsDroppingColumn(mut.GetTableId()) var schema string schema, _, ok = infoGetter.SchemaAndTableName(mut.GetTableId()) @@ -68,7 +69,7 @@ func GenFlashSQLs(infoGetter TableInfoGetter, pv *tipb.PrewriteValue, commitTS i return nil, nil, errors.Annotate(err, "gen insert sql fail") } case tipb.MutationType_Update: - sql, args, err = GenFlashUpdateSQL(schema, info, row, commitTS) + sql, args, err = GenFlashUpdateSQL(schema, info, row, commitTS, isTblDroppingCol) if err != nil { return nil, nil, errors.Annotate(err, "gen update sql fail") } @@ -140,7 +141,7 @@ func GenFlashInsertSQL(schema string, table *model.TableInfo, row []byte, commit } // GenFlashUpdateSQL generate the SQL need to execute syncing this update row to Flash -func GenFlashUpdateSQL(schema string, table *model.TableInfo, row []byte, commitTS int64) (sql string, args []interface{}, err error) { +func GenFlashUpdateSQL(schema string, table *model.TableInfo, row []byte, commitTS int64, isTblDroppingCol bool) (sql string, args []interface{}, err error) { schema = strings.ToLower(schema) pkColumn := pkHandleColumn(table) if pkColumn == nil { @@ -148,7 +149,7 @@ func GenFlashUpdateSQL(schema string, table *model.TableInfo, row []byte, commit } pkID := pkColumn.ID - updtDecoder := newUpdateDecoder(table) + updtDecoder := newUpdateDecoder(table, isTblDroppingCol) version := makeInternalVersionValue(uint64(commitTS)) delFlag := makeInternalDelmarkValue(false) diff --git a/drainer/translator/flash_test.go b/drainer/translator/flash_test.go deleted file mode 100644 index 7e50793fa..000000000 --- a/drainer/translator/flash_test.go +++ /dev/null @@ -1,643 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package translator - -import ( - "time" - - . "github.com/pingcap/check" - "github.com/pingcap/parser/model" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/types/json" - _ "github.com/pingcap/tidb/types/parser_driver" - "github.com/pingcap/tidb/util/codec" -) - -var tidbRowID = int64(11) - -func genCommitTS(i int) int64 { - return int64(100 + i) -} - -func (t *testTranslatorSuite) TestFlashGenInsertSQLs(c *C) { - schema := "T" - tables := []*model.TableInfo{testGenTable("normal"), testGenTable("hasPK"), testGenTable("hasID")} - expectedValCounts := []int{7, 7, 6} - expectedSQLs := []string{ - "IMPORT INTO `t`.`account` (`id`,`name`,`sex`,`" + implicitColName + "`,`" + internalVersionColName + "`,`" + internalDelmarkColName + "`) values (?,?,?,?,?,?);", - "IMPORT INTO `t`.`account` (`id`,`name`,`sex`,`" + implicitColName + "`,`" + internalVersionColName + "`,`" + internalDelmarkColName + "`) values (?,?,?,?,?,?);", - "IMPORT INTO `t`.`account` (`id`,`name`,`sex`,`" + internalVersionColName + "`,`" + internalDelmarkColName + "`) values (?,?,?,?,?);", - } - for i, table := range tables { - c.Logf("id: %d, table: %+v", i, table) - rowDatas, expected := testFlashGenRowData(c, table, i, false) - binlog := testFlashGenInsertBinlog(c, table, rowDatas) - sql, vals, err := GenFlashInsertSQL(schema, table, binlog, genCommitTS(i)) - c.Assert(err, IsNil) - c.Assert(vals, HasLen, expectedValCounts[i]) - c.Assert(sql, Equals, expectedSQLs[i]) - c.Assert(vals, DeepEquals, expected) - } - - table := testGenTable("normal") - rowDatas, _ := testFlashGenRowData(c, table, 1, false) - binlog := testFlashGenInsertBinlog(c, table, rowDatas) - _, _, err := GenFlashInsertSQL(schema, tables[0], binlog[6:], 0) - c.Assert(err, NotNil) -} - -func (t *testTranslatorSuite) TestGenUpdateFlashSQLs(c *C) { - schema := "T" - tables := []*model.TableInfo{ - testGenTable("normal"), - testGenTable("hasPK"), - // TODO: Will add this table back when update supports changing primary key. - // testGenTable("hasID"), - } - expectedValCounts := []int{7, 7, 6} - expectedSQLs := []string{ - "IMPORT INTO `t`.`account` (`id`,`name`,`sex`,`" + implicitColName + "`,`" + internalVersionColName + "`,`" + internalDelmarkColName + "`) values (?,?,?,?,?,?);", - "IMPORT INTO `t`.`account` (`id`,`name`,`sex`,`" + implicitColName + "`,`" + internalVersionColName + "`,`" + internalDelmarkColName + "`) values (?,?,?,?,?,?);", - "IMPORT INTO `t`.`account` (`id`,`name`,`sex`,`" + internalVersionColName + "`,`" + internalDelmarkColName + "`) values (?,?,?,?,?);", - } - for i, table := range tables { - oldRowDatas, _ := testFlashGenRowData(c, table, 1, false) - newRowDatas, newExpected := testFlashGenRowData(c, table, i, false) - binlog := testFlashGenUpdateBinlog(c, table, oldRowDatas, newRowDatas) - sql, vals, err := GenFlashUpdateSQL(schema, table, binlog, genCommitTS(i)) - c.Assert(err, IsNil) - c.Assert(len(vals), Equals, expectedValCounts[i]) - c.Assert(sql, Equals, expectedSQLs[i]) - c.Assert(vals, DeepEquals, newExpected) - } - - table := testGenTable("normal") - rowDatas, _ := testFlashGenRowData(c, table, 1, false) - binlog := testFlashGenUpdateBinlog(c, table, rowDatas, rowDatas) - _, _, err := GenFlashUpdateSQL(schema, table, binlog[6:], 0) - c.Assert(err, NotNil) -} - -func (t *testTranslatorSuite) TestFlashGenDeleteSQLs(c *C) { - schema := "T" - tables := []*model.TableInfo{testGenTable("normal"), testGenTable("hasPK"), testGenTable("hasID")} - expectedValCounts := []int{7, 7, 6} - expectedSQLs := []string{ - "IMPORT INTO `t`.`account` (`id`,`name`,`sex`,`" + implicitColName + "`,`" + internalVersionColName + "`,`" + internalDelmarkColName + "`) values (?,?,?,?,?,?);", - "IMPORT INTO `t`.`account` (`id`,`name`,`sex`,`" + implicitColName + "`,`" + internalVersionColName + "`,`" + internalDelmarkColName + "`) values (?,?,?,?,?,?);", - "IMPORT INTO `t`.`account` (`id`,`name`,`sex`,`" + internalVersionColName + "`,`" + internalDelmarkColName + "`) values (?,?,?,?,?);", - } - for i, t := range tables { - rowDatas, expected := testFlashGenRowData(c, t, i, true) - binlog := testFlashGenDeleteBinlog(c, t, rowDatas) - sql, vals, err := GenFlashDeleteSQL(schema, t, binlog, genCommitTS(i)) - c.Assert(err, IsNil) - c.Assert(len(vals), Equals, expectedValCounts[i]) - c.Assert(sql, Equals, expectedSQLs[i]) - c.Assert(vals, DeepEquals, expected) - } - - table := testGenTable("normal") - rowDatas, _ := testFlashGenRowData(c, table, 1, true) - binlog := testFlashGenDeleteBinlog(c, table, rowDatas) - _, _, err := GenFlashDeleteSQL(schema, table, binlog[6:], 0) - c.Assert(err, NotNil) -} - -func (t *testTranslatorSuite) TestFlashGenDDLSQL(c *C) { - schema := "Test_Schema" - dtRegex := "[0-9]{4}-[0-1][0-9]-[0-3][0-9] [0-2][0-9]:[0-5][0-9]:[0-5][0-9]" - check := func(ddl string, checker Checker, expected string) { - gen, err := GenFlashDDLSQL(ddl, schema) - c.Assert(err, IsNil) - c.Assert(gen, checker, expected) - } - - check("create database "+schema, - Equals, - "CREATE DATABASE IF NOT EXISTS `test_schema`;") - - check("drop database "+schema, - Equals, - "DROP DATABASE `test_schema`;") - - // Primary keys. - check("create table Test(I int, f float)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`i` Nullable(Int32),`f` Nullable(Float32)) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - check("create table Test(I int, f float, primary key(i))", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`i` Int32,`f` Nullable(Float32)) ENGINE MutableMergeTree((`i`), 8192);") - check("create table Test(I int primary key, f float not null)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`i` Int32,`f` Float32) ENGINE MutableMergeTree((`i`), 8192);") - check("create table Test(I int, f float, primary key(i, f))", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`i` Nullable(Int32),`f` Nullable(Float32)) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - // Numeric types, with unsigned, nullable and default value attributes. - check("create table Test(bT bit, I int, T tinyint, M mediumint, B bigint, F float, D double, DE decimal(38, 10))", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`bt` Nullable(UInt64),`i` Nullable(Int32),`t` Nullable(Int8),`m` Nullable(Int32),`b` Nullable(Int64),`f` Nullable(Float32),`d` Nullable(Float64),`de` Nullable(Decimal(38, 10))) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - check("create table Test(I int unsigned, T tinyint unsigned, M mediumint unsigned, B bigint unsigned, F float unsigned, D double unsigned, DE decimal unsigned)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`i` Nullable(UInt32),`t` Nullable(UInt8),`m` Nullable(UInt32),`b` Nullable(UInt64),`f` Nullable(Float32),`d` Nullable(Float64),`de` Nullable(Decimal(11, 0))) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - check("create table Test(BT bit not null, I int not null, T tinyint not null, M mediumint not null, B bigint not null, F float not null, D double not null, DE decimal not null)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`bt` UInt64,`i` Int32,`t` Int8,`m` Int32,`b` Int64,`f` Float32,`d` Float64,`de` Decimal(11, 0)) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - check("create table Test(Bt bit default 255, I int default null, T tinyint unsigned default 1, M mediumint not null default -2.0, B bigint unsigned not null default 100, F float not null default 1234.56, D double not null default 8765.4321, DE decimal not null default 0)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`bt` Nullable(UInt64) DEFAULT 255,`i` Nullable(Int32) DEFAULT NULL,`t` Nullable(UInt8) DEFAULT 1,`m` Int32 DEFAULT -2.0,`b` UInt64 DEFAULT 100,`f` Float32 DEFAULT 1234.56,`d` Float64 DEFAULT 8765.4321,`de` Decimal(11, 0) DEFAULT 0) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - check("create table Test(F float not null default 1234, D double not null default '8765.4321', DE decimal not null default 42)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`f` Float32 DEFAULT 1234,`d` Float64 DEFAULT 8765.4321,`de` Decimal(11, 0) DEFAULT 42) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - // String types, with default value attribute. - check("create table Test(C Char(10) not null, vC Varchar(255) not null, B BLOB not null, t tinyblob not null, m MediumBlob not null, L longblob not null)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`c` String,`vc` String,`b` String,`t` String,`m` String,`l` String) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - check("create table Test(C Char(10) default NULL, vC Varchar(255) not null default 'abc', B BLOB not null default \"\", t tinyblob not null default '', m MediumBlob not null default \"def\", L longblob not null default 1234.5)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`c` Nullable(String) DEFAULT NULL,`vc` String DEFAULT 'abc',`b` String DEFAULT '',`t` String DEFAULT '',`m` String DEFAULT 'def',`l` String DEFAULT '1234.5') ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - // Date/time types, with default value attribute. - check("create table Test(DT Date not null, tM Time not null, dTTm DateTime not null, ts timestamp not null, y year not null)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`dt` Date,`tm` Int64,`dttm` DateTime,`ts` DateTime,`y` Int16) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - check("create table Test(DT Date not null default '0000-00-00', tM Time default -1, dTTm DateTime not null default \"2018-01-01 13:13:13\", ts timestamp not null default current_timestamp(), y year not null default +1984)", - Matches, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` \\(`"+implicitColName+"` Int64,`dt` Date DEFAULT '0000-00-00',`tm` Nullable\\(Int64\\) DEFAULT -1,`dttm` DateTime DEFAULT '2018-01-01 13:13:13',`ts` DateTime DEFAULT '"+dtRegex+"',`y` Int16 DEFAULT 1984\\) ENGINE MutableMergeTree\\(\\(`"+implicitColName+"`\\), 8192\\);") - // Enum type, with default value attribute. - check("create table Test(E Enum('a', 'b') not null)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`e` Enum16(''=0,'a'=1,'b'=2)) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - check("create table Test(E Enum('a', '', 'b') not null)", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`test` (`"+implicitColName+"` Int64,`e` Enum16('a'=1,''=2,'b'=3)) ENGINE MutableMergeTree((`"+implicitColName+"`), 8192);") - - // Default value conversions using alter table statement, as the result is relatively simpler. - // Bit. - check("alter table Test add column bT bit default null", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `bt` Nullable(UInt64) DEFAULT NULL;") - check("alter table Test add column bT bit not null default 255", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `bt` UInt64 DEFAULT 255;") - check("alter table Test add column bT bit not null default -255", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `bt` UInt64 DEFAULT -255;") - check("alter table Test add column bT bit not null default '255'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `bt` UInt64 DEFAULT 3290421;") - check("alter table Test add column bT bit not null default 255.5", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `bt` UInt64 DEFAULT 256;") - // Decimal. - check("alter table Test add column d decimal default null", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `d` Nullable(Decimal(11, 0)) DEFAULT NULL;") - check("alter table Test add column d decimal not null default 255.5", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `d` Decimal(11, 0) DEFAULT 255.5;") - check("alter table Test add column d decimal not null default -255", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `d` Decimal(11, 0) DEFAULT -255;") - check("alter table Test add column d decimal not null default '255'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `d` Decimal(11, 0) DEFAULT 255;") - check("alter table Test add column d decimal not null default '-255'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `d` Decimal(11, 0) DEFAULT -255;") - check("alter table Test add column d decimal(10) default null", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `d` Nullable(Decimal(10, 0)) DEFAULT NULL;") - check("alter table Test add column d decimal(10, 0) not null default 255.5", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `d` Decimal(10, 0) DEFAULT 255.5;") - check("alter table Test add column d decimal(10, 5) not null default -255", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `d` Decimal(10, 5) DEFAULT -255;") - check("alter table Test add column d decimal(10, 5) not null default '255'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `d` Decimal(10, 5) DEFAULT 255;") - check("alter table Test add column d decimal(10, 5) not null default '-255'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `d` Decimal(10, 5) DEFAULT -255;") - // Numeric. - check("alter table Test add column i int default null", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `i` Nullable(Int32) DEFAULT NULL;") - check("alter table Test add column i int not null default 255.5", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `i` Int32 DEFAULT 255.5;") - check("alter table Test add column i int not null default -255", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `i` Int32 DEFAULT -255;") - check("alter table Test add column i int not null default '255'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `i` Int32 DEFAULT 255;") - check("alter table Test add column i int not null default '-255'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `i` Int32 DEFAULT -255;") - // String. - check("alter table Test add column vc varchar(10) default null", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `vc` Nullable(String) DEFAULT NULL;") - check("alter table Test add column vc varchar(10) not null default 255", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `vc` String DEFAULT '255';") - check("alter table Test add column vc varchar(10) not null default -255", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `vc` String DEFAULT '-255';") - check("alter table Test add column vc varchar(10) not null default '\\''", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `vc` String DEFAULT '\\'';") - // Date. - check("alter table Test add column dt date default NULL", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` Nullable(Date) DEFAULT NULL;") - check("alter table Test add column dt date not null default 19980808", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` Date DEFAULT '1998-08-08';") - check("alter table Test add column dt date not null default 19980808232323", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` Date DEFAULT '1998-08-08 23:23:23';") - check("alter table Test add column dt date not null default 19980808235959.99", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` Date DEFAULT '1998-08-09';") - check("alter table Test add column dt date not null default 0", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` Date DEFAULT '0000-00-00';") - check("alter table Test add column dt date not null default '1998-08-08'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` Date DEFAULT '1998-08-08';") - check("alter table Test add column dt date not null default '1998-08-08 13:13:13'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` Date DEFAULT '1998-08-08';") - check("alter table Test add column dt date not null default '1998-08-08 13:13'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` Date DEFAULT '1998-08-08';") - check("alter table Test add column dt date not null default '1998-08-08 155:13:13'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` Date DEFAULT '1998-08-08';") - check("alter table Test add column dt date not null default '19980808235959.99'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` Date DEFAULT '1998-08-09';") - // Datetime and timestamp. - check("alter table Test add column dt datetime not null default 19980101", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` DateTime DEFAULT '1998-01-01 00:00:00';") - check("alter table Test add column dt datetime not null default 19980101232323", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` DateTime DEFAULT '1998-01-01 23:23:23';") - check("alter table Test add column dt datetime not null default 19980101235959.999", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` DateTime DEFAULT '1998-01-02 00:00:00';") - check("alter table Test add column dt datetime not null default 0", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` DateTime DEFAULT '0000-00-00 00:00:00';") - check("alter table Test add column dt datetime not null default '0000-00-00'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` DateTime DEFAULT '0000-00-00 00:00:00';") - check("alter table Test add column dt datetime not null default '0000-00-00 00:00:00'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` DateTime DEFAULT '0000-00-00 00:00:00';") - check("alter table Test add column dt datetime not null default '1998-08-08'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` DateTime DEFAULT '1998-08-08 00:00:00';") - check("alter table Test add column dt datetime not null default '1998-08-08 13:13'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` DateTime DEFAULT '1998-08-08 13:13:00';") - check("alter table Test add column dt datetime not null default '1998-08-08 13:13:13'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `dt` DateTime DEFAULT '1998-08-08 13:13:13';") - // Current_timestamp for Timestamp. - check("alter table Test add column ts timestamp not null default current_timestamp", - Matches, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `ts` DateTime DEFAULT '"+dtRegex+"';") - check("alter table Test add column ts timestamp not null default current_timestamp()", - Matches, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `ts` DateTime DEFAULT '"+dtRegex+"';") - // Time(duration). - check("alter table Test add column t time not null default 0", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `t` Int64 DEFAULT 0;") - check("alter table Test add column t time not null default '0'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `t` Int64 DEFAULT 0;") - check("alter table Test add column t time not null default -0", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `t` Int64 DEFAULT -0;") - check("alter table Test add column t time not null default -10", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `t` Int64 DEFAULT -10;") - check("alter table Test add column t time not null default '-10'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `t` Int64 DEFAULT -10;") - check("alter table Test add column t time not null default 151515", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `t` Int64 DEFAULT 151515;") - check("alter table Test add column t time not null default -151515", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `t` Int64 DEFAULT -151515;") - check("alter table Test add column t time not null default 151515.99", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `t` Int64 DEFAULT 151516;") - check("alter table Test add column t time not null default -151515.99", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `t` Int64 DEFAULT -151516;") - check("alter table Test add column t time not null default '-151515.99'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `t` Int64 DEFAULT -151516;") - // Year. - check("alter table Test add column y year not null default 0", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 2000;") - check("alter table Test add column y year not null default '0'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 2000;") - check("alter table Test add column y year not null default -0", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 2000;") - check("alter table Test add column y year not null default '-0'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 2000;") - check("alter table Test add column y year not null default 10", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 2010;") - check("alter table Test add column y year not null default 1998", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 1998;") - check("alter table Test add column y year not null default 1998.0", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 1998;") - check("alter table Test add column y year not null default 1998.9", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 1998;") - check("alter table Test add column y year not null default '1998'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 1998;") - check("alter table Test add column y year not null default '1998.9'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 1998;") - check("alter table Test add column y year not null default '-1998.9'", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `y` Int16 DEFAULT 1998;") - // Enum. - check("alter table Test add column e enum('abc', 'def') not null default 'abc';", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `e` Enum16(''=0,'abc'=1,'def'=2) DEFAULT 'abc';") - check("alter table Test add column e enum('abc', 'def') not null default 1;", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `e` Enum16(''=0,'abc'=1,'def'=2) DEFAULT 1;") - // Set. - check("alter table Test add column s set('abc', 'def') not null default 'def,abc,abc';", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `s` String DEFAULT 'abc,def';") - check("alter table Test add column s set('abc', 'def') not null default 3;", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `s` String DEFAULT 'abc,def';") - // JSON. - check("alter table Test add column j json;", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `j` Nullable(String);") - - check("drop table Test", - Equals, - "DROP TABLE `test_schema`.`test`;") - - check("rename table Test to Test1", - Equals, - "RENAME TABLE `test_schema`.`test` TO `test_schema`.`test1`;") - check("rename table Test to test_Schema1.Test1", - Equals, - "RENAME TABLE `test_schema`.`test` TO `test_schema1`.`test1`;") - - check("alter table Test rename Test1", - Equals, - "RENAME TABLE `test_schema`.`test` TO `test_schema`.`test1`;") - check("alter table Test rename test_Schema1.Test1", - Equals, - "RENAME TABLE `test_schema`.`test` TO `test_schema1`.`test1`;") - check("alter table Test add column C Int default 100", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `c` Nullable(Int32) DEFAULT 100;") - check("alter table Test add column C varchar(255) default 100 after I", - Equals, - "ALTER TABLE `test_schema`.`test` ADD COLUMN `c` Nullable(String) DEFAULT '100' AFTER `i`;") - - check("truncate table Test", - Equals, - "TRUNCATE TABLE `test_schema`.`test`") - - check("create table T like T2", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`t` AS `test_schema`.`t2`") - - check("create table T like D.T2", - Equals, - "CREATE TABLE IF NOT EXISTS `test_schema`.`t` AS `d`.`t2`") -} - -func (t *testTranslatorSuite) TestFlashFormatData(c *C) { - checkWithFT := func(ft types.FieldType, data types.Datum, expected interface{}) { - value, err := formatFlashData(&data, &ft) - c.Assert(err, IsNil) - c.Assert(value, DeepEquals, expected) - } - check := func(tp byte, data types.Datum, expected interface{}) { - checkWithFT(types.FieldType{Tp: tp}, data, expected) - } - var datum types.Datum - // Int types. - check(mysql.TypeTiny, types.NewIntDatum(101), int8(101)) - check(mysql.TypeShort, types.NewIntDatum(101), int16(101)) - check(mysql.TypeInt24, types.NewIntDatum(101), int32(101)) - check(mysql.TypeLong, types.NewIntDatum(101), int32(101)) - check(mysql.TypeLonglong, types.NewIntDatum(101), int64(101)) - check(mysql.TypeFloat, types.NewFloat32Datum(101.101), float32(101.101)) - check(mysql.TypeDouble, types.NewFloat64Datum(101.101), float64(101.101)) - // Bit. - bl, err := types.ParseBitStr("b101") - c.Assert(err, IsNil) - check(mysql.TypeBit, types.NewMysqlBitDatum(bl), uint64(5)) - // Duration. - d, err := types.ParseDuration(new(stmtctx.StatementContext), "101:10:11", 1) - c.Assert(err, IsNil) - check(mysql.TypeDuration, types.NewDurationDatum(d), int64(1011011)) - // Time types. - sc := &stmtctx.StatementContext{TimeZone: time.Local} - dt, err := types.ParseDate(sc, "0000-00-00") - c.Assert(err, IsNil) - check(mysql.TypeDate, types.NewTimeDatum(dt), int64(0)) - check(mysql.TypeDatetime, types.NewTimeDatum(dt), int64(0)) - check(mysql.TypeNewDate, types.NewTimeDatum(dt), int64(0)) - check(mysql.TypeTimestamp, types.NewTimeDatum(dt), int64(0)) - now := time.Now() - utc := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), now.Nanosecond(), time.UTC) - check(mysql.TypeDate, types.NewTimeDatum(types.Time{Time: types.FromGoTime(now)}), utc.Unix()) - check(mysql.TypeDatetime, types.NewTimeDatum(types.Time{Time: types.FromGoTime(now)}), now.Unix()) - check(mysql.TypeNewDate, types.NewTimeDatum(types.Time{Time: types.FromGoTime(now)}), utc.Unix()) - check(mysql.TypeTimestamp, types.NewTimeDatum(types.Time{Time: types.FromGoTime(now)}), now.Unix()) - // Decimal. - checkWithFT(types.FieldType{Tp: mysql.TypeDecimal, Flen: 1, Decimal: 0}, types.NewDecimalDatum(types.NewDecFromFloatForTest(0)), - testPackCHDecimal(c, []byte{}, 0, 0, 1, 0)) - checkWithFT(types.FieldType{Tp: mysql.TypeDecimal, Flen: 1, Decimal: 1}, types.NewDecimalDatum(types.NewDecFromFloatForTest(0)), - testPackCHDecimal(c, []byte{}, 0, 0, 1, 1)) - checkWithFT(types.FieldType{Tp: mysql.TypeDecimal, Flen: 1, Decimal: 0}, types.NewDecimalDatum(types.NewDecFromFloatForTest(1)), - testPackCHDecimal(c, []byte{1}, 1, 0, 1, 0)) - checkWithFT(types.FieldType{Tp: mysql.TypeDecimal, Flen: 1, Decimal: 1}, types.NewDecimalDatum(types.NewDecFromFloatForTest(0.1)), - testPackCHDecimal(c, []byte{1}, 1, 0, 1, 1)) - checkWithFT(types.FieldType{Tp: mysql.TypeDecimal, Flen: 1, Decimal: 0}, types.NewDecimalDatum(types.NewDecFromFloatForTest(-1)), - testPackCHDecimal(c, []byte{1}, 1, 1, 1, 0)) - checkWithFT(types.FieldType{Tp: mysql.TypeDecimal, Flen: 1, Decimal: 1}, types.NewDecimalDatum(types.NewDecFromFloatForTest(-0.1)), - testPackCHDecimal(c, []byte{1}, 1, 1, 1, 1)) - checkWithFT(types.FieldType{Tp: mysql.TypeDecimal, Flen: 65, Decimal: 0}, types.NewDecimalDatum(types.NewDecFromFloatForTest(1000000001)), - testPackCHDecimal(c, []byte{byte(1000000001 & 0xFF), byte(1000000001 >> 8 & 0xFF), byte(1000000001 >> 16 & 0xFF), byte(1000000001 >> 24 & 0xFF)}, 1, 0, 65, 0)) - checkWithFT(types.FieldType{Tp: mysql.TypeDecimal, Flen: 65, Decimal: 10}, types.NewDecimalDatum(types.NewDecFromFloatForTest(0.1000000001)), - testPackCHDecimal(c, []byte{byte(1000000001 & 0xFF), byte(1000000001 >> 8 & 0xFF), byte(1000000001 >> 16 & 0xFF), byte(1000000001 >> 24 & 0xFF)}, 1, 0, 65, 10)) - checkWithFT(types.FieldType{Tp: mysql.TypeDecimal, Flen: 65, Decimal: 0}, types.NewDecimalDatum(types.NewDecFromFloatForTest(-1000000001)), - testPackCHDecimal(c, []byte{byte(1000000001 & 0xFF), byte(1000000001 >> 8 & 0xFF), byte(1000000001 >> 16 & 0xFF), byte(1000000001 >> 24 & 0xFF)}, 1, 1, 65, 0)) - checkWithFT(types.FieldType{Tp: mysql.TypeDecimal, Flen: 65, Decimal: 10}, types.NewDecimalDatum(types.NewDecFromFloatForTest(-0.1000000001)), - testPackCHDecimal(c, []byte{byte(1000000001 & 0xFF), byte(1000000001 >> 8 & 0xFF), byte(1000000001 >> 16 & 0xFF), byte(1000000001 >> 24 & 0xFF)}, 1, 1, 65, 10)) - // Enum. - en, err := types.ParseEnumValue([]string{"a", "b"}, 1) - c.Assert(err, IsNil) - datum.SetMysqlEnum(en) - check(mysql.TypeEnum, datum, int16(1)) - // Set. - s, err := types.ParseSetName([]string{"a", "b"}, "a") - c.Assert(err, IsNil) - datum.SetMysqlSet(s) - check(mysql.TypeSet, datum, "a") - // JSON. - datum.SetMysqlJSON(json.CreateBinary(uint64(101))) - check(mysql.TypeJSON, datum, "101") -} - -func testPackCHDecimal(c *C, value []byte, limb, sign, precision, scale int) []byte { - chBin := append(value, make([]byte, 32-len(value))...) - chBin = append(chBin, byte(limb), byte(limb>>8), byte(sign), byte(sign>>8)) - chBin = append(chBin, make([]byte, 12)...) - chBin = append(chBin, byte(precision), byte(precision>>8), byte(scale), byte(scale>>8)) - chBin = append(chBin, make([]byte, 12)...) - return chBin -} - -func testFlashGenRowData(c *C, table *model.TableInfo, base int, delFlag bool) ([]types.Datum, []interface{}) { - datas := make([]types.Datum, 3) - expected := make([]interface{}, 3) - var pk interface{} - for index, col := range table.Columns { - d, e := testFlashGenDatum(c, col, base%2+1) - datas[index] = d - expected[index] = e - // Only obtain the first PK column, to WAR the hasID table bug. - if pk == nil && testIsPKHandleColumn(table, col) { - pk = d.GetInt64() - } - } - if pk == nil { - pk = tidbRowID - expected = append(expected, tidbRowID) - } - var pks []interface{} - pks = append(pks, pk) - expected = append(append(pks, expected...), makeInternalVersionValue(uint64(genCommitTS(base))), makeInternalDelmarkValue(delFlag)) - return datas, expected -} - -func testFlashGenDatum(c *C, col *model.ColumnInfo, base int) (types.Datum, interface{}) { - d, _ := testGenDatum(c, col, base) - e, err := formatFlashData(&d, &col.FieldType) - c.Assert(err, IsNil) - return d, e -} - -func testFlashGenInsertBinlog(c *C, table *model.TableInfo, r []types.Datum) []byte { - colIDs := make([]int64, 0, len(r)) - row := make([]types.Datum, 0, len(r)) - var pk interface{} - for _, col := range table.Columns { - if pk == nil && testIsPKHandleColumn(table, col) { - pk = r[col.Offset] - continue - } - colIDs = append(colIDs, col.ID) - row = append(row, r[col.Offset]) - } - sc := &stmtctx.StatementContext{TimeZone: time.Local} - value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil) - c.Assert(err, IsNil) - - if pk == nil { - pk = types.NewIntDatum(tidbRowID) - } - handleVal, _ := codec.EncodeValue(sc, nil, pk.(types.Datum)) - bin := append(handleVal, value...) - return bin -} - -func testFlashGenUpdateBinlog(c *C, table *model.TableInfo, oldData []types.Datum, newData []types.Datum) []byte { - colIDs := make([]int64, 0, len(table.Columns)) - oldRow := make([]types.Datum, 0, len(oldData)) - newRow := make([]types.Datum, 0, len(newData)) - hasPK := false - for _, col := range table.Columns { - if testIsPKHandleColumn(table, col) { - hasPK = true - } - colIDs = append(colIDs, col.ID) - oldRow = append(oldRow, oldData[col.Offset]) - newRow = append(newRow, newData[col.Offset]) - } - - if !hasPK { - colIDs = append(colIDs, implicitColID) - oldRow = append(oldRow, types.NewIntDatum(tidbRowID)) - newRow = append(newRow, types.NewIntDatum(tidbRowID)) - } - - var bin []byte - sc := &stmtctx.StatementContext{TimeZone: time.Local} - newValue, err := tablecodec.EncodeRow(sc, newRow, colIDs, nil, nil) - c.Assert(err, IsNil) - oldValue, err := tablecodec.EncodeRow(sc, oldRow, colIDs, nil, nil) - c.Assert(err, IsNil) - bin = append(oldValue, newValue...) - return bin -} - -func testFlashGenDeleteBinlog(c *C, table *model.TableInfo, r []types.Datum) []byte { - colIDs := make([]int64, 0, len(r)) - row := make([]types.Datum, 0, len(r)) - hasPK := false - for _, col := range table.Columns { - if testIsPKHandleColumn(table, col) { - hasPK = true - } - colIDs = append(colIDs, col.ID) - row = append(row, r[col.Offset]) - } - - if !hasPK { - colIDs = append(colIDs, implicitColID) - row = append(row, types.NewIntDatum(tidbRowID)) - } - - sc := &stmtctx.StatementContext{TimeZone: time.Local} - bin, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil) - c.Assert(err, IsNil) - return bin -} diff --git a/drainer/translator/kafka.go b/drainer/translator/kafka.go index b8b5f3620..b14e429ff 100644 --- a/drainer/translator/kafka.go +++ b/drainer/translator/kafka.go @@ -63,6 +63,7 @@ func TiBinlogToSlaveBinlog( if !ok { return nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId()) } + isTblDroppingCol := infoGetter.IsDroppingColumn(mut.GetTableId()) schema, _, ok = infoGetter.SchemaAndTableName(mut.GetTableId()) if !ok { @@ -71,7 +72,7 @@ func TiBinlogToSlaveBinlog( iter := newSequenceIterator(&mut) for { - table, err := nextRow(schema, info, iter) + table, err := nextRow(schema, info, isTblDroppingCol, iter) if err != nil { if errors.Cause(err) == io.EOF { break @@ -147,8 +148,8 @@ func deleteRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e return } -func updateRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, changedRow *obinlog.Row, err error) { - updtDecoder := newUpdateDecoder(tableInfo) +func updateRowToRow(tableInfo *model.TableInfo, raw []byte, isTblDroppingCol bool) (row *obinlog.Row, changedRow *obinlog.Row, err error) { + updtDecoder := newUpdateDecoder(tableInfo, isTblDroppingCol) oldDatums, newDatums, err := updtDecoder.decode(raw, time.Local) if err != nil { return @@ -245,7 +246,7 @@ func DatumToColumn(colInfo *model.ColumnInfo, datum types.Datum) (col *obinlog.C return } -func createTableMutation(tp pb.MutationType, info *model.TableInfo, row []byte) (*obinlog.TableMutation, error) { +func createTableMutation(tp pb.MutationType, info *model.TableInfo, isTblDroppingCol bool, row []byte) (*obinlog.TableMutation, error) { var err error mut := new(obinlog.TableMutation) switch tp { @@ -257,7 +258,7 @@ func createTableMutation(tp pb.MutationType, info *model.TableInfo, row []byte) } case pb.MutationType_Update: mut.Type = obinlog.MutationType_Update.Enum() - mut.Row, mut.ChangeRow, err = updateRowToRow(info, row) + mut.Row, mut.ChangeRow, err = updateRowToRow(info, row, isTblDroppingCol) if err != nil { return nil, err } @@ -273,13 +274,13 @@ func createTableMutation(tp pb.MutationType, info *model.TableInfo, row []byte) return mut, nil } -func nextRow(schema string, info *model.TableInfo, iter *sequenceIterator) (*obinlog.Table, error) { +func nextRow(schema string, info *model.TableInfo, isTblDroppingCol bool, iter *sequenceIterator) (*obinlog.Table, error) { mutType, row, err := iter.next() if err != nil { return nil, errors.Trace(err) } - tableMutation, err := createTableMutation(mutType, info, row) + tableMutation, err := createTableMutation(mutType, info, isTblDroppingCol, row) if err != nil { return nil, errors.Trace(err) } diff --git a/drainer/translator/mysql.go b/drainer/translator/mysql.go index 9ce79890d..18bcef025 100644 --- a/drainer/translator/mysql.go +++ b/drainer/translator/mysql.go @@ -56,9 +56,9 @@ func genMysqlInsert(schema string, table *model.TableInfo, row []byte) (names [] return names, args, nil } -func genMysqlUpdate(schema string, table *model.TableInfo, row []byte) (names []string, values []interface{}, oldValues []interface{}, err error) { +func genMysqlUpdate(schema string, table *model.TableInfo, row []byte, isTblDroppingCol bool) (names []string, values []interface{}, oldValues []interface{}, err error) { columns := writableColumns(table) - updtDecoder := newUpdateDecoder(table) + updtDecoder := newUpdateDecoder(table, isTblDroppingCol) var updateColumns []*model.ColumnInfo @@ -120,6 +120,8 @@ func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBi return nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId()) } + isTblDroppingCol := infoGetter.IsDroppingColumn(mut.GetTableId()) + schema, table, ok = infoGetter.SchemaAndTableName(mut.GetTableId()) if !ok { return nil, errors.Errorf("SchemaAndTableName empty table id: %d", mut.GetTableId()) @@ -153,9 +155,9 @@ func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBi dml.Values[name] = args[i] } case tipb.MutationType_Update: - names, args, oldArgs, err := genMysqlUpdate(schema, info, row) + names, args, oldArgs, err := genMysqlUpdate(schema, info, row, isTblDroppingCol) if err != nil { - return nil, errors.Annotate(err, "gen delete fail") + return nil, errors.Annotate(err, "gen update fail") } dml := &loader.DML{ diff --git a/drainer/translator/pb.go b/drainer/translator/pb.go index 7ab9b8e72..83921a38f 100644 --- a/drainer/translator/pb.go +++ b/drainer/translator/pb.go @@ -64,6 +64,8 @@ func TiBinlogToPbBinlog(infoGetter TableInfoGetter, schema string, table string, return nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId()) } + isTblDroppingCol := infoGetter.IsDroppingColumn(mut.GetTableId()) + schema, _, ok = infoGetter.SchemaAndTableName(mut.GetTableId()) if !ok { return nil, errors.Errorf("SchemaAndTableName empty table id: %d", mut.GetTableId()) @@ -87,7 +89,7 @@ func TiBinlogToPbBinlog(infoGetter TableInfoGetter, schema string, table string, } pbBinlog.DmlData.Events = append(pbBinlog.DmlData.Events, *event) case tipb.MutationType_Update: - event, err := genUpdate(schema, info, row) + event, err := genUpdate(schema, info, row, isTblDroppingCol) if err != nil { return nil, errors.Annotatef(err, "genUpdate failed") } @@ -151,11 +153,11 @@ func genInsert(schema string, table *model.TableInfo, row []byte) (event *pb.Eve return } -func genUpdate(schema string, table *model.TableInfo, row []byte) (event *pb.Event, err error) { +func genUpdate(schema string, table *model.TableInfo, row []byte, isTblDroppingCol bool) (event *pb.Event, err error) { columns := writableColumns(table) - colsTypeMap := util.ToColumnTypeMap(columns) + colsMap := util.ToColumnMap(columns) - oldColumnValues, newColumnValues, err := DecodeOldAndNewRow(row, colsTypeMap, time.Local) + oldColumnValues, newColumnValues, err := DecodeOldAndNewRow(row, colsMap, time.Local, isTblDroppingCol) if err != nil { return nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name) } diff --git a/drainer/translator/table_info.go b/drainer/translator/table_info.go index 130d22b9b..c53465c4e 100644 --- a/drainer/translator/table_info.go +++ b/drainer/translator/table_info.go @@ -19,4 +19,5 @@ import "github.com/pingcap/parser/model" type TableInfoGetter interface { TableByID(id int64) (info *model.TableInfo, ok bool) SchemaAndTableName(id int64) (string, string, bool) + IsDroppingColumn(id int64) bool } diff --git a/drainer/translator/testing.go b/drainer/translator/testing.go index 481e3f1ce..a88061129 100644 --- a/drainer/translator/testing.go +++ b/drainer/translator/testing.go @@ -95,6 +95,11 @@ func (g *BinlogGenrator) SchemaAndTableName(id int64) (schema string, table stri return } +// IsDroppingColumn implements TableInfoGetter interface +func (g *BinlogGenrator) IsDroppingColumn(id int64) bool { + return false +} + // SetDDL set up a ddl binlog. func (g *BinlogGenrator) SetDDL() { g.reset() diff --git a/drainer/translator/translator.go b/drainer/translator/translator.go index 9d9659462..6d35746bc 100644 --- a/drainer/translator/translator.go +++ b/drainer/translator/translator.go @@ -103,7 +103,7 @@ func getDefaultOrZeroValue(col *model.ColumnInfo) types.Datum { // DecodeOldAndNewRow decodes a byte slice into datums with a existing row map. // Row layout: colID1, value1, colID2, value2, ..... -func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) { +func DecodeOldAndNewRow(b []byte, cols map[int64]*model.ColumnInfo, loc *time.Location, isTblDroppingCol bool) (map[int64]types.Datum, map[int64]types.Datum, error) { if b == nil { return nil, nil, nil } @@ -111,8 +111,8 @@ func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Loc return nil, nil, nil } - cnt := 0 var ( + cnt int data []byte err error oldRow = make(map[int64]types.Datum, len(cols)) @@ -134,9 +134,9 @@ func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Loc return nil, nil, errors.Trace(err) } id := cid.GetInt64() - ft, ok := cols[id] + col, ok := cols[id] if ok { - v, err := tablecodec.DecodeColumnValue(data, ft, loc) + v, err := tablecodec.DecodeColumnValue(data, &col.FieldType, loc) if err != nil { return nil, nil, errors.Trace(err) } @@ -155,28 +155,58 @@ func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Loc } } - if cnt != len(cols)*2 || len(newRow) != len(oldRow) { + parsedCols := cnt / 2 + isInvalid := len(newRow) != len(oldRow) || (len(cols) != parsedCols && len(cols)-1 != parsedCols) + if isInvalid { return nil, nil, errors.Errorf("row data is corrupted %v", b) } + if parsedCols == len(cols)-1 { + if !isTblDroppingCol { + return nil, nil, errors.Errorf("row data is corrupted %v", b) + } + var missingCol *model.ColumnInfo + for colID, col := range cols { + _, inOld := oldRow[colID] + _, inNew := newRow[colID] + if !inOld && !inNew { + missingCol = col + break + } + } + // We can't find a column that's missing in both old and new + if missingCol == nil { + return nil, nil, errors.Errorf("row data is corrupted %v", b) + } + log.Info( + "Fill missing col with default val", + zap.String("name", missingCol.Name.O), + zap.Int64("id", missingCol.ID), + zap.Int("Tp", int(missingCol.FieldType.Tp)), + ) + oldRow[missingCol.ID] = getDefaultOrZeroValue(missingCol) + newRow[missingCol.ID] = getDefaultOrZeroValue(missingCol) + } return oldRow, newRow, nil } type updateDecoder struct { - colsTypes map[int64]*types.FieldType + columns map[int64]*model.ColumnInfo + isTblDroppingCol bool } -func newUpdateDecoder(table *model.TableInfo) updateDecoder { +func newUpdateDecoder(table *model.TableInfo, isTblDroppingCol bool) updateDecoder { columns := writableColumns(table) return updateDecoder{ - colsTypes: util.ToColumnTypeMap(columns), + columns: util.ToColumnMap(columns), + isTblDroppingCol: isTblDroppingCol, } } // decode decodes a byte slice into datums with a existing row map. // Row layout: colID1, value1, colID2, value2, ..... func (ud updateDecoder) decode(b []byte, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) { - return DecodeOldAndNewRow(b, ud.colsTypes, loc) + return DecodeOldAndNewRow(b, ud.columns, loc, ud.isTblDroppingCol) } func fixType(data types.Datum, col *model.ColumnInfo) types.Datum { diff --git a/go.mod b/go.mod index 6fc8e277f..ccc860914 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/pingcap/pd v1.1.0-beta.0.20191119124645-4c0c1027f412 github.com/pingcap/tidb v1.1.0-beta.0.20191119111543-77faf6466821 github.com/pingcap/tidb-tools v3.0.6-0.20191120102444-63de173cc344+incompatible - github.com/pingcap/tipb v0.0.0-20191112054303-0b0ad0d4a92e + github.com/pingcap/tipb v0.0.0-20191120020146-6161b015e21e github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a diff --git a/go.sum b/go.sum index ee9fede43..bb8ff05f6 100644 --- a/go.sum +++ b/go.sum @@ -241,8 +241,11 @@ github.com/pingcap/tidb-tools v3.0.6-0.20191120102444-63de173cc344+incompatible github.com/pingcap/tidb-tools v3.0.6-0.20191120102444-63de173cc344+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 h1:rRMLMjIMFulCX9sGKZ1hoov/iROMsKyC8Snc02nSukw= github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20191112054303-0b0ad0d4a92e h1:TWwzCfLrj9GH5uaT0VcvdSnrHuwEntUfoHDTYdOzNNI= github.com/pingcap/tipb v0.0.0-20191112054303-0b0ad0d4a92e/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pingcap/tipb v0.0.0-20191120020146-6161b015e21e h1:OWgXsJ2Zwa+q+sqi87fGuNda+ChJrclVd7wiGP5Epps= +github.com/pingcap/tipb v0.0.0-20191120020146-6161b015e21e/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/pkg/util/util.go b/pkg/util/util.go index 377ec3b5d..1a1603e3c 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -143,6 +143,15 @@ func ToColumnTypeMap(columns []*model.ColumnInfo) map[int64]*types.FieldType { return colTypeMap } +// ToColumnMap return a map index by column id +func ToColumnMap(columns []*model.ColumnInfo) map[int64]*model.ColumnInfo { + colMap := make(map[int64]*model.ColumnInfo, len(columns)) + for _, col := range columns { + colMap[col.ID] = col + } + return colMap +} + // RetryOnError defines a action with retry when fn returns error func RetryOnError(retryCount int, sleepTime time.Duration, errStr string, fn func() error) error { var err error diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index 9d961172f..b77005f62 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -19,6 +19,7 @@ import ( "math" "math/rand" "strings" + "sync" "time" "github.com/pingcap/errors" @@ -199,6 +200,9 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) { runPKcases(tr) + tr.run(caseUpdateWhileAddingCol) + tr.execSQLs([]string{"DROP TABLE growing_cols;"}) + tr.execSQLs(caseMultiDataType) tr.execSQLs(caseMultiDataTypeClean) @@ -218,11 +222,12 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) { }) tr.execSQLs(casePKAddDuplicateUKClean) - // run caseInsertBit + tr.run(caseUpdateWhileDroppingCol) + tr.execSQLs([]string{"DROP TABLE many_cols;"}) + tr.execSQLs(caseInsertBit) tr.execSQLs(caseInsertBitClean) - // run caseRecoverAndInsert tr.execSQLs(caseRecoverAndInsert) tr.execSQLs(caseRecoverAndInsertClean) @@ -304,6 +309,103 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) { tr.execSQLs([]string{"DROP TABLE binlog_big;"}) } +func caseUpdateWhileAddingCol(db *sql.DB) { + mustExec(db, ` +CREATE TABLE growing_cols ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0 +);`) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + insertSQL := `INSERT INTO growing_cols(id, val) VALUES (?, ?);` + mustExec(db, insertSQL, 1, 0) + + // Keep updating to generate DMLs while the other goroutine's adding columns + updateSQL := `UPDATE growing_cols SET val = ? WHERE id = ?;` + for i := 0; i < 300; i++ { + mustExec(db, updateSQL, i, 1) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 300; i++ { + updateSQL := fmt.Sprintf(`ALTER TABLE growing_cols ADD COLUMN col%d VARCHAR(50);`, i) + mustExec(db, updateSQL) + } + }() + + wg.Wait() +} + +func caseUpdateWhileDroppingCol(db *sql.DB) { + const nCols = 50 + var builder strings.Builder + for i := 0; i < nCols; i++ { + if i != 0 { + builder.WriteRune(',') + } + builder.WriteString(fmt.Sprintf("col%d VARCHAR(50) NOT NULL", i)) + } + createSQL := fmt.Sprintf(` +CREATE TABLE many_cols ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + %s +);`, builder.String()) + mustExec(db, createSQL) + + builder.Reset() + for i := 0; i < nCols; i++ { + if i != 0 { + builder.WriteRune(',') + } + builder.WriteString(fmt.Sprintf("col%d", i)) + } + cols := builder.String() + + builder.Reset() + for i := 0; i < nCols; i++ { + if i != 0 { + builder.WriteRune(',') + } + builder.WriteString(`""`) + } + placeholders := builder.String() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + // Insert a row with all columns set to empty string + insertSQL := fmt.Sprintf(`INSERT INTO many_cols(id, %s) VALUES (?, %s);`, cols, placeholders) + mustExec(db, insertSQL, 1) + + // Keep updating to generate DMLs while the other goroutine's dropping columns + updateSQL := `UPDATE many_cols SET val = ? WHERE id = ?;` + for i := 0; i < 100; i++ { + mustExec(db, updateSQL, i, 1) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < nCols; i++ { + mustExec(db, fmt.Sprintf("ALTER TABLE many_cols DROP COLUMN col%d;", i)) + } + }() + + wg.Wait() +} + // caseTblWithGeneratedCol creates a table with generated column, // and insert values into the table func caseTblWithGeneratedCol(db *sql.DB) {