Skip to content

Commit

Permalink
schemastore: fix panic after recover table (#997)
Browse files Browse the repository at this point in the history
close #985
  • Loading branch information
lidezhu authored Feb 17, 2025
1 parent 7af8164 commit 796d1bd
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 13 deletions.
8 changes: 3 additions & 5 deletions logservice/schemastore/multi_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,6 @@ func (v *versionedTableInfoStore) applyDDLFromPersistStorage(event *PersistedDDL
func (v *versionedTableInfoStore) applyDDL(event *PersistedDDLEvent) {
v.mu.Lock()
defer v.mu.Unlock()
// delete table should not receive more ddl except recover table
if model.ActionType(event.Type) != model.ActionRecoverTable {
assertNonDeleted(v)
}

if !v.initialized {
// The usage of the parameter `event` may outlive the function call, so we copy it.
Expand All @@ -208,10 +204,12 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
}
tableInfo, deleted := handler.extractTableInfoFunc(event, v.tableID)
if tableInfo != nil {
v.infos = append(v.infos, &tableInfoItem{Version: event.FinishedTs, Info: tableInfo})
if ddlType == model.ActionRecoverTable {
v.deleteVersion = math.MaxUint64
} else {
assertNonDeleted(v)
}
v.infos = append(v.infos, &tableInfoItem{Version: event.FinishedTs, Info: tableInfo})
} else if deleted {
v.deleteVersion = event.FinishedTs
}
Expand Down
48 changes: 43 additions & 5 deletions logservice/schemastore/multi_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
func TestBuildVersionedTableInfoStore(t *testing.T) {
type QueryTableInfoTestCase struct {
snapTs uint64
deleted bool
schemaName string
tableName string
}
Expand Down Expand Up @@ -114,6 +115,36 @@ func TestBuildVersionedTableInfoStore(t *testing.T) {
},
},
},
// test recover table
{
tableID: 200,
ddlEvents: func() []*PersistedDDLEvent {
return []*PersistedDDLEvent{
buildCreateTableEventForTest(10, 200, "test", "normal_table", 1010), // create table 200
buildDropTableEventForTest(10, 200, "test", "normal_table", 1020), // drop table 200
buildRecoverTableEventForTest(10, 200, "test", "normal_table", 1030), // recover table 200
buildDropTableEventForTest(10, 200, "test", "normal_table", 1040), // drop table 200
}
}(),
queryCases: []QueryTableInfoTestCase{
{
snapTs: 1010,
schemaName: "test",
tableName: "normal_table",
},
// Note: In 1020, the table is dropped, but this information is overridden by a subsequent table recovery.
// Since storing this information is meaningless, we retain the current behavior.
{
snapTs: 1030,
schemaName: "test",
tableName: "normal_table",
},
{
snapTs: 1040,
deleted: true,
},
},
},
}
for _, tt := range testCases {
store := newEmptyVersionedTableInfoStore(tt.tableID)
Expand All @@ -123,11 +154,18 @@ func TestBuildVersionedTableInfoStore(t *testing.T) {
}
for _, c := range tt.queryCases {
tableInfo, err := store.getTableInfo(c.snapTs)
require.Nil(t, err)
require.Equal(t, c.schemaName, tableInfo.TableName.Schema)
require.Equal(t, c.tableName, tableInfo.TableName.Table)
if !tableInfo.TableName.IsPartition {
require.Equal(t, tt.tableID, tableInfo.TableName.TableID)
if !c.deleted {
require.Nil(t, err)
require.Equal(t, c.schemaName, tableInfo.TableName.Schema)
require.Equal(t, c.tableName, tableInfo.TableName.Table)
if !tableInfo.TableName.IsPartition {
require.Equal(t, tt.tableID, tableInfo.TableName.TableID)
}
} else {
require.Nil(t, tableInfo)
if _, ok := err.(*TableDeletedError); !ok {
t.Error("expect TableDeletedError, but got", err)
}
}
}
if tt.deleteVersion != 0 {
Expand Down
26 changes: 26 additions & 0 deletions logservice/schemastore/multi_version_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,32 @@ func buildCreateTableEventForTest(schemaID, tableID int64, schemaName, tableName
}
}

func buildDropTableEventForTest(schemaID, tableID int64, schemaName, tableName string, finishedTs uint64) *PersistedDDLEvent {
return &PersistedDDLEvent{
Type: byte(model.ActionDropTable),
SchemaID: schemaID,
TableID: tableID,
SchemaName: schemaName,
TableName: tableName,
FinishedTs: finishedTs,
}
}

func buildRecoverTableEventForTest(schemaID, tableID int64, schemaName, tableName string, finishedTs uint64) *PersistedDDLEvent {
return &PersistedDDLEvent{
Type: byte(model.ActionRecoverTable),
SchemaID: schemaID,
TableID: tableID,
SchemaName: schemaName,
TableName: tableName,
TableInfo: &model.TableInfo{
ID: tableID,
Name: pmodel.NewCIStr(tableName),
},
FinishedTs: finishedTs,
}
}

func buildCreatePartitionTableEventForTest(schemaID, tableID int64, schemaName, tableName string, partitionIDs []int64, finishedTs uint64) *PersistedDDLEvent {
partitionDefinitions := make([]model.PartitionDefinition, 0, len(partitionIDs))
for _, partitionID := range partitionIDs {
Expand Down
7 changes: 4 additions & 3 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,8 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error {

handler, ok := allDDLHandlers[job.Type]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", job.Type), zap.String("query", job.Query))
log.Error("unknown ddl type, ignore it", zap.Any("ddlType", job.Type), zap.String("query", job.Query))
return nil
}
ddlEvent := handler.buildPersistedDDLEventFunc(buildPersistedDDLEventFuncArgs{
job: job,
Expand Down Expand Up @@ -721,7 +722,7 @@ func shouldSkipDDL(job *model.Job, tableMap map[int64]*BasicTableInfo) bool {
case model.ActionCreateTable:
// Note: partition table's logical table id is also in tableMap
if _, ok := tableMap[job.BinlogInfo.TableInfo.ID]; ok {
log.Warn("table already exists. ignore DDL",
log.Info("table already exists. ignore DDL",
zap.String("DDL", job.Query),
zap.Int64("jobID", job.ID),
zap.Int64("schemaID", job.SchemaID),
Expand All @@ -733,7 +734,7 @@ func shouldSkipDDL(job *model.Job, tableMap map[int64]*BasicTableInfo) bool {
case model.ActionCreateTables:
// For duplicate create tables ddl job, the tables in the job should be same, check the first table is enough
if _, ok := tableMap[job.BinlogInfo.MultipleTableInfos[0].ID]; ok {
log.Warn("table already exists. ignore DDL",
log.Info("table already exists. ignore DDL",
zap.String("DDL", job.Query),
zap.Int64("jobID", job.ID),
zap.Int64("schemaID", job.SchemaID),
Expand Down

0 comments on commit 796d1bd

Please sign in to comment.