Skip to content

Commit

Permalink
remove schema id
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Feb 18, 2025
1 parent c43626e commit d955184
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 44 deletions.
4 changes: 2 additions & 2 deletions logservice/schemastore/ddl_job_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func initDDLTableInfo(kvStorage kv.Storage) error {
}

ddlTableInfo = &event.DDLTableInfo{}
ddlTableInfo.DDLJobTable = common.WrapTableInfo(db.ID, db.Name.L, tableInfo)
ddlTableInfo.DDLJobTable = common.WrapTableInfo(db.Name.L, tableInfo)
ddlTableInfo.JobMetaColumnIDinJobTable = col.ID

// for tidb_ddl_history
Expand All @@ -192,7 +192,7 @@ func initDDLTableInfo(kvStorage kv.Storage) error {
return errors.Trace(err)
}

ddlTableInfo.DDLHistoryTable = common.WrapTableInfo(db.ID, db.Name.L, historyTableInfo)
ddlTableInfo.DDLHistoryTable = common.WrapTableInfo(db.Name.L, historyTableInfo)
ddlTableInfo.JobMetaColumnIDinHistoryTable = historyTableCol.ID

return nil
Expand Down
2 changes: 1 addition & 1 deletion logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func readTableInfoInKVSnap(snap *pebble.Snapshot, tableID int64, version uint64)
if err != nil {
log.Fatal("unmarshal table info failed", zap.Error(err))
}
return common.WrapTableInfo(table_info_entry.SchemaID, table_info_entry.SchemaName, tableInfo)
return common.WrapTableInfo(table_info_entry.SchemaName, tableInfo)
}

