From 7260be773a672c76aae60d5a3889c9eb44b58597 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 9 Jan 2025 11:20:33 +0800 Subject: [PATCH 1/5] Revert "codec(ticdc): canal-json encode table id (#11875)" This reverts commit 44a46bf23fe31995f120454b29de6feb839d1bc6. --- pkg/sink/codec/canal/canal_json_message.go | 36 +-------------- .../canal/canal_json_row_event_encoder.go | 12 ----- .../canal_json_row_event_encoder_test.go | 46 ++++++------------- .../canal/canal_json_txn_event_decoder.go | 1 - .../canal_json_txn_event_decoder_test.go | 6 +-- 5 files changed, 18 insertions(+), 83 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index fda1b3768e2..f184d53fcd6 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -38,9 +38,6 @@ type canalJSONMessageInterface interface { getSchema() *string getTable() *string getCommitTs() uint64 - getPhysicalTableID() int64 - getTableID() int64 - isPartition() bool getQuery() string getOld() map[string]interface{} getData() map[string]interface{} @@ -88,18 +85,6 @@ func (c *JSONMessage) getCommitTs() uint64 { return 0 } -func (c *JSONMessage) getTableID() int64 { - return 0 -} - -func (c *JSONMessage) getPhysicalTableID() int64 { - return 0 -} - -func (c *JSONMessage) isPartition() bool { - return false -} - func (c *JSONMessage) getQuery() string { return c.Query } @@ -152,8 +137,6 @@ func (c *JSONMessage) pkNameSet() map[string]struct{} { type tidbExtension struct { CommitTs uint64 `json:"commitTs,omitempty"` - TableID int64 `json:"tableId,omitempty"` - PhysicalTableID int64 `json:"partitionId,omitempty"` WatermarkTs uint64 `json:"watermarkTs,omitempty"` OnlyHandleKey bool `json:"onlyHandleKey,omitempty"` ClaimCheckLocation string `json:"claimCheckLocation,omitempty"` @@ -172,21 +155,6 @@ func (c *canalJSONMessageWithTiDBExtension) getCommitTs() uint64 { return c.Extensions.CommitTs } -func (c *canalJSONMessageWithTiDBExtension) getTableID() int64 { - return c.Extensions.TableID -} - -func (c *canalJSONMessageWithTiDBExtension) getPhysicalTableID() int64 { - if c.Extensions.PhysicalTableID != 0 { - return c.Extensions.PhysicalTableID - } - return c.Extensions.TableID -} - -func (c *canalJSONMessageWithTiDBExtension) isPartition() bool { - return c.Extensions.PhysicalTableID != 0 -} - func (b *batchDecoder) queryTableInfo(msg canalJSONMessageInterface) *model.TableInfo { schema := *msg.getSchema() table := *msg.getTable() @@ -274,10 +242,8 @@ func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, err result := new(model.RowChangedEvent) result.TableInfo = b.queryTableInfo(msg) result.CommitTs = msg.getCommitTs() - mysqlType := msg.getMySQLType() - result.TableInfo.TableName.IsPartition = msg.isPartition() - result.TableInfo.TableName.TableID = msg.getTableID() + mysqlType := msg.getMySQLType() var err error if msg.eventType() == canal.EventType_DELETE { // for `DELETE` event, `data` contain the old data, set it as the `PreColumns` diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go index b5ad2d5e0db..e1efefb3f32 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -255,18 +255,6 @@ func newJSONMessageForDML( out.RawByte('{') out.RawString("\"commitTs\":") out.Uint64(e.CommitTs) - out.RawByte(',') - - // the logical table id - out.RawString("\"tableId\":") - out.Int64(e.TableInfo.ID) - - // the physical table id - if e.TableInfo.IsPartitionTable() { - out.RawByte(',') - out.RawString("\"partitionId\":") - out.Int64(e.GetTableID()) - } // only send handle key may happen in 2 cases: // 1. delete event, and set only handle key config. no need to encode `onlyHandleKey` field diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index dd7f891a0dd..bc904f79f07 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -132,11 +132,6 @@ func TestDMLE2E(t *testing.T) { decodedEvent, err = decoder.NextRowChangedEvent() require.NoError(t, err) require.True(t, decodedEvent.IsUpdate()) - if enableTiDBExtension { - require.Equal(t, updateEvent.CommitTs, decodedEvent.CommitTs) - require.Equal(t, updateEvent.GetTableID(), decodedEvent.GetTableID()) - require.Equal(t, updateEvent.TableInfo.IsPartitionTable(), decodedEvent.TableInfo.IsPartitionTable()) - } err = encoder.AppendRowChangedEvent(ctx, "", deleteEvent, func() {}) require.NoError(t, err) @@ -153,11 +148,6 @@ func TestDMLE2E(t *testing.T) { decodedEvent, err = decoder.NextRowChangedEvent() require.NoError(t, err) require.True(t, decodedEvent.IsDelete()) - if enableTiDBExtension { - require.Equal(t, deleteEvent.CommitTs, decodedEvent.CommitTs) - require.Equal(t, deleteEvent.GetTableID(), decodedEvent.GetTableID()) - require.Equal(t, deleteEvent.TableInfo.IsPartitionTable(), decodedEvent.TableInfo.IsPartitionTable()) - } } } @@ -278,8 +268,6 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) { require.NoError(t, err, rawValue) require.Equal(t, insertEvent.CommitTs, decodedLargeEvent.CommitTs) - require.Equal(t, insertEvent.GetTableID(), decodedLargeEvent.GetTableID()) - require.Equal(t, insertEvent.TableInfo.IsPartitionTable(), decodedLargeEvent.TableInfo.IsPartitionTable()) require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedLargeEvent.TableInfo.GetSchemaName()) require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedLargeEvent.TableInfo.GetTableName()) require.Nil(t, nil, decodedLargeEvent.PreColumns) @@ -547,7 +535,6 @@ func TestDDLEventWithExtension(t *testing.T) { require.NoError(t, err) require.Equal(t, ddlEvent.Query, decodedDDL.Query) require.Equal(t, ddlEvent.CommitTs, decodedDDL.CommitTs) - require.Equal(t, ddlEvent.TableInfo.IsPartitionTable(), decodedDDL.TableInfo.IsPartitionTable()) require.Equal(t, ddlEvent.TableInfo.TableName.Schema, decodedDDL.TableInfo.TableName.Schema) require.Equal(t, ddlEvent.TableInfo.TableName.Table, decodedDDL.TableInfo.TableName.Table) } @@ -645,29 +632,26 @@ func TestMaxMessageBytes(t *testing.T) { ctx := context.Background() topic := "" - codecConfig := common.NewConfig(config.ProtocolCanalJSON) - for _, enableTiDBExtension := range []bool{true, false} { - codecConfig.EnableTiDBExtension = enableTiDBExtension - // the test message length is smaller than max-message-bytes - codecConfig.WithMaxMessageBytes(300) + // the test message length is smaller than max-message-bytes + maxMessageBytes := 300 + codecConfig := common.NewConfig(config.ProtocolCanalJSON).WithMaxMessageBytes(maxMessageBytes) - builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) - require.NoError(t, err) - encoder := builder.Build() + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() - err = encoder.AppendRowChangedEvent(ctx, topic, row, nil) - require.NoError(t, err) + err = encoder.AppendRowChangedEvent(ctx, topic, row, nil) + require.NoError(t, err) - // the test message length is larger than max-message-bytes - codecConfig = codecConfig.WithMaxMessageBytes(100) + // the test message length is larger than max-message-bytes + codecConfig = codecConfig.WithMaxMessageBytes(100) - builder, err = NewJSONRowEventEncoderBuilder(ctx, codecConfig) - require.NoError(t, err) + builder, err = NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) - encoder = builder.Build() - err = encoder.AppendRowChangedEvent(ctx, topic, row, nil) - require.Error(t, err, cerror.ErrMessageTooLarge) - } + encoder = builder.Build() + err = encoder.AppendRowChangedEvent(ctx, topic, row, nil) + require.Error(t, err, cerror.ErrMessageTooLarge) } func TestCanalJSONContentCompatibleE2E(t *testing.T) { diff --git a/pkg/sink/codec/canal/canal_json_txn_event_decoder.go b/pkg/sink/codec/canal/canal_json_txn_event_decoder.go index cf0b326084d..28aa79c0b39 100644 --- a/pkg/sink/codec/canal/canal_json_txn_event_decoder.go +++ b/pkg/sink/codec/canal/canal_json_txn_event_decoder.go @@ -115,7 +115,6 @@ func (d *canalJSONTxnEventDecoder) canalJSONMessage2RowChange() (*model.RowChang result := new(model.RowChangedEvent) result.TableInfo = newTableInfo(msg, nil) result.CommitTs = msg.getCommitTs() - result.PhysicalTableID = msg.getPhysicalTableID() mysqlType := msg.getMySQLType() var err error diff --git a/pkg/sink/codec/canal/canal_json_txn_event_decoder_test.go b/pkg/sink/codec/canal/canal_json_txn_event_decoder_test.go index 3f222082cfd..e2d394f6727 100644 --- a/pkg/sink/codec/canal/canal_json_txn_event_decoder_test.go +++ b/pkg/sink/codec/canal/canal_json_txn_event_decoder_test.go @@ -29,7 +29,7 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { _, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) ctx := context.Background() - for _, encodeEnable := range []bool{true, false} { + for _, encodeEnable := range []bool{false, true} { encodeConfig := common.NewConfig(config.ProtocolCanalJSON) encodeConfig.EnableTiDBExtension = encodeEnable encodeConfig.Terminator = config.CRLF @@ -45,7 +45,7 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { require.Equal(t, 1, len(messages)) msg := messages[0] - for _, decodeEnable := range []bool{true, false} { + for _, decodeEnable := range []bool{false, true} { decodeConfig := common.NewConfig(config.ProtocolCanalJSON) decodeConfig.EnableTiDBExtension = decodeEnable @@ -63,8 +63,6 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { if encodeEnable && decodeEnable { require.Equal(t, insertEvent.CommitTs, decodedEvent.CommitTs) - require.Equal(t, insertEvent.GetTableID(), decodedEvent.GetTableID()) - require.Equal(t, insertEvent.TableInfo.IsPartitionTable(), decodedEvent.TableInfo.IsPartitionTable()) } require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedEvent.TableInfo.GetSchemaName()) require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedEvent.TableInfo.GetTableName()) From 3b2d2e884cb0d589391500c3d58e7c3d55b03d1c Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 9 Jan 2025 11:42:17 +0800 Subject: [PATCH 2/5] add method --- pkg/sink/codec/canal/canal_json_message.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index f184d53fcd6..e66df1e2bfc 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -38,6 +38,8 @@ type canalJSONMessageInterface interface { getSchema() *string getTable() *string getCommitTs() uint64 + getPhysicalTableID() int64 + getTableID() int64 getQuery() string getOld() map[string]interface{} getData() map[string]interface{} @@ -85,6 +87,14 @@ func (c *JSONMessage) getCommitTs() uint64 { return 0 } +func (c *JSONMessage) getTableID() int64 { + return 0 +} + +func (c *JSONMessage) getPhysicalTableID() int64 { + return 0 +} + func (c *JSONMessage) getQuery() string { return c.Query } From 9fe977fdad719d1c2a07854dbdfc48c8e5bc9f47 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 9 Jan 2025 16:01:53 +0800 Subject: [PATCH 3/5] fix ut --- .../canal_json_row_event_encoder_test.go | 32 +++++-------------- 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index bc904f79f07..78e2319c90c 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -90,12 +90,9 @@ func TestDMLE2E(t *testing.T) { require.True(t, decodedEvent.IsInsert()) if enableTiDBExtension { - require.Equal(t, insertEvent.CommitTs, decodedEvent.CommitTs) - require.Equal(t, insertEvent.GetTableID(), decodedEvent.GetTableID()) - } else { - require.NotZero(t, decodedEvent.GetTableID()) - require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.ID) + require.Equal(t, insertEvent.GetCommitTs(), decodedEvent.GetCommitTs()) } + require.NotZero(t, decodedEvent.GetTableID()) require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedEvent.TableInfo.GetSchemaName()) require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedEvent.TableInfo.GetTableName()) @@ -793,13 +790,8 @@ func TestE2EPartitionTable(t *testing.T) { decodedEvent, err := decoder.NextRowChangedEvent() require.NoError(t, err) - - if enableTiDBExtension { - require.Equal(t, decodedEvent.GetTableID(), insertEvent.GetTableID()) - } else { - require.NotZero(t, decodedEvent.GetTableID()) - require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID) - } + require.NotZero(t, decodedEvent.GetTableID()) + require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID) err = encoder.AppendRowChangedEvent(ctx, "", insertEvent1, nil) require.NoError(t, err) @@ -815,12 +807,8 @@ func TestE2EPartitionTable(t *testing.T) { decodedEvent, err = decoder.NextRowChangedEvent() require.NoError(t, err) - if enableTiDBExtension { - require.Equal(t, decodedEvent.GetTableID(), insertEvent1.GetTableID()) - } else { - require.NotZero(t, decodedEvent.GetTableID()) - require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[1].ID) - } + require.NotZero(t, decodedEvent.GetTableID()) + require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[1].ID) err = encoder.AppendRowChangedEvent(ctx, "", insertEvent2, nil) require.NoError(t, err) @@ -836,11 +824,7 @@ func TestE2EPartitionTable(t *testing.T) { decodedEvent, err = decoder.NextRowChangedEvent() require.NoError(t, err) - if enableTiDBExtension { - require.Equal(t, decodedEvent.GetTableID(), insertEvent2.GetTableID()) - } else { - require.NotZero(t, decodedEvent.GetTableID()) - require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[2].ID) - } + require.NotZero(t, decodedEvent.GetTableID()) + require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[2].ID) } } From fda05d00c62455d185033850d787bbd60295b5f2 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 14 Jan 2025 16:15:30 +0800 Subject: [PATCH 4/5] fix cannot found partition --- pkg/sink/codec/canal/canal_json_message.go | 9 ++- .../canal_json_row_event_encoder_test.go | 57 +++++++++++++++++++ 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index e66df1e2bfc..56a0bb086c0 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -227,11 +227,16 @@ func (b *batchDecoder) setPhysicalTableID(event *model.RowChangedEvent, physical } } for _, partition := range event.TableInfo.Partition.Definitions { - if partition.LessThan[0] == "MAXVALUE" { + lessThan := partition.LessThan[0] + if lessThan == "MAXVALUE" { event.PhysicalTableID = partition.ID return nil } - if strings.Compare(columnValue, partition.LessThan[0]) == -1 { + if len(columnValue) < len(lessThan) { + event.PhysicalTableID = partition.ID + return nil + } + if strings.Compare(columnValue, lessThan) == -1 { event.PhysicalTableID = partition.ID return nil } diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index 78e2319c90c..b1548a4cfab 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -729,6 +729,63 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { } } +func TestE2EPartitionTableByRange(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + + createTableDDLEvent := helper.DDL2Event(`create table t (id int primary key, a int) PARTITION BY RANGE ( id ) ( + PARTITION p0 VALUES LESS THAN (6), + PARTITION p1 VALUES LESS THAN (11), + PARTITION p2 VALUES LESS THAN (21))`) + require.NotNil(t, createTableDDLEvent) + + insertEvent := helper.DML2Event(`insert into t (id) values (6)`, "test", "t", "p1") + require.NotNil(t, insertEvent) + + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + message, err := encoder.EncodeDDLEvent(createTableDDLEvent) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + tp, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, tp) + + decodedDDL, err := decoder.NextDDLEvent() + require.NoError(t, err) + require.NotNil(t, decodedDDL) + + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil) + require.NoError(t, err) + message = encoder.Build()[0] + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err = decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, tp) + + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.NotZero(t, decodedEvent.GetTableID()) + require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[1].ID) +} + func TestE2EPartitionTable(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close() From 3edb7e17ca37cecd2f67f288f2c689be0bae9a39 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 14 Jan 2025 17:10:54 +0800 Subject: [PATCH 5/5] add one ut to cover hash partition table scenario --- pkg/sink/codec/canal/canal_json_message.go | 16 ++++++ .../canal_json_row_event_encoder_test.go | 53 +++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index 56a0bb086c0..17754d805ac 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -244,6 +244,22 @@ func (b *batchDecoder) setPhysicalTableID(event *model.RowChangedEvent, physical return fmt.Errorf("cannot found partition for column value %s", columnValue) // todo: support following rule if meet the corresponding workload case pmodel.PartitionTypeHash: + targetColumnID := event.TableInfo.ForceGetColumnIDByName(strings.ReplaceAll(event.TableInfo.Partition.Expr, "`", "")) + columns := event.Columns + if columns == nil { + columns = event.PreColumns + } + var columnValue int64 + for _, col := range columns { + if col.ColumnID == targetColumnID { + columnValue = col.Value.(int64) + break + } + } + result := columnValue % int64(len(event.TableInfo.Partition.Definitions)) + partitionID := event.TableInfo.GetPartitionInfo().Definitions[result].ID + event.PhysicalTableID = partitionID + return nil case pmodel.PartitionTypeKey: case pmodel.PartitionTypeList: case pmodel.PartitionTypeNone: diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index b1548a4cfab..8ffb6b46b7e 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -729,6 +729,59 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { } } +func TestE2EPartitionTableByHash(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + + createTableDDLEvent := helper.DDL2Event(`CREATE TABLE t (a INT,PRIMARY KEY(a)) PARTITION BY HASH (a) PARTITIONS 5`) + require.NotNil(t, createTableDDLEvent) + insertEvent := helper.DML2Event(`insert into t values (5)`, "test", "t", "p0") + require.NotNil(t, insertEvent) + + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + message, err := encoder.EncodeDDLEvent(createTableDDLEvent) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + tp, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, tp) + + decodedDDL, err := decoder.NextDDLEvent() + require.NoError(t, err) + require.NotNil(t, decodedDDL) + + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil) + require.NoError(t, err) + message = encoder.Build()[0] + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err = decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, tp) + + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.NotZero(t, decodedEvent.GetTableID()) + require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID) +} + func TestE2EPartitionTableByRange(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close()