Skip to content

Commit

Permalink
drainer: bugfix, handle "missing column" when a column is getting dro…
Browse files Browse the repository at this point in the history
…pped (pingcap#803)
  • Loading branch information
suzaku authored Nov 21, 2019
1 parent 5aa1669 commit 72a15d4
Show file tree
Hide file tree
Showing 16 changed files with 232 additions and 695 deletions.
6 changes: 5 additions & 1 deletion drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,13 @@ func (c *Collector) syncBinlog(item *binlogItem) error {

log.Info("get ddl job", zap.Stringer("job", job))

if skipJob(job) {
isDelOnlyEvent := model.SchemaState(binlog.DdlSchemaState) == model.StateDeleteOnly
if skipJob(job) && !isDelOnlyEvent {
return nil
}
if isDelOnlyEvent {
job.SchemaState = model.StateDeleteOnly
}
item.SetJob(job)
ddlJobsCounter.Add(float64(1))
}
Expand Down
51 changes: 33 additions & 18 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Schema struct {
tables map[int64]*model.TableInfo

truncateTableID map[int64]struct{}
tblsDroppingCol map[int64]bool

schemaMetaVersion int64

Expand All @@ -56,6 +57,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) {
hasImplicitCol: hasImplicitCol,
version2SchemaTable: make(map[int64]TableName),
truncateTableID: make(map[int64]struct{}),
tblsDroppingCol: make(map[int64]bool),
jobs: jobs,
}

Expand Down Expand Up @@ -231,25 +233,27 @@ func (s *Schema) addJob(job *model.Job) {
func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
var i int
for i = 0; i < len(s.jobs); i++ {
if skipJob(s.jobs[i]) {
log.Debug("skip ddl job", zap.Stringer("job", s.jobs[i]))
job := s.jobs[i]

if job.BinlogInfo.SchemaVersion > version {
break
}

if job.BinlogInfo.SchemaVersion <= s.currentVersion {
log.Warn("ddl job schema version is less than current version, skip this ddl job",
zap.Stringer("job", job),
zap.Int64("currentVersion", s.currentVersion))
continue
}

if s.jobs[i].BinlogInfo.SchemaVersion <= version {
if s.jobs[i].BinlogInfo.SchemaVersion <= s.currentVersion {
log.Warn("ddl job schema version is less than current version, skip this ddl job",
zap.Stringer("job", s.jobs[i]),
zap.Int64("currentVersion", s.currentVersion))
continue
}

_, _, _, err := s.handleDDL(s.jobs[i])
if err != nil {
return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
}
} else {
break
if job.SchemaState == model.StateDeleteOnly && job.Type == model.ActionDropColumn {
s.tblsDroppingCol[job.TableID] = true
log.Info("Got DeleteOnly Job", zap.Stringer("job", job))
continue
}
_, _, _, err := s.handleDDL(job)
if err != nil {
return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
}
}

Expand All @@ -264,12 +268,13 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
// the third value[string]: the sql that is corresponding to the job
// the fourth value[error]: the handleDDL execution's err
func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) {
log.Debug("handle job: ", zap.Stringer("job", job))

if skipJob(job) {
log.Debug("Skip job", zap.Stringer("job", job))
return "", "", "", nil
}

log.Debug("Handle job", zap.Stringer("job", job))

sql = job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
Expand Down Expand Up @@ -426,11 +431,21 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
s.currentVersion = job.BinlogInfo.SchemaVersion
schemaName = schema.Name.O
tableName = tbInfo.Name.O

if job.Type == model.ActionDropColumn {
log.Info("Finished dropping column", zap.Stringer("job", job))
delete(s.tblsDroppingCol, job.TableID)
}
}

return
}

// IsDroppingColumn returns true if the table is in the middle of dropping a column
func (s *Schema) IsDroppingColumn(id int64) bool {
return s.tblsDroppingCol[id]
}

// IsTruncateTableID returns true if the table id have been truncated by truncate table DDL
func (s *Schema) IsTruncateTableID(id int64) bool {
_, ok := s.truncateTableID[id]
Expand Down
2 changes: 1 addition & 1 deletion drainer/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (t *schemaSuite) TestSchema(c *C) {
jobs = append(jobs, job)

// construct a rollbackdone job
jobs = append(jobs, &model.Job{ID: 5, State: model.JobStateRollbackDone})
jobs = append(jobs, &model.Job{ID: 5, State: model.JobStateRollbackDone, BinlogInfo: &model.HistoryInfo{}})

// reconstruct the local schema
schema, err := NewSchema(jobs, false)
Expand Down
9 changes: 7 additions & 2 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ ForLoop:
lastAddComitTS = binlog.GetCommitTs()
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite})
if err != nil {
err = errors.Annotate(err, "add to dsyncer failed")
err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs)
break ForLoop
}
executeHistogram.Observe(time.Since(beginTime).Seconds())
Expand All @@ -383,6 +383,11 @@ ForLoop:
break ForLoop
}

if b.job.SchemaState == model.StateDeleteOnly && b.job.Type == model.ActionDropColumn {
log.Info("Syncer skips DeleteOnly DDL", zap.Stringer("job", b.job), zap.Int64("ts", b.GetCommitTs()))
continue
}

sql := b.job.Query
var schema, table string
schema, table, err = s.schema.getSchemaTableAndDelete(b.job.BinlogInfo.SchemaVersion)
Expand All @@ -404,7 +409,7 @@ ForLoop:

err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table})
if err != nil {
err = errors.Annotate(err, "add to dsyncer failed")
err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs)
break ForLoop
}
executeHistogram.Observe(time.Since(beginTime).Seconds())
Expand Down
7 changes: 4 additions & 3 deletions drainer/translator/flash.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func GenFlashSQLs(infoGetter TableInfoGetter, pv *tipb.PrewriteValue, commitTS i
if !ok {
return nil, nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId())
}
isTblDroppingCol := infoGetter.IsDroppingColumn(mut.GetTableId())

var schema string
schema, _, ok = infoGetter.SchemaAndTableName(mut.GetTableId())
Expand All @@ -68,7 +69,7 @@ func GenFlashSQLs(infoGetter TableInfoGetter, pv *tipb.PrewriteValue, commitTS i
return nil, nil, errors.Annotate(err, "gen insert sql fail")
}
case tipb.MutationType_Update:
sql, args, err = GenFlashUpdateSQL(schema, info, row, commitTS)
sql, args, err = GenFlashUpdateSQL(schema, info, row, commitTS, isTblDroppingCol)
if err != nil {
return nil, nil, errors.Annotate(err, "gen update sql fail")
}
Expand Down Expand Up @@ -140,15 +141,15 @@ func GenFlashInsertSQL(schema string, table *model.TableInfo, row []byte, commit
}

// GenFlashUpdateSQL generate the SQL need to execute syncing this update row to Flash
func GenFlashUpdateSQL(schema string, table *model.TableInfo, row []byte, commitTS int64) (sql string, args []interface{}, err error) {
func GenFlashUpdateSQL(schema string, table *model.TableInfo, row []byte, commitTS int64, isTblDroppingCol bool) (sql string, args []interface{}, err error) {
schema = strings.ToLower(schema)
pkColumn := pkHandleColumn(table)
if pkColumn == nil {
pkColumn = fakeImplicitColumn(table)
}
pkID := pkColumn.ID

updtDecoder := newUpdateDecoder(table)
updtDecoder := newUpdateDecoder(table, isTblDroppingCol)
version := makeInternalVersionValue(uint64(commitTS))
delFlag := makeInternalDelmarkValue(false)

Expand Down
Loading

0 comments on commit 72a15d4

Please sign in to comment.