Skip to content

Commit

Permalink
schemastore: refactor and add more unit tests (#749)
Browse files Browse the repository at this point in the history
* add more tests

* refactor 1

* refactor 2

* refactor 3

* refactor

* wip

* refactor

* refactor

* refactor
  • Loading branch information
lidezhu authored Jan 1, 2025
1 parent 93c6e9f commit 05c03b4
Show file tree
Hide file tree
Showing 8 changed files with 1,765 additions and 1,592 deletions.
39 changes: 26 additions & 13 deletions logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,24 @@ func loadAndApplyDDLHistory(
if shouldSkipDDL(&ddlEvent, databaseMap, tableMap) {
continue
}
if tableTriggerDDLHistory, err = updateDDLHistory(
&ddlEvent,
databaseMap,
tableMap,
partitionMap,
tablesDDLHistory,
tableTriggerDDLHistory); err != nil {
log.Panic("updateDDLHistory error", zap.Error(err))
}
if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap, partitionMap); err != nil {
log.Panic("updateDatabaseInfo error", zap.Error(err))
handler, ok := allDDLHandlers[model.ActionType(ddlEvent.Type)]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", ddlEvent.Type), zap.String("query", ddlEvent.Query))
}
tableTriggerDDLHistory = handler.updateDDLHistoryFunc(updateDDLHistoryFuncArgs{
ddlEvent: &ddlEvent,
databaseMap: databaseMap,
tableMap: tableMap,
partitionMap: partitionMap,
tablesDDLHistory: tablesDDLHistory,
tableTriggerDDLHistory: tableTriggerDDLHistory,
})
handler.updateSchemaMetadataFunc(updateSchemaMetadataFuncArgs{
event: &ddlEvent,
databaseMap: databaseMap,
tableMap: tableMap,
partitionMap: partitionMap,
})
}

return tablesDDLHistory, tableTriggerDDLHistory, nil
Expand Down Expand Up @@ -585,9 +591,16 @@ func loadAllPhysicalTablesAtTs(
defer snapIter.Close()
for snapIter.First(); snapIter.Valid(); snapIter.Next() {
ddlEvent := unmarshalPersistedDDLEvent(snapIter.Value())
if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap, partitionMap); err != nil {
log.Panic("updateDatabaseInfo error", zap.Error(err))
handler, ok := allDDLHandlers[model.ActionType(ddlEvent.Type)]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", ddlEvent.Type), zap.String("query", ddlEvent.Query))
}
handler.updateSchemaMetadataFunc(updateSchemaMetadataFuncArgs{
event: &ddlEvent,
databaseMap: databaseMap,
tableMap: tableMap,
partitionMap: partitionMap,
})
}
log.Info("after load tables from ddl",
zap.Int("tableMapLen", len(tableMap)),
Expand Down
146 changes: 11 additions & 135 deletions logservice/schemastore/multi_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ func (v *versionedTableInfoStore) applyDDL(event *PersistedDDLEvent) {

// lock must be hold by the caller
func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
// TODO: add a unit test
// TODO: whether need add schema version check
if len(v.infos) != 0 && event.FinishedTs <= v.infos[len(v.infos)-1].version {
log.Warn("already applied ddl, ignore it.",
zap.Int64("tableID", v.tableID),
Expand All @@ -219,98 +217,7 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
zap.Int("infosLen", len(v.infos)))
return
}
appendTableInfo := func() {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo)
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
}

switch model.ActionType(event.Type) {
case model.ActionCreateTable:
if len(v.infos) == 1 {
// table info may be in snapshot, can not filter redudant job in this case
log.Warn("ignore create table job",
zap.Int64("tableID", int64(v.tableID)),
zap.String("query", event.Query),
zap.Uint64("finishedTS", event.FinishedTs))
break
}
assertEmpty(v.infos, event)
appendTableInfo()
case model.ActionDropSchema:
// ignore
case model.ActionDropTable:
v.deleteVersion = uint64(event.FinishedTs)
case model.ActionAddColumn,
model.ActionDropColumn:
assertNonEmpty(v.infos, event)
appendTableInfo()
case model.ActionTruncateTable:
if isPartitionTable(event.TableInfo) {
createTable := false
for _, partition := range getAllPartitionIDs(event.TableInfo) {
if v.tableID == partition {
createTable = true
break
}
}
if createTable {
log.Info("create table for truncate table")
appendTableInfo()
} else {
v.deleteVersion = uint64(event.FinishedTs)
}
} else {
if v.tableID == event.CurrentTableID {
appendTableInfo()
} else {
if v.tableID != event.PrevTableID {
log.Panic("should not happen")
}
v.deleteVersion = uint64(event.FinishedTs)
}
}
case model.ActionRenameTable:
assertNonEmpty(v.infos, event)
appendTableInfo()
case model.ActionAddTablePartition:
newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo))
for _, partition := range newCreatedIDs {
if v.tableID == partition {
appendTableInfo()
break
}
}
case model.ActionDropTablePartition:
droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo))
for _, partition := range droppedIDs {
if v.tableID == partition {
v.deleteVersion = uint64(event.FinishedTs)
break
}
}
case model.ActionCreateView:
// create view is add to all table's ddl history, so it will be read when build store, just ignore it
case model.ActionTruncateTablePartition:
physicalIDs := getAllPartitionIDs(event.TableInfo)
droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs)
dropped := false
for _, partition := range droppedIDs {
if v.tableID == partition {
v.deleteVersion = uint64(event.FinishedTs)
dropped = true
break
}
}
if !dropped {
newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs)
for _, partition := range newCreatedIDs {
if v.tableID == partition {
appendTableInfo()
break
}
}
}
case model.ActionExchangeTablePartition:
if model.ActionType(event.Type) == model.ActionExchangeTablePartition {
assertNonEmpty(v.infos, event)
columnSchema := v.infos[len(v.infos)-1].info.ShadowCopyColumnSchema()
// the previous normal table
Expand All @@ -337,48 +244,17 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
)
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: tableInfo})
}
case model.ActionCreateTables:
assertEmpty(v.infos, event)
for _, tableInfo := range event.MultipleTableInfos {
if isPartitionTable(tableInfo) {
for _, partitionID := range getAllPartitionIDs(tableInfo) {
if v.tableID == partitionID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, tableInfo)
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
break
}
}
} else {
if v.tableID == tableInfo.ID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, tableInfo)
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
break
}
}
}
case model.ActionReorganizePartition:
physicalIDs := getAllPartitionIDs(event.TableInfo)
droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs)
dropped := false
for _, partition := range droppedIDs {
if v.tableID == partition {
v.deleteVersion = uint64(event.FinishedTs)
dropped = true
break
}
} else {
// TODO: add func to check invariant for every ddl type
handler, ok := allDDLHandlers[model.ActionType(event.Type)]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", event.Type), zap.String("query", event.Query))
}
if !dropped {
newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs)
for _, partition := range newCreatedIDs {
if v.tableID == partition {
appendTableInfo()
break
}
}
tableInfo, deleted := handler.extractTableInfoFunc(event, v.tableID)
if tableInfo != nil {
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: tableInfo})
} else if deleted {
v.deleteVersion = uint64(event.FinishedTs)
}
default:
log.Panic("not supported ddl type",
zap.Any("ddlType", event.Type),
zap.String("DDL", event.Query))
}
}
Loading

0 comments on commit 05c03b4

Please sign in to comment.