func unmarshalPersistedDDLEvent(value []byte) PersistedDDLEvent {
Expand Down
2 changes: 1 addition & 1 deletion logservice/schemastore/multi_version_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func buildExchangePartitionTableEventForTest(
Enable: true,
},
},
ExtraTableInfo: common.WrapTableInfo(normalSchemaID, normalSchemaName, &model.TableInfo{
ExtraTableInfo: common.WrapTableInfo(normalSchemaName, &model.TableInfo{
ID: normalTableID,
Name: pmodel.NewCIStr(normalTableName),
}),
Expand Down
40 changes: 18 additions & 22 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,12 +1299,12 @@ func extractTableInfoFuncForSingleTableDDL(event *PersistedDDLEvent, tableID int
if isPartitionTable(event.TableInfo) {
for _, partitionID := range getAllPartitionIDs(event.TableInfo) {
if tableID == partitionID {
return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
}
}
} else {
if tableID == event.TableID {
return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
}
}
log.Panic("should not reach here", zap.Any("event", event), zap.Int64("tableID", tableID))
Expand All @@ -1314,7 +1314,7 @@ func extractTableInfoFuncForSingleTableDDL(event *PersistedDDLEvent, tableID int
func extractTableInfoFuncForExchangeTablePartition(event *PersistedDDLEvent, tableID int64) (*common.TableInfo, bool) {
if tableID == event.TableID {
// old normal table id, return the table info of the partition table
return common.WrapTableInfo(event.ExtraSchemaID, event.ExtraSchemaName, event.TableInfo), false
return common.WrapTableInfo(event.ExtraSchemaName, event.TableInfo), false
} else {
physicalIDs := getAllPartitionIDs(event.TableInfo)
droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs)
Expand All @@ -1331,7 +1331,6 @@ func extractTableInfoFuncForExchangeTablePartition(event *PersistedDDLEvent, tab
// old partition id, return the table info of the normal table
columnSchema := event.ExtraTableInfo.ShadowCopyColumnSchema()
tableInfo := common.NewTableInfo(
event.SchemaID,
event.SchemaName,
pmodel.NewCIStr(event.TableName).O,
tableID,
Expand All @@ -1357,13 +1356,13 @@ func extractTableInfoFuncForTruncateTable(event *PersistedDDLEvent, tableID int6
if isPartitionTable(event.TableInfo) {
for _, partitionID := range getAllPartitionIDs(event.TableInfo) {
if tableID == partitionID {
return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
}
}
return nil, true
} else {
if tableID == event.ExtraTableID {
return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
} else if tableID == event.TableID {
return nil, true
}
Expand All @@ -1376,7 +1375,7 @@ func extractTableInfoFuncForAddPartition(event *PersistedDDLEvent, tableID int64
newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo))
for _, partition := range newCreatedIDs {
if tableID == partition {
return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
}
}
return nil, false
Expand All @@ -1403,7 +1402,7 @@ func extractTableInfoFuncForTruncateAndReorganizePartition(event *PersistedDDLEv
newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs)
for _, partition := range newCreatedIDs {
if tableID == partition {
return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
}
}
return nil, false
Expand All @@ -1414,12 +1413,12 @@ func extractTableInfoFuncForRenameTables(event *PersistedDDLEvent, tableID int64
if isPartitionTable(tableInfo) {
for _, partitionID := range getAllPartitionIDs(tableInfo) {
if tableID == partitionID {
return common.WrapTableInfo(event.SchemaIDs[i], event.SchemaNames[i], tableInfo), false
return common.WrapTableInfo(event.SchemaNames[i], tableInfo), false
}
}
} else {
if tableID == tableInfo.ID {
return common.WrapTableInfo(event.SchemaIDs[i], event.SchemaNames[i], tableInfo), false
return common.WrapTableInfo(event.SchemaNames[i], tableInfo), false
}
}
}
Expand All @@ -1432,12 +1431,12 @@ func extractTableInfoFuncForCreateTables(event *PersistedDDLEvent, tableID int64
if isPartitionTable(tableInfo) {
for _, partitionID := range getAllPartitionIDs(tableInfo) {
if tableID == partitionID {
return common.WrapTableInfo(event.SchemaID, event.SchemaName, tableInfo), false
return common.WrapTableInfo(event.SchemaName, tableInfo), false
}
}
} else {
if tableID == tableInfo.ID {
return common.WrapTableInfo(event.SchemaID, event.SchemaName, tableInfo), false
return common.WrapTableInfo(event.SchemaName, tableInfo), false
}
}
}
Expand All @@ -1459,7 +1458,7 @@ func extractTableInfoFuncForAlterTablePartitioning(event *PersistedDDLEvent, tab
}
for _, partitionID := range getAllPartitionIDs(event.TableInfo) {
if tableID == partitionID {
return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
}
}
log.Panic("should not reach here", zap.Int64("tableID", tableID))
Expand All @@ -1468,7 +1467,7 @@ func extractTableInfoFuncForAlterTablePartitioning(event *PersistedDDLEvent, tab

func extractTableInfoFuncForRemovePartitioning(event *PersistedDDLEvent, tableID int64) (*common.TableInfo, bool) {
if event.TableID == tableID {
return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
} else {
for _, partitionID := range event.PrevPartitions {
if tableID == partitionID {
Expand Down Expand Up @@ -1497,10 +1496,7 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter,
}
}
if rawEvent.TableInfo != nil {
wrapTableInfo = common.WrapTableInfo(
rawEvent.SchemaID,
rawEvent.SchemaName,
rawEvent.TableInfo)
wrapTableInfo = common.WrapTableInfo(rawEvent.SchemaName, rawEvent.TableInfo)
}

return commonEvent.DDLEvent{
Expand Down Expand Up @@ -2005,7 +2001,7 @@ func buildDDLEventForExchangeTablePartition(rawEvent *PersistedDDLEvent, tableFi
log.Fatal("should not happen")
}
ddlEvent.MultipleTableInfos = []*common.TableInfo{
common.WrapTableInfo(rawEvent.SchemaID, rawEvent.SchemaName, rawEvent.TableInfo),
common.WrapTableInfo(rawEvent.SchemaName, rawEvent.TableInfo),
rawEvent.ExtraTableInfo,
}
return ddlEvent, true
Expand Down Expand Up @@ -2039,7 +2035,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
allPhysicalIDs := getAllPartitionIDs(rawEvent.TableInfo)
if !ignorePrevTable {
resultQuerys = append(resultQuerys, querys[i])
tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.SchemaID, rawEvent.SchemaNames[i], tableInfo))
tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.SchemaNames[i], tableInfo))
ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, allPhysicalIDs...)
if !ignoreCurrentTable {
// check whether schema change
Expand Down Expand Up @@ -2082,7 +2078,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
} else {
if !ignorePrevTable {
resultQuerys = append(resultQuerys, querys[i])
tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.SchemaID, rawEvent.SchemaNames[i], tableInfo))
tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.SchemaNames[i], tableInfo))
ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, tableInfo.ID)
if !ignoreCurrentTable {
if rawEvent.ExtraSchemaIDs[i] != rawEvent.SchemaIDs[i] {
Expand Down Expand Up @@ -2198,7 +2194,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte
TableName: info.Name.O,
})
resultQuerys = append(resultQuerys, querys[i])
tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.SchemaID, rawEvent.SchemaName, info))
tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.SchemaName, info))
}
ddlEvent.TableNameChange = &commonEvent.TableNameChange{
AddName: addName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/event/ddl_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDDLEvent(t *testing.T) {
SchemaName: ddlJob.SchemaName,
TableName: ddlJob.TableName,
Query: ddlJob.Query,
TableInfo: common.WrapTableInfo(ddlJob.SchemaID, ddlJob.SchemaName, ddlJob.BinlogInfo.TableInfo),
TableInfo: common.WrapTableInfo(ddlJob.SchemaName, ddlJob.BinlogInfo.TableInfo),
FinishedTs: ddlJob.BinlogInfo.FinishedTS,
Err: apperror.ErrDDLEventError.GenWithStackByArgs("test"),
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/common/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ func NewEventTestHelper(t testing.TB) *EventTestHelper {
func (s *EventTestHelper) ApplyJob(job *timodel.Job) {
key := toTableInfosKey(job.SchemaName, job.TableName)
log.Info("apply job", zap.String("jobKey", key), zap.Any("job", job))
info := common.WrapTableInfo(
job.SchemaID,
job.SchemaName,
job.BinlogInfo.TableInfo)
info := common.WrapTableInfo(job.SchemaName, job.BinlogInfo.TableInfo)
info.InitPrivateFields()
s.tableInfos[key] = info
}
Expand Down
17 changes: 7 additions & 10 deletions pkg/common/table_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/meta/model"
datumTypes "github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tinylib/msgp/msgp"
Expand Down Expand Up @@ -287,7 +287,6 @@ const (

// TableInfo provides meta data describing a DB table.
type TableInfo struct {
SchemaID int64 `json:"schema-id"`
// NOTICE: We probably store the logical ID inside TableName,
// not the physical ID.
// For normal table, there is only one ID, which is the physical ID.
Expand Down Expand Up @@ -496,12 +495,12 @@ func (ti *TableInfo) IsPartitionTable() bool {
}

// GetRowColInfos returns all column infos for rowcodec
func (ti *TableInfo) GetRowColInfos() ([]int64, map[int64]*datumTypes.FieldType, []rowcodec.ColInfo) {
func (ti *TableInfo) GetRowColInfos() ([]int64, map[int64]*types.FieldType, []rowcodec.ColInfo) {
return ti.columnSchema.HandleColID, ti.columnSchema.RowColFieldTps, ti.columnSchema.RowColInfos
}

// GetFieldSlice returns the field types of all columns
func (ti *TableInfo) GetFieldSlice() []*datumTypes.FieldType {
func (ti *TableInfo) GetFieldSlice() []*types.FieldType {
return ti.columnSchema.RowColFieldTpsSlice
}

Expand Down Expand Up @@ -602,9 +601,8 @@ func (ti *TableInfo) GetPrimaryKeyColumnNames() []string {
return result
}

func NewTableInfo(schemaID int64, schemaName string, tableName string, tableID int64, isPartition bool, columnSchema *columnSchema) *TableInfo {
func NewTableInfo(schemaName string, tableName string, tableID int64, isPartition bool, columnSchema *columnSchema) *TableInfo {
ti := &TableInfo{
SchemaID: schemaID,
TableName: TableName{
Schema: schemaName,
Table: tableName,
Expand All @@ -625,12 +623,12 @@ func NewTableInfo(schemaID int64, schemaName string, tableName string, tableID i
}

// WrapTableInfo creates a TableInfo from a model.TableInfo
func WrapTableInfo(schemaID int64, schemaName string, info *model.TableInfo) *TableInfo {
func WrapTableInfo(schemaName string, info *model.TableInfo) *TableInfo {
// search column schema object
sharedColumnSchemaStorage := GetSharedColumnSchemaStorage()
columnSchema := sharedColumnSchemaStorage.GetOrSetColumnSchema(info)

return NewTableInfo(schemaID, schemaName, info.Name.O, info.ID, info.GetPartitionInfo() != nil, columnSchema)
return NewTableInfo(schemaName, info.Name.O, info.ID, info.GetPartitionInfo() != nil, columnSchema)
}

// GetColumnDefaultValue returns the default definition of a column.
Expand All @@ -639,15 +637,14 @@ func GetColumnDefaultValue(col *model.ColumnInfo) interface{} {
if defaultValue == nil {
defaultValue = col.GetOriginDefaultValue()
}
defaultDatum := datumTypes.NewDatum(defaultValue)
defaultDatum := types.NewDatum(defaultValue)
return defaultDatum.GetValue()
}

// BuildTiDBTableInfoWithoutVirtualColumns build a TableInfo without virual columns from the source table info
func BuildTiDBTableInfoWithoutVirtualColumns(source *TableInfo) *TableInfo {
newColumnSchema := source.columnSchema.getColumnSchemaWithoutVirtualColumns()
tableInfo := &TableInfo{
SchemaID: source.SchemaID,
TableName: source.TableName,
columnSchema: newColumnSchema,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventservice/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func genEvents(helper *pevent.EventTestHelper, t *testing.T, ddl string, dmls ..
SchemaName: job.SchemaName,
TableName: job.TableName,
Query: ddl,
TableInfo: common.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.TableInfo),
TableInfo: common.WrapTableInfo(job.SchemaName, job.BinlogInfo.TableInfo),
}, kvEvents
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/cloudstorage/path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestCheckOrWriteSchema(t *testing.T) {
DefaultValue: 10,
}
columns = append(columns, col)
tableInfo := commonType.WrapTableInfo(101, "test", &timodel.TableInfo{Columns: columns})
tableInfo := commonType.WrapTableInfo("test", &timodel.TableInfo{Columns: columns})

table := VersionedTableName{
TableNameWithPhysicTableID: tableInfo.TableName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/cloudstorage/table_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (t *TableDefinition) ToTableInfo() (*common.TableInfo, error) {
tidbTableInfo.Columns = append(tidbTableInfo.Columns, tiCol)
nextMockID += 1
}
info := common.WrapTableInfo(100, t.Schema, tidbTableInfo)
info := common.WrapTableInfo(t.Schema, tidbTableInfo)

return info, nil
}
Expand Down

0 comments on commit d955184

Please sign in to comment.