Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec(ticdc): Revert "codec(ticdc): canal-json encode table id (#11875)" #11992

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type canalJSONMessageInterface interface {
getCommitTs() uint64
getPhysicalTableID() int64
getTableID() int64
isPartition() bool
getQuery() string
getOld() map[string]interface{}
getData() map[string]interface{}
Expand Down Expand Up @@ -96,10 +95,6 @@ func (c *JSONMessage) getPhysicalTableID() int64 {
return 0
}

func (c *JSONMessage) isPartition() bool {
return false
}

func (c *JSONMessage) getQuery() string {
return c.Query
}
Expand Down Expand Up @@ -152,8 +147,6 @@ func (c *JSONMessage) pkNameSet() map[string]struct{} {

type tidbExtension struct {
CommitTs uint64 `json:"commitTs,omitempty"`
TableID int64 `json:"tableId,omitempty"`
PhysicalTableID int64 `json:"partitionId,omitempty"`
WatermarkTs uint64 `json:"watermarkTs,omitempty"`
OnlyHandleKey bool `json:"onlyHandleKey,omitempty"`
ClaimCheckLocation string `json:"claimCheckLocation,omitempty"`
Expand All @@ -172,21 +165,6 @@ func (c *canalJSONMessageWithTiDBExtension) getCommitTs() uint64 {
return c.Extensions.CommitTs
}

func (c *canalJSONMessageWithTiDBExtension) getTableID() int64 {
return c.Extensions.TableID
}

func (c *canalJSONMessageWithTiDBExtension) getPhysicalTableID() int64 {
if c.Extensions.PhysicalTableID != 0 {
return c.Extensions.PhysicalTableID
}
return c.Extensions.TableID
}

func (c *canalJSONMessageWithTiDBExtension) isPartition() bool {
return c.Extensions.PhysicalTableID != 0
}

func (b *batchDecoder) queryTableInfo(msg canalJSONMessageInterface) *model.TableInfo {
schema := *msg.getSchema()
table := *msg.getTable()
Expand Down Expand Up @@ -249,18 +227,39 @@ func (b *batchDecoder) setPhysicalTableID(event *model.RowChangedEvent, physical
}
}
for _, partition := range event.TableInfo.Partition.Definitions {
if partition.LessThan[0] == "MAXVALUE" {
lessThan := partition.LessThan[0]
if lessThan == "MAXVALUE" {
event.PhysicalTableID = partition.ID
return nil
}
if len(columnValue) < len(lessThan) {
event.PhysicalTableID = partition.ID
return nil
}
if strings.Compare(columnValue, partition.LessThan[0]) == -1 {
if strings.Compare(columnValue, lessThan) == -1 {
event.PhysicalTableID = partition.ID
return nil
}
}
return fmt.Errorf("cannot found partition for column value %s", columnValue)
// todo: support following rule if meet the corresponding workload
case pmodel.PartitionTypeHash:
targetColumnID := event.TableInfo.ForceGetColumnIDByName(strings.ReplaceAll(event.TableInfo.Partition.Expr, "`", ""))
columns := event.Columns
if columns == nil {
columns = event.PreColumns
}
var columnValue int64
for _, col := range columns {
if col.ColumnID == targetColumnID {
columnValue = col.Value.(int64)
break
}
}
result := columnValue % int64(len(event.TableInfo.Partition.Definitions))
partitionID := event.TableInfo.GetPartitionInfo().Definitions[result].ID
event.PhysicalTableID = partitionID
return nil
case pmodel.PartitionTypeKey:
case pmodel.PartitionTypeList:
case pmodel.PartitionTypeNone:
Expand All @@ -274,10 +273,8 @@ func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, err
result := new(model.RowChangedEvent)
result.TableInfo = b.queryTableInfo(msg)
result.CommitTs = msg.getCommitTs()
mysqlType := msg.getMySQLType()
result.TableInfo.TableName.IsPartition = msg.isPartition()
result.TableInfo.TableName.TableID = msg.getTableID()

mysqlType := msg.getMySQLType()
var err error
if msg.eventType() == canal.EventType_DELETE {
// for `DELETE` event, `data` contain the old data, set it as the `PreColumns`
Expand Down
12 changes: 0 additions & 12 deletions pkg/sink/codec/canal/canal_json_row_event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,6 @@ func newJSONMessageForDML(
out.RawByte('{')
out.RawString("\"commitTs\":")
out.Uint64(e.CommitTs)
out.RawByte(',')

// the logical table id
out.RawString("\"tableId\":")
out.Int64(e.TableInfo.ID)

// the physical table id
if e.TableInfo.IsPartitionTable() {
out.RawByte(',')
out.RawString("\"partitionId\":")
out.Int64(e.GetTableID())
}

// only send handle key may happen in 2 cases:
// 1. delete event, and set only handle key config. no need to encode `onlyHandleKey` field
Expand Down
188 changes: 133 additions & 55 deletions pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,9 @@ func TestDMLE2E(t *testing.T) {

require.True(t, decodedEvent.IsInsert())
if enableTiDBExtension {
require.Equal(t, insertEvent.CommitTs, decodedEvent.CommitTs)
require.Equal(t, insertEvent.GetTableID(), decodedEvent.GetTableID())
} else {
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.ID)
require.Equal(t, insertEvent.GetCommitTs(), decodedEvent.GetCommitTs())
}
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedEvent.TableInfo.GetSchemaName())
require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedEvent.TableInfo.GetTableName())

Expand Down Expand Up @@ -132,11 +129,6 @@ func TestDMLE2E(t *testing.T) {
decodedEvent, err = decoder.NextRowChangedEvent()
require.NoError(t, err)
require.True(t, decodedEvent.IsUpdate())
if enableTiDBExtension {
require.Equal(t, updateEvent.CommitTs, decodedEvent.CommitTs)
require.Equal(t, updateEvent.GetTableID(), decodedEvent.GetTableID())
require.Equal(t, updateEvent.TableInfo.IsPartitionTable(), decodedEvent.TableInfo.IsPartitionTable())
}

err = encoder.AppendRowChangedEvent(ctx, "", deleteEvent, func() {})
require.NoError(t, err)
Expand All @@ -153,11 +145,6 @@ func TestDMLE2E(t *testing.T) {
decodedEvent, err = decoder.NextRowChangedEvent()
require.NoError(t, err)
require.True(t, decodedEvent.IsDelete())
if enableTiDBExtension {
require.Equal(t, deleteEvent.CommitTs, decodedEvent.CommitTs)
require.Equal(t, deleteEvent.GetTableID(), decodedEvent.GetTableID())
require.Equal(t, deleteEvent.TableInfo.IsPartitionTable(), decodedEvent.TableInfo.IsPartitionTable())
}
}
}

Expand Down Expand Up @@ -278,8 +265,6 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) {
require.NoError(t, err, rawValue)

require.Equal(t, insertEvent.CommitTs, decodedLargeEvent.CommitTs)
require.Equal(t, insertEvent.GetTableID(), decodedLargeEvent.GetTableID())
require.Equal(t, insertEvent.TableInfo.IsPartitionTable(), decodedLargeEvent.TableInfo.IsPartitionTable())
require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedLargeEvent.TableInfo.GetSchemaName())
require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedLargeEvent.TableInfo.GetTableName())
require.Nil(t, nil, decodedLargeEvent.PreColumns)
Expand Down Expand Up @@ -547,7 +532,6 @@ func TestDDLEventWithExtension(t *testing.T) {
require.NoError(t, err)
require.Equal(t, ddlEvent.Query, decodedDDL.Query)
require.Equal(t, ddlEvent.CommitTs, decodedDDL.CommitTs)
require.Equal(t, ddlEvent.TableInfo.IsPartitionTable(), decodedDDL.TableInfo.IsPartitionTable())
require.Equal(t, ddlEvent.TableInfo.TableName.Schema, decodedDDL.TableInfo.TableName.Schema)
require.Equal(t, ddlEvent.TableInfo.TableName.Table, decodedDDL.TableInfo.TableName.Table)
}
Expand Down Expand Up @@ -645,29 +629,26 @@ func TestMaxMessageBytes(t *testing.T) {
ctx := context.Background()
topic := ""

codecConfig := common.NewConfig(config.ProtocolCanalJSON)
for _, enableTiDBExtension := range []bool{true, false} {
codecConfig.EnableTiDBExtension = enableTiDBExtension
// the test message length is smaller than max-message-bytes
codecConfig.WithMaxMessageBytes(300)
// the test message length is smaller than max-message-bytes
maxMessageBytes := 300
codecConfig := common.NewConfig(config.ProtocolCanalJSON).WithMaxMessageBytes(maxMessageBytes)

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()
builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

err = encoder.AppendRowChangedEvent(ctx, topic, row, nil)
require.NoError(t, err)
err = encoder.AppendRowChangedEvent(ctx, topic, row, nil)
require.NoError(t, err)

// the test message length is larger than max-message-bytes
codecConfig = codecConfig.WithMaxMessageBytes(100)
// the test message length is larger than max-message-bytes
codecConfig = codecConfig.WithMaxMessageBytes(100)

builder, err = NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
builder, err = NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)

encoder = builder.Build()
err = encoder.AppendRowChangedEvent(ctx, topic, row, nil)
require.Error(t, err, cerror.ErrMessageTooLarge)
}
encoder = builder.Build()
err = encoder.AppendRowChangedEvent(ctx, topic, row, nil)
require.Error(t, err, cerror.ErrMessageTooLarge)
}

func TestCanalJSONContentCompatibleE2E(t *testing.T) {
Expand Down Expand Up @@ -748,6 +729,116 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) {
}
}

func TestE2EPartitionTableByHash(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

createTableDDLEvent := helper.DDL2Event(`CREATE TABLE t (a INT,PRIMARY KEY(a)) PARTITION BY HASH (a) PARTITIONS 5`)
require.NotNil(t, createTableDDLEvent)
insertEvent := helper.DML2Event(`insert into t values (5)`, "test", "t", "p0")
require.NotNil(t, insertEvent)

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolCanalJSON)

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

message, err := encoder.EncodeDDLEvent(createTableDDLEvent)
require.NoError(t, err)

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, tp)

