From b7a4b1c12ac8a393d3f008617aba3581ba97a04b Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Tue, 11 Feb 2025 10:52:43 +0800 Subject: [PATCH] coordinator: improve the log format and code struct (#978) close pingcap/ticdc#963 --- coordinator/changefeed/backoff.go | 7 +- coordinator/changefeed/changefeed.go | 2 +- coordinator/changefeed/changefeed_db.go | 5 +- coordinator/changefeed/changefeed_db_test.go | 3 + coordinator/controller.go | 275 ++++++++++++------ coordinator/coordinator.go | 149 ++++++---- coordinator/operator/operator_controller.go | 50 ++-- coordinator/operator/operator_stop.go | 24 +- downstreamadapter/dispatcher/dispatcher.go | 76 +++-- logservice/eventstore/event_store.go | 18 +- maintainer/maintainer.go | 21 +- maintainer/maintainer_manager.go | 4 +- maintainer/operator/operator_controller.go | 11 + pkg/common/format.go | 3 +- pkg/eventservice/event_broker.go | 67 +++-- pkg/node/node.go | 6 + server/module_election.go | 12 +- .../changefeed_dup_error_restart/run.sh | 2 +- 18 files changed, 486 insertions(+), 249 deletions(-) diff --git a/coordinator/changefeed/backoff.go b/coordinator/changefeed/backoff.go index 587e85150..7ac74ccd8 100644 --- a/coordinator/changefeed/backoff.go +++ b/coordinator/changefeed/backoff.go @@ -188,15 +188,16 @@ func (m *Backoff) HandleError(errs []*heartbeatpb.RunningError) (bool, *heartbea log.Error("The changefeed won't be restarted as it has been experiencing failures for "+ "an extended duration", zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime), - zap.String("namespace", m.id.Namespace()), - zap.String("changefeed", m.id.Name()), + zap.Stringer("changefeed", m.id), + zap.Uint64("checkpointTs", m.checkpointTs), zap.Time("nextRetryTime", m.nextRetryTime.Load()), ) return true, lastError } // if any error is occurred , we should set the changefeed state to warning and stop the changefeed log.Warn("changefeed meets an error, will be stopped", - zap.String("namespace", m.id.Name()), + zap.Stringer("changefeed", m.id), + zap.Uint64("checkpointTs", m.checkpointTs), zap.Time("nextRetryTime", m.nextRetryTime.Load()), zap.Any("error", errs)) // patch the last error to changefeed info diff --git a/coordinator/changefeed/changefeed.go b/coordinator/changefeed/changefeed.go index 38e5111d2..e458d9eea 100644 --- a/coordinator/changefeed/changefeed.go +++ b/coordinator/changefeed/changefeed.go @@ -141,7 +141,7 @@ func (c *Changefeed) UpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool c.status.Store(newStatus) if old.BootstrapDone != newStatus.BootstrapDone { log.Info("Received changefeed status with bootstrapDone", - zap.String("changefeed", c.ID.String()), + zap.Stringer("changefeed", c.ID), zap.Bool("bootstrapDone", newStatus.BootstrapDone)) return true, model.StateNormal, nil } diff --git a/coordinator/changefeed/changefeed_db.go b/coordinator/changefeed/changefeed_db.go index 019b299fb..fdb2bc95e 100644 --- a/coordinator/changefeed/changefeed_db.go +++ b/coordinator/changefeed/changefeed_db.go @@ -224,7 +224,10 @@ func (db *ChangefeedDB) Resume(id common.ChangeFeedID, resetBackoff bool, overwr delete(db.stopped, id) cf.isNew = overwriteCheckpointTs db.AddAbsentWithoutLock(cf) - log.Info("resume changefeed", zap.String("changefeed", id.String()), zap.Any("overwriteCheckpointTs", overwriteCheckpointTs)) + log.Info("resume changefeed", + zap.Stringer("changefeed", id), + zap.Uint64("checkpointTs", cf.GetStatus().CheckpointTs), + zap.Bool("overwriteCheckpointTs", overwriteCheckpointTs)) } } diff --git a/coordinator/changefeed/changefeed_db_test.go b/coordinator/changefeed/changefeed_db_test.go index 3a94d83cf..8f1cc2f07 100644 --- a/coordinator/changefeed/changefeed_db_test.go +++ b/coordinator/changefeed/changefeed_db_test.go @@ -79,6 +79,9 @@ func TestResume(t *testing.T) { cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test")} db.AddStoppedChangefeed(cf) cf.backoff = NewBackoff(cf.ID, 0, 0) + cf.status = atomic.NewPointer(&heartbeatpb.MaintainerStatus{ + CheckpointTs: 100, + }) db.Resume(cf.ID, true, false) diff --git a/coordinator/controller.go b/coordinator/controller.go index 5b4ebf740..b08898046 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -39,6 +39,11 @@ import ( "go.uber.org/zap" ) +const ( + bootstrapperID = "coordinator" + nodeChangeHandlerID = "coordinator-controller" +) + // Controller schedules and balance changefeeds, there are 3 main components: // 1. scheduler: generate operators for handling different scheduling tasks. // 2. operatorController: manage all operators and execute them periodically. @@ -56,8 +61,10 @@ type Controller struct { bootstrapped *atomic.Bool bootstrapper *bootstrap.Bootstrapper[heartbeatpb.CoordinatorBootstrapResponse] - mutex sync.Mutex // protect nodeChanged and do onNodeChanged() - nodeChanged bool + nodeChanged struct { + sync.Mutex + changed bool + } nodeManager *watcher.NodeManager taskScheduler threadpool.ThreadPool @@ -98,8 +105,23 @@ func NewController( version: version, bootstrapped: atomic.NewBool(false), scheduler: scheduler.NewController(map[string]scheduler.Scheduler{ - scheduler.BasicScheduler: scheduler.NewBasicScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, oc.NewAddMaintainerOperator), - scheduler.BalanceScheduler: scheduler.NewBalanceScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, balanceInterval, oc.NewMoveMaintainerOperator), + scheduler.BasicScheduler: scheduler.NewBasicScheduler( + selfNode.ID.String(), + batchSize, + oc, + changefeedDB, + nodeManager, + oc.NewAddMaintainerOperator, + ), + scheduler.BalanceScheduler: scheduler.NewBalanceScheduler( + selfNode.ID.String(), + batchSize, + oc, + changefeedDB, + nodeManager, + balanceInterval, + oc.NewMoveMaintainerOperator, + ), }), eventCh: eventCh, operatorController: oc, @@ -108,23 +130,33 @@ func NewController( nodeManager: nodeManager, taskScheduler: taskScheduler, backend: backend, - nodeChanged: false, updatedChangefeedCh: updatedChangefeedCh, stateChangedCh: stateChangedCh, lastPrintStatusTime: time.Now(), } - c.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.CoordinatorBootstrapResponse]("coordinator", c.newBootstrapMessage) + c.nodeChanged.changed = false + + c.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.CoordinatorBootstrapResponse]( + bootstrapperID, + c.newBootstrapMessage, + ) // init bootstrapper nodes nodes := c.nodeManager.GetAliveNodes() // detect the capture changes - c.nodeManager.RegisterNodeChangeHandler("coordinator-controller", func(allNodes map[node.ID]*node.Info) { - c.mutex.Lock() - c.nodeChanged = true - c.mutex.Unlock() - }) + c.nodeManager.RegisterNodeChangeHandler( + nodeChangeHandlerID, + func(allNodes map[node.ID]*node.Info) { + c.nodeChanged.Lock() + defer c.nodeChanged.Unlock() + c.nodeChanged.changed = true + }, + ) log.Info("coordinator bootstrap initial nodes", - zap.Int("nodes", len(nodes))) + zap.Int("nodeNum", len(nodes)), + zap.Any("nodes", nodes), + ) + newNodes := make([]*node.Info, 0, len(nodes)) for _, n := range nodes { newNodes = append(newNodes, n) @@ -137,21 +169,23 @@ func NewController( } // HandleEvent implements the event-driven process mode -func (c *Controller) HandleEvent(event *Event) bool { +func (c *Controller) HandleEvent(event *Event) { if event == nil { - return false + return } start := time.Now() defer func() { duration := time.Since(start) if duration > time.Second { - log.Info("coordinator is too slow", + log.Info("coordinator is slow, handle a event takes too long", zap.Int("type", event.eventType), zap.Duration("duration", duration)) } }() - // first check the online/offline nodes + + // Before processing the event, we need to check the online/offline nodes, + // the following logic is based on whether the node changed. c.checkOnNodeChanged() switch event.eventType { @@ -160,15 +194,15 @@ func (c *Controller) HandleEvent(event *Event) bool { case EventPeriod: c.onPeriodTask() } - return false } func (c *Controller) checkOnNodeChanged() { - c.mutex.Lock() - defer c.mutex.Unlock() - if c.nodeChanged { + c.nodeChanged.Lock() + defer c.nodeChanged.Unlock() + + if c.nodeChanged.changed { c.onNodeChanged() - c.nodeChanged = false + c.nodeChanged.changed = false } } @@ -185,7 +219,7 @@ func (c *Controller) onMessage(msg *messaging.TargetMessage) { case messaging.TypeMaintainerHeartbeatRequest: if c.bootstrapper.CheckAllNodeInitialized() { req := msg.Message[0].(*heartbeatpb.MaintainerHeartbeat) - c.HandleStatus(msg.From, req.Statuses) + c.handleMaintainerStatus(msg.From, req.Statuses) } default: log.Panic("unexpected message type", @@ -210,13 +244,19 @@ func (c *Controller) onNodeChanged() { c.RemoveNode(id) } } + log.Info("node changed", - zap.Int("new", len(newNodes)), - zap.Int("removed", len(removedNodes))) + zap.Int("newNodeNum", len(newNodes)), + zap.Int("removedNodeNum", len(removedNodes)), + zap.Any("newNodes", newNodes), + zap.Any("removedNodes", removedNodes), + ) + c.sendMessages(c.bootstrapper.HandleNewNodes(newNodes)) cachedResponse := c.bootstrapper.HandleRemoveNodes(removedNodes) if cachedResponse != nil { - log.Info("bootstrap done after removed some nodes") + log.Info("bootstrap done after removed some nodes", + zap.Any("removedNodes", removedNodes)) c.onBootstrapDone(cachedResponse) } } @@ -229,7 +269,7 @@ func (c *Controller) sendMessages(msgs []*messaging.TargetMessage) { func (c *Controller) onMaintainerBootstrapResponse(msg *messaging.TargetMessage) { log.Info("received maintainer bootstrap response", - zap.Any("server", msg.From)) + zap.Stringer("node", msg.From)) cachedResp := c.bootstrapper.HandleBootstrapResponse( msg.From, msg.Message[0].(*heartbeatpb.CoordinatorBootstrapResponse), @@ -250,18 +290,20 @@ func (c *Controller) onBootstrapDone(cachedResp map[node.ID]*heartbeatpb.Coordin zap.Int("size", len(cachedResp))) // runningCfs is the changefeeds that are already running on other nodes runningCfs := make(map[common.ChangeFeedID]remoteMaintainer) - for server, bootstrapMsg := range cachedResp { + for node, bootstrapMsg := range cachedResp { log.Info("received bootstrap response", - zap.Any("server", server), + zap.Stringer("node", node), zap.Int("size", len(bootstrapMsg.Statuses))) for _, info := range bootstrapMsg.Statuses { cfID := common.NewChangefeedIDFromPB(info.ChangefeedID) - if _, ok := runningCfs[cfID]; ok { + if old, ok := runningCfs[cfID]; ok { log.Panic("maintainer runs on multiple node", - zap.String("cf", cfID.Name())) + zap.Stringer("oldNode", old.nodeID), + zap.Stringer("newNode", node), + zap.Stringer("cf", cfID)) } runningCfs[cfID] = remoteMaintainer{ - nodeID: server, + nodeID: node, status: info, } } @@ -269,70 +311,119 @@ func (c *Controller) onBootstrapDone(cachedResp map[node.ID]*heartbeatpb.Coordin c.FinishBootstrap(runningCfs) } -// HandleStatus handle the status report from the node -func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.MaintainerStatus) { - cfs := make(map[common.ChangeFeedID]*changefeed.Changefeed, len(statusList)) +// handleMaintainerStatus handle the status report from the maintainers +func (c *Controller) handleMaintainerStatus(from node.ID, statusList []*heartbeatpb.MaintainerStatus) { + changedCfs := make(map[common.ChangeFeedID]*changefeed.Changefeed, len(statusList)) + for _, status := range statusList { cfID := common.NewChangefeedIDFromPB(status.ChangefeedID) - c.operatorController.UpdateOperatorStatus(cfID, from, status) - cf := c.GetTask(cfID) - if cf == nil { - if status.State != heartbeatpb.ComponentState_Working { - continue - } - if op := c.operatorController.GetOperator(cfID); op == nil { - log.Warn("no changgefeed found and no operator for it, ignore", - zap.String("changefeed", cfID.Name()), - zap.String("from", from.String()), - zap.Any("status", status)) - // if the changefeed is not found, and the status is working, we need to remove it from maintainer - _ = c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(cfID, from, true, true)) - } - continue - } - nodeID := cf.GetNodeID() - if nodeID == "" { - // the changefeed is stopped - continue - } - if nodeID != from { - // todo: handle the case that the node id is mismatch - log.Warn("remote changefeed maintainer nodeID mismatch with local record", - zap.String("changefeed", cfID.Name()), - zap.Stringer("remoteNodeID", from), - zap.Stringer("localNodeID", nodeID)) - continue - } - cfs[cfID] = cf - - changed, state, err := cf.UpdateStatus(status) - if changed { - log.Info("changefeed status changed", - zap.String("changefeed", cfID.Name()), - zap.Any("state", state), - zap.Any("error", err)) - var mErr *model.RunningError - if err != nil { - mErr = &model.RunningError{ - Time: time.Now(), - Addr: err.Node, - Code: err.Code, - Message: err.Message, - } - } - c.stateChangedCh <- &ChangefeedStateChangeEvent{ - ChangefeedID: cfID, - State: state, - err: mErr, - } + cf := c.handleSingleMaintainerStatus(from, status, cfID) + if cf != nil { + changedCfs[cfID] = cf } } + + // Try to send updated changefeeds without blocking select { - case c.updatedChangefeedCh <- cfs: + case c.updatedChangefeedCh <- changedCfs: default: } } +func (c *Controller) handleSingleMaintainerStatus( + from node.ID, + status *heartbeatpb.MaintainerStatus, + cfID common.ChangeFeedID, +) *changefeed.Changefeed { + // Update the operator status first + c.operatorController.UpdateOperatorStatus(cfID, from, status) + + cf := c.getChangefeed(cfID) + if cf == nil { + c.handleNonExistentChangefeed(cfID, from, status) + return nil + } + + if !c.validateMaintainerNode(cf, from, cfID) { + return nil + } + + c.updateChangefeedStatus(cf, cfID, status) + return cf +} + +func (c *Controller) handleNonExistentChangefeed( + cfID common.ChangeFeedID, + from node.ID, + status *heartbeatpb.MaintainerStatus, +) { + // If the changefeed is not in changefeedDB, and the maintainer is not working, just ignore it + if status.State != heartbeatpb.ComponentState_Working { + return + } + + if op := c.operatorController.GetOperator(cfID); op == nil { + log.Warn("no changefeed found and no operator for it, removing from maintainer", + zap.Stringer("changefeed", cfID), + zap.Stringer("sourceNode", from), + zap.String("status", common.FormatMaintainerStatus(status))) + + // Remove working changefeed from maintainer if it's not in changefeedDB + _ = c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(cfID, from, true, true)) + } +} + +func (c *Controller) validateMaintainerNode( + cf *changefeed.Changefeed, + from node.ID, + cfID common.ChangeFeedID, +) bool { + nodeID := cf.GetNodeID() + if nodeID == "" { + return false + } + + if nodeID != from { + log.Warn("remote changefeed maintainer nodeID mismatch with local record", + zap.Stringer("changefeed", cfID), + zap.Stringer("localNode", nodeID), + zap.Stringer("remoteNode", from)) + return false + } + return true +} + +func (c *Controller) updateChangefeedStatus( + cf *changefeed.Changefeed, + cfID common.ChangeFeedID, + status *heartbeatpb.MaintainerStatus, +) { + changed, state, err := cf.UpdateStatus(status) + if !changed { + return + } + + log.Info("changefeed status changed", + zap.Stringer("changefeed", cfID), + zap.String("state", string(state)), + zap.Stringer("error", err)) + + var mErr *model.RunningError + if err != nil { + mErr = &model.RunningError{ + Time: time.Now(), + Addr: err.Node, + Code: err.Code, + Message: err.Message, + } + } + c.stateChangedCh <- &ChangefeedStateChangeEvent{ + ChangefeedID: cfID, + State: state, + err: mErr, + } +} + // FinishBootstrap is called when all nodes have sent bootstrap response // It will load all changefeeds from metastore, and compare with running changefeeds // Then initialize the changefeeds that are not running on other nodes @@ -549,8 +640,8 @@ func (c *Controller) GetChangefeed( return cf.GetInfo(), status, nil } -// GetTask queries a task by channgefeed ID, return nil if not found -func (c *Controller) GetTask(id common.ChangeFeedID) *changefeed.Changefeed { +// getChangefeed returns the changefeed by id, return nil if not found +func (c *Controller) getChangefeed(id common.ChangeFeedID) *changefeed.Changefeed { return c.changefeedDB.GetByID(id) } @@ -586,3 +677,11 @@ func (c *Controller) collectMetrics() { c.lastPrintStatusTime = time.Now() } } + +func shouldRunChangefeed(state model.FeedState) bool { + switch state { + case model.StateStopped, model.StateFailed, model.StateFinished: + return false + } + return true +} diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index e80a0ef96..d63ed41d7 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -39,12 +39,33 @@ import ( pd "github.com/tikv/pd/client" "go.uber.org/atomic" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) -var ( - metricsDSInputChanLen = metrics.DynamicStreamEventChanSize.WithLabelValues("coordinator") - metricsDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("coordinator") -) +// Message Flow in Coordinator: +// (from maintainer) +// External Messages Coordinator Controller Storage +// | | | | +// | ----message-----> | | | +// | | | | +// | | ---event.message----> | | +// | | | | +// | | <---state change----- | | +// | | | | +// | | ----update state----------------> | +// | | | | +// | | <---checkpoint ts---- | | +// | | | | +// | | ----save checkpoint ts-------------> | +// | | | | +// +// Flow Description: +// 1. External messages arrive at Coordinator via MessageCenter +// 2. Coordinator forwards messages as events to Controller +// 3. Controller processes events and reports state changes back +// 4. Coordinator updates state in meta store +// 5. Controller reports checkpoint TS +// 6. Coordinator saves checkpoint TS to meta store var updateGCTickerInterval = 1 * time.Minute @@ -56,6 +77,7 @@ type coordinator struct { lastTickTime time.Time controller *Controller + backend changefeed.Backend mc messaging.MessageCenter taskScheduler threadpool.ThreadPool @@ -64,10 +86,13 @@ type coordinator struct { pdClient pd.Client pdClock pdutil.Clock - eventCh *chann.DrainableChann[*Event] - updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed - stateChangedCh chan *ChangefeedStateChangeEvent - backend changefeed.Backend + // eventCh is used to receive the event from message center, basically these messages + // are from maintainer. + eventCh *chann.DrainableChann[*Event] + // changefeedProgressReportCh is used to receive the changefeed progress report from the controller + changefeedProgressReportCh chan map[common.ChangeFeedID]*changefeed.Changefeed + // changefeedStateChangedCh is used to receive the changefeed state changed event from the controller + changefeedStateChangedCh chan *ChangefeedStateChangeEvent cancel func() closed atomic.Bool @@ -84,28 +109,31 @@ func New(node *node.Info, ) server.Coordinator { mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) c := &coordinator{ - version: version, - nodeInfo: node, - gcServiceID: gcServiceID, - lastTickTime: time.Now(), - gcManager: gc.NewManager(gcServiceID, pdClient, pdClock), - eventCh: chann.NewAutoDrainChann[*Event](), - pdClient: pdClient, - pdClock: pdClock, - mc: mc, - updatedChangefeedCh: make(chan map[common.ChangeFeedID]*changefeed.Changefeed, 1024), - stateChangedCh: make(chan *ChangefeedStateChangeEvent, 1024), - backend: backend, + version: version, + nodeInfo: node, + gcServiceID: gcServiceID, + lastTickTime: time.Now(), + gcManager: gc.NewManager(gcServiceID, pdClient, pdClock), + eventCh: chann.NewAutoDrainChann[*Event](), + pdClient: pdClient, + pdClock: pdClock, + mc: mc, + changefeedProgressReportCh: make(chan map[common.ChangeFeedID]*changefeed.Changefeed, 1024), + changefeedStateChangedCh: make(chan *ChangefeedStateChangeEvent, 1024), + backend: backend, } + // handle messages from message center + mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages) + c.taskScheduler = threadpool.NewThreadPoolDefault() c.closed.Store(false) controller := NewController( c.version, c.nodeInfo, - c.updatedChangefeedCh, - c.stateChangedCh, - backend, + c.changefeedProgressReportCh, + c.changefeedStateChangedCh, + c.backend, c.eventCh, c.taskScheduler, batchSize, @@ -114,19 +142,17 @@ func New(node *node.Info, c.controller = controller - // receive messages - mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages) - nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) - - nodeManager.RegisterOwnerChangeHandler(string(c.nodeInfo.ID), func(newCoordinatorID string) { - if newCoordinatorID != string(c.nodeInfo.ID) { - log.Info("Coordinator changed, and I am not the coordinator, stop myself", - zap.String("selfID", string(c.nodeInfo.ID)), - zap.String("newCoordinatorID", newCoordinatorID)) - c.AsyncStop() - } - }) + nodeManager.RegisterOwnerChangeHandler( + string(c.nodeInfo.ID), + func(newCoordinatorID string) { + if newCoordinatorID != string(c.nodeInfo.ID) { + log.Info("Coordinator changed, and I am not the coordinator, stop myself", + zap.String("selfID", string(c.nodeInfo.ID)), + zap.String("newCoordinatorID", newCoordinatorID)) + c.AsyncStop() + } + }) return c } @@ -139,29 +165,38 @@ func (c *coordinator) recvMessages(_ context.Context, msg *messaging.TargetMessa return nil } -// Run is the entrance of the coordinator, it will be called by the etcd watcher every 50ms. -// 1. Handle message reported by other modules. -// 2. Check if the node is changed: -// - if a new node is added, send bootstrap message to that node , -// - if a node is removed, clean related state machine that bind to that node. -// 3. Schedule changefeeds if all node is bootstrapped. +// Run spawns two goroutines to handle messages and run the coordinator. func (c *coordinator) Run(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel + + eg, cctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return c.run(cctx) + }) + eg.Go(func() error { + return c.runHandleEvent(cctx) + }) + return eg.Wait() +} + +// run handles the following: +// 1. update the gc safepoint to PD +// 2. store the changefeed checkpointTs to meta store +// 3. handle the state changed event +func (c *coordinator) run(ctx context.Context) error { failpoint.Inject("InjectUpdateGCTickerInterval", func(val failpoint.Value) { updateGCTickerInterval = time.Duration(val.(int) * int(time.Millisecond)) }) gcTick := time.NewTicker(updateGCTickerInterval) - ctx, cancel := context.WithCancel(ctx) - c.cancel = cancel + defer gcTick.Stop() updateMetricsTicker := time.NewTicker(time.Second * 1) defer updateMetricsTicker.Stop() - go c.runHandleEvent(ctx) - failpoint.Inject("coordinator-run-with-error", func() error { return errors.New("coordinator run with error") }) - for { select { case <-ctx.Done(): @@ -174,11 +209,11 @@ func (c *coordinator) Run(ctx context.Context) error { now := time.Now() metrics.CoordinatorCounter.Add(float64(now.Sub(c.lastTickTime)) / float64(time.Second)) c.lastTickTime = now - case cfs := <-c.updatedChangefeedCh: + case cfs := <-c.changefeedProgressReportCh: if err := c.saveCheckpointTs(ctx, cfs); err != nil { return errors.Trace(err) } - case event := <-c.stateChangedCh: + case event := <-c.changefeedStateChangedCh: if err := c.handleStateChangedEvent(ctx, event); err != nil { return errors.Trace(err) } @@ -186,6 +221,7 @@ func (c *coordinator) Run(ctx context.Context) error { } } +// runHandleEvent handles messages from the other modules. func (c *coordinator) runHandleEvent(ctx context.Context) error { for { select { @@ -197,8 +233,11 @@ func (c *coordinator) runHandleEvent(ctx context.Context) error { } } -func (c *coordinator) handleStateChangedEvent(ctx context.Context, event *ChangefeedStateChangeEvent) error { - cf := c.controller.GetTask(event.ChangefeedID) +func (c *coordinator) handleStateChangedEvent( + ctx context.Context, + event *ChangefeedStateChangeEvent, +) error { + cf := c.controller.getChangefeed(event.ChangefeedID) if cf == nil { log.Warn("changefeed not found", zap.String("changefeed", event.ChangefeedID.String())) return nil @@ -263,7 +302,7 @@ func (c *coordinator) checkStaleCheckpointTs(ctx context.Context, id common.Chan zap.String("changefeed", id.String()), zap.Error(ctx.Err())) return - case c.stateChangedCh <- &ChangefeedStateChangeEvent{ + case c.changefeedStateChangedCh <- &ChangefeedStateChangeEvent{ ChangefeedID: id, State: state, err: &model.RunningError{ @@ -340,14 +379,6 @@ func (c *coordinator) GetChangefeed(ctx context.Context, changefeedDisplayName c return c.controller.GetChangefeed(ctx, changefeedDisplayName) } -func shouldRunChangefeed(state model.FeedState) bool { - switch state { - case model.StateStopped, model.StateFailed, model.StateFinished: - return false - } - return true -} - func (c *coordinator) AsyncStop() { if c.closed.CompareAndSwap(false, true) { c.mc.DeRegisterHandler(messaging.CoordinatorTopic) diff --git a/coordinator/operator/operator_controller.go b/coordinator/operator/operator_controller.go index c8c782017..128caa134 100644 --- a/coordinator/operator/operator_controller.go +++ b/coordinator/operator/operator_controller.go @@ -34,6 +34,9 @@ import ( // Controller is the operator controller, it manages all operators. // And the Controller is responsible for the execution of the operator. type Controller struct { + mu sync.RWMutex + + role string changefeedDB *changefeed.ChangefeedDB operators map[common.ChangeFeedID]*operator.OperatorWithTime[common.ChangeFeedID, *heartbeatpb.MaintainerStatus] runningQueue operator.OperatorQueue[common.ChangeFeedID, *heartbeatpb.MaintainerStatus] @@ -42,8 +45,6 @@ type Controller struct { selfNode *node.Info backend changefeed.Backend nodeManger *watcher.NodeManager - - lock sync.RWMutex } func NewOperatorController(mc messaging.MessageCenter, @@ -54,6 +55,7 @@ func NewOperatorController(mc messaging.MessageCenter, batchSize int, ) *Controller { oc := &Controller{ + role: "coordinator", operators: make(map[common.ChangeFeedID]*operator.OperatorWithTime[common.ChangeFeedID, *heartbeatpb.MaintainerStatus]), runningQueue: make(operator.OperatorQueue[common.ChangeFeedID, *heartbeatpb.MaintainerStatus], 0), messageCenter: mc, @@ -79,13 +81,14 @@ func (oc *Controller) Execute() time.Time { continue } - oc.lock.RLock() + oc.mu.RLock() msg := r.Schedule() - oc.lock.RUnlock() + oc.mu.RUnlock() if msg != nil { _ = oc.messageCenter.SendCommand(msg) log.Info("send command to maintainer", + zap.String("role", oc.role), zap.String("operator", r.String())) } executedItem++ @@ -97,17 +100,19 @@ func (oc *Controller) Execute() time.Time { // AddOperator adds an operator to the controller, if the operator already exists, return false. func (oc *Controller) AddOperator(op operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus]) bool { - oc.lock.Lock() - defer oc.lock.Unlock() + oc.mu.Lock() + defer oc.mu.Unlock() if pre, ok := oc.operators[op.ID()]; ok { log.Info("add operator failed, operator already exists", + zap.String("role", oc.role), zap.Stringer("operator", op), zap.Stringer("previousOperator", pre.OP)) return false } cf := oc.changefeedDB.GetByID(op.ID()) if cf == nil { log.Warn("add operator failed, changefeed not found", + zap.String("role", oc.role), zap.String("operator", op.String())) return false } @@ -119,12 +124,13 @@ func (oc *Controller) AddOperator(op operator.Operator[common.ChangeFeedID, *hea // if remove is true, it will remove the changefeed from the chagnefeed DB // if remove is false, it only marks as the changefeed stooped in changefeed DB, so we will not schedule the changefeed again func (oc *Controller) StopChangefeed(_ context.Context, cfID common.ChangeFeedID, removed bool) { - oc.lock.Lock() - defer oc.lock.Unlock() + oc.mu.Lock() + defer oc.mu.Unlock() scheduledNode := oc.changefeedDB.StopByChangefeedID(cfID, removed) if scheduledNode == "" { log.Info("changefeed is not scheduled, try stop maintainer using coordinator node", + zap.String("role", oc.role), zap.Bool("removed", removed), zap.String("changefeed", cfID.Name())) scheduledNode = oc.selfNode.ID @@ -142,11 +148,13 @@ func (oc *Controller) pushStopChangefeedOperator(cfID common.ChangeFeedID, nodeI if ok { if oldStop.changefeedIsRemoved { log.Info("changefeed is in removing progress, skip the stop operator", + zap.String("role", oc.role), zap.String("changefeed", cfID.Name())) return } } log.Info("changefeed is stopped , replace the old one", + zap.String("role", oc.role), zap.String("changefeed", cfID.Name()), zap.String("operator", old.OP.String())) old.OP.OnTaskRemoved() @@ -160,8 +168,8 @@ func (oc *Controller) pushStopChangefeedOperator(cfID common.ChangeFeedID, nodeI func (oc *Controller) UpdateOperatorStatus(id common.ChangeFeedID, from node.ID, status *heartbeatpb.MaintainerStatus, ) { - oc.lock.RLock() - defer oc.lock.RUnlock() + oc.mu.RLock() + defer oc.mu.RUnlock() op, ok := oc.operators[id] if ok { @@ -173,8 +181,8 @@ func (oc *Controller) UpdateOperatorStatus(id common.ChangeFeedID, from node.ID, // the controller will mark all maintainers on the node as absent if no operator is handling it, // then the controller will notify all operators. func (oc *Controller) OnNodeRemoved(n node.ID) { - oc.lock.RLock() - defer oc.lock.RUnlock() + oc.mu.RLock() + defer oc.mu.RUnlock() for _, cf := range oc.changefeedDB.GetByNodeID(n) { _, ok := oc.operators[cf.ID] @@ -189,8 +197,8 @@ func (oc *Controller) OnNodeRemoved(n node.ID) { // GetOperator returns the operator by id. func (oc *Controller) GetOperator(id common.ChangeFeedID) operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus] { - oc.lock.RLock() - defer oc.lock.RUnlock() + oc.mu.RLock() + defer oc.mu.RUnlock() if op, ok := oc.operators[id]; !ok { return nil @@ -201,8 +209,8 @@ func (oc *Controller) GetOperator(id common.ChangeFeedID) operator.Operator[comm // HasOperator returns true if the operator with the ChangeFeedDisplayName exists in the controller. func (oc *Controller) HasOperator(dispName common.ChangeFeedDisplayName) bool { - oc.lock.RLock() - defer oc.lock.RUnlock() + oc.mu.RLock() + defer oc.mu.RUnlock() for id := range oc.operators { if id.DisplayName == dispName { @@ -214,8 +222,8 @@ func (oc *Controller) HasOperator(dispName common.ChangeFeedDisplayName) bool { // OperatorSize returns the number of operators in the controller. func (oc *Controller) OperatorSize() int { - oc.lock.RLock() - defer oc.lock.RUnlock() + oc.mu.RLock() + defer oc.mu.RUnlock() return len(oc.operators) } @@ -223,8 +231,8 @@ func (oc *Controller) OperatorSize() int { // "next" is true to indicate that it may exist in next attempt, // and false is the end for the poll. func (oc *Controller) pollQueueingOperator() (operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus], bool) { - oc.lock.Lock() - defer oc.lock.Unlock() + oc.mu.Lock() + defer oc.mu.Unlock() if oc.runningQueue.Len() == 0 { return nil, false @@ -243,6 +251,7 @@ func (oc *Controller) pollQueueingOperator() (operator.Operator[common.ChangeFee metrics.CoordinatorFinishedOperatorCount.WithLabelValues(op.Type()).Inc() metrics.CoordinatorOperatorDuration.WithLabelValues(op.Type()).Observe(time.Since(item.EnqueueTime).Seconds()) log.Info("operator finished", + zap.String("role", oc.role), zap.String("operator", opID.String()), zap.String("operator", op.String())) return nil, true @@ -262,6 +271,7 @@ func (oc *Controller) pollQueueingOperator() (operator.Operator[common.ChangeFee func (oc *Controller) pushOperator(op operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus]) { oc.checkAffectedNodes(op) log.Info("add operator to running queue", + zap.String("role", oc.role), zap.String("operator", op.String())) opWithTime := operator.NewOperatorWithTime(op, time.Now()) oc.operators[op.ID()] = opWithTime diff --git a/coordinator/operator/operator_stop.go b/coordinator/operator/operator_stop.go index 7789329c1..da5a9345b 100644 --- a/coordinator/operator/operator_stop.go +++ b/coordinator/operator/operator_stop.go @@ -56,7 +56,7 @@ func NewStopChangefeedOperator(cfID common.ChangeFeedID, func (m *StopChangefeedOperator) Check(_ node.ID, status *heartbeatpb.MaintainerStatus) { if !m.finished.Load() && status.State != heartbeatpb.ComponentState_Working { log.Info("maintainer report non-working status", - zap.String("maintainer", m.cfID.String())) + zap.Stringer("maintainer", m.cfID)) m.finished.Store(true) } } @@ -69,8 +69,8 @@ func (m *StopChangefeedOperator) Schedule() *messaging.TargetMessage { func (m *StopChangefeedOperator) OnNodeRemove(n node.ID) { if n == m.nodeID { log.Info("node is stopped during stop maintainer, schedule stop command to coordinator node", - zap.String("changefeed", m.cfID.String()), - zap.String("node", n.String())) + zap.Stringer("changefeed", m.cfID), + zap.Stringer("node", n)) m.nodeID = m.coordinatorNodeID } } @@ -93,23 +93,29 @@ func (m *StopChangefeedOperator) OnTaskRemoved() { func (m *StopChangefeedOperator) Start() { log.Info("start remove maintainer operator", - zap.String("changefeed", m.cfID.String())) + zap.Stringer("changefeed", m.cfID)) } func (m *StopChangefeedOperator) PostFinish() { if m.changefeedIsRemoved { if err := m.backend.DeleteChangefeed(context.Background(), m.cfID); err != nil { log.Warn("failed to delete changefeed", - zap.String("changefeed", m.cfID.String()), zap.Error(err)) + zap.Stringer("changefeed", m.cfID), + zap.Error(err)) } } else { - if err := m.backend.SetChangefeedProgress(context.Background(), m.cfID, config.ProgressNone); err != nil { + if err := m.backend.SetChangefeedProgress( + context.Background(), + m.cfID, + config.ProgressNone, + ); err != nil { log.Warn("failed to set changefeed progress", - zap.String("changefeed", m.cfID.String()), zap.Error(err)) + zap.Stringer("changefeed", m.cfID), + zap.Error(err), + ) } } - log.Info("stop maintainer operator finished", - zap.String("changefeed", m.cfID.String())) + log.Info("stop maintainer operator finished", zap.Stringer("changefeed", m.cfID)) } func (m *StopChangefeedOperator) String() string { diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index 65bc0b666..1f1466c2a 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -199,7 +199,7 @@ func (d *Dispatcher) InitializeTableSchemaStore(schemaInfo []*heartbeatpb.Schema } if d.tableSchemaStore != nil { - log.Info("tableSchemaStore has already been initialized", zap.Any("dispatcher", d.id)) + log.Info("tableSchemaStore has already been initialized", zap.Stringer("dispatcher", d.id)) return nil } d.tableSchemaStore = util.NewTableSchemaStore(schemaInfo, d.sink.SinkType()) @@ -235,12 +235,18 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat pendingEvent, blockStatus := d.blockEventStatus.getEventAndStage() if pendingEvent == nil && action.CommitTs > d.GetResolvedTs() { // we have not receive the block event, and the action is for the future event, so just ignore - log.Info("pending event is nil, and the action's commit is larger than dispatchers resolvedTs", zap.Uint64("resolvedTs", d.GetResolvedTs()), zap.Uint64("actionCommitTs", action.CommitTs), zap.Any("dispatcher", d.id)) + log.Info("pending event is nil, and the action's commit is larger than dispatchers resolvedTs", + zap.Uint64("resolvedTs", d.GetResolvedTs()), + zap.Uint64("actionCommitTs", action.CommitTs), + zap.Stringer("dispatcher", d.id)) // we have not receive the block event, and the action is for the future event, so just ignore return } if pendingEvent != nil && action.CommitTs == pendingEvent.GetCommitTs() && blockStatus == heartbeatpb.BlockStage_WAITING { - log.Info("pending event get the action", zap.Any("action", action), zap.Any("dispatcher", d.id), zap.Uint64("pendingEventCommitTs", pendingEvent.GetCommitTs())) + log.Info("pending event get the action", + zap.Any("action", action), + zap.Stringer("dispatcher", d.id), + zap.Uint64("pendingEventCommitTs", pendingEvent.GetCommitTs())) d.blockEventStatus.updateBlockStage(heartbeatpb.BlockStage_WRITING) pendingEvent.PushFrontFlushFunc(func() { // clear blockEventStatus should be before wake ds. @@ -258,8 +264,8 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat case d.errCh <- err: default: log.Error("error channel is full, discard error", - zap.Any("changefeedID", d.changefeedID.String()), - zap.Any("dispatcherID", d.id.String()), + zap.Stringer("changefeedID", d.changefeedID), + zap.Stringer("dispatcherID", d.id), zap.Error(err)) } return @@ -297,7 +303,8 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba // Dispatcher is ready, handle the events for _, dispatcherEvent := range dispatcherEvents { log.Debug("dispatcher receive all event", - zap.Stringer("dispatcher", d.id), zap.Any("event", dispatcherEvent.Event)) + zap.Stringer("dispatcher", d.id), + zap.Any("event", dispatcherEvent.Event)) failpoint.Inject("HandleEventsSlowly", func() { lag := time.Duration(rand.Intn(5000)) * time.Millisecond log.Warn("handle events slowly", zap.Duration("lag", lag)) @@ -308,11 +315,11 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba // Pre-check, make sure the event is not stale if event.GetCommitTs() < d.GetResolvedTs() { log.Warn("Received a stale event, should ignore it", - zap.Any("dispatcherResolvedTs", d.GetResolvedTs()), - zap.Any("EVentCommitTs", event.GetCommitTs()), - zap.Any("seq", event.GetSeq()), - zap.Any("eventType", event.GetType()), - zap.Any("dispatcher", d.id)) + zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()), + zap.Uint64("eventCommitTs", event.GetCommitTs()), + zap.Uint64("seq", event.GetSeq()), + zap.Int("eventType", event.GetType()), + zap.Stringer("dispatcher", d.id)) continue } @@ -338,7 +345,8 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba d.AddDMLEventToSink(dml) case commonEvent.TypeDDLEvent: if len(dispatcherEvents) != 1 { - log.Panic("ddl event should only be singly handled", zap.Any("dispatcherID", d.id)) + log.Panic("ddl event should only be singly handled", + zap.Stringer("dispatcherID", d.id)) } failpoint.Inject("BlockOrWaitBeforeDealWithDDL", nil) block = true @@ -352,8 +360,8 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba case d.errCh <- err: default: log.Error("error channel is full, discard error", - zap.Any("changefeedID", d.changefeedID.String()), - zap.Any("dispatcherID", d.id.String()), + zap.Stringer("changefeedID", d.changefeedID), + zap.Stringer("dispatcherID", d.id), zap.Error(err)) } return @@ -375,7 +383,8 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba d.dealWithBlockEvent(ddl) case commonEvent.TypeSyncPointEvent: if len(dispatcherEvents) != 1 { - log.Panic("sync point event should only be singly handled", zap.Any("dispatcherID", d.id)) + log.Panic("sync point event should only be singly handled", + zap.Stringer("dispatcherID", d.id)) } block = true syncPoint := event.(*commonEvent.SyncPointEvent) @@ -389,10 +398,11 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba d.dealWithBlockEvent(syncPoint) case commonEvent.TypeHandshakeEvent: log.Warn("Receive handshake event unexpectedly", - zap.Stringer("dispatcher", d.id), zap.Any("event", event)) + zap.Stringer("dispatcher", d.id), + zap.Any("event", event)) default: log.Panic("Unexpected event type", - zap.Any("eventType", event.GetType()), + zap.Int("eventType", event.GetType()), zap.Stringer("dispatcher", d.id), zap.Uint64("commitTs", event.GetCommitTs())) } @@ -474,8 +484,8 @@ func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) { case d.errCh <- err: default: log.Error("error channel is full, discard error", - zap.Any("changefeedID", d.changefeedID.String()), - zap.Any("dispatcherID", d.id.String()), + zap.Stringer("changefeedID", d.changefeedID), + zap.Stringer("dispatcherID", d.id), zap.Error(err)) } return @@ -557,10 +567,10 @@ func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) { if schemaIDChange.TableID == d.tableSpan.TableID { if schemaIDChange.OldSchemaID != d.schemaID { log.Error("Wrong Schema ID", - zap.Any("dispatcherID", d.id), - zap.Any("exceptSchemaID", schemaIDChange.OldSchemaID), - zap.Any("actualSchemaID", d.schemaID), - zap.Any("tableSpan", d.tableSpan.String())) + zap.Stringer("dispatcherID", d.id), + zap.Int64("exceptSchemaID", schemaIDChange.OldSchemaID), + zap.Int64("actualSchemaID", d.schemaID), + zap.String("tableSpan", common.FormatTableSpan(d.tableSpan))) return } else { d.schemaID = schemaIDChange.NewSchemaID @@ -641,13 +651,24 @@ func (d *Dispatcher) GetStartTsIsSyncpoint() bool { func (d *Dispatcher) Remove() { log.Info("table event dispatcher component status changed to stopping", - zap.String("table", d.tableSpan.String())) + zap.Stringer("changefeedID", d.changefeedID), + zap.Stringer("dispatcher", d.id), + zap.String("table", common.FormatTableSpan(d.tableSpan)), + zap.Uint64("checkpointTs", d.GetCheckpointTs()), + zap.Uint64("resolvedTs", d.GetResolvedTs()), + ) d.isRemoving.Store(true) dispatcherStatusDS := GetDispatcherStatusDynamicStream() err := dispatcherStatusDS.RemovePath(d.id) if err != nil { - log.Error("remove dispatcher from dynamic stream failed", zap.Error(err)) + log.Error("remove dispatcher from dynamic stream failed", + zap.Stringer("changefeedID", d.changefeedID), + zap.Stringer("dispatcher", d.id), + zap.String("table", common.FormatTableSpan(d.tableSpan)), + zap.Uint64("checkpointTs", d.GetCheckpointTs()), + zap.Uint64("resolvedTs", d.GetResolvedTs()), + zap.Error(err)) } } @@ -656,7 +677,10 @@ func (d *Dispatcher) addToStatusDynamicStream() { dispatcherStatusDS := GetDispatcherStatusDynamicStream() err := dispatcherStatusDS.AddPath(d.id, d) if err != nil { - log.Error("add dispatcher to dynamic stream failed", zap.Error(err)) + log.Error("add dispatcher to dynamic stream failed", + zap.Stringer("changefeedID", d.changefeedID), + zap.Stringer("dispatcher", d.id), + zap.Error(err)) } } diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 2ecb75660..ea8a20544 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -400,15 +400,15 @@ func (e *eventStore) RegisterDispatcher( onlyReuse bool, ) (bool, error) { log.Info("register dispatcher", - zap.Any("dispatcherID", dispatcherID), - zap.String("span", tableSpan.String()), + zap.Stringer("dispatcherID", dispatcherID), + zap.String("span", common.FormatTableSpan(tableSpan)), zap.Uint64("startTs", startTs)) start := time.Now() defer func() { log.Info("register dispatcher done", - zap.Any("dispatcherID", dispatcherID), - zap.String("span", tableSpan.String()), + zap.Stringer("dispatcherID", dispatcherID), + zap.String("span", common.FormatTableSpan(tableSpan)), zap.Uint64("startTs", startTs), zap.Duration("duration", time.Since(start))) }() @@ -444,7 +444,7 @@ func (e *eventStore) RegisterDispatcher( candidateIDs[dispatcherID] = true e.dispatcherMeta.Unlock() log.Info("reuse existing subscription", - zap.Any("dispatcherID", dispatcherID), + zap.Stringer("dispatcherID", dispatcherID), zap.Uint64("subID", uint64(stat.subID)), zap.Uint64("checkpointTs", subscriptionStat.checkpointTs.Load()), zap.Uint64("startTs", startTs)) @@ -532,7 +532,7 @@ func (e *eventStore) RegisterDispatcher( func (e *eventStore) UnregisterDispatcher(dispatcherID common.DispatcherID) error { log.Info("unregister dispatcher", zap.Stringer("dispatcherID", dispatcherID)) defer func() { - log.Info("unregister dispatcher done", zap.Any("dispatcherID", dispatcherID)) + log.Info("unregister dispatcher done", zap.Stringer("dispatcherID", dispatcherID)) }() e.dispatcherMeta.Lock() defer e.dispatcherMeta.Unlock() @@ -623,7 +623,7 @@ func (e *eventStore) GetDispatcherDMLEventState(dispatcherID common.DispatcherID defer e.dispatcherMeta.RUnlock() stat, ok := e.dispatcherMeta.dispatcherStats[dispatcherID] if !ok { - log.Warn("fail to find dispatcher", zap.Any("dispatcherID", dispatcherID)) + log.Warn("fail to find dispatcher", zap.Stringer("dispatcherID", dispatcherID)) return false, DMLEventState{ // ResolvedTs: subscriptionStat.resolvedTs, MaxEventCommitTs: math.MaxUint64, @@ -640,14 +640,14 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com e.dispatcherMeta.RLock() stat, ok := e.dispatcherMeta.dispatcherStats[dispatcherID] if !ok { - log.Warn("fail to find dispatcher", zap.Any("dispatcherID", dispatcherID)) + log.Warn("fail to find dispatcher", zap.Stringer("dispatcherID", dispatcherID)) e.dispatcherMeta.RUnlock() return nil, nil } subscriptionStat := e.dispatcherMeta.subscriptionStats[stat.subID] if dataRange.StartTs < subscriptionStat.checkpointTs.Load() { log.Panic("should not happen", - zap.Any("dispatcherID", dispatcherID), + zap.Stringer("dispatcherID", dispatcherID), zap.Uint64("checkpointTs", subscriptionStat.checkpointTs.Load()), zap.Uint64("startTs", dataRange.StartTs)) } diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index a3d58bf14..6aa4b2e54 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -278,7 +278,10 @@ func (m *Maintainer) HandleEvent(event *Event) bool { if m.scheduleState.Load() == int32(heartbeatpb.ComponentState_Stopped) { log.Warn("maintainer is stopped, stop handling event", - zap.String("changefeed", m.id.String())) + zap.String("changefeed", m.id.String()), + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs), + ) return false } @@ -444,6 +447,9 @@ func (m *Maintainer) onRemoveMaintainer(cascade, changefeedRemoved bool) { m.changefeedRemoved.Store(changefeedRemoved) closed := m.tryCloseChangefeed() if closed { + log.Info("changefeed maintainer closed", + zap.Stringer("changefeed", m.id), + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs)) m.removed.Store(true) m.scheduleState.Store(int32(heartbeatpb.ComponentState_Stopped)) metrics.MaintainerGauge.WithLabelValues(m.id.Namespace(), m.id.Name()).Dec() @@ -572,7 +578,7 @@ func (m *Maintainer) sendMessages(msgs []*messaging.TargetMessage) { } func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) { - // ignore the heartbeat if the maintianer not bootstrapped + // ignore the heartbeat if the maintainer not bootstrapped if !m.bootstrapped.Load() { return } @@ -586,8 +592,8 @@ func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) { m.controller.HandleStatus(msg.From, req.Statuses) if req.Err != nil { log.Warn("dispatcher report an error", - zap.String("changefeed", m.id.Name()), - zap.String("from", msg.From.String()), + zap.Stringer("changefeed", m.id), + zap.Stringer("sourceNode", msg.From), zap.String("error", req.Err.Message)) m.onError(msg.From, req.Err) } @@ -749,9 +755,10 @@ func (m *Maintainer) trySendMaintainerCloseRequestToAllNode() bool { } if len(msgs) > 0 { m.sendMessages(msgs) - log.Info("send maintainer close request", - zap.String("changefeed", m.id.Name()), - zap.Int("count", len(msgs))) + log.Info("send maintainer close request to all dispatcher managers", + zap.Stringer("changefeed", m.id), + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Int("nodeCount", len(msgs))) } return len(msgs) == 0 } diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 5e2897777..231d5f420 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -152,7 +152,9 @@ func (m *Manager) Run(ctx context.Context) error { if cf.removed.Load() { cf.Close() log.Info("maintainer removed, remove it from dynamic stream", - zap.Stringer("changefeed", cf.id)) + zap.Stringer("changefeed", cf.id), + zap.Uint64("checkpointTs", cf.getWatermark().CheckpointTs), + ) m.maintainers.Delete(key) } return true diff --git a/maintainer/operator/operator_controller.go b/maintainer/operator/operator_controller.go index 8cea2261e..a1de352b8 100644 --- a/maintainer/operator/operator_controller.go +++ b/maintainer/operator/operator_controller.go @@ -37,6 +37,7 @@ var _ operator.Controller[common.DispatcherID, *heartbeatpb.TableSpanStatus] = & // Controller is the operator controller, it manages all operators. // And the Controller is responsible for the execution of the operator. type Controller struct { + role string changefeedID common.ChangeFeedID batchSize int messageCenter messaging.MessageCenter @@ -54,6 +55,7 @@ func NewOperatorController( batchSize int, ) *Controller { oc := &Controller{ + role: "maintainer", changefeedID: changefeedID, operators: make(map[common.DispatcherID]*operator.OperatorWithTime[common.DispatcherID, *heartbeatpb.TableSpanStatus]), runningQueue: make(operator.OperatorQueue[common.DispatcherID, *heartbeatpb.TableSpanStatus], 0), @@ -86,6 +88,7 @@ func (oc *Controller) Execute() time.Time { if msg != nil { _ = oc.messageCenter.SendCommand(msg) log.Info("send command to dispatcher", + zap.String("role", oc.role), zap.String("changefeed", oc.changefeedID.Name()), zap.String("operator", r.String())) } @@ -134,6 +137,7 @@ func (oc *Controller) AddOperator(op operator.Operator[common.DispatcherID, *hea if _, ok := oc.operators[op.ID()]; ok { log.Info("add operator failed, operator already exists", + zap.String("role", oc.role), zap.String("changefeed", oc.changefeedID.Name()), zap.String("operator", op.String())) return false @@ -141,6 +145,7 @@ func (oc *Controller) AddOperator(op operator.Operator[common.DispatcherID, *hea span := oc.replicationDB.GetTaskByID(op.ID()) if span == nil { log.Warn("add operator failed, span not found", + zap.String("role", oc.role), zap.String("changefeed", oc.changefeedID.Name()), zap.String("operator", op.String())) return false @@ -219,6 +224,7 @@ func (oc *Controller) pollQueueingOperator() (operator.Operator[common.Dispatche metrics.FinishedOperatorCount.WithLabelValues(model.DefaultNamespace, oc.changefeedID.Name(), op.Type()).Inc() metrics.OperatorDuration.WithLabelValues(model.DefaultNamespace, oc.changefeedID.Name(), op.Type()).Observe(time.Since(item.EnqueueTime).Seconds()) log.Info("operator finished", + zap.String("role", oc.role), zap.String("changefeed", oc.changefeedID.Name()), zap.String("operator", opID.String()), zap.String("operator", op.String())) @@ -240,6 +246,7 @@ func (oc *Controller) pollQueueingOperator() (operator.Operator[common.Dispatche func (oc *Controller) removeReplicaSet(op *RemoveDispatcherOperator) { if old, ok := oc.operators[op.ID()]; ok { log.Info("replica set is removed , replace the old one", + zap.String("role", oc.role), zap.String("changefeed", oc.changefeedID.Name()), zap.String("replicaset", old.OP.ID().String()), zap.String("operator", old.OP.String())) @@ -255,6 +262,7 @@ func (oc *Controller) removeReplicaSet(op *RemoveDispatcherOperator) { func (oc *Controller) pushOperator(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) { oc.checkAffectedNodes(op) log.Info("add operator to running queue", + zap.String("role", oc.role), zap.String("changefeed", oc.changefeedID.Name()), zap.String("operator", op.String())) withTime := operator.NewOperatorWithTime(op, time.Now()) @@ -318,6 +326,7 @@ func (oc *Controller) AddMergeSplitOperator( for _, replicaSet := range affectedReplicaSets { if _, ok := oc.operators[replicaSet.ID]; ok { log.Info("add operator failed, operator already exists", + zap.String("role", oc.role), zap.String("changefeed", oc.changefeedID.Name()), zap.String("dispatcherID", replicaSet.ID.String()), ) @@ -326,6 +335,7 @@ func (oc *Controller) AddMergeSplitOperator( span := oc.replicationDB.GetTaskByID(replicaSet.ID) if span == nil { log.Warn("add operator failed, span not found", + zap.String("role", oc.role), zap.String("changefeed", oc.changefeedID.Name()), zap.String("dispatcherID", replicaSet.ID.String())) return false @@ -344,6 +354,7 @@ func (oc *Controller) AddMergeSplitOperator( oc.pushOperator(op) } log.Info("add merge split operator", + zap.String("role", oc.role), zap.String("changefeed", oc.changefeedID.Name()), zap.String("primary", primaryID.String()), zap.Int64("tableID", splitSpans[0].TableID), diff --git a/pkg/common/format.go b/pkg/common/format.go index 2caf76142..142c612e9 100644 --- a/pkg/common/format.go +++ b/pkg/common/format.go @@ -112,11 +112,12 @@ func FormatMaintainerStatus(s *heartbeatpb.MaintainerStatus) string { } sb := strings.Builder{} sb.WriteString(fmt.Sprintf( - "changefeed: %s, feedState: %s, state: %s, checkpointTs: %d, errs: [", + "changefeed: %s, feedState: %s, state: %s, checkpointTs: %d, bootstrapDone: %t, errs: [", s.ChangefeedID.GetName(), s.FeedState, s.State.String(), s.CheckpointTs, + s.BootstrapDone, )) for _, err := range s.Err { sb.WriteString(err.String()) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 036d3ba1e..ac87fd41a 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -815,9 +815,14 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) { dispatcher := newDispatcherStat(startTs, info, filter, workerIndex, changefeedStatus) if span.Equal(heartbeatpb.DDLSpan) { c.tableTriggerDispatchers.Store(id, dispatcher) - log.Info("table trigger dispatcher register dispatcher", zap.Uint64("clusterID", c.tidbClusterID), - zap.Any("dispatcherID", id), zap.Int64("tableID", span.TableID), - zap.Uint64("startTs", startTs), zap.Duration("brokerRegisterDuration", time.Since(start))) + log.Info("table trigger dispatcher register dispatcher", + zap.Uint64("clusterID", c.tidbClusterID), + zap.Stringer("changefeedID", changefeedID), + zap.Stringer("dispatcherID", id), + zap.String("span", common.FormatTableSpan(span)), + zap.Uint64("startTs", startTs), + zap.Duration("brokerRegisterDuration", time.Since(start)), + ) return } @@ -832,7 +837,10 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) { info.IsOnlyReuse(), ) if err != nil { - log.Panic("register dispatcher to eventStore failed", zap.Error(err), zap.Any("dispatcherInfo", info)) + log.Panic("register dispatcher to eventStore failed", + zap.Error(err), + zap.Any("dispatcherInfo", info), + ) } if !success { if !info.IsOnlyReuse() { @@ -846,20 +854,33 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) { err = c.schemaStore.RegisterTable(span.GetTableID(), info.GetStartTs()) if err != nil { - log.Panic("register table to schemaStore failed", zap.Error(err), zap.Int64("tableID", span.TableID), zap.Uint64("startTs", info.GetStartTs())) + log.Panic("register table to schemaStore failed", + zap.Error(err), + zap.String("span", common.FormatTableSpan(span)), + zap.Uint64("startTs", info.GetStartTs()), + ) } tableInfo, err := c.schemaStore.GetTableInfo(span.GetTableID(), info.GetStartTs()) if err != nil { - log.Panic("get table info from schemaStore failed", zap.Error(err), zap.Int64("tableID", span.TableID), zap.Uint64("startTs", info.GetStartTs())) + log.Panic("get table info from schemaStore failed", + zap.Error(err), + zap.Int64("tableID", span.TableID), + zap.Uint64("startTs", info.GetStartTs()), + ) } dispatcher.updateTableInfo(tableInfo) eventStoreRegisterDuration := time.Since(start) c.dispatchers.Store(id, dispatcher) - log.Info("register dispatcher", zap.Uint64("clusterID", c.tidbClusterID), - zap.Any("dispatcherID", id), zap.Int64("tableID", span.TableID), - zap.Uint64("startTs", startTs), zap.Duration("brokerRegisterDuration", brokerRegisterDuration), - zap.Duration("eventStoreRegisterDuration", eventStoreRegisterDuration)) + log.Info("register dispatcher", + zap.Uint64("clusterID", c.tidbClusterID), + zap.Stringer("changefeedID", changefeedID), + zap.Stringer("dispatcherID", id), + zap.String("span", common.FormatTableSpan(span)), + zap.Uint64("startTs", startTs), + zap.Duration("brokerRegisterDuration", brokerRegisterDuration), + zap.Duration("eventStoreRegisterDuration", eventStoreRegisterDuration), + ) } func (c *eventBroker) removeDispatcher(dispatcherInfo DispatcherInfo) { @@ -877,8 +898,10 @@ func (c *eventBroker) removeDispatcher(dispatcherInfo DispatcherInfo) { c.dispatchers.Delete(id) log.Info("remove dispatcher", zap.Uint64("clusterID", c.tidbClusterID), - zap.Int64("tableID", dispatcherInfo.GetTableSpan().TableID), - zap.Any("dispatcherID", id)) + zap.Stringer("changefeedID", dispatcherInfo.GetChangefeedID()), + zap.Stringer("dispatcherID", id), + zap.String("span", common.FormatTableSpan(dispatcherInfo.GetTableSpan())), + ) } func (c *eventBroker) pauseDispatcher(dispatcherInfo DispatcherInfo) { @@ -888,8 +911,9 @@ func (c *eventBroker) pauseDispatcher(dispatcherInfo DispatcherInfo) { } log.Info("pause dispatcher", zap.Uint64("clusterID", c.tidbClusterID), - zap.Int64("tableID", stat.info.GetTableSpan().TableID), - zap.Any("dispatcher", stat.id), + zap.Stringer("changefeedID", stat.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", stat.id), + zap.String("span", common.FormatTableSpan(stat.info.GetTableSpan())), zap.Uint64("sentResolvedTs", stat.sentResolvedTs.Load()), zap.Uint64("seq", stat.seq.Load())) stat.isRunning.Store(false) @@ -901,8 +925,9 @@ func (c *eventBroker) resumeDispatcher(dispatcherInfo DispatcherInfo) { return } log.Info("resume dispatcher", - zap.Int64("tableID", stat.info.GetTableSpan().TableID), - zap.Any("dispatcher", stat.id), + zap.Stringer("changefeedID", stat.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", stat.id), + zap.String("span", common.FormatTableSpan(stat.info.GetTableSpan())), zap.Uint64("sentResolvedTs", stat.sentResolvedTs.Load()), zap.Uint64("seq", stat.seq.Load())) stat.isRunning.Store(true) @@ -915,8 +940,9 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) { } stat.resetState(dispatcherInfo.GetStartTs()) log.Info("reset dispatcher", - zap.Int64("tableID", stat.info.GetTableSpan().TableID), - zap.Any("dispatcher", stat.id), + zap.Stringer("changefeedID", stat.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", stat.id), + zap.String("span", common.FormatTableSpan(stat.info.GetTableSpan())), zap.Uint64("startTs", stat.info.GetStartTs())) } @@ -925,8 +951,9 @@ func (c *eventBroker) getOrSetChangefeedStatus(changefeedID common.ChangeFeedID) if !ok { stat = newChangefeedStatus(changefeedID) log.Info("new changefeed status", - zap.Any("changefeedID", changefeedID.String()), - zap.Any("stat", stat.(*changefeedStatus).isRunning.Load())) + zap.Stringer("changefeedID", changefeedID), + zap.Bool("isRunning", stat.(*changefeedStatus).isRunning.Load()), + ) c.changefeedMap.Store(changefeedID, stat) } return stat.(*changefeedStatus) diff --git a/pkg/node/node.go b/pkg/node/node.go index 9fcb5c4c4..72c46dc7f 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -15,6 +15,7 @@ package node import ( "encoding/json" + "fmt" "time" "github.com/google/uuid" @@ -63,6 +64,11 @@ func NewInfo(addr string, deployPath string) *Info { } } +func (c *Info) String() string { + return fmt.Sprintf("ID: %s, AdvertiseAddr: %s, Version: %s, GitHash: %s, DeployPath: %s, StartTimestamp: %d, Epoch: %d", + c.ID, c.AdvertiseAddr, c.Version, c.GitHash, c.DeployPath, c.StartTimestamp, c.Epoch) +} + // Marshal using json.Marshal. func (c *Info) Marshal() ([]byte, error) { data, err := json.Marshal(c) diff --git a/server/module_election.go b/server/module_election.go index e08c93f14..0e759fbea 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -131,10 +131,16 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { zap.String("captureID", string(e.svr.info.ID)), zap.Int64("coordinatorVersion", coordinatorVersion)) - co := coordinator.New(e.svr.info, - e.svr.pdClient, e.svr.PDClock, changefeed.NewEtcdBackend(e.svr.EtcdClient), + co := coordinator.New( + e.svr.info, + e.svr.pdClient, + e.svr.PDClock, + changefeed.NewEtcdBackend(e.svr.EtcdClient), e.svr.EtcdClient.GetGCServiceID(), - coordinatorVersion, 10000, time.Minute) + coordinatorVersion, + 10000, + time.Minute, + ) e.svr.setCoordinator(co) err = co.Run(ctx) // When coordinator exits, we need to stop it. diff --git a/tests/integration_tests/changefeed_dup_error_restart/run.sh b/tests/integration_tests/changefeed_dup_error_restart/run.sh index b07aee74a..7d703c64f 100755 --- a/tests/integration_tests/changefeed_dup_error_restart/run.sh +++ b/tests/integration_tests/changefeed_dup_error_restart/run.sh @@ -41,7 +41,7 @@ function run() { go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_dup_error_restart run_sql "CREATE TABLE changefeed_dup_error_restart.finish_mark_2 (a int primary key);" check_table_exists "changefeed_dup_error_restart.finish_mark_2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180 cleanup_process $CDC_BINARY }