Skip to content

Commit

Permalink
support write ddl ts downstream && make ddl execution idempotent (#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Oct 21, 2024
1 parent 4594615 commit 3b89639
Show file tree
Hide file tree
Showing 20 changed files with 636 additions and 201 deletions.
38 changes: 22 additions & 16 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/flowbehappy/tigate/pkg/common"
commonEvent "github.com/flowbehappy/tigate/pkg/common/event"
"github.com/flowbehappy/tigate/pkg/filter"
"github.com/flowbehappy/tigate/pkg/sink/util"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser/model"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -97,7 +97,7 @@ type Dispatcher struct {
schemaID int64

// only exist when the dispatcher is a table trigger event dispatcher
tableNameStore *TableNameStore
tableSchemaStore *util.TableSchemaStore

// isReady is used to indicate whether the dispatcher is ready.
// If false, the dispatcher will drop the event it received.
Expand Down Expand Up @@ -133,10 +133,10 @@ func NewDispatcher(
}
dispatcher.startTs.Store(startTs)

// only when is not mysql sink, table trigger event dispatcher need tableNameStore to store the table name
// only when is not mysql sink, table trigger event dispatcher need tableSchemaStore to store the table name
// in order to calculate all the topics when sending checkpointTs to downstream
if tableSpan.Equal(heartbeatpb.DDLSpan) && dispatcher.sink.SinkType() != tisink.MysqlSinkType {
dispatcher.tableNameStore = NewTableNameStore()
dispatcher.tableSchemaStore = util.NewTableSchemaStore()
}

dispatcher.AddToDynamicStream()
Expand All @@ -148,7 +148,7 @@ func NewDispatcher(
// If we get a ack info, we need to check whether the ack is for the current pending ddl event. If so, we can cancel the resend task.
// If we get a dispatcher action, we need to check whether the action is for the current pending ddl event. If so, we can deal the ddl event based on the action.
// 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream(async).
// 2. If the action is a pass, we just need to pass the event in tableProgress(for correct calculation) and wake the dispatcherEventsHandler
// 2. If the action is a pass, we just need to pass the event in tableProgress(for correct calculation) and wake the dispatcherEventHandler
func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.DispatcherStatus) {
if d.blockPendingEvent == nil {
// receive outdated status
Expand Down Expand Up @@ -258,8 +258,8 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block boo
zap.Int64("table", event.TableID),
zap.Uint64("commitTs", event.GetCommitTs()),
zap.Uint64("seq", event.GetSeq()))
if d.tableNameStore != nil {
d.tableNameStore.AddEvent(event)
if d.tableSchemaStore != nil {
d.tableSchemaStore.AddEvent(event)
}

event.AddPostFlushFunc(func() {
Expand Down Expand Up @@ -332,7 +332,19 @@ func shouldBlock(event commonEvent.BlockEvent) bool {
switch event.GetType() {
case commonEvent.TypeDDLEvent:
ddlEvent := event.(*commonEvent.DDLEvent)
return filter.ShouldBlock(model.ActionType(ddlEvent.Type))
if ddlEvent.BlockedTables != nil {
switch ddlEvent.GetBlockedTables().InfluenceType {
case commonEvent.InfluenceTypeNormal:
if len(ddlEvent.GetBlockedTables().TableIDs) != 0 {
return true
} else {
return false
}
case commonEvent.InfluenceTypeDB, commonEvent.InfluenceTypeAll:
return true
}
}
return false
case commonEvent.TypeSyncPointEvent:
return true
default:
Expand Down Expand Up @@ -377,6 +389,7 @@ func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
d.blockStatusesChan <- message
}
} else {
log.Info("Send block event to maintainer")
message := &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
Expand Down Expand Up @@ -522,7 +535,6 @@ func (d *Dispatcher) TryClose() (w heartbeatpb.Watermark, ok bool) {
w.CheckpointTs = d.GetCheckpointTs()
w.ResolvedTs = d.GetResolvedTs()

//d.MemoryUsage.Clear()
d.componentStatus.Set(heartbeatpb.ComponentState_Stopped)
return w, true
}
Expand Down Expand Up @@ -550,11 +562,5 @@ func (d *Dispatcher) CollectDispatcherHeartBeatInfo(h *HeartBeatInfo) {
}

func (d *Dispatcher) HandleCheckpointTs(checkpointTs uint64) {
if d.tableNameStore == nil {
log.Error("Should not HandleCheckpointTs for table trigger event dispatcher without tableNameStore")
return
}

tableNames := d.tableNameStore.GetAllTableNames(checkpointTs)
d.sink.AddCheckpointTs(checkpointTs, tableNames)
d.sink.AddCheckpointTs(checkpointTs)
}
77 changes: 0 additions & 77 deletions downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,80 +312,3 @@ func GetDispatcherStatusDynamicStream() dynstream.DynamicStream[common.Dispatche
func SetDispatcherStatusDynamicStream(dynamicStream dynstream.DynamicStream[common.DispatcherID, DispatcherStatusWithID, *Dispatcher]) {
dispatcherStatusDynamicStream = dynamicStream
}

// TableNameStore is used to store all the table name(schema name with table name).
// TableNameStore only exists in the table trigger event dispatcher, which means each changefeed only has one TableNameStore.
// TableNameStore support to provide all the table name of the specified ts(only support incremental ts)
// When meeting Create Table / Create Tables / Drop Table / Rename Table / Rename Tables / Drop Database / Recover Table
// TableNameStore need to update the table name info.
type TableNameStore struct {
// store all the existing table which existed at the latest query ts
existingTables map[string]map[string]*commonEvent.SchemaTableName // databaseName -> {tableName -> SchemaTableName}
// store the change of table name from the latest query ts to now(latest event)
latestTableChanges *LatestTableChanges
}

func NewTableNameStore() *TableNameStore {
return &TableNameStore{
existingTables: make(map[string]map[string]*commonEvent.SchemaTableName),
latestTableChanges: &LatestTableChanges{m: make(map[uint64]*commonEvent.TableNameChange)},
}
}

func (s *TableNameStore) AddEvent(event *commonEvent.DDLEvent) {
if event.TableNameChange != nil {
s.latestTableChanges.Add(event)
}
}

// GetAllTableNames only will be called when maintainer send message to ask dispatcher to write checkpointTs to downstream.
// So the ts must be <= the latest received event ts of table trigger event dispatcher.
func (s *TableNameStore) GetAllTableNames(ts uint64) []*commonEvent.SchemaTableName {
s.latestTableChanges.mutex.Lock()
if len(s.latestTableChanges.m) > 0 {
// update the existingTables with the latest table changes <= ts
for commitTs, tableNameChange := range s.latestTableChanges.m {
if commitTs <= ts {
if tableNameChange.DropDatabaseName != "" {
delete(s.existingTables, tableNameChange.DropDatabaseName)
} else {
for _, addName := range tableNameChange.AddName {
if s.existingTables[addName.SchemaName] == nil {
s.existingTables[addName.SchemaName] = make(map[string]*commonEvent.SchemaTableName, 0)
}
s.existingTables[addName.SchemaName][addName.TableName] = &addName
}
for _, dropName := range tableNameChange.DropName {
delete(s.existingTables[dropName.SchemaName], dropName.TableName)
if len(s.existingTables[dropName.SchemaName]) == 0 {
delete(s.existingTables, dropName.SchemaName)
}
}
}
}
delete(s.latestTableChanges.m, commitTs)
}
}

s.latestTableChanges.mutex.Unlock()

tableNames := make([]*commonEvent.SchemaTableName, 0)
for _, tables := range s.existingTables {
for _, tableName := range tables {
tableNames = append(tableNames, tableName)
}
}

return tableNames
}

type LatestTableChanges struct {
mutex sync.Mutex
m map[uint64]*commonEvent.TableNameChange
}

func (l *LatestTableChanges) Add(ddlEvent *commonEvent.DDLEvent) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.m[ddlEvent.FinishedTs] = ddlEvent.TableNameChange
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID,
}
manager.filter = filter

err = manager.InitSink()
err = manager.InitSink(ctx)
if err != nil {
return nil, apperror.ErrCreateEventDispatcherManagerFailed.Wrap(err).GenWithStackByArgs("event dispatcher manager init sink failed")
}
Expand All @@ -164,8 +164,8 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID,
return manager, nil
}

func (e *EventDispatcherManager) InitSink() error {
sink, err := sink.NewSink(e.config, e.changefeedID)
func (e *EventDispatcherManager) InitSink(ctx context.Context) error {
sink, err := sink.NewSink(ctx, e.config, e.changefeedID)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/flowbehappy/tigate/pkg/sink/codec"
"github.com/flowbehappy/tigate/pkg/sink/kafka"
v2 "github.com/flowbehappy/tigate/pkg/sink/kafka/v2"
sinkutil "github.com/flowbehappy/tigate/pkg/sink/util"
tiutils "github.com/flowbehappy/tigate/pkg/sink/util"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -172,9 +173,12 @@ func (s *KafkaSink) AddBlockEvent(event commonEvent.BlockEvent, tableProgress *t
}
}

func (s *KafkaSink) AddCheckpointTs(ts uint64, tableNames []*commonEvent.SchemaTableName) {
s.ddlWorker.GetCheckpointInfoChan() <- &worker.CheckpointInfo{Ts: ts, TableNames: tableNames}
func (s *KafkaSink) AddCheckpointTs(ts uint64) {
s.ddlWorker.GetCheckpointTsChan() <- ts
}

func (s *KafkaSink) SetTableSchemaStore(tableSchemaStore *sinkutil.TableSchemaStore) {
s.ddlWorker.SetTableSchemaStore(tableSchemaStore)
}
func (s *KafkaSink) Close() {
}
11 changes: 8 additions & 3 deletions downstreamadapter/sink/mysql_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (

"github.com/flowbehappy/tigate/downstreamadapter/sink/types"
"github.com/flowbehappy/tigate/downstreamadapter/worker"
"github.com/flowbehappy/tigate/downstreamadapter/writer"
commonEvent "github.com/flowbehappy/tigate/pkg/common/event"
"github.com/flowbehappy/tigate/pkg/sink/mysql"
"github.com/flowbehappy/tigate/pkg/sink/util"

"github.com/pingcap/tiflow/cdc/model"
)
Expand All @@ -42,7 +43,7 @@ type MysqlSink struct {
}

// event dispatcher manager 初始化的时候创建 mysqlSink 对象
func NewMysqlSink(changefeedID model.ChangeFeedID, workerCount int, cfg *writer.MysqlConfig, db *sql.DB) *MysqlSink {
func NewMysqlSink(changefeedID model.ChangeFeedID, workerCount int, cfg *mysql.MysqlConfig, db *sql.DB) *MysqlSink {
ctx := context.Background()
mysqlSink := MysqlSink{
changefeedID: changefeedID,
Expand All @@ -62,6 +63,10 @@ func (s *MysqlSink) SinkType() SinkType {
return MysqlSinkType
}

func (s *MysqlSink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
s.ddlWorker.SetTableSchemaStore(tableSchemaStore)
}

func (s *MysqlSink) AddDMLEvent(event *commonEvent.DMLEvent, tableProgress *types.TableProgress) {
if event.Len() == 0 {
return
Expand All @@ -83,6 +88,6 @@ func (s *MysqlSink) AddBlockEvent(event commonEvent.BlockEvent, tableProgress *t
s.ddlWorker.GetDDLEventChan() <- event
}

func (s *MysqlSink) AddCheckpointTs(ts uint64, tableNames []*commonEvent.SchemaTableName) {}
func (s *MysqlSink) AddCheckpointTs(ts uint64) {}

func (s *MysqlSink) Close() {}
16 changes: 7 additions & 9 deletions downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package sink

import (
"context"
"net/url"

"github.com/flowbehappy/tigate/downstreamadapter/sink/types"
"github.com/flowbehappy/tigate/downstreamadapter/writer"
commonEvent "github.com/flowbehappy/tigate/pkg/common/event"
"github.com/flowbehappy/tigate/pkg/config"
"github.com/flowbehappy/tigate/pkg/sink/mysql"
sinkutil "github.com/flowbehappy/tigate/pkg/sink/util"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
Expand All @@ -37,25 +39,21 @@ type Sink interface {
AddDMLEvent(event *commonEvent.DMLEvent, tableProgress *types.TableProgress)
AddBlockEvent(event commonEvent.BlockEvent, tableProgress *types.TableProgress)
PassBlockEvent(event commonEvent.BlockEvent, tableProgress *types.TableProgress)
AddCheckpointTs(ts uint64, tableNames []*commonEvent.SchemaTableName)
// IsEmpty(tableSpan *common.TableSpan) bool
// AddTableSpan(tableSpan *common.TableSpan)
// RemoveTableSpan(tableSpan *common.TableSpan)
// StopTableSpan(tableSpan *common.TableSpan)
// GetCheckpointTs(tableSpan *common.TableSpan) (uint64, bool)
AddCheckpointTs(ts uint64)
SetTableSchemaStore(tableSchemaStore *sinkutil.TableSchemaStore)
Close()
SinkType() SinkType
}

func NewSink(config *config.ChangefeedConfig, changefeedID model.ChangeFeedID) (Sink, error) {
func NewSink(ctx context.Context, config *config.ChangefeedConfig, changefeedID model.ChangeFeedID) (Sink, error) {
sinkURI, err := url.Parse(config.SinkURI)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
switch scheme {
case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme:
cfg, db, err := writer.NewMysqlConfigAndDB(sinkURI)
cfg, db, err := mysql.NewMysqlConfigAndDB(ctx, sinkURI)
if err != nil {
return nil, err
}
Expand Down
28 changes: 15 additions & 13 deletions downstreamadapter/worker/kafka_ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
commonEvent "github.com/flowbehappy/tigate/pkg/common/event"
"github.com/flowbehappy/tigate/pkg/metrics"
"github.com/flowbehappy/tigate/pkg/sink/codec/encoder"
"github.com/flowbehappy/tigate/pkg/sink/util"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -18,19 +19,14 @@ import (
"go.uber.org/zap"
)

type CheckpointInfo struct {
Ts uint64
TableNames []*commonEvent.SchemaTableName
}

// worker will send messages to the DML producer on a batch basis.
type KafkaDDLWorker struct {
// changeFeedID indicates this sink belongs to which processor(changefeed).
changeFeedID model.ChangeFeedID
// protocol indicates the protocol used by this sink.
protocol config.Protocol
ddlEventChan chan *commonEvent.DDLEvent
checkpointInfoChan chan *CheckpointInfo
protocol config.Protocol
ddlEventChan chan *commonEvent.DDLEvent
checkpointTsChan chan uint64
// ticker used to force flush the batched messages when the interval is reached.
ticker *time.Ticker

Expand All @@ -44,6 +40,8 @@ type KafkaDDLWorker struct {
// producer is used to send the messages to the Kafka broker.
producer ddlproducer.DDLProducer

tableSchemaStore *util.TableSchemaStore

statistics *metrics.Statistics
partitionRule DDLDispatchRule
ctx context.Context
Expand Down Expand Up @@ -107,8 +105,12 @@ func (w *KafkaDDLWorker) GetDDLEventChan() chan<- *commonEvent.DDLEvent {
return w.ddlEventChan
}

func (w *KafkaDDLWorker) GetCheckpointInfoChan() chan<- *CheckpointInfo {
return w.checkpointInfoChan
func (w *KafkaDDLWorker) GetCheckpointTsChan() chan<- uint64 {
return w.checkpointTsChan
}

func (w *KafkaDDLWorker) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
w.tableSchemaStore = tableSchemaStore
}

func (w *KafkaDDLWorker) encodeAndSendDDLEvents() error {
Expand Down Expand Up @@ -177,16 +179,16 @@ func (w *KafkaDDLWorker) encodeAndSendCheckpointEvents() error {
select {
case <-w.ctx.Done():
return errors.Trace(w.ctx.Err())
case checkpointInfo, ok := <-w.checkpointInfoChan:
case ts, ok := <-w.checkpointTsChan:
if !ok {
log.Warn("MQ sink flush worker channel closed",
zap.String("namespace", w.changeFeedID.Namespace),
zap.String("changefeed", w.changeFeedID.ID))
return nil
}
start := time.Now()
ts := checkpointInfo.Ts
tableNames := checkpointInfo.TableNames

tableNames := w.tableSchemaStore.GetAllTableNames(ts)
msg, err := w.encoder.EncodeCheckpointEvent(ts)
if err != nil {
return errors.Trace(err)
Expand Down
Loading

0 comments on commit 3b89639

Please sign in to comment.