Skip to content

Commit

Permalink
adjust decoder ddl event signature
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Feb 18, 2025
1 parent 376dcaa commit 759168d
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 20 deletions.
33 changes: 16 additions & 17 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -369,7 +370,7 @@ func (b *canalJSONDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error)

// NextDDLEvent implements the RowEventDecoder interface
// `HasNext` should be called before this.
func (b *canalJSONDecoder) NextDDLEvent() (*model.DDLEvent, error) {
func (b *canalJSONDecoder) NextDDLEvent() (*commonEvent.DDLEvent, error) {
if b.msg == nil || b.msg.messageType() != common.MessageTypeDDL {
return nil, errors.ErrDecodeFailed.
GenWithStack("not found ddl event message")
Expand Down Expand Up @@ -424,23 +425,21 @@ func (b *canalJSONDecoder) NextResolvedEvent() (uint64, error) {
return withExtensionEvent.Extensions.WatermarkTs, nil
}

func canalJSONMessage2DDLEvent(msg canalJSONMessageInterface) *model.DDLEvent {
result := new(model.DDLEvent)
// we lost the startTs from kafka message
result.CommitTs = msg.getCommitTs()

result.TableInfo = new(model.TableInfo)
result.TableInfo.TableName = model.TableName{
Schema: *msg.getSchema(),
Table: *msg.getTable(),
}

// we lost DDL type from canal json format, only got the DDL SQL.
func canalJSONMessage2DDLEvent(msg canalJSONMessageInterface) *commonEvent.DDLEvent {
result := new(commonEvent.DDLEvent)
result.Query = msg.getQuery()

// hack the DDL Type to be compatible with MySQL sink's logic
// see https://github.com/pingcap/tiflow/blob/0578db337d/cdc/sink/mysql.go#L362-L370
result.Type = getDDLActionType(result.Query)
//// we lost the startTs from kafka message
//result.CommitTs = msg.getCommitTs()
//
//result.TableInfo = new(model.TableInfo)
//result.TableInfo.TableName = model.TableName{
// Schema: *msg.getSchema(),
// Table: *msg.getTable(),
//}
//
//// hack the DDL Type to be compatible with MySQL sink's logic
//// see https://github.com/pingcap/tiflow/blob/0578db337d/cdc/sink/mysql.go#L362-L370
//result.Type = getDDLActionType(result.Query)
return result
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/sink/codec/canal/canal_json_txn_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package canal
import (
"bytes"
"encoding/json"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/errors"
Expand Down Expand Up @@ -167,6 +168,6 @@ func (d *canalJSONTxnEventDecoder) NextResolvedEvent() (uint64, error) {
}

// NextDDLEvent implements the RowEventDecoder interface
func (d *canalJSONTxnEventDecoder) NextDDLEvent() (*model.DDLEvent, error) {
func (d *canalJSONTxnEventDecoder) NextDDLEvent() (*commonEvent.DDLEvent, error) {
return nil, nil
}
3 changes: 2 additions & 1 deletion pkg/sink/codec/common/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package common

import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/tiflow/cdc/model"
)

Expand All @@ -35,5 +36,5 @@ type RowEventDecoder interface {
// NextRowChangedEvent returns the next row changed event if exists
NextRowChangedEvent() (*model.RowChangedEvent, error)
// NextDDLEvent returns the next DDL event if exists
NextDDLEvent() (*model.DDLEvent, error)
NextDDLEvent() (*commonEvent.DDLEvent, error)
}
3 changes: 2 additions & 1 deletion pkg/sink/codec/open/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"encoding/binary"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"path/filepath"
"sort"
"strings"
Expand Down Expand Up @@ -169,7 +170,7 @@ func (b *BatchDecoder) NextResolvedEvent() (uint64, error) {
}

// NextDDLEvent implements the RowEventDecoder interface
func (b *BatchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
func (b *BatchDecoder) NextDDLEvent() (*commonEvent.DDLEvent, error) {
if b.nextKey.Type != common.MessageTypeDDL {
return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found ddl event message")
}
Expand Down

0 comments on commit 759168d

Please sign in to comment.