diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index e6a96c2d3..871ed72fa 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -304,11 +304,10 @@ jobs: run: | export TICDC_NEWARCH=true && make integration_test CASE=http_api_tls - - - name: Test http_api_tls_with_user_auth - if: ${{ success() }} - run: | - export TICDC_NEWARCH=true && make integration_test CASE=http_api_tls_with_user_auth + # - name: Test http_api_tls_with_user_auth + # if: ${{ success() }} + # run: | + # export TICDC_NEWARCH=true && make integration_test CASE=http_api_tls_with_user_auth - name: Test default_value if: ${{ success() }} diff --git a/coordinator/changefeed/changefeed.go b/coordinator/changefeed/changefeed.go index 106317655..b28f34c44 100644 --- a/coordinator/changefeed/changefeed.go +++ b/coordinator/changefeed/changefeed.go @@ -139,6 +139,11 @@ func (c *Changefeed) UpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool return false, model.StateNormal, nil } +func (c *Changefeed) ForceUpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool, model.FeedState, *heartbeatpb.RunningError) { + c.status.Store(newStatus) + return c.backoff.CheckStatus(newStatus) +} + func (c *Changefeed) IsMQSink() bool { return c.isMQSink } diff --git a/coordinator/controller.go b/coordinator/controller.go index 4c3234f3c..2e33e8328 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -464,7 +464,7 @@ func (c *Controller) ResumeChangefeed(ctx context.Context, id common.ChangeFeedI status := cf.GetClonedStatus() status.CheckpointTs = newCheckpointTs - _, _, err := cf.UpdateStatus(status) + _, _, err := cf.ForceUpdateStatus(status) if err != nil { return errors.New(err.Message) } diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index 5e71591a4..bbd0139ab 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -210,7 +210,7 @@ func (d *Dispatcher) InitializeTableSchemaStore(schemaInfo []*heartbeatpb.Schema // 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream. // 2. If the action is a pass, we just need to pass the event func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.DispatcherStatus) { - log.Debug("dispatcher handle dispatcher status", + log.Info("dispatcher handle dispatcher status", zap.Any("dispatcherStatus", dispatcherStatus), zap.Stringer("dispatcher", d.id), zap.Any("action", dispatcherStatus.GetAction()), @@ -230,11 +230,22 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat if action != nil { pendingEvent, blockStatus := d.blockEventStatus.getEventAndStage() if pendingEvent == nil && action.CommitTs > d.GetResolvedTs() { + // we have not receive the block event, and the action is for the future event, so just ignore + log.Info("pending event is nil, and the action's commit is larger than dispatchers resolvedTs", zap.Any("resolvedTs", d.GetResolvedTs()), zap.Any("action commitTs", action.CommitTs), zap.Any("dispatcher", d.id)) // we have not receive the block event, and the action is for the future event, so just ignore return } if pendingEvent != nil && action.CommitTs == pendingEvent.GetCommitTs() && blockStatus == heartbeatpb.BlockStage_WAITING { + log.Info("pending event get the action", zap.Any("action", action), zap.Any("dispatcher", d.id), zap.Any("pendingEvent commitTs", pendingEvent.GetCommitTs())) d.blockEventStatus.updateBlockStage(heartbeatpb.BlockStage_WRITING) + pendingEvent.PushFrontFlushFunc(func() { + // clear blockEventStatus should be before wake ds. + // otherwise, there may happen: + // 1. wake ds + // 2. get new ds and set new pending event + // 3. clear blockEventStatus(should be the old pending event, but clear the new one) + d.blockEventStatus.clear() + }) if action.Action == heartbeatpb.Action_Write { failpoint.Inject("BlockOrWaitBeforeWrite", nil) err := d.AddBlockEventToSink(pendingEvent) @@ -255,8 +266,6 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat d.PassBlockEventToSink(pendingEvent) failpoint.Inject("BlockAfterPass", nil) } - - d.blockEventStatus.clear() } // whether the outdate message or not, we need to return message show we have finished the event. diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index aa59a3bf7..550d32c6f 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -45,6 +45,7 @@ func (r *ResendTaskMap) Get(identifier BlockEventIdentifier) *ResendTask { } func (r *ResendTaskMap) Set(identifier BlockEventIdentifier, task *ResendTask) { + log.Info("set resend task", zap.Any("identifier", identifier), zap.Any("task", task)) r.mutex.Lock() defer r.mutex.Unlock() r.m[identifier] = task diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index ecc02fb71..0baffbaf7 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -55,7 +55,9 @@ One EventDispatcherManager has one backend sink. */ type EventDispatcherManager struct { changefeedID common.ChangeFeedID - maintainerID node.ID + + maintainerIDMutex sync.Mutex + maintainerID node.ID pdClock pdutil.Clock @@ -710,10 +712,14 @@ func (e *EventDispatcherManager) GetDispatcherMap() *DispatcherMap { } func (e *EventDispatcherManager) GetMaintainerID() node.ID { + e.maintainerIDMutex.Lock() + defer e.maintainerIDMutex.Unlock() return e.maintainerID } func (e *EventDispatcherManager) SetMaintainerID(maintainerID node.ID) { + e.maintainerIDMutex.Lock() + defer e.maintainerIDMutex.Unlock() e.maintainerID = maintainerID } diff --git a/maintainer/barrier.go b/maintainer/barrier.go index da3e2d29a..bb7dc7e00 100644 --- a/maintainer/barrier.go +++ b/maintainer/barrier.go @@ -14,6 +14,8 @@ package maintainer import ( + "sync" + "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/maintainer/range_checker" @@ -34,11 +36,51 @@ import ( // 6. maintainer wait for all dispatchers reporting event(pass) done message // 7. maintainer clear the event, and schedule block event? todo: what if we schedule first then wait for all dispatchers? type Barrier struct { - blockedTs map[eventKey]*BarrierEvent + blockedEvents *BlockedEventMap controller *Controller splitTableEnabled bool } +type BlockedEventMap struct { + mutex sync.Mutex + m map[eventKey]*BarrierEvent +} + +func NewBlockEventMap() *BlockedEventMap { + return &BlockedEventMap{ + m: make(map[eventKey]*BarrierEvent), + } +} + +func (b *BlockedEventMap) Range(f func(key eventKey, value *BarrierEvent) bool) { + b.mutex.Lock() + defer b.mutex.Unlock() + for k, v := range b.m { + if !f(k, v) { + break + } + } +} + +func (b *BlockedEventMap) Get(key eventKey) (*BarrierEvent, bool) { + b.mutex.Lock() + defer b.mutex.Unlock() + event, ok := b.m[key] + return event, ok +} + +func (b *BlockedEventMap) Set(key eventKey, event *BarrierEvent) { + b.mutex.Lock() + defer b.mutex.Unlock() + b.m[key] = event +} + +func (b *BlockedEventMap) Delete(key eventKey) { + b.mutex.Lock() + defer b.mutex.Unlock() + delete(b.m, key) +} + // eventKey is the key of the block event, // the ddl and sync point are identified by the blockTs and isSyncPoint since they can share the same blockTs type eventKey struct { @@ -49,7 +91,7 @@ type eventKey struct { // NewBarrier create a new barrier for the changefeed func NewBarrier(controller *Controller, splitTableEnabled bool) *Barrier { return &Barrier{ - blockedTs: make(map[eventKey]*BarrierEvent), + blockedEvents: NewBlockEventMap(), controller: controller, splitTableEnabled: splitTableEnabled, } @@ -59,13 +101,17 @@ func NewBarrier(controller *Controller, splitTableEnabled bool) *Barrier { func (b *Barrier) HandleStatus(from node.ID, request *heartbeatpb.BlockStatusRequest, ) *messaging.TargetMessage { - log.Debug("handle block status", zap.String("from", from.String()), + log.Info("handle block status", zap.String("from", from.String()), zap.String("changefeed", request.ChangefeedID.GetName()), zap.Any("detail", request)) - eventMap := make(map[*BarrierEvent][]*heartbeatpb.DispatcherID) + eventDispatcherIDsMap := make(map[*BarrierEvent][]*heartbeatpb.DispatcherID) + actions := []*heartbeatpb.DispatcherStatus{} var dispatcherStatus []*heartbeatpb.DispatcherStatus for _, status := range request.BlockStatuses { - event := b.handleOneStatus(request.ChangefeedID, status) + // deal with block status, and check whether need to return action. + // we need to deal with the block status in order, otherwise scheduler may have problem + // e.g. TODO(truncate + create table) + event, action := b.handleOneStatus(request.ChangefeedID, status) if event == nil { // should not happen log.Error("handle block status failed, event is nil", @@ -74,9 +120,12 @@ func (b *Barrier) HandleStatus(from node.ID, zap.String("detail", status.String())) continue } - eventMap[event] = append(eventMap[event], status.ID) + eventDispatcherIDsMap[event] = append(eventDispatcherIDsMap[event], status.ID) + if action != nil { + actions = append(actions, action) + } } - for event, dispatchers := range eventMap { + for event, dispatchers := range eventDispatcherIDsMap { dispatcherStatus = append(dispatcherStatus, &heartbeatpb.DispatcherStatus{ InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{ InfluenceType: heartbeatpb.InfluenceType_Normal, @@ -84,11 +133,11 @@ func (b *Barrier) HandleStatus(from node.ID, }, Ack: ackEvent(event.commitTs, event.isSyncPoint), }) - // check if all dispatchers reported the block event - if writeAction := b.checkEvent(event, dispatchers); writeAction != nil { - dispatcherStatus = append(dispatcherStatus, writeAction) - } } + for action := range actions { + dispatcherStatus = append(dispatcherStatus, actions[action]) + } + if len(dispatcherStatus) <= 0 { log.Warn("no dispatcher status to send", zap.String("from", from.String()), @@ -115,10 +164,10 @@ func (b *Barrier) HandleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbea blockState := span.BlockState key := getEventKey(blockState.BlockTs, blockState.IsSyncPoint) - event, ok := b.blockedTs[key] + event, ok := b.blockedEvents.Get(key) if !ok { - event = NewBlockEvent(common.NewChangefeedIDFromPB(resp.ChangefeedID), b.controller, blockState, b.splitTableEnabled) - b.blockedTs[key] = event + event = NewBlockEvent(common.NewChangefeedIDFromPB(resp.ChangefeedID), common.NewDispatcherIDFromPB(span.ID), b.controller, blockState, b.splitTableEnabled) + b.blockedEvents.Set(key, event) } switch blockState.Stage { case heartbeatpb.BlockStage_WAITING: @@ -126,11 +175,11 @@ func (b *Barrier) HandleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbea case heartbeatpb.BlockStage_WRITING: // it's in writing stage, must be the writer dispatcher // it's the maintainer's responsibility to resend the write action - event.selected = true + event.selected.Store(true) event.writerDispatcher = common.NewDispatcherIDFromPB(span.ID) case heartbeatpb.BlockStage_DONE: // it's the maintainer's responsibility to resend the pass action - event.selected = true + event.selected.Store(true) event.writerDispatcherAdvanced = true } event.markDispatcherEventDone(common.NewDispatcherIDFromPB(span.ID)) @@ -148,19 +197,19 @@ func (b *Barrier) HandleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbea // While for the table1 in NodeB, it's still wait the pass action. // So we need to check the block event when the maintainer is restarted to help block event decide its state. // TODO:double check the logic here - for _, barrierEvent := range b.blockedTs { + b.blockedEvents.Range(func(key eventKey, barrierEvent *BarrierEvent) bool { if barrierEvent.allDispatcherReported() { // it means the dispatchers involved in the block event are all in the cached resp, not restarted. // so we don't do speical check for this event // just use usual logic to handle it // Besides, is the dispatchers are all reported waiting status, it means at least one dispatcher // is not get acked, so it must be resend by dispatcher later. - continue + return true } switch barrierEvent.blockedDispatchers.InfluenceType { case heartbeatpb.InfluenceType_Normal: for _, tableId := range barrierEvent.blockedDispatchers.TableIDs { - replications := b.controller.replicationDB.GetTasksByTableIDs(tableId) + replications := b.controller.replicationDB.GetTasksByTableID(tableId) for _, replication := range replications { if replication.GetStatus().CheckpointTs >= barrierEvent.commitTs { barrierEvent.rangeChecker.AddSubRange(replication.Span.TableID, replication.Span.StartKey, replication.Span.EndKey) @@ -185,18 +234,31 @@ func (b *Barrier) HandleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbea } // meet the target state(which means the ddl is writen), we need to send pass actions in resend if barrierEvent.allDispatcherReported() { - barrierEvent.selected = true + barrierEvent.selected.Store(true) barrierEvent.writerDispatcherAdvanced = true } - } + return true + }) } // Resend resends the message to the dispatcher manger, the pass action is handle here func (b *Barrier) Resend() []*messaging.TargetMessage { var msgs []*messaging.TargetMessage - for _, event := range b.blockedTs { + + eventList := make([]*BarrierEvent, 0) + b.blockedEvents.Range(func(key eventKey, barrierEvent *BarrierEvent) bool { // todo: we can limit the number of messages to send in one round here - msgs = append(msgs, event.resend()...) + msgs = append(msgs, barrierEvent.resend()...) + + eventList = append(eventList, barrierEvent) + return true + }) + + for _, event := range eventList { + if event != nil { + // check the event is finished or not + b.checkEventFinish(event) + } } return msgs } @@ -205,15 +267,18 @@ func (b *Barrier) Resend() []*messaging.TargetMessage { // currently, when the block event is a create table event, we should block the checkpoint ts forwarding // because on the func (b *Barrier) ShouldBlockCheckpointTs() bool { - for _, event := range b.blockedTs { - if event.hasNewTable { - return true + flag := false + b.blockedEvents.Range(func(key eventKey, barrierEvent *BarrierEvent) bool { + if barrierEvent.hasNewTable { + flag = true + return false } - } - return false + return true + }) + return flag } -func (b *Barrier) handleOneStatus(changefeedID *heartbeatpb.ChangefeedID, status *heartbeatpb.TableSpanBlockStatus) *BarrierEvent { +func (b *Barrier) handleOneStatus(changefeedID *heartbeatpb.ChangefeedID, status *heartbeatpb.TableSpanBlockStatus) (*BarrierEvent, *heartbeatpb.DispatcherStatus) { cfID := common.NewChangefeedIDFromPB(changefeedID) dispatcherID := common.NewDispatcherIDFromPB(status.ID) @@ -231,17 +296,17 @@ func (b *Barrier) handleOneStatus(changefeedID *heartbeatpb.ChangefeedID, status } } if status.State.Stage == heartbeatpb.BlockStage_DONE { - return b.handleEventDone(cfID, dispatcherID, status) + return b.handleEventDone(cfID, dispatcherID, status), nil } return b.handleBlockState(cfID, dispatcherID, status) } func (b *Barrier) handleEventDone(changefeedID common.ChangeFeedID, dispatcherID common.DispatcherID, status *heartbeatpb.TableSpanBlockStatus) *BarrierEvent { key := getEventKey(status.State.BlockTs, status.State.IsSyncPoint) - event, ok := b.blockedTs[key] + event, ok := b.blockedEvents.Get(key) if !ok { // no block event found - be := NewBlockEvent(changefeedID, b.controller, status.State, b.splitTableEnabled) + be := NewBlockEvent(changefeedID, dispatcherID, b.controller, status.State, b.splitTableEnabled) // the event is a fake event, the dispatcher will not send the block event be.rangeChecker = range_checker.NewBoolRangeChecker(false) return be @@ -258,18 +323,19 @@ func (b *Barrier) handleEventDone(changefeedID common.ChangeFeedID, dispatcherID // checkpoint ts is advanced, clear the map, so do not need to resend message anymore event.markDispatcherEventDone(dispatcherID) + b.checkEventFinish(event) return event } func (b *Barrier) handleBlockState(changefeedID common.ChangeFeedID, dispatcherID common.DispatcherID, status *heartbeatpb.TableSpanBlockStatus, -) *BarrierEvent { +) (*BarrierEvent, *heartbeatpb.DispatcherStatus) { blockState := status.State if blockState.IsBlocked { key := getEventKey(blockState.BlockTs, blockState.IsSyncPoint) // insert an event, or get the old one event check if the event is already tracked - event := b.getOrInsertNewEvent(changefeedID, key, blockState) + event := b.getOrInsertNewEvent(changefeedID, dispatcherID, key, blockState) if dispatcherID == b.controller.ddlDispatcherID { log.Info("the block event is sent by ddl dispatcher", zap.String("changefeed", changefeedID.Name()), @@ -277,58 +343,54 @@ func (b *Barrier) handleBlockState(changefeedID common.ChangeFeedID, zap.Uint64("commitTs", blockState.BlockTs)) event.tableTriggerDispatcherRelated = true } - if event.selected { + if event.selected.Load() { // the event already in the selected state, ignore the block event just sent ack log.Warn("the block event already selected, ignore the block event", zap.String("changefeed", changefeedID.Name()), zap.String("dispatcher", dispatcherID.String()), zap.Uint64("commitTs", blockState.BlockTs), ) - return event + // check whether the event can be finished. + b.checkEventFinish(event) + return event, nil } - // the block event, and check whether we need to send write action + // the block event, and check whether we need to send write action event.markDispatcherEventDone(dispatcherID) - return event + return event, event.checkEventAction(dispatcherID) } - // it's not a blocked event, it must be sent by table event trigger dispatcher - // and the ddl already synced to downstream , e.g.: create table, drop table + // it's not a blocked event, it must be sent by table event trigger dispatcher, just for doing scheduler + // and the ddl already synced to downstream , e.g.: create table // if ack failed, dispatcher will send a heartbeat again, so we do not need to care about resend message here - event := NewBlockEvent(changefeedID, b.controller, blockState, b.splitTableEnabled) - // mark the event as selected, so we do not need to wait for all dispatchers to report the event - // and make the rangeChecker always return true - event.rangeChecker = range_checker.NewBoolRangeChecker(true) - event.selected = true - return event + event := NewBlockEvent(changefeedID, dispatcherID, b.controller, blockState, b.splitTableEnabled) + event.scheduleBlockEvent() + return event, nil } // getOrInsertNewEvent get the block event from the map, if not found, create a new one -func (b *Barrier) getOrInsertNewEvent(changefeedID common.ChangeFeedID, key eventKey, - blockState *heartbeatpb.State, +func (b *Barrier) getOrInsertNewEvent(changefeedID common.ChangeFeedID, dispatcherID common.DispatcherID, + key eventKey, blockState *heartbeatpb.State, ) *BarrierEvent { - event, ok := b.blockedTs[key] + event, ok := b.blockedEvents.Get(key) if !ok { - event = NewBlockEvent(changefeedID, b.controller, blockState, b.splitTableEnabled) - b.blockedTs[key] = event + event = NewBlockEvent(changefeedID, dispatcherID, b.controller, blockState, b.splitTableEnabled) + b.blockedEvents.Set(key, event) } return event } -func (b *Barrier) checkEvent(be *BarrierEvent, - dispatchers []*heartbeatpb.DispatcherID, -) *heartbeatpb.DispatcherStatus { +// check whether the event is get all the done message from dispatchers +// if so, remove the event from blockedTs, not need to resend message anymore +func (b *Barrier) checkEventFinish(be *BarrierEvent) { if !be.allDispatcherReported() { - return nil + return } - if be.selected && be.allDispatcherReported() { - log.Info("the all dispatchers reported event done, remove event and schedule it", + if be.selected.Load() { + log.Info("the all dispatchers reported event done, remove event", zap.String("changefeed", be.cfID.Name()), zap.Uint64("committs", be.commitTs)) // already selected a dispatcher to write, now all dispatchers reported the block event - delete(b.blockedTs, getEventKey(be.commitTs, be.isSyncPoint)) - be.scheduleBlockEvent() - return nil + b.blockedEvents.Delete(getEventKey(be.commitTs, be.isSyncPoint)) } - return be.onAllDispatcherReportedBlockEvent(dispatchers) } // ackEvent creates an ack event diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index 41d665e86..b01dcca92 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -23,6 +23,7 @@ import ( commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -32,7 +33,7 @@ type BarrierEvent struct { cfID common.ChangeFeedID commitTs uint64 controller *Controller - selected bool + selected atomic.Bool hasNewTable bool // table trigger event dispatcher reported the block event, we should use it as the writer tableTriggerDispatcherRelated bool @@ -47,6 +48,14 @@ type BarrierEvent struct { // if the split table is enable for this changefeeed, if not we can use table id to check coverage dynamicSplitEnabled bool + // used to store report waiting status dispatchers before rangeChecker is created + // when BlockTables.InfluenceType is not Normal, we should store reported dispatchers first + // and wait get the reported from table trigger event dispatcher(all/db type must have table trigger event dispatcher) + // then create the rangeChecker and update the reported dispatchers. + // Why we need to wait table trigger event dispatcher? + // because we need to consider the add/drop tables in the other ddls. + // so only we use table trigger to create rangeChecker can ensure the coverage is correct. + reportedDispatchers map[common.DispatcherID]struct{} // rangeChecker is used to check if all the dispatchers reported the block events rangeChecker range_checker.RangeChecker lastResendTime time.Time @@ -54,12 +63,15 @@ type BarrierEvent struct { lastWarningLogTime time.Time } -func NewBlockEvent(cfID common.ChangeFeedID, controller *Controller, - status *heartbeatpb.State, dynamicSplitEnabled bool, +func NewBlockEvent(cfID common.ChangeFeedID, + dispatcherID common.DispatcherID, + controller *Controller, + status *heartbeatpb.State, + dynamicSplitEnabled bool, ) *BarrierEvent { event := &BarrierEvent{ controller: controller, - selected: false, + selected: atomic.Bool{}, hasNewTable: len(status.NeedAddedTables) > 0, cfID: cfID, commitTs: status.BlockTs, @@ -68,6 +80,7 @@ func NewBlockEvent(cfID common.ChangeFeedID, controller *Controller, dropDispatchers: status.NeedDroppedTables, schemaIDChange: status.UpdatedSchemas, lastResendTime: time.Time{}, + reportedDispatchers: make(map[common.DispatcherID]struct{}), isSyncPoint: status.IsSyncPoint, dynamicSplitEnabled: dynamicSplitEnabled, lastWarningLogTime: time.Now(), @@ -81,31 +94,21 @@ func NewBlockEvent(cfID common.ChangeFeedID, controller *Controller, event.rangeChecker = range_checker.NewTableCountChecker(len(status.BlockTables.TableIDs)) } case heartbeatpb.InfluenceType_DB: - // add table trigger event dispatcher for InfluenceType_DB: - if dynamicSplitEnabled { - reps := controller.GetTasksBySchemaID(status.BlockTables.SchemaID) - tbls := make([]int64, 0, len(reps)) - for _, rep := range reps { - tbls = append(tbls, rep.Span.TableID) - } - - tbls = append(tbls, heartbeatpb.DDLSpan.TableID) - event.rangeChecker = range_checker.NewTableSpanRangeChecker(tbls) + //TODO:clean code + // create range checker if dispatcher is ddl dispatcher + // otherwise store dispatcherID in reportedDispatchers, and not create rangeChecker + if dispatcherID == controller.ddlDispatcherID { + event.createRangeCheckerForTypeDB() } else { - event.rangeChecker = range_checker.NewTableCountChecker( - controller.GetTaskSizeBySchemaID(status.BlockTables.SchemaID) + 1 /*table trigger event dispatcher*/) + event.reportedDispatchers[dispatcherID] = struct{}{} } case heartbeatpb.InfluenceType_All: - if dynamicSplitEnabled { - reps := controller.GetAllTasks() - tbls := make([]int64, 0, len(reps)) - for _, rep := range reps { - tbls = append(tbls, rep.Span.TableID) - } - tbls = append(tbls, heartbeatpb.DDLSpan.TableID) - event.rangeChecker = range_checker.NewTableSpanRangeChecker(tbls) + // create range checker if dispatcher is ddl dispatcher + // otherwise store dispatcherID in reportedDispatchers, and not create rangeChecker + if dispatcherID == controller.ddlDispatcherID { + event.createRangeCheckerForTypeAll() } else { - event.rangeChecker = range_checker.NewTableCountChecker(controller.TaskSize()) + event.reportedDispatchers[dispatcherID] = struct{}{} } } } @@ -117,10 +120,49 @@ func NewBlockEvent(cfID common.ChangeFeedID, controller *Controller, return event } +func (be *BarrierEvent) createRangeCheckerForTypeAll() { + if be.dynamicSplitEnabled { + reps := be.controller.GetAllTasks() + tbls := make([]int64, 0, len(reps)) + for _, rep := range reps { + tbls = append(tbls, rep.Span.TableID) + } + tbls = append(tbls, heartbeatpb.DDLSpan.TableID) + be.rangeChecker = range_checker.NewTableSpanRangeChecker(tbls) + } else { + be.rangeChecker = range_checker.NewTableCountChecker(be.controller.TaskSize()) + } + log.Info("create range checker for block event", zap.Any("influcenceType", be.blockedDispatchers.InfluenceType), zap.Any("commitTs", be.commitTs)) +} + +func (be *BarrierEvent) createRangeCheckerForTypeDB() { + if be.dynamicSplitEnabled { + reps := be.controller.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) + tbls := make([]int64, 0, len(reps)) + for _, rep := range reps { + tbls = append(tbls, rep.Span.TableID) + } + + tbls = append(tbls, heartbeatpb.DDLSpan.TableID) + be.rangeChecker = range_checker.NewTableSpanRangeChecker(tbls) + } else { + be.rangeChecker = range_checker.NewTableCountChecker( + be.controller.GetTaskSizeBySchemaID(be.blockedDispatchers.SchemaID) + 1 /*table trigger event dispatcher*/) + } + log.Info("create range checker for block event", zap.Any("influcenceType", be.blockedDispatchers.InfluenceType), zap.Any("commitTs", be.commitTs)) +} + +func (be *BarrierEvent) checkEventAction(dispatcherID common.DispatcherID) *heartbeatpb.DispatcherStatus { + if !be.allDispatcherReported() { + return nil + } + return be.onAllDispatcherReportedBlockEvent(dispatcherID) +} + // onAllDispatcherReportedBlockEvent is called when all dispatcher reported the block event // it will select a dispatcher as the writer, reset the range checker ,and move the event to the selected state // returns the dispatcher status to the dispatcher manager -func (be *BarrierEvent) onAllDispatcherReportedBlockEvent(dispatchers []*heartbeatpb.DispatcherID) *heartbeatpb.DispatcherStatus { +func (be *BarrierEvent) onAllDispatcherReportedBlockEvent(dispatcherID common.DispatcherID) *heartbeatpb.DispatcherStatus { var dispatcher common.DispatcherID switch be.blockedDispatchers.InfluenceType { case heartbeatpb.InfluenceType_DB, heartbeatpb.InfluenceType_All: @@ -131,7 +173,7 @@ func (be *BarrierEvent) onAllDispatcherReportedBlockEvent(dispatchers []*heartbe zap.Uint64("commitTs", be.commitTs)) dispatcher = be.controller.ddlDispatcherID default: - selected := dispatchers[len(dispatchers)-1] + selected := dispatcherID.ToPB() if be.tableTriggerDispatcherRelated { // select the last one as the writer // or the table trigger event dispatcher if it's one of the blocked dispatcher @@ -146,13 +188,14 @@ func (be *BarrierEvent) onAllDispatcherReportedBlockEvent(dispatchers []*heartbe // reset ranger checkers be.rangeChecker.Reset() - be.selected = true + be.selected.Store(true) be.writerDispatcher = dispatcher - log.Info("all dispatcher reported heartbeat, select one to write", + log.Info("all dispatcher reported heartbeat, schedule it, and select one to write", zap.String("changefeed", be.cfID.Name()), zap.String("dispatcher", be.writerDispatcher.String()), zap.Uint64("commitTs", be.commitTs), zap.String("barrierType", be.blockedDispatchers.InfluenceType.String())) + be.scheduleBlockEvent() return &heartbeatpb.DispatcherStatus{ InfluencedDispatchers: &heartbeatpb.InfluencedDispatchers{ InfluenceType: heartbeatpb.InfluenceType_Normal, @@ -163,6 +206,7 @@ func (be *BarrierEvent) onAllDispatcherReportedBlockEvent(dispatchers []*heartbe } func (be *BarrierEvent) scheduleBlockEvent() { + log.Info("schedule block event", zap.Uint64("commitTs", be.commitTs)) // dispatcher notify us to drop some tables, by dispatcher ID or schema ID if be.dropDispatchers != nil { switch be.dropDispatchers.InfluenceType { @@ -208,6 +252,10 @@ func (be *BarrierEvent) scheduleBlockEvent() { } } +func (be *BarrierEvent) markTableDone(tableID int64) { + be.rangeChecker.AddSubRange(tableID, nil, nil) +} + func (be *BarrierEvent) markDispatcherEventDone(dispatcherID common.DispatcherID) { replicaSpan := be.controller.GetTask(dispatcherID) if replicaSpan == nil { @@ -216,13 +264,61 @@ func (be *BarrierEvent) markDispatcherEventDone(dispatcherID common.DispatcherID zap.String("dispatcher", dispatcherID.String())) return } - be.rangeChecker.AddSubRange(replicaSpan.Span.TableID, replicaSpan.Span.StartKey, replicaSpan.Span.EndKey) + if be.rangeChecker == nil { + // rangeChecker is not created + if dispatcherID == be.controller.ddlDispatcherID { + // create rangeChecker + switch be.blockedDispatchers.InfluenceType { + case heartbeatpb.InfluenceType_Normal: + log.Panic("influence type should not be normal when range checker is nil") + case heartbeatpb.InfluenceType_DB: + // create range checker first + be.createRangeCheckerForTypeDB() + // add reported dispatchers sub range to range checker + be.rangeChecker.AddSubRange(replicaSpan.Span.TableID, replicaSpan.Span.StartKey, replicaSpan.Span.EndKey) + for dispatcher := range be.reportedDispatchers { + replicaSpan := be.controller.GetTask(dispatcher) + if replicaSpan == nil { + log.Warn("dispatcher not found, ignore", + zap.String("changefeed", be.cfID.Name()), + zap.String("dispatcher", dispatcherID.String())) + return + } + be.rangeChecker.AddSubRange(replicaSpan.Span.TableID, replicaSpan.Span.StartKey, replicaSpan.Span.EndKey) + } + case heartbeatpb.InfluenceType_All: + // create range checker first + be.createRangeCheckerForTypeAll() + // add reported dispatchers sub range to range checker + be.rangeChecker.AddSubRange(replicaSpan.Span.TableID, replicaSpan.Span.StartKey, replicaSpan.Span.EndKey) + for dispatcher := range be.reportedDispatchers { + replicaSpan := be.controller.GetTask(dispatcher) + if replicaSpan == nil { + log.Warn("dispatcher not found, ignore", + zap.String("changefeed", be.cfID.Name()), + zap.String("dispatcher", dispatcherID.String())) + return + } + be.rangeChecker.AddSubRange(replicaSpan.Span.TableID, replicaSpan.Span.StartKey, replicaSpan.Span.EndKey) + } + } + } else { + be.reportedDispatchers[dispatcherID] = struct{}{} + } + } else { + be.rangeChecker.AddSubRange(replicaSpan.Span.TableID, replicaSpan.Span.StartKey, replicaSpan.Span.EndKey) + } } func (be *BarrierEvent) allDispatcherReported() bool { + if be.rangeChecker == nil { + return false + } return be.rangeChecker.IsFullyCovered() } +// send pass action to the related dispatchers, if find the related dispatchers are all removed, mark rangeCheck done +// else return pass action messages func (be *BarrierEvent) sendPassAction() []*messaging.TargetMessage { if be.blockedDispatchers == nil { return []*messaging.TargetMessage{} @@ -230,43 +326,49 @@ func (be *BarrierEvent) sendPassAction() []*messaging.TargetMessage { msgMap := make(map[node.ID]*messaging.TargetMessage) switch be.blockedDispatchers.InfluenceType { case heartbeatpb.InfluenceType_DB: - for _, stm := range be.controller.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) { - nodeID := stm.GetNodeID() - if nodeID == "" { - continue - } - _, ok := msgMap[nodeID] - if !ok { - msgMap[nodeID] = be.newPassActionMessage(nodeID) + spans := be.controller.GetTasksBySchemaID(be.blockedDispatchers.SchemaID) + if len(spans) == 0 { + // means tables are removed, mark the event done + be.rangeChecker.MarkCovered() + return nil + } else { + for _, stm := range spans { + nodeID := stm.GetNodeID() + if nodeID == "" { + continue + } + _, ok := msgMap[nodeID] + if !ok { + msgMap[nodeID] = be.newPassActionMessage(nodeID) + } } } case heartbeatpb.InfluenceType_All: + // all type will not have drop-type ddl. for _, n := range be.controller.GetAllNodes() { msgMap[n] = be.newPassActionMessage(n) } case heartbeatpb.InfluenceType_Normal: - // send pass action - for _, stm := range be.controller.GetTasksByTableIDs(be.blockedDispatchers.TableIDs...) { - if stm == nil { - log.Warn("nil span replication, ignore", - zap.String("changefeed", be.cfID.Name()), - zap.Uint64("commitTs", be.commitTs), - zap.Bool("isSyncPoint", be.isSyncPoint), - ) - continue - } - nodeID := stm.GetNodeID() - dispatcherID := stm.ID - if dispatcherID == be.writerDispatcher { - continue - } - msg, ok := msgMap[nodeID] - if !ok { - msg = be.newPassActionMessage(nodeID) - msgMap[nodeID] = msg + for _, tableID := range be.blockedDispatchers.TableIDs { + spans := be.controller.GetTasksByTableID(tableID) + if len(spans) == 0 { + be.markTableDone(tableID) + } else { + for _, stm := range spans { + nodeID := stm.GetNodeID() + dispatcherID := stm.ID + if dispatcherID == be.writerDispatcher { + continue + } + msg, ok := msgMap[nodeID] + if !ok { + msg = be.newPassActionMessage(nodeID) + msgMap[nodeID] = msg + } + influencedDispatchers := msg.Message[0].(*heartbeatpb.HeartBeatResponse).DispatcherStatuses[0].InfluencedDispatchers + influencedDispatchers.DispatcherIDs = append(influencedDispatchers.DispatcherIDs, dispatcherID.ToPB()) + } } - influencedDispatchers := msg.Message[0].(*heartbeatpb.HeartBeatResponse).DispatcherStatuses[0].InfluencedDispatchers - influencedDispatchers.DispatcherIDs = append(influencedDispatchers.DispatcherIDs, dispatcherID.ToPB()) } } msgs := make([]*messaging.TargetMessage, 0, len(msgMap)) @@ -283,22 +385,34 @@ func (be *BarrierEvent) resend() []*messaging.TargetMessage { var msgs []*messaging.TargetMessage defer func() { if time.Since(be.lastWarningLogTime) > time.Second*10 { - log.Warn("barrier event is not resolved", - zap.String("changefeed", be.cfID.Name()), - zap.Uint64("commitTs", be.commitTs), - zap.Bool("isSyncPoint", be.isSyncPoint), - zap.Bool("selected", be.selected), - zap.Bool("writerDispatcherAdvanced", be.writerDispatcherAdvanced), - zap.String("coverage", be.rangeChecker.Detail()), - zap.Any("blocker", be.blockedDispatchers), - zap.Any("resend", msgs), - ) + if be.rangeChecker != nil { + log.Warn("barrier event is not resolved", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Bool("isSyncPoint", be.isSyncPoint), + zap.Bool("selected", be.selected.Load()), + zap.Bool("writerDispatcherAdvanced", be.writerDispatcherAdvanced), + zap.String("coverage", be.rangeChecker.Detail()), + zap.Any("blocker", be.blockedDispatchers), + zap.Any("resend", msgs), + ) + } else { + log.Warn("barrier event is not resolved", + zap.String("changefeed", be.cfID.Name()), + zap.Uint64("commitTs", be.commitTs), + zap.Bool("isSyncPoint", be.isSyncPoint), + zap.Bool("selected", be.selected.Load()), + zap.Bool("writerDispatcherAdvanced", be.writerDispatcherAdvanced), + zap.Any("blocker", be.blockedDispatchers), + zap.Any("resend", msgs), + ) + } be.lastWarningLogTime = time.Now() } }() // still waiting for all dispatcher to reach the block commit ts - if !be.selected { + if !be.selected.Load() { return nil } be.lastResendTime = time.Now() @@ -317,7 +431,7 @@ func (be *BarrierEvent) resend() []*messaging.TargetMessage { msgs = []*messaging.TargetMessage{be.newWriterActionMessage(stm.GetNodeID())} } else { // the writer dispatcher is advanced, resend pass action - msgs = be.sendPassAction() + return be.sendPassAction() } return msgs } diff --git a/maintainer/barrier_event_test.go b/maintainer/barrier_event_test.go index 9772f294c..758f8c4c2 100644 --- a/maintainer/barrier_event_test.go +++ b/maintainer/barrier_event_test.go @@ -45,7 +45,7 @@ func TestScheduleEvent(t *testing.T) { }, "test1") controller := NewController(cfID, 1, nil, tsoClient, nil, nil, nil, ddlSpan, 1000, 0) controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1) - event := NewBlockEvent(cfID, controller, &heartbeatpb.State{ + event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, controller, &heartbeatpb.State{ IsBlocked: true, BlockTs: 10, NeedDroppedTables: &heartbeatpb.InfluencedTables{InfluenceType: heartbeatpb.InfluenceType_All}, @@ -55,7 +55,7 @@ func TestScheduleEvent(t *testing.T) { // drop table will be executed first require.Equal(t, 2, controller.replicationDB.GetAbsentSize()) - event = NewBlockEvent(cfID, controller, &heartbeatpb.State{ + event = NewBlockEvent(cfID, tableTriggerEventDispatcherID, controller, &heartbeatpb.State{ IsBlocked: true, BlockTs: 10, NeedDroppedTables: &heartbeatpb.InfluencedTables{ @@ -68,7 +68,7 @@ func TestScheduleEvent(t *testing.T) { // drop table will be executed first, then add the new table require.Equal(t, 1, controller.replicationDB.GetAbsentSize()) - event = NewBlockEvent(cfID, controller, &heartbeatpb.State{ + event = NewBlockEvent(cfID, tableTriggerEventDispatcherID, controller, &heartbeatpb.State{ IsBlocked: true, BlockTs: 10, NeedDroppedTables: &heartbeatpb.InfluencedTables{ @@ -105,7 +105,7 @@ func TestResendAction(t *testing.T) { controller.replicationDB.MarkSpanReplicating(stm) dispatcherIDs = append(dispatcherIDs, stm.ID) } - event := NewBlockEvent(cfID, controller, &heartbeatpb.State{ + event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, controller, &heartbeatpb.State{ IsBlocked: true, BlockTs: 10, BlockTables: &heartbeatpb.InfluencedTables{ @@ -114,24 +114,24 @@ func TestResendAction(t *testing.T) { }, false) // time is not reached event.lastResendTime = time.Now() - event.selected = true + event.selected.Store(true) msgs := event.resend() require.Len(t, msgs, 0) // time is not reached event.lastResendTime = time.Time{} - event.selected = false + event.selected.Store(false) msgs = event.resend() require.Len(t, msgs, 0) // resend write action - event.selected = true + event.selected.Store(true) event.writerDispatcherAdvanced = false event.writerDispatcher = dispatcherIDs[0] msgs = event.resend() require.Len(t, msgs, 1) - event = NewBlockEvent(cfID, controller, &heartbeatpb.State{ + event = NewBlockEvent(cfID, tableTriggerEventDispatcherID, controller, &heartbeatpb.State{ IsBlocked: true, BlockTs: 10, BlockTables: &heartbeatpb.InfluencedTables{ @@ -139,7 +139,7 @@ func TestResendAction(t *testing.T) { SchemaID: 1, }, }, false) - event.selected = true + event.selected.Store(true) event.writerDispatcherAdvanced = true msgs = event.resend() require.Len(t, msgs, 1) @@ -149,7 +149,7 @@ func TestResendAction(t *testing.T) { require.Equal(t, resp.DispatcherStatuses[0].InfluencedDispatchers.InfluenceType, heartbeatpb.InfluenceType_DB) require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10)) - event = NewBlockEvent(cfID, controller, &heartbeatpb.State{ + event = NewBlockEvent(cfID, tableTriggerEventDispatcherID, controller, &heartbeatpb.State{ IsBlocked: true, BlockTs: 10, BlockTables: &heartbeatpb.InfluencedTables{ @@ -157,7 +157,7 @@ func TestResendAction(t *testing.T) { SchemaID: 1, }, }, false) - event.selected = true + event.selected.Store(true) event.writerDispatcherAdvanced = true msgs = event.resend() require.Len(t, msgs, 1) @@ -167,7 +167,7 @@ func TestResendAction(t *testing.T) { require.Equal(t, resp.DispatcherStatuses[0].InfluencedDispatchers.InfluenceType, heartbeatpb.InfluenceType_All) require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10)) - event = NewBlockEvent(cfID, controller, &heartbeatpb.State{ + event = NewBlockEvent(cfID, dispatcherIDs[0], controller, &heartbeatpb.State{ IsBlocked: true, BlockTs: 10, BlockTables: &heartbeatpb.InfluencedTables{ @@ -176,7 +176,7 @@ func TestResendAction(t *testing.T) { SchemaID: 1, }, }, false) - event.selected = true + event.selected.Store(true) event.writerDispatcherAdvanced = true msgs = event.resend() require.Len(t, msgs, 1) @@ -204,7 +204,7 @@ func TestUpdateSchemaID(t *testing.T) { controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1) require.Equal(t, 1, controller.replicationDB.GetAbsentSize()) require.Len(t, controller.GetTasksBySchemaID(1), 1) - event := NewBlockEvent(cfID, controller, &heartbeatpb.State{ + event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, controller, &heartbeatpb.State{ IsBlocked: true, BlockTs: 10, BlockTables: &heartbeatpb.InfluencedTables{ @@ -224,7 +224,7 @@ func TestUpdateSchemaID(t *testing.T) { // check the schema id and map is updated require.Len(t, controller.GetTasksBySchemaID(1), 0) require.Len(t, controller.GetTasksBySchemaID(2), 1) - require.Equal(t, controller.GetTasksByTableIDs(1)[0].GetSchemaID(), int64(2)) + require.Equal(t, controller.GetTasksByTableID(1)[0].GetSchemaID(), int64(2)) } func setNodeManagerAndMessageCenter() *watcher.NodeManager { diff --git a/maintainer/barrier_test.go b/maintainer/barrier_test.go index 9619a4e26..73c39e354 100644 --- a/maintainer/barrier_test.go +++ b/maintainer/barrier_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/ticdc/maintainer/replica" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -44,7 +43,7 @@ func TestOneBlockEvent(t *testing.T) { nil, nil, nil, ddlSpan, 1000, 0) startTs := uint64(10) controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, startTs) - stm := controller.GetTasksByTableIDs(1)[0] + stm := controller.GetTasksByTableID(1)[0] controller.replicationDB.BindSpanToNode("", "node1", stm) controller.replicationDB.MarkSpanReplicating(stm) @@ -82,10 +81,10 @@ func TestOneBlockEvent(t *testing.T) { isSyncPoint: true, } resp := msg.Message[0].(*heartbeatpb.HeartBeatResponse) - event := barrier.blockedTs[key] + event := barrier.blockedEvents.m[key] require.Equal(t, uint64(10), event.commitTs) require.True(t, event.writerDispatcher == controller.ddlDispatcherID) - require.True(t, event.selected) + require.True(t, event.selected.Load()) require.False(t, event.writerDispatcherAdvanced) require.Len(t, resp.DispatcherStatuses, 2) require.Equal(t, resp.DispatcherStatuses[0].Ack.CommitTs, uint64(10)) @@ -125,7 +124,7 @@ func TestOneBlockEvent(t *testing.T) { require.NotNil(t, msg) resp = msg.Message[0].(*heartbeatpb.HeartBeatResponse) require.Equal(t, resp.DispatcherStatuses[0].Ack.CommitTs, uint64(10)) - require.Len(t, barrier.blockedTs, 0) + require.Len(t, barrier.blockedEvents.m, 0) // send event done again msg = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ @@ -151,7 +150,7 @@ func TestOneBlockEvent(t *testing.T) { }, }, }) - require.Len(t, barrier.blockedTs, 0) + require.Len(t, barrier.blockedEvents.m, 0) // no event if found, no message will be sent require.NotNil(t, msg) require.Equal(t, resp.DispatcherStatuses[0].Ack.CommitTs, uint64(10)) @@ -173,7 +172,7 @@ func TestNormalBlock(t *testing.T) { var blockedDispatcherIDS []*heartbeatpb.DispatcherID for id := 1; id < 4; id++ { controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: int64(id)}, 10) - stm := controller.GetTasksByTableIDs(int64(id))[0] + stm := controller.GetTasksByTableID(int64(id))[0] blockedDispatcherIDS = append(blockedDispatcherIDS, stm.ID.ToPB()) controller.replicationDB.BindSpanToNode("", "node1", stm) controller.replicationDB.MarkSpanReplicating(stm) @@ -183,7 +182,6 @@ func TestNormalBlock(t *testing.T) { selectDispatcherID := common.NewDispatcherIDFromPB(blockedDispatcherIDS[2]) selectedRep := controller.GetTask(selectDispatcherID) controller.replicationDB.BindSpanToNode("node1", "node2", selectedRep) - dropID := selectedRep.Span.TableID newSpan := &heartbeatpb.Table{TableID: 10, SchemaID: 1} barrier := NewBarrier(controller, false) @@ -201,10 +199,6 @@ func TestNormalBlock(t *testing.T) { InfluenceType: heartbeatpb.InfluenceType_Normal, TableIDs: []int64{1, 2, 3}, }, - NeedDroppedTables: &heartbeatpb.InfluencedTables{ - InfluenceType: heartbeatpb.InfluenceType_Normal, - TableIDs: []int64{dropID}, - }, NeedAddedTables: []*heartbeatpb.Table{newSpan}, }, }, @@ -217,10 +211,6 @@ func TestNormalBlock(t *testing.T) { InfluenceType: heartbeatpb.InfluenceType_Normal, TableIDs: []int64{1, 2, 3}, }, - NeedDroppedTables: &heartbeatpb.InfluencedTables{ - InfluenceType: heartbeatpb.InfluenceType_Normal, - TableIDs: []int64{dropID}, - }, NeedAddedTables: []*heartbeatpb.Table{newSpan}, }, }, @@ -245,10 +235,6 @@ func TestNormalBlock(t *testing.T) { InfluenceType: heartbeatpb.InfluenceType_Normal, TableIDs: []int64{1, 2, 3}, }, - NeedDroppedTables: &heartbeatpb.InfluencedTables{ - InfluenceType: heartbeatpb.InfluenceType_Normal, - TableIDs: []int64{dropID}, - }, NeedAddedTables: []*heartbeatpb.Table{newSpan}, }, }, @@ -259,7 +245,7 @@ func TestNormalBlock(t *testing.T) { blockTs: 10, isSyncPoint: false, } - event := barrier.blockedTs[key] + event := barrier.blockedEvents.m[key] require.Equal(t, uint64(10), event.commitTs) require.True(t, event.writerDispatcher == selectDispatcherID) // all dispatcher reported, the reported status is reset @@ -278,10 +264,6 @@ func TestNormalBlock(t *testing.T) { InfluenceType: heartbeatpb.InfluenceType_Normal, TableIDs: []int64{1, 2, 3}, }, - NeedDroppedTables: &heartbeatpb.InfluencedTables{ - InfluenceType: heartbeatpb.InfluenceType_Normal, - TableIDs: []int64{dropID}, - }, NeedAddedTables: []*heartbeatpb.Table{newSpan}, }, }, @@ -294,10 +276,6 @@ func TestNormalBlock(t *testing.T) { InfluenceType: heartbeatpb.InfluenceType_Normal, TableIDs: []int64{1, 2, 3}, }, - NeedDroppedTables: &heartbeatpb.InfluencedTables{ - InfluenceType: heartbeatpb.InfluenceType_Normal, - TableIDs: []int64{dropID}, - }, NeedAddedTables: []*heartbeatpb.Table{newSpan}, }, }, @@ -320,7 +298,7 @@ func TestNormalBlock(t *testing.T) { }, }, }) - require.Len(t, barrier.blockedTs, 1) + require.Len(t, barrier.blockedEvents.m, 1) msg = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ ChangefeedID: cfID.ToPB(), BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ @@ -342,7 +320,7 @@ func TestNormalBlock(t *testing.T) { }, }, }) - require.Len(t, barrier.blockedTs, 0) + require.Len(t, barrier.blockedEvents.m, 0) } func TestNormalBlockWithTableTrigger(t *testing.T) { @@ -361,7 +339,7 @@ func TestNormalBlockWithTableTrigger(t *testing.T) { var blockedDispatcherIDS []*heartbeatpb.DispatcherID for id := 1; id < 3; id++ { controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: int64(id)}, 10) - stm := controller.GetTasksByTableIDs(int64(id))[0] + stm := controller.GetTasksByTableID(int64(id))[0] blockedDispatcherIDS = append(blockedDispatcherIDS, stm.ID.ToPB()) controller.replicationDB.BindSpanToNode("", "node1", stm) controller.replicationDB.MarkSpanReplicating(stm) @@ -397,7 +375,7 @@ func TestNormalBlockWithTableTrigger(t *testing.T) { require.Len(t, resp.DispatcherStatuses, 1) require.True(t, resp.DispatcherStatuses[0].Ack.CommitTs == 10) require.Len(t, resp.DispatcherStatuses[0].InfluencedDispatchers.DispatcherIDs, 1) - require.False(t, barrier.blockedTs[eventKey{blockTs: 10, isSyncPoint: false}].tableTriggerDispatcherRelated) + require.False(t, barrier.blockedEvents.m[eventKey{blockTs: 10, isSyncPoint: false}].tableTriggerDispatcherRelated) // table trigger block request msg = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ @@ -426,7 +404,7 @@ func TestNormalBlockWithTableTrigger(t *testing.T) { BlockTs: 10, BlockTables: &heartbeatpb.InfluencedTables{ InfluenceType: heartbeatpb.InfluenceType_Normal, - TableIDs: []int64{0, 2, 3}, + TableIDs: []int64{0, 1, 2}, }, NeedDroppedTables: &heartbeatpb.InfluencedTables{ InfluenceType: heartbeatpb.InfluenceType_Normal, @@ -442,7 +420,7 @@ func TestNormalBlockWithTableTrigger(t *testing.T) { blockTs: 10, isSyncPoint: false, } - event := barrier.blockedTs[key] + event := barrier.blockedEvents.m[key] require.Equal(t, uint64(10), event.commitTs) require.True(t, event.writerDispatcher == tableTriggerEventDispatcherID) // all dispatcher reported, the reported status is reset @@ -463,7 +441,7 @@ func TestNormalBlockWithTableTrigger(t *testing.T) { }, }, }) - require.Len(t, barrier.blockedTs, 1) + require.Len(t, barrier.blockedEvents.m, 1) msg = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ ChangefeedID: cfID.ToPB(), BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ @@ -475,17 +453,13 @@ func TestNormalBlockWithTableTrigger(t *testing.T) { Stage: heartbeatpb.BlockStage_DONE, }, }, - { - ID: blockedDispatcherIDS[1], - State: &heartbeatpb.State{ - IsBlocked: true, - BlockTs: 10, - Stage: heartbeatpb.BlockStage_DONE, - }, - }, }, }) - require.Len(t, barrier.blockedTs, 0) + require.Len(t, barrier.blockedEvents.m, 1) + // resend to check removed tables + event.resend() + barrier.checkEventFinish(event) + require.Len(t, barrier.blockedEvents.m, 0) } func TestSchemaBlock(t *testing.T) { @@ -597,7 +571,7 @@ func TestSchemaBlock(t *testing.T) { require.True(t, resp.DispatcherStatuses[1].Action.CommitTs == 10) require.True(t, resp.DispatcherStatuses[1].Action.Action == heartbeatpb.Action_Write) key := eventKey{blockTs: 10} - event := barrier.blockedTs[key] + event := barrier.blockedEvents.m[key] require.Equal(t, uint64(10), event.commitTs) // the ddl dispatcher will be the writer require.Equal(t, event.writerDispatcher, controller.ddlDispatcherID) @@ -628,7 +602,7 @@ func TestSchemaBlock(t *testing.T) { resp = msg.Message[0].(*heartbeatpb.HeartBeatResponse) require.Len(t, resp.DispatcherStatuses, 1) require.True(t, resp.DispatcherStatuses[0].Ack.CommitTs == 10) - event = barrier.blockedTs[key] + event = barrier.blockedEvents.m[key] require.Equal(t, uint64(10), event.commitTs) // the ddl dispatcher will be the writer require.Equal(t, event.writerDispatcher, controller.ddlDispatcherID) @@ -647,37 +621,10 @@ func TestSchemaBlock(t *testing.T) { }, }, }) - // 1 pass action message to one node + // pass action message to no node, because tables are removed msgs := barrier.Resend() - require.Len(t, msgs, 1) - msg = msgs[0] - require.Equal(t, messaging.TypeHeartBeatResponse, msg.Type) - require.Equal(t, msg.Message[0].(*heartbeatpb.HeartBeatResponse).DispatcherStatuses[0].Action.Action, - heartbeatpb.Action_Pass) - require.Len(t, barrier.blockedTs, 1) - // other dispatcher advanced checkpoint ts - msg = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ - ChangefeedID: cfID.ToPB(), - BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ - { - ID: dispatcherIDs[0], - State: &heartbeatpb.State{ - IsBlocked: true, - BlockTs: 10, - Stage: heartbeatpb.BlockStage_DONE, - }, - }, - { - ID: dispatcherIDs[1], - State: &heartbeatpb.State{ - IsBlocked: true, - BlockTs: 10, - Stage: heartbeatpb.BlockStage_DONE, - }, - }, - }, - }) - require.Len(t, barrier.blockedTs, 0) + require.Len(t, msgs, 0) + require.Len(t, barrier.blockedEvents.m, 0) require.Equal(t, 1, controller.replicationDB.GetAbsentSize()) require.Equal(t, 2, controller.operatorController.OperatorSize()) @@ -715,7 +662,6 @@ func TestSyncPointBlock(t *testing.T) { controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1) controller.AddNewTable(commonEvent.Table{SchemaID: 2, TableID: 3}, 1) var dispatcherIDs []*heartbeatpb.DispatcherID - dropTables := []int64{1, 2, 3} absents := controller.replicationDB.GetAbsentForTest(make([]*replica.SpanReplication, 0), 10000) for _, stm := range absents { dispatcherIDs = append(dispatcherIDs, stm.ID.ToPB()) @@ -726,7 +672,6 @@ func TestSyncPointBlock(t *testing.T) { selectedRep := controller.GetTask(selectDispatcherID) controller.replicationDB.BindSpanToNode("node1", "node2", selectedRep) - newSpan := &heartbeatpb.Table{TableID: 10, SchemaID: 2} barrier := NewBarrier(controller, true) // first dispatcher block request msg := barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ @@ -741,12 +686,7 @@ func TestSyncPointBlock(t *testing.T) { InfluenceType: heartbeatpb.InfluenceType_All, SchemaID: 1, }, - NeedDroppedTables: &heartbeatpb.InfluencedTables{ - InfluenceType: heartbeatpb.InfluenceType_Normal, - TableIDs: dropTables, - }, - NeedAddedTables: []*heartbeatpb.Table{newSpan}, - IsSyncPoint: true, + IsSyncPoint: true, }, }, { @@ -758,12 +698,7 @@ func TestSyncPointBlock(t *testing.T) { InfluenceType: heartbeatpb.InfluenceType_All, SchemaID: 1, }, - NeedDroppedTables: &heartbeatpb.InfluencedTables{ - InfluenceType: heartbeatpb.InfluenceType_Normal, - TableIDs: dropTables, - }, - NeedAddedTables: []*heartbeatpb.Table{newSpan}, - IsSyncPoint: true, + IsSyncPoint: true, }, }, { @@ -775,12 +710,7 @@ func TestSyncPointBlock(t *testing.T) { InfluenceType: heartbeatpb.InfluenceType_All, SchemaID: 1, }, - NeedDroppedTables: &heartbeatpb.InfluencedTables{ - InfluenceType: heartbeatpb.InfluenceType_Normal, - TableIDs: dropTables, - }, - NeedAddedTables: []*heartbeatpb.Table{newSpan}, - IsSyncPoint: true, + IsSyncPoint: true, }, }, }, @@ -804,12 +734,7 @@ func TestSyncPointBlock(t *testing.T) { InfluenceType: heartbeatpb.InfluenceType_All, SchemaID: 1, }, - NeedDroppedTables: &heartbeatpb.InfluencedTables{ - InfluenceType: heartbeatpb.InfluenceType_Normal, - TableIDs: dropTables, - }, - NeedAddedTables: []*heartbeatpb.Table{newSpan}, - IsSyncPoint: true, + IsSyncPoint: true, }, }, }, @@ -821,7 +746,7 @@ func TestSyncPointBlock(t *testing.T) { require.True(t, resp.DispatcherStatuses[1].Action.CommitTs == 10) require.True(t, resp.DispatcherStatuses[1].Action.Action == heartbeatpb.Action_Write) key := eventKey{blockTs: 10, isSyncPoint: true} - event := barrier.blockedTs[key] + event := barrier.blockedEvents.m[key] require.Equal(t, uint64(10), event.commitTs) // the last one will be the writer require.Equal(t, event.writerDispatcher, controller.ddlDispatcherID) @@ -844,7 +769,7 @@ func TestSyncPointBlock(t *testing.T) { msgs := barrier.Resend() // 2 pass action messages to one node require.Len(t, msgs, 2) - require.Len(t, barrier.blockedTs, 1) + require.Len(t, barrier.blockedEvents.m, 1) // other dispatcher advanced checkpoint ts msg = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ ChangefeedID: cfID.ToPB(), @@ -878,7 +803,7 @@ func TestSyncPointBlock(t *testing.T) { }, }, }) - require.Len(t, barrier.blockedTs, 0) + require.Len(t, barrier.blockedEvents.m, 0) } func TestNonBlocked(t *testing.T) { @@ -926,7 +851,7 @@ func TestNonBlocked(t *testing.T) { require.Equal(t, uint64(10), resp.DispatcherStatuses[0].Ack.CommitTs) require.True(t, heartbeatpb.InfluenceType_Normal == resp.DispatcherStatuses[0].InfluencedDispatchers.InfluenceType) require.Equal(t, resp.DispatcherStatuses[0].InfluencedDispatchers.DispatcherIDs[0], blockedDispatcherIDS[0]) - require.Len(t, barrier.blockedTs, 0) + require.Len(t, barrier.blockedEvents.m, 0) require.Equal(t, 2, barrier.controller.replicationDB.GetAbsentSize(), 2) } @@ -970,10 +895,10 @@ func TestUpdateCheckpointTs(t *testing.T) { isSyncPoint: false, } resp := msg.Message[0].(*heartbeatpb.HeartBeatResponse) - event := barrier.blockedTs[key] + event := barrier.blockedEvents.m[key] require.Equal(t, uint64(10), event.commitTs) require.True(t, event.writerDispatcher == controller.ddlDispatcherID) - require.True(t, event.selected) + require.True(t, event.selected.Load()) require.False(t, event.writerDispatcherAdvanced) require.Len(t, resp.DispatcherStatuses, 2) require.Equal(t, resp.DispatcherStatuses[0].Ack.CommitTs, uint64(10)) @@ -1004,7 +929,7 @@ func TestHandleBlockBootstrapResponse(t *testing.T) { var dispatcherIDs []*heartbeatpb.DispatcherID for id := 1; id < 4; id++ { controller.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: int64(id)}, 2) - stm := controller.GetTasksByTableIDs(int64(id))[0] + stm := controller.GetTasksByTableID(int64(id))[0] dispatcherIDs = append(dispatcherIDs, stm.ID.ToPB()) controller.replicationDB.BindSpanToNode("", "node1", stm) controller.replicationDB.MarkSpanReplicating(stm) @@ -1043,9 +968,9 @@ func TestHandleBlockBootstrapResponse(t *testing.T) { }, }, }) - event := barrier.blockedTs[getEventKey(6, false)] + event := barrier.blockedEvents.m[getEventKey(6, false)] require.NotNil(t, event) - require.False(t, event.selected) + require.False(t, event.selected.Load()) require.False(t, event.writerDispatcherAdvanced) require.True(t, event.allDispatcherReported()) @@ -1082,9 +1007,9 @@ func TestHandleBlockBootstrapResponse(t *testing.T) { }, }, }) - event = barrier.blockedTs[getEventKey(6, false)] + event = barrier.blockedEvents.m[getEventKey(6, false)] require.NotNil(t, event) - require.True(t, event.selected) + require.True(t, event.selected.Load()) require.False(t, event.writerDispatcherAdvanced) // two done dispatchers @@ -1120,9 +1045,9 @@ func TestHandleBlockBootstrapResponse(t *testing.T) { }, }, }) - event = barrier.blockedTs[getEventKey(6, false)] + event = barrier.blockedEvents.m[getEventKey(6, false)] require.NotNil(t, event) - require.True(t, event.selected) + require.True(t, event.selected.Load()) require.True(t, event.writerDispatcherAdvanced) // nil, none stage @@ -1149,7 +1074,7 @@ func TestHandleBlockBootstrapResponse(t *testing.T) { }, }, }) - event = barrier.blockedTs[getEventKey(6, false)] + event = barrier.blockedEvents.m[getEventKey(6, false)] require.Nil(t, event) } diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index b18092538..4ae2b652a 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -351,9 +351,9 @@ func (c *Controller) RemoveTasksByTableIDs(tables ...int64) { c.operatorController.RemoveTasksByTableIDs(tables...) } -// GetTasksByTableIDs get all tasks by table id -func (c *Controller) GetTasksByTableIDs(tableIDs ...int64) []*replica.SpanReplication { - return c.replicationDB.GetTasksByTableIDs(tableIDs...) +// GetTasksByTableID get all tasks by table id +func (c *Controller) GetTasksByTableID(tableID int64) []*replica.SpanReplication { + return c.replicationDB.GetTasksByTableID(tableID) } // GetAllTasks get all tasks @@ -435,7 +435,7 @@ func (c *Controller) moveTable(tableId int64, targetNode node.ID) error { return apperror.ErrNodeIsNotFound.GenWithStackByArgs("targetNode", targetNode) } - replications := c.replicationDB.GetTasksByTableIDs(tableId) + replications := c.replicationDB.GetTasksByTableID(tableId) if len(replications) != 1 { return apperror.ErrTableIsNotFounded.GenWithStackByArgs("unexpected number of replications found for table in this node; tableID is %s, replication count is %s", tableId, len(replications)) } diff --git a/maintainer/range_checker/bool_range_checker.go b/maintainer/range_checker/bool_range_checker.go index acd2b8b44..1b98e5ba7 100644 --- a/maintainer/range_checker/bool_range_checker.go +++ b/maintainer/range_checker/bool_range_checker.go @@ -13,17 +13,20 @@ package range_checker -import "fmt" +import ( + "fmt" + "sync/atomic" +) // BoolRangeChecker is a range checker that always returns the same value. type BoolRangeChecker struct { - covered bool + covered atomic.Bool } func NewBoolRangeChecker(covered bool) *BoolRangeChecker { - return &BoolRangeChecker{ - covered: covered, - } + boolRangeChecker := &BoolRangeChecker{} + boolRangeChecker.covered.Store(covered) + return boolRangeChecker } // AddSubRange adds a sub table pan to the range checker. @@ -32,7 +35,7 @@ func (f *BoolRangeChecker) AddSubRange(_ int64, _, _ []byte) { // IsFullyCovered checks if the entire range from start to end is covered. func (f *BoolRangeChecker) IsFullyCovered() bool { - return f.covered + return f.covered.Load() } // Reset resets the range checker reported sub spans @@ -40,5 +43,9 @@ func (f *BoolRangeChecker) Reset() { } func (f *BoolRangeChecker) Detail() string { - return fmt.Sprintf("covered: %v", f.covered) + return fmt.Sprintf("covered: %v", f.covered.Load()) +} + +func (f *BoolRangeChecker) MarkCovered() { + f.covered.Store(true) } diff --git a/maintainer/range_checker/range_checker.go b/maintainer/range_checker/range_checker.go index 9c0c846ad..97052ba44 100644 --- a/maintainer/range_checker/range_checker.go +++ b/maintainer/range_checker/range_checker.go @@ -23,4 +23,6 @@ type RangeChecker interface { Reset() // Detail returns the detail status of the range checker, it used for debugging. Detail() string + // mark the range checker is fully covered + MarkCovered() } diff --git a/maintainer/range_checker/table_count_range_checker.go b/maintainer/range_checker/table_count_range_checker.go index 865fb41ad..41043b38b 100644 --- a/maintainer/range_checker/table_count_range_checker.go +++ b/maintainer/range_checker/table_count_range_checker.go @@ -13,12 +13,16 @@ package range_checker -import "fmt" +import ( + "fmt" + "sync/atomic" +) // TableIDRangeChecker is used to check if all table IDs are covered. type TableIDRangeChecker struct { needCount int reportedMap map[int64]struct{} + covered atomic.Bool } // NewTableCountChecker creates a new TableIDRangeChecker. @@ -26,6 +30,7 @@ func NewTableCountChecker(tables int) *TableIDRangeChecker { tc := &TableIDRangeChecker{ needCount: tables, reportedMap: make(map[int64]struct{}, tables), + covered: atomic.Bool{}, } return tc } @@ -37,14 +42,19 @@ func (rc *TableIDRangeChecker) AddSubRange(tableID int64, _, _ []byte) { // IsFullyCovered checks if all table IDs are covered. func (rc *TableIDRangeChecker) IsFullyCovered() bool { - return len(rc.reportedMap) >= rc.needCount + return rc.covered.Load() || len(rc.reportedMap) >= rc.needCount } // Reset resets the reported tables. func (rc *TableIDRangeChecker) Reset() { rc.reportedMap = make(map[int64]struct{}, rc.needCount) + rc.covered.Store(false) } func (rc *TableIDRangeChecker) Detail() string { return fmt.Sprintf("reported count: %d, require count: %d", len(rc.reportedMap), rc.needCount) } + +func (rc *TableIDRangeChecker) MarkCovered() { + rc.covered.Store(true) +} diff --git a/maintainer/range_checker/table_span_range_checker.go b/maintainer/range_checker/table_span_range_checker.go index 461f92902..c10253853 100644 --- a/maintainer/range_checker/table_span_range_checker.go +++ b/maintainer/range_checker/table_span_range_checker.go @@ -17,6 +17,7 @@ import ( "bytes" "fmt" "strings" + "sync/atomic" "github.com/google/btree" "github.com/pingcap/tiflow/pkg/spanz" @@ -25,12 +26,14 @@ import ( // TableSpanRangeChecker is used to check if all ranges cover the start and end byte slices. type TableSpanRangeChecker struct { tableSpans map[int64]*SpanCoverageChecker + covered atomic.Bool } // NewTableSpanRangeChecker creates a new TableSpanRangeChecker with given start and end. func NewTableSpanRangeChecker(tables []int64) *TableSpanRangeChecker { sc := &TableSpanRangeChecker{ tableSpans: make(map[int64]*SpanCoverageChecker), + covered: atomic.Bool{}, } for _, table := range tables { span := spanz.TableIDToComparableSpan(table) @@ -48,6 +51,9 @@ func (rc *TableSpanRangeChecker) AddSubRange(tableID int64, newStart, newEnd []b // IsFullyCovered checks if the entire range from start to end is covered. func (rc *TableSpanRangeChecker) IsFullyCovered() bool { + if rc.covered.Load() { + return true + } for _, span := range rc.tableSpans { if !span.IsFullyCovered() { return false @@ -61,6 +67,11 @@ func (rc *TableSpanRangeChecker) Reset() { for _, span := range rc.tableSpans { span.Reset() } + rc.covered.Store(false) +} + +func (rc *TableSpanRangeChecker) MarkCovered() { + rc.covered.Store(true) } func (rc *TableSpanRangeChecker) Detail() string { @@ -79,19 +90,26 @@ func (rc *TableSpanRangeChecker) Detail() string { type SpanCoverageChecker struct { start, end []byte tree *btree.BTreeG[*RangeNode] + covered atomic.Bool // it's only be set to true when receive the nil range, it means the table is removed. } // NewTableSpanCoverageChecker creates a new NewTableSpanCoverageChecker with given start and end. func NewTableSpanCoverageChecker(start, end []byte) *SpanCoverageChecker { return &SpanCoverageChecker{ - start: start, - end: end, - tree: btree.NewG[*RangeNode](4, rangeLockEntryLess), + start: start, + end: end, + tree: btree.NewG[*RangeNode](4, rangeLockEntryLess), + covered: atomic.Bool{}, } } // AddSubRange adds a sub-range to the range checker. func (rc *SpanCoverageChecker) AddSubRange(newStart, newEnd []byte) { + if newStart == nil && newEnd == nil { + rc.covered.Store(true) + return + } + // Iterate through the B-tree to find overlapping or adjacent ranges var toDelete []*RangeNode @@ -140,6 +158,9 @@ func (rc *SpanCoverageChecker) AddSubRange(newStart, newEnd []byte) { // IsFullyCovered checks if the entire range from start to end is covered. func (rc *SpanCoverageChecker) IsFullyCovered() bool { + if rc.covered.Load() { + return true + } if rc.tree.Len() == 0 { return false } @@ -163,6 +184,11 @@ func (rc *SpanCoverageChecker) IsFullyCovered() bool { // Reset resets the range checker reported sub spans func (rc *SpanCoverageChecker) Reset() { rc.tree = btree.NewG[*RangeNode](4, rangeLockEntryLess) + rc.covered.Store(false) +} + +func (rc *SpanCoverageChecker) MarkCovered() { + rc.covered.Store(true) } // RangeNode represents a node in the BTree. diff --git a/maintainer/replica/replication_db.go b/maintainer/replica/replication_db.go index ef37ae951..b0f75e53a 100644 --- a/maintainer/replica/replication_db.go +++ b/maintainer/replica/replication_db.go @@ -125,20 +125,32 @@ func (db *ReplicationDB) TryRemoveBySchemaID(schemaID int64) []*SpanReplication return tasks } -// GetTasksByTableIDs returns the spans by the table ids -func (db *ReplicationDB) GetTasksByTableIDs(tableIDs ...int64) []*SpanReplication { +// GetTasksByTableID returns the spans by the table id +func (db *ReplicationDB) GetTasksByTableID(tableID int64) []*SpanReplication { db.lock.RLock() defer db.lock.RUnlock() var tasks []*SpanReplication - for _, tableID := range tableIDs { - for _, task := range db.tableTasks[tableID] { - tasks = append(tasks, task) - } + for _, task := range db.tableTasks[tableID] { + tasks = append(tasks, task) } return tasks } +// // GetTasksByTableIDs returns the spans by the table ids +// func (db *ReplicationDB) GetTasksByTableIDs(tableIDs ...int64) []*SpanReplication { +// db.lock.RLock() +// defer db.lock.RUnlock() + +// var tasks []*SpanReplication +// for _, tableID := range tableIDs { +// for _, task := range db.tableTasks[tableID] { +// tasks = append(tasks, task) +// } +// } +// return tasks +// } + // GetAllTasks returns all the spans in the db, it's used when the block event type is all, it will return the ddl span func (db *ReplicationDB) GetAllTasks() []*SpanReplication { db.lock.RLock() diff --git a/maintainer/replica/replication_db_test.go b/maintainer/replica/replication_db_test.go index e5d7804f5..adea20909 100644 --- a/maintainer/replica/replication_db_test.go +++ b/maintainer/replica/replication_db_test.go @@ -59,8 +59,8 @@ func TestBasicFunction(t *testing.T) { require.Len(t, db.GetTasksBySchemaID(2), 0) require.Equal(t, 2, db.GetTaskSizeBySchemaID(1)) require.Equal(t, 0, db.GetTaskSizeBySchemaID(2)) - require.Len(t, db.GetTasksByTableIDs(3), 1) - require.Len(t, db.GetTasksByTableIDs(3, 4), 2) + require.Len(t, db.GetTasksByTableID(3), 1) + require.Len(t, db.GetTasksByTableID(4), 1) require.Len(t, db.GetTaskByNodeID("node1"), 1) require.Len(t, db.GetTaskByNodeID("node2"), 0) require.Equal(t, 0, db.GetTaskSizeByNodeID("node2")) diff --git a/pkg/common/table_info.go b/pkg/common/table_info.go index fcaaf4527..3391adcb5 100644 --- a/pkg/common/table_info.go +++ b/pkg/common/table_info.go @@ -19,6 +19,7 @@ import ( "fmt" "runtime" "strings" + "sync" "sync/atomic" "github.com/pingcap/log" @@ -302,7 +303,8 @@ type TableInfo struct { columnSchema *columnSchema `json:"-"` preSQLs struct { - isInitialized atomic.Bool + mutex sync.Mutex + isInitialized bool m [4]string } `json:"-"` } @@ -314,7 +316,9 @@ func (ti *TableInfo) InitPrivateFields() { return } - if ti.preSQLs.isInitialized.Load() { + ti.preSQLs.mutex.Lock() + defer ti.preSQLs.mutex.Unlock() + if ti.preSQLs.isInitialized { return } @@ -322,7 +326,7 @@ func (ti *TableInfo) InitPrivateFields() { ti.preSQLs.m[preSQLInsert] = fmt.Sprintf(ti.columnSchema.PreSQLs[preSQLInsert], ti.TableName.QuoteString()) ti.preSQLs.m[preSQLReplace] = fmt.Sprintf(ti.columnSchema.PreSQLs[preSQLReplace], ti.TableName.QuoteString()) ti.preSQLs.m[preSQLUpdate] = fmt.Sprintf(ti.columnSchema.PreSQLs[preSQLUpdate], ti.TableName.QuoteString()) - ti.preSQLs.isInitialized.Store(true) + ti.preSQLs.isInitialized = true } func (ti *TableInfo) Marshal() ([]byte, error) { diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 8d3dcd6d7..aa767a7d8 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -414,13 +414,14 @@ func (c *eventBroker) checkAndSendHandshake(task scanTask) bool { // We need call this function every time we send a event(whether dml/ddl/resolvedTs), // thus to ensure the sync point event is in correct order for each dispatcher. func (c *eventBroker) emitSyncPointEventIfNeeded(ts uint64, d *dispatcherStat, remoteID node.ID) { + log.Info("hyy emitSyncPointEventIfNeeded", zap.Uint64("ts", ts), zap.Uint64("nextSyncPoint", d.nextSyncPoint)) if d.enableSyncPoint && ts > d.nextSyncPoint { // Send the sync point event. syncPointEvent := newWrapSyncPointEvent( remoteID, &pevent.SyncPointEvent{ DispatcherID: d.id, - CommitTs: ts, + CommitTs: d.nextSyncPoint, }, d.getEventSenderState()) c.getMessageCh(d.workerIndex) <- syncPointEvent diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index ecb604c7b..045dc3bcd 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -145,6 +145,7 @@ func (w *MysqlWriter) execDDL(event *commonEvent.DDLEvent) error { } shouldSwitchDB := needSwitchDB(event) + // Convert vector type to string type for unsupport database if w.needFormat { if newQuery := formatQuery(event.Query); newQuery != event.Query { @@ -152,6 +153,7 @@ func (w *MysqlWriter) execDDL(event *commonEvent.DDLEvent) error { event.Query = newQuery } } + tx, err := w.db.BeginTx(ctx, nil) if err != nil { return err @@ -236,6 +238,10 @@ func (w *MysqlWriter) waitAsyncDDLDone(event *commonEvent.DDLEvent) { } for _, tableID := range relatedTableIDs { + // tableID 0 means table trigger, which can't not do async ddl + if tableID == 0 { + continue + } state, ok := w.asyncDDLState.Load(tableID) if !ok { // query the downstream, @@ -274,10 +280,59 @@ WHERE TABLE_ID = "%s" AND (STATE = "running" OR STATE = "queueing"); ` +// true means the async ddl is still running, false means the async ddl is done. +func (w *MysqlWriter) doQueryAsyncDDL(tableID int64, query string) (bool, error) { + start := time.Now() + rows, err := w.db.QueryContext(w.ctx, query) + log.Debug("query duration", zap.Any("duration", time.Since(start)), zap.Any("query", query)) + if err != nil { + return false, cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to query ddl jobs table; Query is %s", query))) + } + + defer rows.Close() + var jobID int64 + var jobType string + var schemaState string + var state string + + noRows := true + for rows.Next() { + noRows = false + err := rows.Scan(&jobID, &jobType, &schemaState, &state) + if err != nil { + return false, cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to query ddl jobs table; Query is %s", query))) + } + + log.Info("async ddl is still running", + zap.String("changefeed", w.ChangefeedID.String()), + zap.Duration("checkDuration", time.Since(start)), + zap.Any("tableID", tableID), + zap.Any("jobID", jobID), + zap.String("jobType", jobType), + zap.String("schemaState", schemaState), + zap.String("state", state)) + break + } + + if noRows { + return false, nil + } + + return true, nil +} + // query the ddl jobs to find the state of the async ddl // if the ddl is still running, we should wait for it. func (w *MysqlWriter) checkAndWaitAsyncDDLDoneDownstream(tableID int64) error { query := fmt.Sprintf(checkRunningAddIndexSQL, tableID) + running, err := w.doQueryAsyncDDL(tableID, query) + if err != nil { + return err + } + if !running { + return nil + } + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() @@ -286,38 +341,11 @@ func (w *MysqlWriter) checkAndWaitAsyncDDLDoneDownstream(tableID int64) error { case <-w.ctx.Done(): return nil case <-ticker.C: - start := time.Now() - rows, err := w.db.QueryContext(w.ctx, query) + running, err := w.doQueryAsyncDDL(tableID, query) if err != nil { - return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to query ddl jobs table; Query is %s", query))) + return err } - - defer rows.Close() - var jobID int64 - var jobType string - var schemaState string - var state string - - noRows := true - for rows.Next() { - noRows = false - err := rows.Scan(&jobID, &jobType, &schemaState, &state) - if err != nil { - return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to query ddl jobs table; Query is %s", query))) - } - - log.Info("async ddl is still running", - zap.String("changefeed", w.ChangefeedID.String()), - zap.Duration("checkDuration", time.Since(start)), - zap.Any("tableID", tableID), - zap.Any("jobID", jobID), - zap.String("jobType", jobType), - zap.String("schemaState", schemaState), - zap.String("state", state)) - break - } - - if noRows { + if !running { return nil } }