decodedDDL, err := decoder.NextDDLEvent()
require.NoError(t, err)
require.NotNil(t, decodedDDL)

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil)
require.NoError(t, err)
message = encoder.Build()[0]

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err = decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID)
}

func TestE2EPartitionTableByRange(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

createTableDDLEvent := helper.DDL2Event(`create table t (id int primary key, a int) PARTITION BY RANGE ( id ) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11),
PARTITION p2 VALUES LESS THAN (21))`)
require.NotNil(t, createTableDDLEvent)

insertEvent := helper.DML2Event(`insert into t (id) values (6)`, "test", "t", "p1")
require.NotNil(t, insertEvent)

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolCanalJSON)

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

message, err := encoder.EncodeDDLEvent(createTableDDLEvent)
require.NoError(t, err)

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, tp)

decodedDDL, err := decoder.NextDDLEvent()
require.NoError(t, err)
require.NotNil(t, decodedDDL)

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil)
require.NoError(t, err)
message = encoder.Build()[0]

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err = decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[1].ID)
}

func TestE2EPartitionTable(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand Down Expand Up @@ -809,13 +900,8 @@ func TestE2EPartitionTable(t *testing.T) {

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)

if enableTiDBExtension {
require.Equal(t, decodedEvent.GetTableID(), insertEvent.GetTableID())
} else {
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID)
}
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID)

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent1, nil)
require.NoError(t, err)
Expand All @@ -831,12 +917,8 @@ func TestE2EPartitionTable(t *testing.T) {
decodedEvent, err = decoder.NextRowChangedEvent()
require.NoError(t, err)

if enableTiDBExtension {
require.Equal(t, decodedEvent.GetTableID(), insertEvent1.GetTableID())
} else {
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[1].ID)
}
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[1].ID)

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent2, nil)
require.NoError(t, err)
Expand All @@ -852,11 +934,7 @@ func TestE2EPartitionTable(t *testing.T) {
decodedEvent, err = decoder.NextRowChangedEvent()
require.NoError(t, err)

if enableTiDBExtension {
require.Equal(t, decodedEvent.GetTableID(), insertEvent2.GetTableID())
} else {
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[2].ID)
}
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[2].ID)
}
}
1 change: 0 additions & 1 deletion pkg/sink/codec/canal/canal_json_txn_event_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func (d *canalJSONTxnEventDecoder) canalJSONMessage2RowChange() (*model.RowChang
result := new(model.RowChangedEvent)
result.TableInfo = newTableInfo(msg, nil)
result.CommitTs = msg.getCommitTs()
result.PhysicalTableID = msg.getPhysicalTableID()

mysqlType := msg.getMySQLType()
var err error
Expand Down
Loading
Loading