Skip to content

Commit

Permalink
try to set the table info
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Feb 18, 2025
1 parent d955184 commit 3fc434d
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 74 deletions.
156 changes: 84 additions & 72 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func NewBatchDecoder(
storage: externalStorage,
upstreamTiDB: db,
bytesDecoder: charmap.ISO8859_1.NewDecoder(),
tableInfoCache: make(map[tableKey]*model.TableInfo),
tableInfoCache: make(map[tableKey]*commonType.TableInfo),
partitionInfoCache: make(map[tableKey]*timodel.PartitionInfo),
tableIDAllocator: common.NewFakeTableIDAllocator(),
}, nil
Expand Down Expand Up @@ -431,6 +431,7 @@ func (b *canalJSONDecoder) NextResolvedEvent() (uint64, error) {
func canalJSONMessage2DDLEvent(msg canalJSONMessageInterface) *commonEvent.DDLEvent {
result := new(commonEvent.DDLEvent)
result.Query = msg.getQuery()
result.BlockedTables = nil // todo: set this
// todo: getDDLActionType can handle ActionExchangeTablePartition,
// it's used by the mysql sink.
result.Type = byte(getDDLActionType(result.Query))
Expand All @@ -443,7 +444,7 @@ func canalJSONMessage2DDLEvent(msg canalJSONMessageInterface) *commonEvent.DDLEv
func canalJSONColumnMap2RowChangeColumns(
cols map[string]interface{},
mysqlType map[string]string,
tableInfo *model.TableInfo,
tableInfo *commonType.TableInfo,
) ([]*model.ColumnData, error) {
result := make([]*model.ColumnData, 0, len(cols))
for _, columnInfo := range tableInfo.Columns {

Check failure on line 450 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

tableInfo.Columns undefined (type *"github.com/pingcap/ticdc/pkg/common".TableInfo has no field or method Columns)

Check failure on line 450 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 1 on 7.5

tableInfo.Columns undefined (type *"github.com/pingcap/ticdc/pkg/common".TableInfo has no field or method Columns)

Check failure on line 450 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 2

tableInfo.Columns undefined (type *"github.com/pingcap/ticdc/pkg/common".TableInfo has no field or method Columns)

Check failure on line 450 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 3

tableInfo.Columns undefined (type *"github.com/pingcap/ticdc/pkg/common".TableInfo has no field or method Columns)

Check failure on line 450 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / Failover E2E Test

tableInfo.Columns undefined (type *"github.com/pingcap/ticdc/pkg/common".TableInfo has no field or method Columns)

Check failure on line 450 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 1

tableInfo.Columns undefined (type *"github.com/pingcap/ticdc/pkg/common".TableInfo has no field or method Columns)
Expand Down Expand Up @@ -605,20 +606,20 @@ func (b *canalJSONDecoder) queryTableInfo(msg canalJSONMessageInterface) *common
tableInfo, ok := b.tableInfoCache[cacheKey]
if !ok {
partitionInfo := b.partitionInfoCache[cacheKey]
tableInfo = newTableInfo(msg, partitionInfo)
tableInfo.ID = b.tableIDAllocator.AllocateTableID(schema, table)
if tableInfo.Partition != nil {
for idx, partition := range tableInfo.Partition.Definitions {
partitionID := b.tableIDAllocator.AllocatePartitionID(schema, table, partition.Name.O)
tableInfo.Partition.Definitions[idx].ID = partitionID
}
}
tableID := b.tableIDAllocator.AllocateTableID(schema, table)
tableInfo = newTableInfo(msg, tableID, partitionInfo)
//if tableInfo.Partition != nil {
// for idx, partition := range tableInfo.Partition.Definitions {
// partitionID := b.tableIDAllocator.AllocatePartitionID(schema, table, partition.Name.O)
// tableInfo.Partition.Definitions[idx].ID = partitionID
// }
//}
b.tableInfoCache[cacheKey] = tableInfo
}
return tableInfo
}

func newTableInfo(msg canalJSONMessageInterface, partitionInfo *timodel.PartitionInfo) *commonType.TableInfo {
func newTableInfo(msg canalJSONMessageInterface, tableID int64, partitionInfo *timodel.PartitionInfo) *commonType.TableInfo {
//schema := *msg.getSchema()
//table := *msg.getTable()
//tidbTableInfo := &timodel.TableInfo{}
Expand All @@ -631,69 +632,80 @@ func newTableInfo(msg canalJSONMessageInterface, partitionInfo *timodel.Partitio
//setIndexes(tidbTableInfo, pkNames)
//tidbTableInfo.Partition = partitionInfo
//return model.WrapTableInfo(100, schema, 1000, tidbTableInfo)
result := new(commonType.TableInfo)
return result
//tableInfo := common.NewTableInfo(
// event.SchemaName,
// pmodel.NewCIStr(event.TableName).O,
// tableID,
// false,
// columnSchema)
//var (
// columnSchema *commonType.columnSchema
//)
isPartition := partitionInfo != nil
return commonType.NewTableInfo(*msg.getSchema(), pmodel.NewCIStr(*msg.getTable()).O, tableID, isPartition, columnSchema)

Check failure on line 645 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

undefined: columnSchema

Check failure on line 645 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 1 on 7.5

undefined: columnSchema

Check failure on line 645 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 2

undefined: columnSchema

Check failure on line 645 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 3

undefined: columnSchema

Check failure on line 645 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / Failover E2E Test

undefined: columnSchema

Check failure on line 645 in pkg/sink/codec/canal/canal_json_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 1

undefined: columnSchema
}

func (b *canalJSONDecoder) setPhysicalTableID(event *model.RowChangedEvent) error {
if event.TableInfo.Partition == nil {
event.PhysicalTableID = event.TableInfo.ID
return nil
}
switch event.TableInfo.Partition.Type {
case pmodel.PartitionTypeRange:
targetColumnID := event.TableInfo.ForceGetColumnIDByName(strings.ReplaceAll(event.TableInfo.Partition.Expr, "`", ""))
columns := event.Columns
if columns == nil {
columns = event.PreColumns
}
var columnValue string
for _, col := range columns {
if col.ColumnID == targetColumnID {
columnValue = model.ColumnValueString(col.Value)
break
}
}
for _, partition := range event.TableInfo.Partition.Definitions {
lessThan := partition.LessThan[0]
if lessThan == "MAXVALUE" {
event.PhysicalTableID = partition.ID
return nil
}
if len(columnValue) < len(lessThan) {
event.PhysicalTableID = partition.ID
return nil
}
if strings.Compare(columnValue, lessThan) == -1 {
event.PhysicalTableID = partition.ID
return nil
}
}
return fmt.Errorf("cannot found partition for column value %s", columnValue)
// todo: support following rule if meet the corresponding workload
case pmodel.PartitionTypeHash:
targetColumnID := event.TableInfo.ForceGetColumnIDByName(strings.ReplaceAll(event.TableInfo.Partition.Expr, "`", ""))
columns := event.Columns
if columns == nil {
columns = event.PreColumns
}
var columnValue int64
for _, col := range columns {
if col.ColumnID == targetColumnID {
columnValue = col.Value.(int64)
break
}
}
result := columnValue % int64(len(event.TableInfo.Partition.Definitions))
partitionID := event.TableInfo.GetPartitionInfo().Definitions[result].ID
event.PhysicalTableID = partitionID
return nil
case pmodel.PartitionTypeKey:
case pmodel.PartitionTypeList:
case pmodel.PartitionTypeNone:
default:
}
return fmt.Errorf("manually set partition id for partition type %s not supported yet", event.TableInfo.Partition.Type)
func (b *canalJSONDecoder) setPhysicalTableID(event *commonEvent.DMLEvent) error {
event.PhysicalTableID = event.TableInfo.TableName.TableID
return nil
//if event.TableInfo.Partition == nil {
// event.PhysicalTableID = event.TableInfo.ID
// return nil
//}
//switch event.TableInfo.Partition.Type {
//case pmodel.PartitionTypeRange:
// targetColumnID := event.TableInfo.ForceGetColumnIDByName(strings.ReplaceAll(event.TableInfo.Partition.Expr, "`", ""))
// columns := event.Columns
// if columns == nil {
// columns = event.PreColumns
// }
// var columnValue string
// for _, col := range columns {
// if col.ColumnID == targetColumnID {
// columnValue = model.ColumnValueString(col.Value)
// break
// }
// }
// for _, partition := range event.TableInfo.Partition.Definitions {
// lessThan := partition.LessThan[0]
// if lessThan == "MAXVALUE" {
// event.PhysicalTableID = partition.ID
// return nil
// }
// if len(columnValue) < len(lessThan) {
// event.PhysicalTableID = partition.ID
// return nil
// }
// if strings.Compare(columnValue, lessThan) == -1 {
// event.PhysicalTableID = partition.ID
// return nil
// }
// }
// return fmt.Errorf("cannot found partition for column value %s", columnValue)
//// todo: support following rule if meet the corresponding workload
//case pmodel.PartitionTypeHash:
// targetColumnID := event.TableInfo.ForceGetColumnIDByName(strings.ReplaceAll(event.TableInfo.Partition.Expr, "`", ""))
// columns := event.Columns
// if columns == nil {
// columns = event.PreColumns
// }
// var columnValue int64
// for _, col := range columns {
// if col.ColumnID == targetColumnID {
// columnValue = col.Value.(int64)
// break
// }
// }
// result := columnValue % int64(len(event.TableInfo.Partition.Definitions))
// partitionID := event.TableInfo.GetPartitionInfo().Definitions[result].ID
// event.PhysicalTableID = partitionID
// return nil
//case pmodel.PartitionTypeKey:
//case pmodel.PartitionTypeList:
//case pmodel.PartitionTypeNone:
//default:
//}
//return fmt.Errorf("manually set partition id for partition type %s not supported yet", event.TableInfo.Partition.Type)
}

// return DDL ActionType by the prefix
Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/canal/canal_json_txn_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func (d *canalJSONTxnEventDecoder) NextRowChangedEvent() (*commonEvent.DMLEvent,
return result, nil
}

func (d *canalJSONTxnEventDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, error) {
func (d *canalJSONTxnEventDecoder) canalJSONMessage2RowChange() (*commonEvent.DMLEvent, error) {
msg := d.msg
result := new(model.RowChangedEvent)
result := new(commonEvent.DMLEvent)
result.TableInfo = newTableInfo(msg, nil)

Check failure on line 117 in pkg/sink/codec/canal/canal_json_txn_decoder.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

not enough arguments in call to newTableInfo

Check failure on line 117 in pkg/sink/codec/canal/canal_json_txn_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 1 on 7.5

not enough arguments in call to newTableInfo

Check failure on line 117 in pkg/sink/codec/canal/canal_json_txn_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 2

not enough arguments in call to newTableInfo

Check failure on line 117 in pkg/sink/codec/canal/canal_json_txn_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 3

not enough arguments in call to newTableInfo

Check failure on line 117 in pkg/sink/codec/canal/canal_json_txn_decoder.go

View workflow job for this annotation

GitHub Actions / Failover E2E Test

not enough arguments in call to newTableInfo

Check failure on line 117 in pkg/sink/codec/canal/canal_json_txn_decoder.go

View workflow job for this annotation

GitHub Actions / E2E Test Group 1

not enough arguments in call to newTableInfo
result.CommitTs = msg.getCommitTs()

Expand Down

0 comments on commit 3fc434d

Please sign in to comment.