Skip to content

Commit

Permalink
Adjust ddl logic in maintainer (#895)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Jan 17, 2025
1 parent bd24c45 commit 7b49a26
Show file tree
Hide file tree
Showing 20 changed files with 543 additions and 332 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,10 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=http_api_tls
- name: Test http_api_tls_with_user_auth
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=http_api_tls_with_user_auth
# - name: Test http_api_tls_with_user_auth
# if: ${{ success() }}
# run: |
# export TICDC_NEWARCH=true && make integration_test CASE=http_api_tls_with_user_auth

- name: Test default_value
if: ${{ success() }}
Expand Down
5 changes: 5 additions & 0 deletions coordinator/changefeed/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func (c *Changefeed) UpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool
return false, model.StateNormal, nil
}

func (c *Changefeed) ForceUpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool, model.FeedState, *heartbeatpb.RunningError) {
c.status.Store(newStatus)
return c.backoff.CheckStatus(newStatus)
}

func (c *Changefeed) IsMQSink() bool {
return c.isMQSink
}
Expand Down
2 changes: 1 addition & 1 deletion coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (c *Controller) ResumeChangefeed(ctx context.Context, id common.ChangeFeedI

status := cf.GetClonedStatus()
status.CheckpointTs = newCheckpointTs
_, _, err := cf.UpdateStatus(status)
_, _, err := cf.ForceUpdateStatus(status)
if err != nil {
return errors.New(err.Message)
}
Expand Down
15 changes: 12 additions & 3 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (d *Dispatcher) InitializeTableSchemaStore(schemaInfo []*heartbeatpb.Schema
// 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream.
// 2. If the action is a pass, we just need to pass the event
func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.DispatcherStatus) {
log.Debug("dispatcher handle dispatcher status",
log.Info("dispatcher handle dispatcher status",
zap.Any("dispatcherStatus", dispatcherStatus),
zap.Stringer("dispatcher", d.id),
zap.Any("action", dispatcherStatus.GetAction()),
Expand All @@ -230,11 +230,22 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
if action != nil {
pendingEvent, blockStatus := d.blockEventStatus.getEventAndStage()
if pendingEvent == nil && action.CommitTs > d.GetResolvedTs() {
// we have not receive the block event, and the action is for the future event, so just ignore
log.Info("pending event is nil, and the action's commit is larger than dispatchers resolvedTs", zap.Any("resolvedTs", d.GetResolvedTs()), zap.Any("action commitTs", action.CommitTs), zap.Any("dispatcher", d.id))
// we have not receive the block event, and the action is for the future event, so just ignore
return
}
if pendingEvent != nil && action.CommitTs == pendingEvent.GetCommitTs() && blockStatus == heartbeatpb.BlockStage_WAITING {
log.Info("pending event get the action", zap.Any("action", action), zap.Any("dispatcher", d.id), zap.Any("pendingEvent commitTs", pendingEvent.GetCommitTs()))
d.blockEventStatus.updateBlockStage(heartbeatpb.BlockStage_WRITING)
pendingEvent.PushFrontFlushFunc(func() {
// clear blockEventStatus should be before wake ds.
// otherwise, there may happen:
// 1. wake ds
// 2. get new ds and set new pending event
// 3. clear blockEventStatus(should be the old pending event, but clear the new one)
d.blockEventStatus.clear()
})
if action.Action == heartbeatpb.Action_Write {
failpoint.Inject("BlockOrWaitBeforeWrite", nil)
err := d.AddBlockEventToSink(pendingEvent)
Expand All @@ -255,8 +266,6 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)
}

d.blockEventStatus.clear()
}

// whether the outdate message or not, we need to return message show we have finished the event.
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (r *ResendTaskMap) Get(identifier BlockEventIdentifier) *ResendTask {
}

func (r *ResendTaskMap) Set(identifier BlockEventIdentifier, task *ResendTask) {
log.Info("set resend task", zap.Any("identifier", identifier), zap.Any("task", task))
r.mutex.Lock()
defer r.mutex.Unlock()
r.m[identifier] = task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ One EventDispatcherManager has one backend sink.
*/
type EventDispatcherManager struct {
changefeedID common.ChangeFeedID
maintainerID node.ID

maintainerIDMutex sync.Mutex
maintainerID node.ID

pdClock pdutil.Clock

Expand Down Expand Up @@ -710,10 +712,14 @@ func (e *EventDispatcherManager) GetDispatcherMap() *DispatcherMap {
}

func (e *EventDispatcherManager) GetMaintainerID() node.ID {
e.maintainerIDMutex.Lock()
defer e.maintainerIDMutex.Unlock()
return e.maintainerID
}

func (e *EventDispatcherManager) SetMaintainerID(maintainerID node.ID) {
e.maintainerIDMutex.Lock()
defer e.maintainerIDMutex.Unlock()
e.maintainerID = maintainerID
}

Expand Down
Loading

0 comments on commit 7b49a26

Please sign in to comment.